创建和使用鸟类 Webhook

2022年1月27日

电子邮件

1 min read

创建和使用鸟类 Webhook

关键要点

    • Bird 的实时事件 webhooks 让发送方可以即时接收到事件数据——无需轮询,无需 cron 作业,也无需应对速率限制的麻烦。

    • Webhooks 消除了管理时间窗口的复杂性,防止事件遗漏和处理重复记录。

    • 非常适合下游自动化:更新名单、开始旅程、丰富仪表板或同步内部系统。

    • 教程通过构建一个完整的 AWS 传输管道为发送方提供指导:S3 存储、Lambda 处理以及应用程序负载均衡器。

    • S3 充当 webhook 负载的中心存储层,每个批次都被写入为一个平面 JSON 文件。

    • Lambda 函数处理摄入(存储原始批次)和转换(将 JSON 转换为 CSV)。

    • Bird 批量事件——每个 webhook 批次都包含一个唯一的 x-messagesystems-batch-id,允许重新检查和去重。

    • Consumer Lambda 必须保持高效,因为如果端点在约 10 秒内没有响应,Bird 会重试批次。

    • 推荐使用 AWS ALB(而不是 API Gateway),因为它简单、具有成本效益,并能直接处理 HTTP POST 请求。

    • DNS(Route 53 或外部供应商)被配置为将一个友好的主机名映射到 ALB 端点。

    • 设置完成后,Bird 直接且可靠地将事件数据推送到您的 AWS 管道以便异步处理。

    • 该指南还涵盖最佳实践:最小权限 IAM 权限、临时存储限制、避免递归触发器和组织多 lambda 工作流。

Q&A 精华

  • Bird 的实时事件 webhooks 的主要目的是什么?

    将事件数据直接推送到发送者的终端,实现即时自动化,无需轮询或受速率限制的API调用。

  • 为什么对于大型发送方来说,webhooks 比通过 API 拉取数据更好?

    由于API拉取需要时间窗口管理,存在数据遗漏或重复的风险,并可能达到速率限制——webhooks通过持续推送数据来消除所有这些问题。

  • 推荐的 webhook ingestion pipeline 使用了哪些 AWS services?

    Amazon S3(存储)、AWS Lambda(处理)、Application Load Balancer(HTTP 监听器),以及可选的 Route 53(DNS)。

  • Bird如何批量处理事件数据?

    Bird 将多个事件一起发送到负载中,每个事件分配一个唯一的批次 ID (x-messagesystems-batch-id) 用于跟踪、重试和去重。

  • 什么会触发消费者Lambda函数?

    ALB将传入的webhook POST请求直接转发到Lambda,Lambda提取有效负载并写入到S3。

  • 为什么在处理之前将原始webhook批存储在S3中?

    为了确保快速摄取(<10 秒),以免连接超时,并将较重的处理转移到单独的异步管道。

  • 第二个 Lambda 函数是做什么的?

    它由新的 S3 对象触发,验证 JSON,将其转换为 CSV,并将处理后的输出写入单独的 S3 存储桶。

  • 为什么要为处理后的CSV文件使用单独的S3 bucket?

    为了避免递归触发器(将新的已处理文件写回同一个存储区会导致 Lambda 无休止地重新触发)。

  • Lambda函数需要什么权限?

    消费者Lambda需要S3 PutObject的权限;处理Lambda需要源存储桶的GetObject和目标存储桶的PutObject权限。

  • 为何推荐使用 AWS ALB 而不是 API Gateway?

    ALBs 更便宜、更简单,并且非常适合直接进行 HTTPS POST 转发;API Gateway 可能会改变负载格式并增加复杂性。

  • 发送方如何在 Bird 中配置 webhook?

    通过提供HTTPS端点(ALB的DNS记录),选择一个子账户,选择事件,并保存Webhook配置。

  • 用于存储或分析已处理数据的下游选项有哪些?

    加载到数据库(PostgreSQL、DynamoDB、RDS),输入到ETL系统,或使用像Athena这样的工具直接查询。

