Reach

Grow

Manage

Automate

Reach

Grow

Manage

Automate

创建和使用鸟类 Webhook

电子邮件

1 min read

创建和使用鸟类 Webhook

电子邮件

1 min read

创建和使用鸟类 Webhook

以下是一个简单的指南,帮助发送者在创建鸟类事件Webhook时感到舒适,并使用AWS中的基础设施来消费数据。

Bird 的实时事件 webhook是发件人用来将数据自动推送到他们系统中的非常有价值的工具。这可以推动下游自动化,例如更新邮件列表、触发自动化电子邮件流程或填充内部仪表板。虽然通过 Bird UI 使用事件搜索或通过利用 Bird 事件 API以编程方式访问相同的事件数据,但单个请求返回的记录数量限制或对 API 端点施加的速率限制可能会使这两种方法对大型和复杂的发送者而言都是限制性的。  

实时事件 webhook 使发件人能够配置 Bird 将数据传输到的端点,并且无需安排提取数据的 cron 作业即可使用数据。当提取数据而不是将数据推送给您时,也存在后勤权衡,例如必须确定每个 API 请求使用的时间段和参数。如果时间段未完全对齐,那么您可能会遗漏数据;如果时间段重叠,那么您需要处理重复的数据记录。有了实时 webhook,当事件数据在 Bird 中可用时,它就会简单地推送到您的端点。

虽然许多发件人可能会立即理解实时接收事件数据以推动下游自动化过程的好处,但实现和使用 webhook 的实际过程可能令人望而生畏。如果您对创建端点和以编程方式处理数据的技术组成部分不熟悉,尤其如此。有可用的服务可以自动将 Bird webhook 数据 ETL 到您的数据库中 - 例如 StitchData,这是我们过去曾在博客中讨论过的。然而,如果您希望对该过程有更多控制权,您可以轻松地自己构建组件。以下是一个简单的指南,帮助发件人在使用 AWS 内的基础设施创建 Bird 事件 webhook 并使用数据时感到舒适。

配置 Webhook Target Endpoint

当创建一个Bird事件时,我们希望这些事件数据实时流向AWS中的一个端点,以便我们能够以编程方式消费和使用这些数据。数据将从Bird发送到一个目标端点,然后将载荷转发到一个lambda函数,处理并存储数据在S3桶中。下面可以看到所描述的数据流的高级图示:


Flowchart for a system with components, showing a process starting with 'MessageBird', continuing through an 'Application Load Balancer', leading to a 'Lambda Script (Consumer)', connecting to an 'S3' storage, and optionally processed by another 'Lambda Script (Process)'.


要实现这个工作流程,让我们从后向前实际构建,首先创建一个S3桶用于存储我们的事件数据,然后向后回溯—添加喂入我们已经构建的每个组件。

创建一个S3桶存储Webhook数据

在创建我们的负载均衡器以接收数据或我们的lambda函数以存储数据之前,我们需要先创建一个S3桶来存储这些数据。为此,请导航到AWS中的S3服务并按“创建桶”。您将被提示为桶分配名称并设置区域—确保使用与您的ALB和lambda函数相同的区域。当您的S3桶创建后,它将是空的—如果您想在文件夹内组织数据,可以立即创建预期的目录,或目录将在lambda函数存储文件时创建。在此示例中,我们命名我们的S3桶为“bird-webhooks”并创建一个名为“B Event Data”的文件夹以存储我们的事件数据—您将在下文中看到这些名称在我们的lambda函数中被引用。

创建一个Lambda函数消费数据

数据的实际处理和存储将由一个lambda函数完成,该函数由我们的应用程序负载均衡器(ALB)调用。 

第一步是通过导航到AWS中的Lambda服务并点击“创建函数”来创建您的lambda函数。您将被提示为您的lambda函数分配名称并选择编写函数的编程语言。在此示例中,我们使用Python作为运行时语言。

现在我们需要开发我们的lambda函数。假设我们的应用负载均衡器已经配置并将Webhook载荷转发到我们的lambda函数—该lambda将收到包括完整头和主体的载荷。载荷以字典形式作为对象“event”传递到我们的lambda函数。您可以通过访问载荷内的“headers”和“body”对象独立引用载荷的头和主体。在这个示例中,我们只是要读取“x-messagesystems-batch-id”头,其中的批次ID是Bird为Webhook批次创建的唯一值,并在将正文作为平面文件存储在S3时将其用作文件名;然而,您可能需要根据需要添加其他功能,例如身份验证检查或错误处理

