Reach

Grow

Manage

Automate

Reach

Grow

Manage

Automate

创建和使用鸟类 Webhook

2022年1月27日

电子邮件

1 min read

创建和使用鸟类 Webhook

2022年1月27日

电子邮件

1 min read

创建和使用鸟类 Webhook

以下是一个简单的指南,帮助发送者在创建鸟类事件Webhook时感到舒适,并使用AWS中的基础设施来消费数据。

Bird 的实时事件 webhooks是一个非常有价值的工具,可以让发送者将数据自动推送到他们的系统中。这可以推动下游自动化,例如更新邮件列表、触发自动电子邮件流程或填充内部仪表板。虽然可以通过 Bird UI 使用事件搜索访问相同的事件数据,或者通过利用 Bird 事件 API以编程方式访问,但对单次请求返回的记录数量或对 API 端点速率限制的限制可能会使这两种方法对大型和复杂的发件人有一定限制。  

实时事件 webhooks 使发送者能够配置一个端点,Bird 将数据传输到该端点,并且可以在不需要调度调用数据的 cron 作业的情况下使用数据。与将数据推送给你相比,在拉取数据时也存在后勤上的折衷,例如需要识别每个 API 请求的时间段和参数。如果时间段不完全对齐,则可能会错过数据,如果时间段重叠,则需要处理重复的数据记录。通过实时 webhooks,事件数据在 Bird 中可用时,会简单地推送到你的端点。

虽然立即可以理解实时接收事件数据以推动下游自动化过程的好处,但实施和使用 webhooks 的实际过程可能让人望而却步。特别是在你不熟悉创建端点和以编程方式处理数据的技术组件时尤其如此。市场上有可以自动消费 Bird webhook 数据并将其 ETL 到你的数据库中的服务,例如StitchData,我们过去曾在博客中谈到过。然而,如果你更愿意掌控整个流程,你可以轻松构建自己的组件。以下是一个简单的指南,帮助发送者在使用 AWS 基础设施时自如地创建 Bird 事件 webhook 并使用数据。

配置 Webhook Target Endpoint

当创建一个Bird事件时,我们希望将该事件数据实时传输到AWS中的一个端点,以便我们可以通过编程方式消费和使用这些数据。数据将从Bird发送到目标端点,该端点将把负载转发到一个lambda函数,该函数将处理并将数据存储在S3桶中。描述的数据流的高层次图示如下所示:


Flowchart for a system with components, showing a process starting with 'MessageBird', continuing through an 'Application Load Balancer', leading to a 'Lambda Script (Consumer)', connecting to an 'S3' storage, and optionally processed by another 'Lambda Script (Process)'.


要实施此工作流程,我们实际上从后向前开始构建,从创建一个我们将存储事件数据的S3桶开始,然后向后添加我们已经构建的每个组件。

创建S3桶以存储Webhook数据

在创建我们的负载均衡器以接受数据或我们的lambda函数以存储数据之前,我们需要首先创建一个S3桶来存储数据。虽然S3为Webhook数据提供了出色的存储,但组织也使用PostgreSQL数据库进行事件处理,应实施适当的备份和恢复程序,以保护他们的结构化数据与S3存储策略一起。为此,请导航到AWS中的S3服务,然后按“Create Bucket”。系统会提示您为桶分配一个名称并设置区域—确保使用与您的ALB和lambda函数相同的区域。创建S3桶后,它将为空—如果您希望将数据组织在一个文件夹中,可以立即创建预期目录,或者在您的lambda函数存储文件时将创建目录。在此示例中,我们将我们的S3桶命名为“bird-webhooks”并创建一个名为“B事件数据”的文件夹来存储我们的事件数据—您将在下文中的lambda函数中看到这些名称。

创建一个Lambda函数以消费数据

数据的实际处理和存储将由我们的应用程序负载均衡器(ALB)调用的lambda函数执行。

第一步是通过导航到AWS中的Lambda服务并单击“Create Function”来创建您的lambda函数。系统将提示您为lambda函数分配一个名称并选择要使用的编程语言。在此示例中,我们使用Python作为运行时语言。