以下是一个简单的指南,帮助发送者在创建鸟类事件Webhook时感到舒适,并使用AWS中的基础设施来消费数据。

Bird 的实时事件 webhooks是一个非常有价值的工具,可以让发送者将数据自动推送到他们的系统中。这可以推动下游自动化,例如更新邮件列表、触发自动电子邮件流程或填充内部仪表板。虽然可以通过 Bird UI 使用事件搜索访问相同的事件数据,或者通过利用 Bird 事件 API以编程方式访问,但对单次请求返回的记录数量或对 API 端点速率限制的限制可能会使这两种方法对大型和复杂的发件人有一定限制。  

实时事件 webhooks 使发送者能够配置一个端点,Bird 将数据传输到该端点,并且可以在不需要调度调用数据的 cron 作业的情况下使用数据。与将数据推送给你相比,在拉取数据时也存在后勤上的折衷,例如需要识别每个 API 请求的时间段和参数。如果时间段不完全对齐,则可能会错过数据,如果时间段重叠,则需要处理重复的数据记录。通过实时 webhooks,事件数据在 Bird 中可用时,会简单地推送到你的端点。

虽然立即可以理解实时接收事件数据以推动下游自动化过程的好处,但实施和使用 webhooks 的实际过程可能让人望而却步。特别是在你不熟悉创建端点和以编程方式处理数据的技术组件时尤其如此。市场上有可以自动消费 Bird webhook 数据并将其 ETL 到你的数据库中的服务,例如StitchData,我们过去曾在博客中谈到过。然而,如果你更愿意掌控整个流程,你可以轻松构建自己的组件。以下是一个简单的指南,帮助发送者在使用 AWS 基础设施时自如地创建 Bird 事件 webhook 并使用数据。

配置 Webhook Target Endpoint

当创建Bird事件时,我们希望将该事件数据实时传输到AWS中的一个端点,以便我们能以编程方式消费和使用这些数据。数据将从Bird发送到目标端点,目标端点会将负载转发到一个lambda函数,该函数将处理并将数据存储在S3存储桶中。下面可以看到所描述的数据流的高级图示:


Flowchart for a system with components, showing a process starting with 'MessageBird', continuing through an 'Application Load Balancer', leading to a 'Lambda Script (Consumer)', connecting to an 'S3' storage, and optionally processed by another 'Lambda Script (Process)'.


为了实现这个工作流,让我们实际上以相反的顺序构建它们,首先创建一个S3存储桶,用于存储我们的事件数据,然后倒着工作——添加每个组件到我们已经构建的内容中。

创建一个S3 Bucket以存储Webhook数据

在创建我们的负载均衡器以接受数据,或创建我们的 lambda 函数来存储数据之前,我们首先需要创建我们的S3 bucket来存储数据。虽然 S3 为 webhook 数据提供了出色的存储,组织还使用 PostgreSQL 数据库进行事件处理时,应该实施适当的backup and restore procedures来保护他们的结构化数据,同时采用他们的 S3 存储策略。要做到这一点,请导航到 AWS 内的 S3 服务,然后按“Create Bucket”。系统会提示您为您的 bucket 分配一个名称并设置区域 - 请确保使用与您的 ALB 和 lambda 函数相同的区域。当您的 S3 bucket 被创建时,它将是空的 - 如果您希望将数据组织在文件夹中,可以立即创建目标目录,或者当您的 lambda 函数存储文件时将创建目录。在这个例子中,我们将我们的 S3 bucket 命名为“bird-webhooks”,并创建了一个名为“B Event Data”的文件夹来存储我们的事件数据 - 您将在下面的 lambda 函数中看到这些名称的引用。

