Reach

Grow

Manage

Automate

Reach

Grow

Manage

Automate

创建和使用鸟类 Webhook

2022年1月27日

电子邮件

1 min read

创建和使用鸟类 Webhook

2022年1月27日

电子邮件

1 min read

创建和使用鸟类 Webhook

以下是一个简单的指南,帮助发送者在创建鸟类事件Webhook时感到舒适,并使用AWS中的基础设施来消费数据。

Bird 的实时事件 webhook是发件人用来将数据自动推送到他们系统中的非常有价值的工具。这可以推动下游自动化,例如更新邮件列表、触发自动化电子邮件流程或填充内部仪表板。虽然通过 Bird UI 使用事件搜索或通过利用 Bird 事件 API以编程方式访问相同的事件数据,但单个请求返回的记录数量限制或对 API 端点施加的速率限制可能会使这两种方法对大型和复杂的发送者而言都是限制性的。  

实时事件 webhook 使发件人能够配置 Bird 将数据传输到的端点,并且无需安排提取数据的 cron 作业即可使用数据。当提取数据而不是将数据推送给您时,也存在后勤权衡,例如必须确定每个 API 请求使用的时间段和参数。如果时间段未完全对齐,那么您可能会遗漏数据;如果时间段重叠,那么您需要处理重复的数据记录。有了实时 webhook,当事件数据在 Bird 中可用时,它就会简单地推送到您的端点。

虽然许多发件人可能会立即理解实时接收事件数据以推动下游自动化过程的好处,但实现和使用 webhook 的实际过程可能令人望而生畏。如果您对创建端点和以编程方式处理数据的技术组成部分不熟悉,尤其如此。有可用的服务可以自动将 Bird webhook 数据 ETL 到您的数据库中 - 例如 StitchData,这是我们过去曾在博客中讨论过的。然而,如果您希望对该过程有更多控制权,您可以轻松地自己构建组件。以下是一个简单的指南,帮助发件人在使用 AWS 内的基础设施创建 Bird 事件 webhook 并使用数据时感到舒适。

配置 Webhook Target Endpoint

当创建一个Bird事件时,我们希望事件数据能够实时流式传输到AWS中的一个端点,以便我们能够通过编程来使用和消费这些数据。这些数据将从Bird发送到一个目标端点,目标端点将把负载转发到一个lambda函数,解析并存储到一个S3桶中。如下是所描述数据流的高层次图表:


Flowchart for a system with components, showing a process starting with 'MessageBird', continuing through an 'Application Load Balancer', leading to a 'Lambda Script (Consumer)', connecting to an 'S3' storage, and optionally processed by another 'Lambda Script (Process)'.


要实现这个工作流程,我们实际上是以逆序构建它们,从创建我们存储事件数据的S3桶开始,然后倒退——添加各个组件以馈入构建的内容。

创建一个S3桶以存储Webhook数据

在创建负载均衡器接受数据,或Lambda函数存储数据之前,我们需要先创建一个S3桶以存储数据。尽管S3为Webhook数据提供了优秀的存储,组织还使用PostgreSQL数据库进行事件处理,应实施适当的备份和恢复程序以保护其结构化数据以及S3存储策略。为此,请在AWS内浏览到S3服务并点击“创建桶”。系统将提示您为桶分配名称并设置区域——确保使用与ALB和lambda函数相同的区域。当创建S3桶时,它将是空的,如果您想将数据组织到一个文件夹中,您可以立即创建所需的目录,或者在lambda函数存储文件时创建目录。在此示例中,我们为S3桶命名为“bird-webhooks”,并创建了名为“B Event Data”的文件夹来存储我们的事件数据——您将在下面的lambda函数中看到这些名称被引用。

创建一个Lambda函数以消费数据

数据的实际处理和存储将由一个由我们程序负载均衡器(ALB)调用的lambda函数来执行。

第一步是通过浏览到AWS中的Lambda服务并点击“创建函数”来创建lambda函数。系统将提示您为lambda函数分配名称并选择编程语言来编写函数。在此示例中,我们使用Python作为运行时语言。