现在我们需要开发我们的lambda函数。假设我们的应用程序负载均衡器已经配置好并将Webhook负载转发到我们的lambda函数—lambda将接收一个包含完整头和主体的负载。负载使用对象“event”作为字典传递到我们的lambda函数。您可以通过访问负载内的“headers”和“body”对象分别引用负载的头和主体。在此示例中,我们只是读取“x-messagesystems-batch-id”头,其中Batch ID是Bird为Webhook批创建的唯一值,并在将主体作为平面文件存储在S3时使用它作为文件名;但是,您可能需要根据需要添加其他功能,例如身份验证检查或错误处理

在将负载存储为S3上的平面文件时,我们需要定义S3桶的名称、位置和存储负载数据的文件名。在我们的示例lambda函数中,我们在“store_batch”函数中执行此操作。在此示例中,我们将整个批作为单一文件存储,这有助于确保在Bird和您的端点之间的HTTP连接超时之前收集并存储数据。虽然您可以调整负载均衡器上的连接超时设置,但不能保证连接不会在传输端(在这种情况下是Bird)超时或在您的lambda函数执行结束前终止。作为最佳实践,保持您的消费函数尽可能高效,并将数据处理活动保留给下游过程,例如将批量的JSON格式化负载转换为CSV文件,或将事件数据加载到数据库中。

需要注意的是,您可能需要更新lambda函数的权限。您的执行角色将需要S3的PutObject和GetObject权限。推荐的最佳做法是实施最小权限原则,因此我建议仅为将存储Webhook负载的S3桶设置这些权限。

我们消费者lambda函数的样本可以在这里找到。

关于Batch ID的简短说明: Bird将批量事件到一个负载中,其中每个批可能包含1到350个或更多事件记录。该批将获得一个唯一的Batch ID,可以利用事件Webhooks API或通过单击Webhook流并选择“批状态”来查看批状态。如果Webhook负载无法传递,例如在连接超时期间,Bird将通过同一Batch ID自动重试该批。这可能发生在您的lambda函数接近最长往返时间10秒时,并且是优化消费者功能以减少执行时间的原因之一。

为了处理所有数据处理活动,我建议创建一个单独的lambda函数,每当在S3桶上创建新文件时执行—这样,数据处理在数据传输时异步执行,并且没有因为连接中断而丢失数据的风险。我在后面的部分讨论了处理lambda函数。

创建一个应用程序负载均衡器

为了接收Webhook负载,我们需要提供一个端点以发送负载。我们通过在AWS中创建一个应用程序负载均衡器实现这一点,导航到EC2 > 负载均衡器并单击“Create Load Balancer”。系统将提示您选择要创建的负载均衡器类型—for this, we want to create an application load balancer. 我们需要使用一个应用程序负载均衡器(ALB)来构建我们的消费者,因为事件Webhooks将以HTTP请求形式发送,ALB用于在AWS中路由HTTP请求。作为替代方案,我们可以实施HTTP Gateway;然而,我们在此项目中使用ALB,因为它比HTTP Gateway更轻量化和成本效益更高。需要注意的是,如果您选择使用HTTP Gateway,事件格式可能与ALB不同,因此您的lambda函数将需要相应地处理请求对象。

一旦创建了ALB,您将被提示为ALB分配一个名称并配置方案和访问/安全设置—由于我们计划从外部来源(Bird)接收事件数据,因此我们希望我们的ALB是面向互联网的。 在“Listeners and routing”下,ALB应该在端口443上监听HTTPS,我们希望创建一个目标组指向我们的lambda函数,以便我们的ALB将入站请求转发到我们上面创建的消费lambda函数。 您还需要确保安全组有权接受通过端口443的流量。

为负载均衡器创建DNS记录

为了更容易将我们的ALB用作端点,我们将在DNS中创建一个指向ALB的A记录。为此,我们可以使用AWS Route 53服务(或您当前的DNS提供商)并为您要用于端点的主机名(例如spevents.<your_domain>)创建A记录。在AWS上大规模操作DNS时,请注意存在未记录的DNS限制,可能会影响大流量应用,尤其是那些处理大量出站流量的应用,如电子邮件发送系统。A记录应配置为指向我们创建的ALB。如果您使用Route 53管理DNS记录,可以通过启用“同类项”和选择ALB直接引用ALB实例;否则,如果您使用外部DNS提供商,则应将A记录指向ALB实例的公有IP地址。

我建议使用类似Postman的工具来测试一切是否配置正确,然后再启用您的Bird Webhook。您可以向您的端点发出一个POST请求,并确认收到响应。如果您的POST请求未返回响应,则可能需要重新检查您的ALB是否正在监听正确的端口。