在创建我们的负载均衡器以接受数据,或创建我们的 lambda 函数来存储数据之前,我们首先需要创建我们的S3 bucket来存储数据。虽然 S3 为 webhook 数据提供了出色的存储,组织还使用 PostgreSQL 数据库进行事件处理时,应该实施适当的backup and restore procedures来保护他们的结构化数据,同时采用他们的 S3 存储策略。要做到这一点,请导航到 AWS 内的 S3 服务,然后按“Create Bucket”。系统会提示您为您的 bucket 分配一个名称并设置区域 - 请确保使用与您的 ALB 和 lambda 函数相同的区域。当您的 S3 bucket 被创建时,它将是空的 - 如果您希望将数据组织在文件夹中,可以立即创建目标目录,或者当您的 lambda 函数存储文件时将创建目录。在这个例子中,我们将我们的 S3 bucket 命名为“bird-webhooks”,并创建了一个名为“B Event Data”的文件夹来存储我们的事件数据 - 您将在下面的 lambda 函数中看到这些名称的引用。

在创建我们的负载均衡器以接受数据,或创建我们的 lambda 函数来存储数据之前,我们首先需要创建我们的S3 bucket来存储数据。虽然 S3 为 webhook 数据提供了出色的存储,组织还使用 PostgreSQL 数据库进行事件处理时,应该实施适当的backup and restore procedures来保护他们的结构化数据,同时采用他们的 S3 存储策略。要做到这一点,请导航到 AWS 内的 S3 服务,然后按“Create Bucket”。系统会提示您为您的 bucket 分配一个名称并设置区域 - 请确保使用与您的 ALB 和 lambda 函数相同的区域。当您的 S3 bucket 被创建时,它将是空的 - 如果您希望将数据组织在文件夹中,可以立即创建目标目录,或者当您的 lambda 函数存储文件时将创建目录。在这个例子中,我们将我们的 S3 bucket 命名为“bird-webhooks”,并创建了一个名为“B Event Data”的文件夹来存储我们的事件数据 - 您将在下面的 lambda 函数中看到这些名称的引用。

创建 Lambda Function 来消费数据

实际的数据处理和存储将由我们的应用程序负载均衡器(ALB)调用的lambda 函数执行。

第一步是通过导航到 AWS 内的 Lambda 服务并点击“Create Function”来创建你的 lambda 函数。系统会提示你为你的 lambda 函数分配一个名称并选择一种编程语言来编写你的函数。在本例中,我们使用 Python 作为运行时语言。

现在我们需要开发我们的 lambda 函数。暂时假设我们的应用程序负载均衡器已经配置好,并将 webhook 负载转发到我们的 lambda 函数——lambda 将收到包括完整标题和正文的负载。负载通过字典对象“event”传递到我们的 lambda 函数中。你可以通过访问负载内的“headers”和“body”对象来分别引用负载的标题和正文。在这个例子中,我们只是打算读取“x-messagesystems-batch-id”标题,其中 batch ID 是 Bird 为 webhook 批次创建的唯一值,并在将正文作为扁平文件存储在 S3 中时,使用它作为文件名;然而,你可能希望根据需要添加额外的功能,如身份验证检查或错误处理

在将负载存储为 S3 上的扁平文件时,我们需要定义 S3 存储桶的名称、位置和将存储负载数据的文件名。在我们的示例 lambda 函数中,我们在“store_batch”函数中完成此操作。在本例中,我们打算将整个批次作为单个文件存储,这有助于确保在 Bird 和你的端点之间的 HTTP 连接超时之前收集和存储数据。尽管你可以调整负载均衡器上的连接超时设置,但无法保证连接不会在传输端(在这种情况下是 Bird)超时,或者在你的 lambda 函数完成执行之前连接不会被终止。最好的做法是尽可能提高你的消费者函数的效率,并在可能的情况下将数据处理活动保留给下游进程——例如将批次的 JSON 格式负载转换为 CSV 文件,或将事件数据加载到数据库中。

值得注意的是,你可能需要更新你的 lambda 函数的权限。你的执行角色需要对 S3 的 PutObject 和 GetObject 权限。最佳实践是实施最小特权原则,所以我建议仅为存储 webhook 负载的 S3 存储桶设置这些权限。

我们的消费者 lambda 函数示例可以在这里找到。