在S3上存储载荷为平面文件时,我们需要定义S3桶的名称、位置以及载荷数据将被存储的文件名。在我们的示例lambda函数中,我们在“store_batch”函数内执行此操作。在此示例中,我们将整个批次存储为单个文件,这有助于确保数据在Bird和您的端点之间的HTTP连接超时之前被收集和存储。虽然您可以调整负载均衡器上的连接超时设置,但不能保证在传输侧(在此场景中为Bird)连接不会超时,或连接在您的lambda函数完成执行之前不会被终止。最佳实践是让您的消费函数尽可能高效,并将数据处理活动保留给可能的下游进程—例如将批量的JSON格式载荷转换为CSV文件,或将事件数据加载到数据库中。

需要注意,您可能需要更新您lambda函数的权限。您的执行角色将需要S3的PutObject和GetObject权限。最佳实践是强制执行最小权限原则,因此我建议仅为存储Webhook载荷的S3桶设置这些权限。

有关我们的消费者lambda函数的示例可以在这里找到。

关于批次ID的快速说明:Bird将批量事件集合为单个载荷,每批次可能包含1到350或更多事件记录。该批次将被赋予一个唯一的批次ID,可以通过利用事件Webhooks API或者在您的Bird账户中点击Webhook流并选择“批次状态”来查看批次状态。如果Webhook载荷无法传递,例如在连接超时时,Bird将自动使用相同的批次ID重试该批次。这可能发生在您的lambda函数运行接近10秒最大往返时间时,是优化消费者函数以减少执行时间的原因之一。

为处理所有数据处理活动,我建议创建一个单独的lambda函数,当在S3桶上创建新文件时执行—这样,数据处理在数据传输的异步过程中进行,并且没有因连接终止而丢失数据的风险。我在后面的章节中讨论了处理lambda函数。

创建应用程序负载均衡器

为了接收Webhook载荷,我们需要提供一个端点以发送载荷。我们通过在AWS中创建一个应用程序负载均衡器来做到这一点,方法是导航到EC2 > 负载均衡器并点击“创建负载均衡器”。您将被提示选择要创建的负载均衡器类型—对此,我们要创建一个应用程序负载均衡器。我们需要使用应用程序负载均衡器(ALB)来构建我们的消费者,因为事件Webhooks将作为HTTP请求发送,而ALBs用于在AWS中路由HTTP请求。我们可以另作HTTP网关为替代方案;然而,我们在此项目中使用ALB是因为它比HTTP网关更轻量且具成本效益。值得注意的是,如果选择使用HTTP网关,事件格式可能与ALB的不同,因此您的lambda函数需要据此处理请求对象。

一旦创建了ALB,您将被提示为您的ALB分配名称并配置方案及访问/安全设置—既然我们计划从外部源(Bird)接收事件数据,我们会希望我们的ALB是面向互联网的。 在“监听和路由”下,ALB应在端口443上侦听HTTPS,并且我们希望创建一个指向我们创建的消费lambda函数的目标组,以便我们的ALB将入站请求转发到我们上述创建的消费lambda函数。还需要确保安全组有权限通过端口443接受流量。

为负载均衡器创建DNS记录

为了简化对ALB作为端点的使用,我们将在DNS中为我们的ALB创建A记录。为此,我们可以使用AWS Route 53服务(或您当前的DNS提供商)并为要用作端点的主机名创建A记录(例如,spevents.<your_domain>)。A记录应配置为指向我们创建的ALB。如果您使用Route 53管理DNS记录,您可以启用“别名”并选择ALB以直接引用ALB实例;否则,如果使用外部DNS提供商,您应将A记录指向ALB实例的公共IP地址。

我建议使用诸如Postman之类的工具测试一切配置正确,然后再启用您的Bird Webhook。您可以向您的端点发出一个POST请求并确认收到回复。如果您的POST请求未返回响应,您可能需要重新检查您的ALB是否正在监听正确的端口。

创建Bird Webhook