创建一个Bird Webhook

现在我们准备好在Bird中创建webhook并使用上面A记录定义的主机名作为我们的目标端点。要创建webhook,请导航到您Bird账户中的Webhooks部分并单击“Create Webhook”。系统将提示您为您的Webhook分配一个名称并提供目标URL—目标应是您之前创建的A记录的主机名。请注意,目标URL可能需要在URL中包含“HTTPS://”。

一旦完成,请验证已选择正确的子账户和事件,然后按“Create Webhook”保存您的配置。所有选定事件类型的事件数据现在将流向我们的目标URL并被我们的ALB消费以进行下游处理。

处理 Webhook Event Data

根据存储Bird事件数据的预期用途,您的需求可能可以通过简单地将JSON有效负载存储为一个平面文件来满足。您可能已经建立了一个下游ETL过程,能够消费并加载JSON格式的数据。在这两种情况下,您可能可以按原样使用我们上面创建的处理lambda创建的平面文件。

或者,您可能需要转换数据——例如从JSON转换为CSV格式——或直接将数据加载到数据库中。在这个例子中,我们将创建一个简单的lambda函数,将来自原始JSON格式的webhook数据转换为可以加载到数据库中的CSV文件。

创建一个Lambda来处理数据

与用于消费webhook数据的lambda函数一样,我们需要通过导航到AWS中的Lambda服务并按“创建函数”来创建一个新的lambda函数。当我们S3桶上创建新文件时,新lambda函数将被触发——它将读取数据并将其转换为一个新的csv文件。

lambda函数接受文件信息作为一个事件。在示例lambda函数中,您将看到我们首先进行了一系列验证检查,以确保数据完整且格式符合预期。接下来,我们使用“csv”库将JSON有效负载转换为CSV文件并写入临时文件。Lambda函数只能将本地文件写入“/tmp”目录,所以我们创建了一个临时csv文件,并使用<batch_id>.csv的命名约定命名。我们在这里使用batch_id的原因只是为了确保由于接收到多个webhook有效负载而运行的任何并行进程不会相互干扰,因为每个webhook批次将有一个唯一的batch_id。

一旦数据完全转换为CSV,我们读取CSV数据作为字节流,删除临时文件,并将CSV数据作为新文件保存到S3。需要注意的是,输出需要不同的S3桶,否则,我们有可能创建一个递归循环,这可能导致lambda使用增加和成本增加。我们需要在lambda函数中识别我们希望将CSV文件存储在哪个S3桶和位置。按照上述相同的程序创建一个新的S3桶以存储CSV文件。

注意,tmp目录的空间限制为512 MB,因此重要的是临时文件随后被删除,以确保将来执行时有足够的空间。我们使用临时文件而不是直接写入S3的原因是通过一个请求来简化与S3的连接。

就像使用consume lambda函数一样,您可能需要更新处理lambda函数的权限。此lambda函数需要执行角色具有输入S3桶的GetObject权限,以及输出S3桶的PutObject和GetObject权限。

我们的处理lambda函数样本可以在此处找到。

配置一个Lambda以在S3上存储新数据时执行

现在,我们的lambda函数已创建,用于将文件从JSON格式转换为CSV格式,我们需要配置它以在我们的S3桶上创建新文件时触发。为此,我们需要通过打开lambda函数并单击页面顶部的“添加触发器”来为lambda函数添加触发器。选择“S3”,并提供存储原始webhook有效负载的S3桶名称。您还可以选择指定文件前缀和/或后缀以进行过滤。配置设置后,通过单击页面底部的“添加”来添加触发器。现在,每当您的S3 bucket中添加新文件时,您的处理lambda函数将执行。

将数据加载到数据库中

在这个例子中,我将不详细介绍将数据加载到数据库中,但如果您一直在关注这个例子,您有几个选项:

  1. 在您的处理lambda函数中将数据直接加载到数据库中

  2. 使用已建立的ETL过程消费您的CSV文件

无论您是使用AWS数据库服务,如RDSDynamoDB,还是您有自己的PostgreSQL数据库(或类似),您都可以直接从您的处理lambda函数连接到数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您还可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似于SQL的查询语言访问数据。我建议参考您所使用服务的相关文档,以获取有关如何最好地在您的环境中完成此操作的更多信息。

同样,还有许多服务可以帮助消费CSV文件并将数据加载到数据库中。您可能已经有可以利用的ETL过程。