现在我们需要开发lambda函数。暂时假设我们的程序负载均衡器已经配置并将Webhook负载转发给我们的lambda函数——lambda将接收到一个包含完整头和主体的负载。负载使用对象“事件”作为字典传递给lambda函数。您可以通过访问负载内的“headers”和“body”对象来独立引用负载的头和主体。在此示例中,我们仅仅读取“x-messagesystems-batch-id”头,其中的批次ID是Bird为Webhook批次创建的唯一值,并在S3中以平面文件存储主体时使用它作为文件名;不过,您可能要根据需要添加其他功能,例如身份验证检查或错误处理

当将负载存储为S3上的平面文件时,我们需要定义将存储负载数据的S3桶、位置和文件名。在我们的样本lambda函数中,我们在“store_batch”函数内执行此操作。在此示例中,我们将整个批次存储为单个文件,这有助于确保数据在Bird和您的端点之间的HTTP连接超时之前被收集和存储。尽管您可以调整负载均衡器上的连接超时设置,但不能保证连接不会在发送方(在本例中为Bird)超时,或在lambda函数执行完成之前终止连接。最佳实践是使消费者功能尽可能高效,并在可能的情况下将数据处理活动保留为下游进程——例如将批处理的JSON格式负载转换为CSV文件,或将事件数据加载到数据库中。

需要注意的是,您可能需要更新lambda函数的权限。执行角色需要S3的PutObject和GetObject权限。最佳实践是执行最小特权原则,因此我建议仅为存储Webhook负载的S3桶设置这些权限。

我们的消费者lambda函数的示例可以在这里找到。

关于批次ID的快速说明:Bird将批量事件到单个负载中,其中每个批次可能包含1到350个或更多的事件记录。批次将分配一个唯一的批次ID,通过利用事件Webhooks API或通过点击Webhook流并选择“批次状态”在您的Bird帐户中查看批次状态。假设无法交付Webhook负载,例如连接超时期间,Bird将使用相同的批次ID自动重试批次。这可能发生在您的lambda函数接近10秒的最大往返时间运行时,这是优化消费者功能以减少执行时间的理由。

为处理所有数据处理活动,我建议创建一个单独的lambda函数,在S3桶中创建新文件时执行——这样,数据处理是与数据传输异步执行的,并且没有因连接终止而失去数据的风险。我将在后面的部分讨论处理lambda函数。

创建一个应用程序负载均衡器

为了接收Webhook负载,我们需要提供一个端点以便发送负载。对此,我们通过在AWS中创建一个应用程序负载均衡器,步骤是导航到EC2 > Load Balancers并点击“创建负载均衡器”。系统将提示您选择要创建的负载均衡器类型——为此,我们希望创建一个应用程序负载均衡器。我们需要使用应用程序负载均衡器(ALB)来构建我们的消费者,因为事件Webhooks将作为HTTP请求发送,并且ALB用于在AWS中路由HTTP请求。可以作为替代实现HTTP Gateway;然而,我们在该项目中选择使用ALB,因为它比HTTP Gateway更加轻量和经济。值得注意的是,如果选择使用HTTP Gateway,与ALB相比事件格式可能不同,因此需要相应地处理请求对象。

创建ALB后,系统将提示您为ALB指定名称并配置方案和访问/安全设置——由于我们计划从外部源(Bird)接收事件数据,因此我们希望ALB面向互联网。在“侦听器和路由”下,ALB应监听端口443上的HTTPS,并且我们希望创建一个指向我们的lambda函数的目标组,以便我们的ALB将入站请求转发给我们在上面创建的消费lambda函数。还需要确保安全组有接收443端口流量的权限。

为负载均衡器创建DNS记录

为了更容易地使用我们的ALB作为端点,我们将在DNS中创建一个指向ALB的A记录。对此,我们可以使用AWS Route 53服务(或当前的DNS提供商)并为要用于端点的主机名创建A记录(例如,spevents.<你的域名>)。在AWS上大规模处理DNS时,请注意可能影响高流量应用程序的未公开的DNS限制,尤其是那些处理大量出站流量的应用程序,如电子邮件传输系统。A记录应被配置为指向我们创建的ALB。如果您使用Route 53来管理DNS记录,可以通过启用“Alias”并选择ALB来直接引用ALB实例;否则,如果使用外部DNS提供商,应该将A记录指向ALB实例的公共IP地址。

我建议使用Postman等工具测试一切是否正确配置,然后再启用Bird Webhook。您可以向您的端点发送一个POST请求,并确认收到响应。如果您的POST请求未返回响应,可能需要仔细检查您的ALB是否监听正确的端口。

创建一个Bird Webhook