现在我们已准备好在Bird中创建Webhook并将之前定义的A记录的主机名用作我们的目标端点。要创建Webhook,请导航到Bird账户内的Webhooks部分并点击“创建Webhook”。您将被提示为您的Webhook分配一个名称并提供一个目标URL—目标应是您先前创建的A记录的主机名。请注意,目标URL可能需要在URL中包含“HTTPS://”。

完成后,验证选择了正确的子账户和事件,并按“创建Webhook”以保存您的配置。所有选定事件类型的事件数据现在将流向我们的目标URL并被我们的ALB消费以进行下游处理。

处理 Webhook Event Data

根据存储Bird事件数据的预期用途,您的需求可能可以通过简单地将JSON有效负载存储为一个平面文件来满足。您可能已经建立了一个下游ETL过程,能够消费并加载JSON格式的数据。在这两种情况下,您可能可以按原样使用我们上面创建的处理lambda创建的平面文件。

或者,您可能需要转换数据——例如从JSON转换为CSV格式——或直接将数据加载到数据库中。在这个例子中,我们将创建一个简单的lambda函数,将来自原始JSON格式的webhook数据转换为可以加载到数据库中的CSV文件。

创建一个Lambda来处理数据

与用于消费webhook数据的lambda函数一样,我们需要通过导航到AWS中的Lambda服务并按“创建函数”来创建一个新的lambda函数。当我们S3桶上创建新文件时,新lambda函数将被触发——它将读取数据并将其转换为一个新的csv文件。

lambda函数接受文件信息作为一个事件。在示例lambda函数中,您将看到我们首先进行了一系列验证检查,以确保数据完整且格式符合预期。接下来,我们使用“csv”库将JSON有效负载转换为CSV文件并写入临时文件。Lambda函数只能将本地文件写入“/tmp”目录,所以我们创建了一个临时csv文件,并使用<batch_id>.csv的命名约定命名。我们在这里使用batch_id的原因只是为了确保由于接收到多个webhook有效负载而运行的任何并行进程不会相互干扰,因为每个webhook批次将有一个唯一的batch_id。

一旦数据完全转换为CSV,我们读取CSV数据作为字节流,删除临时文件,并将CSV数据作为新文件保存到S3。需要注意的是,输出需要不同的S3桶,否则,我们有可能创建一个递归循环,这可能导致lambda使用增加和成本增加。我们需要在lambda函数中识别我们希望将CSV文件存储在哪个S3桶和位置。按照上述相同的程序创建一个新的S3桶以存储CSV文件。

注意,tmp目录的空间限制为512 MB,因此重要的是临时文件随后被删除,以确保将来执行时有足够的空间。我们使用临时文件而不是直接写入S3的原因是通过一个请求来简化与S3的连接。

就像使用consume lambda函数一样,您可能需要更新处理lambda函数的权限。此lambda函数需要执行角色具有输入S3桶的GetObject权限,以及输出S3桶的PutObject和GetObject权限。

我们的处理lambda函数样本可以在此处找到。

配置一个Lambda以在S3上存储新数据时执行

现在,我们的lambda函数已创建,用于将文件从JSON格式转换为CSV格式,我们需要配置它以在我们的S3桶上创建新文件时触发。为此,我们需要通过打开lambda函数并单击页面顶部的“添加触发器”来为lambda函数添加触发器。选择“S3”,并提供存储原始webhook有效负载的S3桶名称。您还可以选择指定文件前缀和/或后缀以进行过滤。配置设置后,通过单击页面底部的“添加”来添加触发器。现在,每当您的S3 bucket中添加新文件时,您的处理lambda函数将执行。

将数据加载到数据库中

在这个例子中,我将不详细介绍将数据加载到数据库中,但如果您一直在关注这个例子,您有几个选项:

  1. 在您的处理lambda函数中将数据直接加载到数据库中

  2. 使用已建立的ETL过程消费您的CSV文件

无论您是使用AWS数据库服务,如RDSDynamoDB,还是您有自己的PostgreSQL数据库(或类似),您都可以直接从您的处理lambda函数连接到数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您还可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似于SQL的查询语言访问数据。我建议参考您所使用服务的相关文档,以获取有关如何最好地在您的环境中完成此操作的更多信息。

同样,还有许多服务可以帮助消费CSV文件并将数据加载到数据库中。您可能已经有可以利用的ETL过程。

我们希望您觉得本指南有帮助——愿您发送愉快!