我们希望您觉得本指南有帮助——愿您发送愉快!

根据存储Bird事件数据的预期用途,您的需求可能可以通过简单地将JSON有效负载存储为一个平面文件来满足。您可能已经建立了一个下游ETL过程,能够消费并加载JSON格式的数据。在这两种情况下,您可能可以按原样使用我们上面创建的处理lambda创建的平面文件。

或者,您可能需要转换数据——例如从JSON转换为CSV格式——或直接将数据加载到数据库中。在这个例子中,我们将创建一个简单的lambda函数,将来自原始JSON格式的webhook数据转换为可以加载到数据库中的CSV文件。

创建一个Lambda来处理数据

与用于消费webhook数据的lambda函数一样,我们需要通过导航到AWS中的Lambda服务并按“创建函数”来创建一个新的lambda函数。当我们S3桶上创建新文件时,新lambda函数将被触发——它将读取数据并将其转换为一个新的csv文件。

lambda函数接受文件信息作为一个事件。在示例lambda函数中,您将看到我们首先进行了一系列验证检查,以确保数据完整且格式符合预期。接下来,我们使用“csv”库将JSON有效负载转换为CSV文件并写入临时文件。Lambda函数只能将本地文件写入“/tmp”目录,所以我们创建了一个临时csv文件,并使用<batch_id>.csv的命名约定命名。我们在这里使用batch_id的原因只是为了确保由于接收到多个webhook有效负载而运行的任何并行进程不会相互干扰,因为每个webhook批次将有一个唯一的batch_id。

一旦数据完全转换为CSV,我们读取CSV数据作为字节流,删除临时文件,并将CSV数据作为新文件保存到S3。需要注意的是,输出需要不同的S3桶,否则,我们有可能创建一个递归循环,这可能导致lambda使用增加和成本增加。我们需要在lambda函数中识别我们希望将CSV文件存储在哪个S3桶和位置。按照上述相同的程序创建一个新的S3桶以存储CSV文件。

注意,tmp目录的空间限制为512 MB,因此重要的是临时文件随后被删除,以确保将来执行时有足够的空间。我们使用临时文件而不是直接写入S3的原因是通过一个请求来简化与S3的连接。

就像使用consume lambda函数一样,您可能需要更新处理lambda函数的权限。此lambda函数需要执行角色具有输入S3桶的GetObject权限,以及输出S3桶的PutObject和GetObject权限。

我们的处理lambda函数样本可以在此处找到。

配置一个Lambda以在S3上存储新数据时执行

现在,我们的lambda函数已创建,用于将文件从JSON格式转换为CSV格式,我们需要配置它以在我们的S3桶上创建新文件时触发。为此,我们需要通过打开lambda函数并单击页面顶部的“添加触发器”来为lambda函数添加触发器。选择“S3”,并提供存储原始webhook有效负载的S3桶名称。您还可以选择指定文件前缀和/或后缀以进行过滤。配置设置后,通过单击页面底部的“添加”来添加触发器。现在,每当您的S3 bucket中添加新文件时,您的处理lambda函数将执行。

将数据加载到数据库中

在这个例子中,我将不详细介绍将数据加载到数据库中,但如果您一直在关注这个例子,您有几个选项:

  1. 在您的处理lambda函数中将数据直接加载到数据库中

  2. 使用已建立的ETL过程消费您的CSV文件

无论您是使用AWS数据库服务,如RDSDynamoDB,还是您有自己的PostgreSQL数据库(或类似),您都可以直接从您的处理lambda函数连接到数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您还可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似于SQL的查询语言访问数据。我建议参考您所使用服务的相关文档,以获取有关如何最好地在您的环境中完成此操作的更多信息。

同样,还有许多服务可以帮助消费CSV文件并将数据加载到数据库中。您可能已经有可以利用的ETL过程。

我们希望您觉得本指南有帮助——愿您发送愉快!

根据存储Bird事件数据的预期用途,您的需求可能可以通过简单地将JSON有效负载存储为一个平面文件来满足。您可能已经建立了一个下游ETL过程,能够消费并加载JSON格式的数据。在这两种情况下,您可能可以按原样使用我们上面创建的处理lambda创建的平面文件。

或者,您可能需要转换数据——例如从JSON转换为CSV格式——或直接将数据加载到数据库中。在这个例子中,我们将创建一个简单的lambda函数,将来自原始JSON格式的webhook数据转换为可以加载到数据库中的CSV文件。