现在我们准备好在Bird中创建webhook,并使用上面A记录中定义的主机名作为我们目标端点。要创建webhook,请导航到您的Bird帐户中的Webhooks部分并点击“创建webhook”。系统将提示您为webhook分配名称并提供目标URL——目标应是先前创建的A记录的主机名。请注意,目标URL可能需要在URL中包含“HTTPS://”。

完成后,验证已选择正确的子帐户和事件,然后按“创建webhook”保存您的配置。所有选定事件类型的事件数据现在将流到我们的目标URL,并被我们的ALB消费以供后续处理。

处理 Webhook Event Data

根据存储Bird事件数据的预期用途,您的需求可能可以通过简单地将JSON有效负载存储为一个平面文件来满足。您可能已经建立了一个下游ETL过程,能够消费并加载JSON格式的数据。在这两种情况下,您可能可以按原样使用我们上面创建的处理lambda创建的平面文件。

或者,您可能需要转换数据——例如从JSON转换为CSV格式——或直接将数据加载到数据库中。在这个例子中,我们将创建一个简单的lambda函数,将来自原始JSON格式的webhook数据转换为可以加载到数据库中的CSV文件。

创建一个Lambda来处理数据

与用于消费webhook数据的lambda函数一样,我们需要通过导航到AWS中的Lambda服务并按“创建函数”来创建一个新的lambda函数。当我们S3桶上创建新文件时,新lambda函数将被触发——它将读取数据并将其转换为一个新的csv文件。

lambda函数接受文件信息作为一个事件。在示例lambda函数中,您将看到我们首先进行了一系列验证检查,以确保数据完整且格式符合预期。接下来,我们使用“csv”库将JSON有效负载转换为CSV文件并写入临时文件。Lambda函数只能将本地文件写入“/tmp”目录,所以我们创建了一个临时csv文件,并使用<batch_id>.csv的命名约定命名。我们在这里使用batch_id的原因只是为了确保由于接收到多个webhook有效负载而运行的任何并行进程不会相互干扰,因为每个webhook批次将有一个唯一的batch_id。

一旦数据完全转换为CSV,我们读取CSV数据作为字节流,删除临时文件,并将CSV数据作为新文件保存到S3。需要注意的是,输出需要不同的S3桶,否则,我们有可能创建一个递归循环,这可能导致lambda使用增加和成本增加。我们需要在lambda函数中识别我们希望将CSV文件存储在哪个S3桶和位置。按照上述相同的程序创建一个新的S3桶以存储CSV文件。

注意,tmp目录的空间限制为512 MB,因此重要的是临时文件随后被删除,以确保将来执行时有足够的空间。我们使用临时文件而不是直接写入S3的原因是通过一个请求来简化与S3的连接。

就像使用consume lambda函数一样,您可能需要更新处理lambda函数的权限。此lambda函数需要执行角色具有输入S3桶的GetObject权限,以及输出S3桶的PutObject和GetObject权限。

我们的处理lambda函数样本可以在此处找到。

配置一个Lambda以在S3上存储新数据时执行

现在,我们的lambda函数已创建,用于将文件从JSON格式转换为CSV格式,我们需要配置它以在我们的S3桶上创建新文件时触发。为此,我们需要通过打开lambda函数并单击页面顶部的“添加触发器”来为lambda函数添加触发器。选择“S3”,并提供存储原始webhook有效负载的S3桶名称。您还可以选择指定文件前缀和/或后缀以进行过滤。配置设置后,通过单击页面底部的“添加”来添加触发器。现在,每当您的S3 bucket中添加新文件时,您的处理lambda函数将执行。

将数据加载到数据库中

在这个例子中,我将不详细介绍将数据加载到数据库中,但如果您一直在关注这个例子,您有几个选项:

  1. 在您的处理lambda函数中将数据直接加载到数据库中

  2. 使用已建立的ETL过程消费您的CSV文件

无论您是使用AWS数据库服务,如RDSDynamoDB,还是您有自己的PostgreSQL数据库(或类似),您都可以直接从您的处理lambda函数连接到数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您还可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似于SQL的查询语言访问数据。我建议参考您所使用服务的相关文档,以获取有关如何最好地在您的环境中完成此操作的更多信息。

同样,还有许多服务可以帮助消费CSV文件并将数据加载到数据库中。您可能已经有可以利用的ETL过程。

我们希望您觉得本指南有帮助——愿您发送愉快!