关于批次 ID 的快速说明:Bird 会批量事件到单个负载中,其中每个批次可能包含 1 到 350 或更多事件记录。批次将被赋予一个唯一的批次 ID,可以通过利用Event Webhooks API或在你的 Bird 账户内点击webhook stream并选择“Batch Status”来查看批次状态。如果无法交付 webhook 负载,例如在连接超时时,Bird 将自动使用相同的批次 ID重试批次。当你的 lambda 函数运行接近 10 秒的最大往返时间时,这种情况可能会发生,这也是优化消费者函数以减少执行时间的原因之一。

为了处理所有数据处理活动,我建议创建一个独立的 lambda 函数,每当在 S3 存储桶上创建新文件时执行——这样,数据处理就与数据传输异步进行,并且不会因为连接终止而丢失数据。我将在后面的部分中讨论处理 lambda 函数。

创建一个 Application Load Balancer

为了接收 webhook 有效负载,我们需要提供一个终端来发送有效负载。我们通过在 AWS 中创建一个应用负载均衡器来实现这一点,方法是导航到 EC2 > 负载均衡器并点击“创建负载均衡器”。系统会提示您选择要创建的负载均衡器类型——在此情况下,我们需要创建一个应用负载均衡器。我们需要使用应用负载均衡器(ALB)来构建我们的消费者,因为事件 webhooks 将作为 HTTP 请求发送,并且 ALB 用于在 AWS 内部路由 HTTP 请求。我们可以实现 HTTP 网关作为替代方案;然而,我们在这个项目中使用 ALB,因为它比 HTTP 网关更轻量且更具成本效益。需要注意的是,如果您选择使用 HTTP 网关,事件格式可能会与 ALB 不同,因此您的 lambda 函数需要相应地处理请求对象。

一旦创建了您的 ALB,您将被提示为您的 ALB 分配名称并配置方案和访问/安全设置——由于我们计划从外部来源(Bird)接收事件数据,我们希望我们的 ALB 面向互联网。在“监听器和路由”下,ALB 应该在端口 443 上监听 HTTPS,我们希望创建一个指向我们的 lambda 函数的目标组,以便我们的 ALB 将传入请求转发到我们上面创建的消费者 lambda 函数。您还需要确保安全组有权通过端口 443 接受流量。

创建 DNS Record 用于 Load Balancer

为了让我们更方便地将我们的ALB用作终端节点,我们将在DNS中创建一个指向我们ALB的A记录。为此,我们可以使用AWS Route 53服务(或您当前的DNS提供商)并为您希望用于终端节点的主机名创建一个A记录(例如,spevents.<your_domain>)。在AWS上大规模使用DNS时,请注意存在未记录的DNS限制,可能会影响高流量应用程序,特别是那些处理大量出站流量的系统,如电子邮件传递系统。A记录应配置为指向我们创建的ALB。如果您使用Route 53管理DNS记录,可以通过启用“Alias”并选择ALB直接引用ALB实例;否则,如果您使用的是外部DNS提供商,则应将A记录指向ALB实例的公共IP地址。

我建议使用Postman等工具在启用您的Bird webhook之前测试一切是否已正确配置。您可以向您的终端节点发送POST请求,并确认是否收到响应。如果您的POST请求没有返回响应,您可能需要仔细检查您的ALB是否监听正确的端口。

Create a Bird Webhook

现在我们准备在Bird中创建webhook,并使用上面A记录定义的主机名作为我们的目标端点。要创建webhook,请导航到Bird帐户中的Webhooks部分,然后点击“Create Webhook”。系统将提示您为webhook分配一个名称并提供目标URL——该目标应为您之前创建的A记录的主机名。请注意,目标URL可能需要在URL中包含“HTTPS://”。  

完成后,请验证选择了正确的子账户和事件,然后按“Create Webhook”保存您的配置。所有选定事件类型的事件数据现在将流向我们的目标URL,并被我们的ALB用于下游处理。

处理 Webhook Event Data

根据存储Bird事件数据的预期目的,您的需求可能只需将JSON有效负载存储为平面文件即可满足。您也可能已经建立了一个下游ETL流程,该流程能够以JSON格式消费和加载数据。在这两种情况下,您可能可以直接使用我们上面创建的处理lambda生成的平面文件。