根据存储Bird事件数据的预期用途,您的需求可能可以通过简单地将JSON有效负载存储为一个平面文件来满足。您可能已经建立了一个下游ETL过程,能够消费并加载JSON格式的数据。在这两种情况下,您可能可以按原样使用我们上面创建的处理lambda创建的平面文件。

或者,您可能需要转换数据——例如从JSON转换为CSV格式——或直接将数据加载到数据库中。在这个例子中,我们将创建一个简单的lambda函数,将来自原始JSON格式的webhook数据转换为可以加载到数据库中的CSV文件。

创建一个Lambda来处理数据

与用于消费webhook数据的lambda函数一样,我们需要通过导航到AWS中的Lambda服务并按“创建函数”来创建一个新的lambda函数。当我们S3桶上创建新文件时,新lambda函数将被触发——它将读取数据并将其转换为一个新的csv文件。

lambda函数接受文件信息作为一个事件。在示例lambda函数中,您将看到我们首先进行了一系列验证检查,以确保数据完整且格式符合预期。接下来,我们使用“csv”库将JSON有效负载转换为CSV文件并写入临时文件。Lambda函数只能将本地文件写入“/tmp”目录,所以我们创建了一个临时csv文件,并使用<batch_id>.csv的命名约定命名。我们在这里使用batch_id的原因只是为了确保由于接收到多个webhook有效负载而运行的任何并行进程不会相互干扰,因为每个webhook批次将有一个唯一的batch_id。

一旦数据完全转换为CSV,我们读取CSV数据作为字节流,删除临时文件,并将CSV数据作为新文件保存到S3。需要注意的是,输出需要不同的S3桶,否则,我们有可能创建一个递归循环,这可能导致lambda使用增加和成本增加。我们需要在lambda函数中识别我们希望将CSV文件存储在哪个S3桶和位置。按照上述相同的程序创建一个新的S3桶以存储CSV文件。

注意,tmp目录的空间限制为512 MB,因此重要的是临时文件随后被删除,以确保将来执行时有足够的空间。我们使用临时文件而不是直接写入S3的原因是通过一个请求来简化与S3的连接。

就像使用consume lambda函数一样,您可能需要更新处理lambda函数的权限。此lambda函数需要执行角色具有输入S3桶的GetObject权限,以及输出S3桶的PutObject和GetObject权限。

我们的处理lambda函数样本可以在此处找到。

配置一个Lambda以在S3上存储新数据时执行

现在,我们的lambda函数已创建,用于将文件从JSON格式转换为CSV格式,我们需要配置它以在我们的S3桶上创建新文件时触发。为此,我们需要通过打开lambda函数并单击页面顶部的“添加触发器”来为lambda函数添加触发器。选择“S3”,并提供存储原始webhook有效负载的S3桶名称。您还可以选择指定文件前缀和/或后缀以进行过滤。配置设置后,通过单击页面底部的“添加”来添加触发器。现在,每当您的S3 bucket中添加新文件时,您的处理lambda函数将执行。

将数据加载到数据库中

在这个例子中,我将不详细介绍将数据加载到数据库中,但如果您一直在关注这个例子,您有几个选项:

  1. 在您的处理lambda函数中将数据直接加载到数据库中

  2. 使用已建立的ETL过程消费您的CSV文件

无论您是使用AWS数据库服务,如RDSDynamoDB,还是您有自己的PostgreSQL数据库(或类似),您都可以直接从您的处理lambda函数连接到数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您还可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似于SQL的查询语言访问数据。我建议参考您所使用服务的相关文档,以获取有关如何最好地在您的环境中完成此操作的更多信息。

同样,还有许多服务可以帮助消费CSV文件并将数据加载到数据库中。您可能已经有可以利用的ETL过程。

我们希望您觉得本指南有帮助——愿您发送愉快!

根据存储Bird事件数据的预期用途,您的需求可能可以通过简单地将JSON有效负载存储为一个平面文件来满足。您可能已经建立了一个下游ETL过程,能够消费并加载JSON格式的数据。在这两种情况下,您可能可以按原样使用我们上面创建的处理lambda创建的平面文件。

或者,您可能需要转换数据——例如从JSON转换为CSV格式——或直接将数据加载到数据库中。在这个例子中,我们将创建一个简单的lambda函数,将来自原始JSON格式的webhook数据转换为可以加载到数据库中的CSV文件。