根据存储Bird事件数据的预期用途,您的需求可能可以通过简单地将JSON有效负载存储为一个平面文件来满足。您可能已经建立了一个下游ETL过程,能够消费并加载JSON格式的数据。在这两种情况下,您可能可以按原样使用我们上面创建的处理lambda创建的平面文件。

或者,您可能需要转换数据——例如从JSON转换为CSV格式——或直接将数据加载到数据库中。在这个例子中,我们将创建一个简单的lambda函数,将来自原始JSON格式的webhook数据转换为可以加载到数据库中的CSV文件。

创建一个Lambda来处理数据

与用于消费webhook数据的lambda函数一样,我们需要通过导航到AWS中的Lambda服务并按“创建函数”来创建一个新的lambda函数。当我们S3桶上创建新文件时,新lambda函数将被触发——它将读取数据并将其转换为一个新的csv文件。

lambda函数接受文件信息作为一个事件。在示例lambda函数中,您将看到我们首先进行了一系列验证检查,以确保数据完整且格式符合预期。接下来,我们使用“csv”库将JSON有效负载转换为CSV文件并写入临时文件。Lambda函数只能将本地文件写入“/tmp”目录,所以我们创建了一个临时csv文件,并使用<batch_id>.csv的命名约定命名。我们在这里使用batch_id的原因只是为了确保由于接收到多个webhook有效负载而运行的任何并行进程不会相互干扰,因为每个webhook批次将有一个唯一的batch_id。

一旦数据完全转换为CSV,我们读取CSV数据作为字节流,删除临时文件,并将CSV数据作为新文件保存到S3。需要注意的是,输出需要不同的S3桶,否则,我们有可能创建一个递归循环,这可能导致lambda使用增加和成本增加。我们需要在lambda函数中识别我们希望将CSV文件存储在哪个S3桶和位置。按照上述相同的程序创建一个新的S3桶以存储CSV文件。

注意,tmp目录的空间限制为512 MB,因此重要的是临时文件随后被删除,以确保将来执行时有足够的空间。我们使用临时文件而不是直接写入S3的原因是通过一个请求来简化与S3的连接。

就像使用consume lambda函数一样,您可能需要更新处理lambda函数的权限。此lambda函数需要执行角色具有输入S3桶的GetObject权限,以及输出S3桶的PutObject和GetObject权限。

我们的处理lambda函数样本可以在此处找到。

配置一个Lambda以在S3上存储新数据时执行

现在,我们的lambda函数已创建,用于将文件从JSON格式转换为CSV格式,我们需要配置它以在我们的S3桶上创建新文件时触发。为此,我们需要通过打开lambda函数并单击页面顶部的“添加触发器”来为lambda函数添加触发器。选择“S3”,并提供存储原始webhook有效负载的S3桶名称。您还可以选择指定文件前缀和/或后缀以进行过滤。配置设置后,通过单击页面底部的“添加”来添加触发器。现在,每当您的S3 bucket中添加新文件时,您的处理lambda函数将执行。

将数据加载到数据库中

在这个例子中,我将不详细介绍将数据加载到数据库中,但如果您一直在关注这个例子,您有几个选项:

  1. 在您的处理lambda函数中将数据直接加载到数据库中

  2. 使用已建立的ETL过程消费您的CSV文件

无论您是使用AWS数据库服务,如RDSDynamoDB,还是您有自己的PostgreSQL数据库(或类似),您都可以直接从您的处理lambda函数连接到数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您还可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似于SQL的查询语言访问数据。我建议参考您所使用服务的相关文档,以获取有关如何最好地在您的环境中完成此操作的更多信息。

同样,还有许多服务可以帮助消费CSV文件并将数据加载到数据库中。您可能已经有可以利用的ETL过程。

我们希望您觉得本指南有帮助——愿您发送愉快!

根据存储Bird事件数据的预期用途,您的需求可能可以通过简单地将JSON有效负载存储为一个平面文件来满足。您可能已经建立了一个下游ETL过程,能够消费并加载JSON格式的数据。在这两种情况下,您可能可以按原样使用我们上面创建的处理lambda创建的平面文件。

或者,您可能需要转换数据——例如从JSON转换为CSV格式——或直接将数据加载到数据库中。在这个例子中,我们将创建一个简单的lambda函数,将来自原始JSON格式的webhook数据转换为可以加载到数据库中的CSV文件。