或者,您可能需要转换数据,例如将JSON格式转换为CSV格式,或者直接将数据加载到数据库中。在这个例子中,我们将创建一个简单的lambda函数,将webhook数据从原始JSON格式转换为可以加载到数据库中的CSV文件。

创建一个Lambda来处理数据

与用于消费 webhook 数据的 lambda 函数类似,我们需要通过导航至 AWS 中的 Lambda 服务并按下“Create Function”来创建一个新的 lambda 函数。这个新的 lambda 函数将在我们的 S3 桶中新文件创建时触发——它将读取数据并将其转换为一个新的 CSV 文件。

lambda 函数接受文件信息作为一个事件。在示例的 lambda 函数中,您将看到我们首先进行了一系列的验证检查,以确保数据是完整的并且格式如预期。接下来,我们使用“csv”库并写入临时文件,将 JSON 负载转换成 CSV 文件。Lambda 函数只能将本地文件写入到“/tmp”目录,因此我们创建一个临时 CSV 文件,并用 <batch_id>.csv 的约定命名它。我们在此使用 batch_id 的原因只是为了确保由于接收到多个 webhook 负载而运行的任何并行进程不会相互干扰,因为每个 webhook 批处理将拥有一个唯一的 batch_id。

一旦数据完全转换为 CSV,我们将以字节流的形式读取 CSV 数据,删除临时文件,并将 CSV 数据保存为 S3 上的新文件。需要注意的是,输出需要一个不同的 S3 桶,否则我们可能会创建一个递归循环,导致 Lambda 使用量增加和成本上升。我们需要在我们的 lambda 函数中识别我们希望存储 CSV 文件的 S3 桶及位置。按照上述相同的过程创建一个新的 S3 桶以存储我们的 CSV 文件。

注意 tmp 目录的空间限制为 512 MB,因此为确保将来的执行有足够的空间,删除临时文件很重要。与直接写入 S3 相比,我们使用临时文件的原因是通过一次请求简化与 S3 的连接。

与消费 lambda 函数一样,您可能需要更新流程 lambda 函数的权限。此 lambda 函数要求执行角色具备输入 S3 桶的 GetObject 权限,并且具备输出 S3 桶的 PutObject 和 GetObject 权限。

我们处理 lambda 功能的一个示例可以在这里找到。

配置一个Lambda在新数据存储到S3时执行

现在,我们已经创建了将文件从JSON格式转换为CSV格式的lambda函数,我们需要配置它以便在我们S3桶上创建新文件时触发。为此,我们需要通过打开我们的lambda函数并点击页面顶部的“Add Trigger”来添加触发器。选择“S3”并提供存储原始webhook有效负载的S3桶的名称。您还可以选择指定文件前缀和/或后缀进行过滤。设置配置完成后,您可以通过点击页面底部的“Add”来添加触发器。现在,每当新的文件被添加到您的S3桶时,您的处理lambda函数就会执行。

将数据加载到数据库

在这个示例中,我不会详细介绍如何将数据加载到数据库中,但是,如果您一直在关注此示例,您有几个选项:

  1. 在您的处理lambda函数中直接将数据加载到您的数据库中

  2. 使用已建立的ETL流程来处理您的CSV文件

无论您是使用AWS数据库服务,比如RDSDynamoDB,还是您拥有自己的PostgreSQL数据库(或类似的),都可以直接从您的处理lambda函数连接到您的数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您也可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似SQL的查询语言访问数据。我建议参考您使用的服务的相应文档,以获取更多有关如何最好地在您环境中实现此功能的信息。

同样,有许多服务可以帮助处理CSV文件并将数据加载到数据库中。您可能已经拥有可以利用的ETL流程。

我们希望您发现本指南有所帮助——发送愉快!

其他新闻

阅读更多来自此类别的内容

A person is standing at a desk while typing on a laptop.

完整的AI原生平台,可与您的业务一起扩展。

© 2025 Bird

A person is standing at a desk while typing on a laptop.

完整的AI原生平台,可与您的业务一起扩展。

© 2025 Bird