创建一个Lambda来处理数据

与用于消费webhook数据的lambda函数一样,我们需要通过导航到AWS中的Lambda服务并按“创建函数”来创建一个新的lambda函数。当我们S3桶上创建新文件时,新lambda函数将被触发——它将读取数据并将其转换为一个新的csv文件。

lambda函数接受文件信息作为一个事件。在示例lambda函数中,您将看到我们首先进行了一系列验证检查,以确保数据完整且格式符合预期。接下来,我们使用“csv”库将JSON有效负载转换为CSV文件并写入临时文件。Lambda函数只能将本地文件写入“/tmp”目录,所以我们创建了一个临时csv文件,并使用<batch_id>.csv的命名约定命名。我们在这里使用batch_id的原因只是为了确保由于接收到多个webhook有效负载而运行的任何并行进程不会相互干扰,因为每个webhook批次将有一个唯一的batch_id。

一旦数据完全转换为CSV,我们读取CSV数据作为字节流,删除临时文件,并将CSV数据作为新文件保存到S3。需要注意的是,输出需要不同的S3桶,否则,我们有可能创建一个递归循环,这可能导致lambda使用增加和成本增加。我们需要在lambda函数中识别我们希望将CSV文件存储在哪个S3桶和位置。按照上述相同的程序创建一个新的S3桶以存储CSV文件。

注意,tmp目录的空间限制为512 MB,因此重要的是临时文件随后被删除,以确保将来执行时有足够的空间。我们使用临时文件而不是直接写入S3的原因是通过一个请求来简化与S3的连接。

就像使用consume lambda函数一样,您可能需要更新处理lambda函数的权限。此lambda函数需要执行角色具有输入S3桶的GetObject权限,以及输出S3桶的PutObject和GetObject权限。

我们的处理lambda函数样本可以在此处找到。

配置一个Lambda以在S3上存储新数据时执行

现在,我们的lambda函数已创建,用于将文件从JSON格式转换为CSV格式,我们需要配置它以在我们的S3桶上创建新文件时触发。为此,我们需要通过打开lambda函数并单击页面顶部的“添加触发器”来为lambda函数添加触发器。选择“S3”,并提供存储原始webhook有效负载的S3桶名称。您还可以选择指定文件前缀和/或后缀以进行过滤。配置设置后,通过单击页面底部的“添加”来添加触发器。现在,每当您的S3 bucket中添加新文件时,您的处理lambda函数将执行。

将数据加载到数据库中

在这个例子中,我将不详细介绍将数据加载到数据库中,但如果您一直在关注这个例子,您有几个选项:

  1. 在您的处理lambda函数中将数据直接加载到数据库中

  2. 使用已建立的ETL过程消费您的CSV文件

无论您是使用AWS数据库服务,如RDSDynamoDB,还是您有自己的PostgreSQL数据库(或类似),您都可以直接从您的处理lambda函数连接到数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您还可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似于SQL的查询语言访问数据。我建议参考您所使用服务的相关文档,以获取有关如何最好地在您的环境中完成此操作的更多信息。

同样,还有许多服务可以帮助消费CSV文件并将数据加载到数据库中。您可能已经有可以利用的ETL过程。

我们希望您觉得本指南有帮助——愿您发送愉快!

让我们为您联系Bird专家。
在30分钟内见证Bird的全部威力。

通过提交,您同意 Bird 可能会就我们的产品和服务与您联系。

您可以随时取消订阅。查看Bird的隐私声明以获取有关数据处理的详细信息。

Newsletter

通过每周更新到您的收件箱,随时了解 Bird 的最新动态。

让我们为您联系Bird专家。
在30分钟内见证Bird的全部威力。

通过提交,您同意 Bird 可能会就我们的产品和服务与您联系。

您可以随时取消订阅。查看Bird的隐私声明以获取有关数据处理的详细信息。

Newsletter

通过每周更新到您的收件箱,随时了解 Bird 的最新动态。

让我们为您联系Bird专家。
在30分钟内见证Bird的全部威力。

通过提交,您同意 Bird 可能会就我们的产品和服务与您联系。

您可以随时取消订阅。查看Bird的隐私声明以获取有关数据处理的详细信息。

R

Reach

G

Grow

M

Manage

A

Automate

Newsletter

通过每周更新到您的收件箱,随时了解 Bird 的最新动态。