创建一个Lambda来处理数据

与用于消费webhook数据的lambda函数一样,我们需要通过导航到AWS中的Lambda服务并按“创建函数”来创建一个新的lambda函数。当我们S3桶上创建新文件时,新lambda函数将被触发——它将读取数据并将其转换为一个新的csv文件。

lambda函数接受文件信息作为一个事件。在示例lambda函数中,您将看到我们首先进行了一系列验证检查,以确保数据完整且格式符合预期。接下来,我们使用“csv”库将JSON有效负载转换为CSV文件并写入临时文件。Lambda函数只能将本地文件写入“/tmp”目录,所以我们创建了一个临时csv文件,并使用<batch_id>.csv的命名约定命名。我们在这里使用batch_id的原因只是为了确保由于接收到多个webhook有效负载而运行的任何并行进程不会相互干扰,因为每个webhook批次将有一个唯一的batch_id。

一旦数据完全转换为CSV,我们读取CSV数据作为字节流,删除临时文件,并将CSV数据作为新文件保存到S3。需要注意的是,输出需要不同的S3桶,否则,我们有可能创建一个递归循环,这可能导致lambda使用增加和成本增加。我们需要在lambda函数中识别我们希望将CSV文件存储在哪个S3桶和位置。按照上述相同的程序创建一个新的S3桶以存储CSV文件。

注意,tmp目录的空间限制为512 MB,因此重要的是临时文件随后被删除,以确保将来执行时有足够的空间。我们使用临时文件而不是直接写入S3的原因是通过一个请求来简化与S3的连接。

就像使用consume lambda函数一样,您可能需要更新处理lambda函数的权限。此lambda函数需要执行角色具有输入S3桶的GetObject权限,以及输出S3桶的PutObject和GetObject权限。

我们的处理lambda函数样本可以在此处找到。

配置一个Lambda以在S3上存储新数据时执行

现在,我们的lambda函数已创建,用于将文件从JSON格式转换为CSV格式,我们需要配置它以在我们的S3桶上创建新文件时触发。为此,我们需要通过打开lambda函数并单击页面顶部的“添加触发器”来为lambda函数添加触发器。选择“S3”,并提供存储原始webhook有效负载的S3桶名称。您还可以选择指定文件前缀和/或后缀以进行过滤。配置设置后,通过单击页面底部的“添加”来添加触发器。现在,每当您的S3 bucket中添加新文件时,您的处理lambda函数将执行。

将数据加载到数据库中

在这个例子中,我将不详细介绍将数据加载到数据库中,但如果您一直在关注这个例子,您有几个选项:

  1. 在您的处理lambda函数中将数据直接加载到数据库中

  2. 使用已建立的ETL过程消费您的CSV文件

无论您是使用AWS数据库服务,如RDSDynamoDB,还是您有自己的PostgreSQL数据库(或类似),您都可以直接从您的处理lambda函数连接到数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您还可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似于SQL的查询语言访问数据。我建议参考您所使用服务的相关文档,以获取有关如何最好地在您的环境中完成此操作的更多信息。

同样,还有许多服务可以帮助消费CSV文件并将数据加载到数据库中。您可能已经有可以利用的ETL过程。

我们希望您觉得本指南有帮助——愿您发送愉快!

让我们为您联系Bird专家。
在30分钟内见证Bird的全部威力。

通过提交,您同意 Bird 可能会就我们的产品和服务与您联系。

您可以随时取消订阅。查看Bird的隐私声明以获取有关数据处理的详细信息。

Newsletter

通过每周更新到您的收件箱,随时了解 Bird 的最新动态。

让我们为您联系Bird专家。
在30分钟内见证Bird的全部威力。

通过提交,您同意 Bird 可能会就我们的产品和服务与您联系。

您可以随时取消订阅。查看Bird的隐私声明以获取有关数据处理的详细信息。

Newsletter

通过每周更新到您的收件箱,随时了解 Bird 的最新动态。

让我们为您联系Bird专家。
在30分钟内见证Bird的全部威力。

通过提交,您同意 Bird 可能会就我们的产品和服务与您联系。

您可以随时取消订阅。查看Bird的隐私声明以获取有关数据处理的详细信息。

R

Reach

G

Grow

M

Manage

A

Automate

Newsletter

通过每周更新到您的收件箱,随时了解 Bird 的最新动态。