创建一个Lambda来处理数据

与用于消费webhook数据的lambda函数一样,我们需要通过导航到AWS中的Lambda服务并按“创建函数”来创建一个新的lambda函数。当我们S3桶上创建新文件时,新lambda函数将被触发——它将读取数据并将其转换为一个新的csv文件。

lambda函数接受文件信息作为一个事件。在示例lambda函数中,您将看到我们首先进行了一系列验证检查,以确保数据完整且格式符合预期。接下来,我们使用“csv”库将JSON有效负载转换为CSV文件并写入临时文件。Lambda函数只能将本地文件写入“/tmp”目录,所以我们创建了一个临时csv文件,并使用<batch_id>.csv的命名约定命名。我们在这里使用batch_id的原因只是为了确保由于接收到多个webhook有效负载而运行的任何并行进程不会相互干扰,因为每个webhook批次将有一个唯一的batch_id。

一旦数据完全转换为CSV,我们读取CSV数据作为字节流,删除临时文件,并将CSV数据作为新文件保存到S3。需要注意的是,输出需要不同的S3桶,否则,我们有可能创建一个递归循环,这可能导致lambda使用增加和成本增加。我们需要在lambda函数中识别我们希望将CSV文件存储在哪个S3桶和位置。按照上述相同的程序创建一个新的S3桶以存储CSV文件。

注意,tmp目录的空间限制为512 MB,因此重要的是临时文件随后被删除,以确保将来执行时有足够的空间。我们使用临时文件而不是直接写入S3的原因是通过一个请求来简化与S3的连接。

就像使用consume lambda函数一样,您可能需要更新处理lambda函数的权限。此lambda函数需要执行角色具有输入S3桶的GetObject权限,以及输出S3桶的PutObject和GetObject权限。

我们的处理lambda函数样本可以在此处找到。

配置一个Lambda以在S3上存储新数据时执行

现在,我们的lambda函数已创建,用于将文件从JSON格式转换为CSV格式,我们需要配置它以在我们的S3桶上创建新文件时触发。为此,我们需要通过打开lambda函数并单击页面顶部的“添加触发器”来为lambda函数添加触发器。选择“S3”,并提供存储原始webhook有效负载的S3桶名称。您还可以选择指定文件前缀和/或后缀以进行过滤。配置设置后,通过单击页面底部的“添加”来添加触发器。现在,每当您的S3 bucket中添加新文件时,您的处理lambda函数将执行。

将数据加载到数据库中

在这个例子中,我将不详细介绍将数据加载到数据库中,但如果您一直在关注这个例子,您有几个选项:

  1. 在您的处理lambda函数中将数据直接加载到数据库中

  2. 使用已建立的ETL过程消费您的CSV文件

无论您是使用AWS数据库服务,如RDSDynamoDB,还是您有自己的PostgreSQL数据库(或类似),您都可以直接从您的处理lambda函数连接到数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您还可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似于SQL的查询语言访问数据。我建议参考您所使用服务的相关文档,以获取有关如何最好地在您的环境中完成此操作的更多信息。

同样,还有许多服务可以帮助消费CSV文件并将数据加载到数据库中。您可能已经有可以利用的ETL过程。

我们希望您觉得本指南有帮助——愿您发送愉快!

让我们为您联系Bird专家。
在30分钟内见证Bird的全部威力。

通过提交,您同意 Bird 可能会就我们的产品和服务与您联系。

您可以随时取消订阅。查看Bird的隐私声明以获取有关数据处理的详细信息。

Newsletter

通过每周更新到您的收件箱,随时了解 Bird 的最新动态。

让我们为您联系Bird专家。
在30分钟内见证Bird的全部威力。

通过提交,您同意 Bird 可能会就我们的产品和服务与您联系。

您可以随时取消订阅。查看Bird的隐私声明以获取有关数据处理的详细信息。

Newsletter

通过每周更新到您的收件箱,随时了解 Bird 的最新动态。

让我们为您联系Bird专家。
在30分钟内见证Bird的全部威力。

通过提交,您同意 Bird 可能会就我们的产品和服务与您联系。

您可以随时取消订阅。查看Bird的隐私声明以获取有关数据处理的详细信息。

R

Reach

G

Grow

M

Manage

A

Automate

Newsletter

通过每周更新到您的收件箱,随时了解 Bird 的最新动态。