创建和使用鸟类 Webhook

2022年1月27日

电子邮件

1 min read

创建和使用鸟类 Webhook

关键要点

    • Bird 的实时事件 webhooks 让发送方可以即时接收到事件数据——无需轮询,无需 cron 作业,也无需应对速率限制的麻烦。

    • Webhooks 消除了管理时间窗口的复杂性,防止事件遗漏和处理重复记录。

    • 非常适合下游自动化:更新名单、开始旅程、丰富仪表板或同步内部系统。

    • 教程通过构建一个完整的 AWS 传输管道为发送方提供指导:S3 存储、Lambda 处理以及应用程序负载均衡器。

    • S3 充当 webhook 负载的中心存储层,每个批次都被写入为一个平面 JSON 文件。

    • Lambda 函数处理摄入(存储原始批次)和转换(将 JSON 转换为 CSV)。

    • Bird 批量事件——每个 webhook 批次都包含一个唯一的 x-messagesystems-batch-id,允许重新检查和去重。

    • Consumer Lambda 必须保持高效,因为如果端点在约 10 秒内没有响应,Bird 会重试批次。

    • 推荐使用 AWS ALB(而不是 API Gateway),因为它简单、具有成本效益,并能直接处理 HTTP POST 请求。

    • DNS(Route 53 或外部供应商)被配置为将一个友好的主机名映射到 ALB 端点。

    • 设置完成后,Bird 直接且可靠地将事件数据推送到您的 AWS 管道以便异步处理。

    • 该指南还涵盖最佳实践:最小权限 IAM 权限、临时存储限制、避免递归触发器和组织多 lambda 工作流。

Q&A 精华

  • Bird 的实时事件 webhooks 的主要目的是什么?

    将事件数据直接推送到发送者的终端,实现即时自动化,无需轮询或受速率限制的API调用。

  • 为什么对于大型发送方来说,webhooks 比通过 API 拉取数据更好?

    由于API拉取需要时间窗口管理,存在数据遗漏或重复的风险,并可能达到速率限制——webhooks通过持续推送数据来消除所有这些问题。

  • 推荐的 webhook ingestion pipeline 使用了哪些 AWS services?

    Amazon S3(存储)、AWS Lambda(处理)、Application Load Balancer(HTTP 监听器),以及可选的 Route 53(DNS)。

  • Bird如何批量处理事件数据?

    Bird 将多个事件一起发送到负载中,每个事件分配一个唯一的批次 ID (x-messagesystems-batch-id) 用于跟踪、重试和去重。

  • 什么会触发消费者Lambda函数?

    ALB将传入的webhook POST请求直接转发到Lambda,Lambda提取有效负载并写入到S3。

  • 为什么在处理之前将原始webhook批存储在S3中?

    为了确保快速摄取(<10 秒),以免连接超时,并将较重的处理转移到单独的异步管道。

  • 第二个 Lambda 函数是做什么的?

    它由新的 S3 对象触发,验证 JSON,将其转换为 CSV,并将处理后的输出写入单独的 S3 存储桶。

  • 为什么要为处理后的CSV文件使用单独的S3 bucket?

    为了避免递归触发器(将新的已处理文件写回同一个存储区会导致 Lambda 无休止地重新触发)。

  • Lambda函数需要什么权限?

    消费者Lambda需要S3 PutObject的权限;处理Lambda需要源存储桶的GetObject和目标存储桶的PutObject权限。

  • 为何推荐使用 AWS ALB 而不是 API Gateway?

    ALBs 更便宜、更简单,并且非常适合直接进行 HTTPS POST 转发;API Gateway 可能会改变负载格式并增加复杂性。

  • 发送方如何在 Bird 中配置 webhook?

    通过提供HTTPS端点(ALB的DNS记录),选择一个子账户,选择事件,并保存Webhook配置。

  • 用于存储或分析已处理数据的下游选项有哪些?

    加载到数据库(PostgreSQL、DynamoDB、RDS),输入到ETL系统,或使用像Athena这样的工具直接查询。

Bird 的实时事件 webhooks是一个非常有价值的工具,可以让发送者将数据自动推送到他们的系统中。这可以推动下游自动化,例如更新邮件列表、触发自动电子邮件流程或填充内部仪表板。虽然可以通过 Bird UI 使用事件搜索访问相同的事件数据,或者通过利用 Bird 事件 API以编程方式访问,但对单次请求返回的记录数量或对 API 端点速率限制的限制可能会使这两种方法对大型和复杂的发件人有一定限制。  

实时事件 webhooks 使发送者能够配置一个端点,Bird 将数据传输到该端点,并且可以在不需要调度调用数据的 cron 作业的情况下使用数据。与将数据推送给你相比,在拉取数据时也存在后勤上的折衷,例如需要识别每个 API 请求的时间段和参数。如果时间段不完全对齐,则可能会错过数据,如果时间段重叠,则需要处理重复的数据记录。通过实时 webhooks,事件数据在 Bird 中可用时,会简单地推送到你的端点。

虽然立即可以理解实时接收事件数据以推动下游自动化过程的好处,但实施和使用 webhooks 的实际过程可能让人望而却步。特别是在你不熟悉创建端点和以编程方式处理数据的技术组件时尤其如此。市场上有可以自动消费 Bird webhook 数据并将其 ETL 到你的数据库中的服务,例如StitchData,我们过去曾在博客中谈到过。然而,如果你更愿意掌控整个流程,你可以轻松构建自己的组件。以下是一个简单的指南,帮助发送者在使用 AWS 基础设施时自如地创建 Bird 事件 webhook 并使用数据。

Bird 的实时事件 webhooks是一个非常有价值的工具,可以让发送者将数据自动推送到他们的系统中。这可以推动下游自动化,例如更新邮件列表、触发自动电子邮件流程或填充内部仪表板。虽然可以通过 Bird UI 使用事件搜索访问相同的事件数据,或者通过利用 Bird 事件 API以编程方式访问,但对单次请求返回的记录数量或对 API 端点速率限制的限制可能会使这两种方法对大型和复杂的发件人有一定限制。  

实时事件 webhooks 使发送者能够配置一个端点,Bird 将数据传输到该端点,并且可以在不需要调度调用数据的 cron 作业的情况下使用数据。与将数据推送给你相比,在拉取数据时也存在后勤上的折衷,例如需要识别每个 API 请求的时间段和参数。如果时间段不完全对齐,则可能会错过数据,如果时间段重叠,则需要处理重复的数据记录。通过实时 webhooks,事件数据在 Bird 中可用时,会简单地推送到你的端点。

虽然立即可以理解实时接收事件数据以推动下游自动化过程的好处,但实施和使用 webhooks 的实际过程可能让人望而却步。特别是在你不熟悉创建端点和以编程方式处理数据的技术组件时尤其如此。市场上有可以自动消费 Bird webhook 数据并将其 ETL 到你的数据库中的服务,例如StitchData,我们过去曾在博客中谈到过。然而,如果你更愿意掌控整个流程,你可以轻松构建自己的组件。以下是一个简单的指南,帮助发送者在使用 AWS 基础设施时自如地创建 Bird 事件 webhook 并使用数据。

Bird 的实时事件 webhooks是一个非常有价值的工具,可以让发送者将数据自动推送到他们的系统中。这可以推动下游自动化,例如更新邮件列表、触发自动电子邮件流程或填充内部仪表板。虽然可以通过 Bird UI 使用事件搜索访问相同的事件数据,或者通过利用 Bird 事件 API以编程方式访问,但对单次请求返回的记录数量或对 API 端点速率限制的限制可能会使这两种方法对大型和复杂的发件人有一定限制。  

实时事件 webhooks 使发送者能够配置一个端点,Bird 将数据传输到该端点,并且可以在不需要调度调用数据的 cron 作业的情况下使用数据。与将数据推送给你相比,在拉取数据时也存在后勤上的折衷,例如需要识别每个 API 请求的时间段和参数。如果时间段不完全对齐,则可能会错过数据,如果时间段重叠,则需要处理重复的数据记录。通过实时 webhooks,事件数据在 Bird 中可用时,会简单地推送到你的端点。

虽然立即可以理解实时接收事件数据以推动下游自动化过程的好处,但实施和使用 webhooks 的实际过程可能让人望而却步。特别是在你不熟悉创建端点和以编程方式处理数据的技术组件时尤其如此。市场上有可以自动消费 Bird webhook 数据并将其 ETL 到你的数据库中的服务,例如StitchData,我们过去曾在博客中谈到过。然而,如果你更愿意掌控整个流程,你可以轻松构建自己的组件。以下是一个简单的指南,帮助发送者在使用 AWS 基础设施时自如地创建 Bird 事件 webhook 并使用数据。

配置 Webhook Target Endpoint

当创建一个 Bird 事件时,我们希望这些事件数据能够实时地流式传输到 AWS 中的一个终端点,以便我们能以编程方式消费和使用该数据。数据将从 Bird 发送到目标终端点,该终端点将把有效载荷转发到处理数据并将其存储在 S3 存储桶中的 Lambda 函数。所描述的数据流的高层次图示如下所示:


Flowchart for a system with components, showing a process starting with 'MessageBird', continuing through an 'Application Load Balancer', leading to a 'Lambda Script (Consumer)', connecting to an 'S3' storage, and optionally processed by another 'Lambda Script (Process)'.


Component

Responsibility

Bird Webhooks

以 HTTP POST 请求方式发出实时事件批次

Application Load Balancer

接收外部 webhook 流量并路由请求

Lambda (Consumer)

高效地将原始 webhook 批次持久化到 S3

Amazon S3

将批次事件有效载荷存储为平面 JSON 文件

Lambda (Processor)

异步转换或加载存储的数据

要实现此工作流,让我们实际上以相反的顺序构建它们,从创建一个 S3 存储桶开始,我们将存储我们的事件数据,然后倒退——添加每个组件以便于我们已经构建的内容。

当创建一个 Bird 事件时,我们希望这些事件数据能够实时地流式传输到 AWS 中的一个终端点,以便我们能以编程方式消费和使用该数据。数据将从 Bird 发送到目标终端点,该终端点将把有效载荷转发到处理数据并将其存储在 S3 存储桶中的 Lambda 函数。所描述的数据流的高层次图示如下所示:


Flowchart for a system with components, showing a process starting with 'MessageBird', continuing through an 'Application Load Balancer', leading to a 'Lambda Script (Consumer)', connecting to an 'S3' storage, and optionally processed by another 'Lambda Script (Process)'.


Component

Responsibility

Bird Webhooks

以 HTTP POST 请求方式发出实时事件批次

Application Load Balancer

接收外部 webhook 流量并路由请求

Lambda (Consumer)

高效地将原始 webhook 批次持久化到 S3

Amazon S3

将批次事件有效载荷存储为平面 JSON 文件

Lambda (Processor)

异步转换或加载存储的数据

要实现此工作流,让我们实际上以相反的顺序构建它们,从创建一个 S3 存储桶开始,我们将存储我们的事件数据,然后倒退——添加每个组件以便于我们已经构建的内容。

当创建一个 Bird 事件时,我们希望这些事件数据能够实时地流式传输到 AWS 中的一个终端点,以便我们能以编程方式消费和使用该数据。数据将从 Bird 发送到目标终端点,该终端点将把有效载荷转发到处理数据并将其存储在 S3 存储桶中的 Lambda 函数。所描述的数据流的高层次图示如下所示:


Flowchart for a system with components, showing a process starting with 'MessageBird', continuing through an 'Application Load Balancer', leading to a 'Lambda Script (Consumer)', connecting to an 'S3' storage, and optionally processed by another 'Lambda Script (Process)'.


Component

Responsibility

Bird Webhooks

以 HTTP POST 请求方式发出实时事件批次

Application Load Balancer

接收外部 webhook 流量并路由请求

Lambda (Consumer)

高效地将原始 webhook 批次持久化到 S3

Amazon S3

将批次事件有效载荷存储为平面 JSON 文件

Lambda (Processor)

异步转换或加载存储的数据

要实现此工作流,让我们实际上以相反的顺序构建它们,从创建一个 S3 存储桶开始,我们将存储我们的事件数据,然后倒退——添加每个组件以便于我们已经构建的内容。

创建一个S3 Bucket来存储Webhook数据

在创建我们的负载均衡器以接受数据或我们的 lambda 函数以存储数据之前,我们需要先创建我们的 S3 bucket,数据将在其中存储。虽然 S3 为 webhook 数据提供了出色的存储,但也使用 PostgreSQL 数据库进行事件处理的组织应实施适当的 backup and restore procedures 来保护他们的结构化数据以及他们的 S3 存储策略。要执行此操作,请导航到 AWS 中的 S3 服务并按“Create Bucket”。系统会提示您为存储桶分配名称并设置区域 – 请确保使用与您的 ALB 和 lambda 函数相同的区域。当您的 S3 bucket 创建完成后,它会为空 – 如果您想在一个文件夹中组织数据,您可以立即创建预期目录,或者目录将在您的 lambda 函数存储文件时创建。在此示例中,我们将我们的 S3 bucket 命名为 “bird-webhooks” 并创建了一个名为 “B Event Data” 的文件夹来存储我们的事件数据 – 您将在下面的 lambda 函数中看到这些名称被引用。

在创建我们的负载均衡器以接受数据或我们的 lambda 函数以存储数据之前,我们需要先创建我们的 S3 bucket,数据将在其中存储。虽然 S3 为 webhook 数据提供了出色的存储,但也使用 PostgreSQL 数据库进行事件处理的组织应实施适当的 backup and restore procedures 来保护他们的结构化数据以及他们的 S3 存储策略。要执行此操作,请导航到 AWS 中的 S3 服务并按“Create Bucket”。系统会提示您为存储桶分配名称并设置区域 – 请确保使用与您的 ALB 和 lambda 函数相同的区域。当您的 S3 bucket 创建完成后,它会为空 – 如果您想在一个文件夹中组织数据,您可以立即创建预期目录,或者目录将在您的 lambda 函数存储文件时创建。在此示例中,我们将我们的 S3 bucket 命名为 “bird-webhooks” 并创建了一个名为 “B Event Data” 的文件夹来存储我们的事件数据 – 您将在下面的 lambda 函数中看到这些名称被引用。

在创建我们的负载均衡器以接受数据或我们的 lambda 函数以存储数据之前,我们需要先创建我们的 S3 bucket,数据将在其中存储。虽然 S3 为 webhook 数据提供了出色的存储,但也使用 PostgreSQL 数据库进行事件处理的组织应实施适当的 backup and restore procedures 来保护他们的结构化数据以及他们的 S3 存储策略。要执行此操作,请导航到 AWS 中的 S3 服务并按“Create Bucket”。系统会提示您为存储桶分配名称并设置区域 – 请确保使用与您的 ALB 和 lambda 函数相同的区域。当您的 S3 bucket 创建完成后,它会为空 – 如果您想在一个文件夹中组织数据,您可以立即创建预期目录,或者目录将在您的 lambda 函数存储文件时创建。在此示例中,我们将我们的 S3 bucket 命名为 “bird-webhooks” 并创建了一个名为 “B Event Data” 的文件夹来存储我们的事件数据 – 您将在下面的 lambda 函数中看到这些名称被引用。

创建一个 Lambda Function 来消费数据

数据的实际处理和存储将由一个lambda function执行,该函数由我们的应用程序负载均衡器 (ALB) 调用。

第一步是通过导航到 AWS 的 Lambda 服务并单击“Create Function”来创建您的 lambda function。系统会提示您为 lambda function 指定一个名称,并选择以哪种编程语言编写函数。在此示例中,我们使用 Python 作为运行时语言。

现在我们需要开发我们的 lambda function。假设我们的应用程序负载均衡器已配置好,并且正在将 webhook 负载转发到我们的 lambda function —— lambda 将接收包括完整标头和主体在内的负载。负载是使用“event”对象作为字典传递给我们的 lambda function 的。您可以通过访问负载内的“headers”和“body”对象来独立引用负载的标头和主体。在此示例中,我们只是简单读取“x-messagesystems-batch-id”标头,其中批 ID 由 Bird 为 webhook 批次创建的唯一值,并在将主体作为扁平文件存储在 S3 时将其用作文件名;然而,您可能需要根据需要添加额外的功能,例如身份验证检查或错误处理

将负载存储为 S3 上的扁平文件时,我们需要定义存储负载数据的 S3 存储桶的名称、位置和文件名。在我们的示例 lambda function 中,我们在“store_batch”函数中执行此操作。在此示例中,我们将整个批次存储为单个文件,这有助于确保数据在 Bird 与您的端点之间的 HTTP 连接超时之前被收集和存储。虽然您可以调整负载均衡器上的连接超时设置,但不能保证连接不会在传输端(在这种情况下是 Bird)超时,或者连接不会在您的 lambda function 完成执行之前终止。最佳做法是尽可能提高您的消费者功能效率,并将数据处理活动保留给下游流程——如将批量 JSON 格式的负载转换为 CSV 文件,或将事件数据加载到数据库中。

需要注意的是,您可能需要更新 lambda function 的权限。您的执行角色需要 S3 的 PutObject 和 GetObject 权限。最佳实践是执行least privilege 原则,因此我建议仅对存储 webhook 负载的 S3 存储桶设置这些权限。

我们的消费者 lambda function 的一个示例可以在这里找到。

关于批 ID 的简要说明:Bird 将批处理事件为单个负载,其中每批可能包含 1 到 350 或更多事件记录。批次将被赋予一个唯一的批 ID,您可以利用事件 Webhooks API查看批次状态,或者通过单击一个webhook stream并选择“Batch Status”在您的 Bird 账户中查看。如果某个 webhook 负载未能传送,例如在连接超时时,Bird 将自动使用相同的批 ID 重试该批次。当您的 lambda function 接近 10 秒的最大往返时间运行时,这可能会发生,因此优化消费者功能以减少执行时间非常重要。

为了处理所有数据处理活动,我建议创建一个单独的 lambda function,以便在 S3 存储桶上创建新文件时执行——这样,数据处理是异步执行的,与数据传输无关,并且没有因连接终止而丢失数据的风险。我将在后面的部分中讨论处理 lambda function。

数据的实际处理和存储将由一个lambda function执行,该函数由我们的应用程序负载均衡器 (ALB) 调用。

第一步是通过导航到 AWS 的 Lambda 服务并单击“Create Function”来创建您的 lambda function。系统会提示您为 lambda function 指定一个名称,并选择以哪种编程语言编写函数。在此示例中,我们使用 Python 作为运行时语言。

现在我们需要开发我们的 lambda function。假设我们的应用程序负载均衡器已配置好,并且正在将 webhook 负载转发到我们的 lambda function —— lambda 将接收包括完整标头和主体在内的负载。负载是使用“event”对象作为字典传递给我们的 lambda function 的。您可以通过访问负载内的“headers”和“body”对象来独立引用负载的标头和主体。在此示例中,我们只是简单读取“x-messagesystems-batch-id”标头,其中批 ID 由 Bird 为 webhook 批次创建的唯一值,并在将主体作为扁平文件存储在 S3 时将其用作文件名;然而,您可能需要根据需要添加额外的功能,例如身份验证检查或错误处理

将负载存储为 S3 上的扁平文件时,我们需要定义存储负载数据的 S3 存储桶的名称、位置和文件名。在我们的示例 lambda function 中,我们在“store_batch”函数中执行此操作。在此示例中,我们将整个批次存储为单个文件,这有助于确保数据在 Bird 与您的端点之间的 HTTP 连接超时之前被收集和存储。虽然您可以调整负载均衡器上的连接超时设置,但不能保证连接不会在传输端(在这种情况下是 Bird)超时,或者连接不会在您的 lambda function 完成执行之前终止。最佳做法是尽可能提高您的消费者功能效率,并将数据处理活动保留给下游流程——如将批量 JSON 格式的负载转换为 CSV 文件,或将事件数据加载到数据库中。

需要注意的是,您可能需要更新 lambda function 的权限。您的执行角色需要 S3 的 PutObject 和 GetObject 权限。最佳实践是执行least privilege 原则,因此我建议仅对存储 webhook 负载的 S3 存储桶设置这些权限。

我们的消费者 lambda function 的一个示例可以在这里找到。

关于批 ID 的简要说明:Bird 将批处理事件为单个负载,其中每批可能包含 1 到 350 或更多事件记录。批次将被赋予一个唯一的批 ID,您可以利用事件 Webhooks API查看批次状态,或者通过单击一个webhook stream并选择“Batch Status”在您的 Bird 账户中查看。如果某个 webhook 负载未能传送,例如在连接超时时,Bird 将自动使用相同的批 ID 重试该批次。当您的 lambda function 接近 10 秒的最大往返时间运行时,这可能会发生,因此优化消费者功能以减少执行时间非常重要。

为了处理所有数据处理活动,我建议创建一个单独的 lambda function,以便在 S3 存储桶上创建新文件时执行——这样,数据处理是异步执行的,与数据传输无关,并且没有因连接终止而丢失数据的风险。我将在后面的部分中讨论处理 lambda function。

数据的实际处理和存储将由一个lambda function执行,该函数由我们的应用程序负载均衡器 (ALB) 调用。

第一步是通过导航到 AWS 的 Lambda 服务并单击“Create Function”来创建您的 lambda function。系统会提示您为 lambda function 指定一个名称,并选择以哪种编程语言编写函数。在此示例中,我们使用 Python 作为运行时语言。

现在我们需要开发我们的 lambda function。假设我们的应用程序负载均衡器已配置好,并且正在将 webhook 负载转发到我们的 lambda function —— lambda 将接收包括完整标头和主体在内的负载。负载是使用“event”对象作为字典传递给我们的 lambda function 的。您可以通过访问负载内的“headers”和“body”对象来独立引用负载的标头和主体。在此示例中,我们只是简单读取“x-messagesystems-batch-id”标头,其中批 ID 由 Bird 为 webhook 批次创建的唯一值,并在将主体作为扁平文件存储在 S3 时将其用作文件名;然而,您可能需要根据需要添加额外的功能,例如身份验证检查或错误处理

将负载存储为 S3 上的扁平文件时,我们需要定义存储负载数据的 S3 存储桶的名称、位置和文件名。在我们的示例 lambda function 中,我们在“store_batch”函数中执行此操作。在此示例中,我们将整个批次存储为单个文件,这有助于确保数据在 Bird 与您的端点之间的 HTTP 连接超时之前被收集和存储。虽然您可以调整负载均衡器上的连接超时设置,但不能保证连接不会在传输端(在这种情况下是 Bird)超时,或者连接不会在您的 lambda function 完成执行之前终止。最佳做法是尽可能提高您的消费者功能效率,并将数据处理活动保留给下游流程——如将批量 JSON 格式的负载转换为 CSV 文件,或将事件数据加载到数据库中。

需要注意的是,您可能需要更新 lambda function 的权限。您的执行角色需要 S3 的 PutObject 和 GetObject 权限。最佳实践是执行least privilege 原则,因此我建议仅对存储 webhook 负载的 S3 存储桶设置这些权限。

我们的消费者 lambda function 的一个示例可以在这里找到。

关于批 ID 的简要说明:Bird 将批处理事件为单个负载,其中每批可能包含 1 到 350 或更多事件记录。批次将被赋予一个唯一的批 ID,您可以利用事件 Webhooks API查看批次状态,或者通过单击一个webhook stream并选择“Batch Status”在您的 Bird 账户中查看。如果某个 webhook 负载未能传送,例如在连接超时时,Bird 将自动使用相同的批 ID 重试该批次。当您的 lambda function 接近 10 秒的最大往返时间运行时,这可能会发生,因此优化消费者功能以减少执行时间非常重要。

为了处理所有数据处理活动,我建议创建一个单独的 lambda function,以便在 S3 存储桶上创建新文件时执行——这样,数据处理是异步执行的,与数据传输无关,并且没有因连接终止而丢失数据的风险。我将在后面的部分中讨论处理 lambda function。

创建一个 Application Load Balancer

为了接收 webhook 负载,我们需要提供一个端点来发送负载。我们通过在 AWS 中导航到 EC2 > 负载均衡器并点击“Create Load Balancer”来创建应用负载均衡器以实现此目的。系统会提示您选择要创建的负载均衡器类型——在这种情况下,我们需要创建一个应用负载均衡器。我们需要使用应用负载均衡器 (ALB) 来构建我们的消费者,因为事件 webhook 将作为 HTTP 请求发送,而 ALB 用于在 AWS 内路由 HTTP 请求。我们可以实现一个 HTTP 网关作为替代方案;然而,我们在此项目中使用 ALB 因为它比 HTTP 网关更轻量且更具成本效益。需要注意的是,如果您选择使用 HTTP 网关,事件格式可能与 ALB 不同,因此您的 lambda 函数需要相应地处理请求对象。

一旦您的 ALB 创建完成,系统将提示您为 ALB 指定名称并配置方案和访问/安全设置——由于我们计划从外部来源 (Bird) 接收事件数据,我们希望 ALB 面向互联网。 在“Listeners and routing”下,该 ALB 应通过端口 443 监听 HTTPS,我们希望创建一个目标组指向我们的 lambda 函数,以便我们的 ALB 将传入请求转发到我们在上面创建的消费 lambda 函数。同时,您还需要确保安全组有权限通过端口 443 接受流量。

为了接收 webhook 负载,我们需要提供一个端点来发送负载。我们通过在 AWS 中导航到 EC2 > 负载均衡器并点击“Create Load Balancer”来创建应用负载均衡器以实现此目的。系统会提示您选择要创建的负载均衡器类型——在这种情况下,我们需要创建一个应用负载均衡器。我们需要使用应用负载均衡器 (ALB) 来构建我们的消费者,因为事件 webhook 将作为 HTTP 请求发送,而 ALB 用于在 AWS 内路由 HTTP 请求。我们可以实现一个 HTTP 网关作为替代方案;然而,我们在此项目中使用 ALB 因为它比 HTTP 网关更轻量且更具成本效益。需要注意的是,如果您选择使用 HTTP 网关,事件格式可能与 ALB 不同,因此您的 lambda 函数需要相应地处理请求对象。

一旦您的 ALB 创建完成,系统将提示您为 ALB 指定名称并配置方案和访问/安全设置——由于我们计划从外部来源 (Bird) 接收事件数据,我们希望 ALB 面向互联网。 在“Listeners and routing”下,该 ALB 应通过端口 443 监听 HTTPS,我们希望创建一个目标组指向我们的 lambda 函数,以便我们的 ALB 将传入请求转发到我们在上面创建的消费 lambda 函数。同时,您还需要确保安全组有权限通过端口 443 接受流量。

为了接收 webhook 负载,我们需要提供一个端点来发送负载。我们通过在 AWS 中导航到 EC2 > 负载均衡器并点击“Create Load Balancer”来创建应用负载均衡器以实现此目的。系统会提示您选择要创建的负载均衡器类型——在这种情况下,我们需要创建一个应用负载均衡器。我们需要使用应用负载均衡器 (ALB) 来构建我们的消费者,因为事件 webhook 将作为 HTTP 请求发送,而 ALB 用于在 AWS 内路由 HTTP 请求。我们可以实现一个 HTTP 网关作为替代方案;然而,我们在此项目中使用 ALB 因为它比 HTTP 网关更轻量且更具成本效益。需要注意的是,如果您选择使用 HTTP 网关,事件格式可能与 ALB 不同,因此您的 lambda 函数需要相应地处理请求对象。

一旦您的 ALB 创建完成,系统将提示您为 ALB 指定名称并配置方案和访问/安全设置——由于我们计划从外部来源 (Bird) 接收事件数据,我们希望 ALB 面向互联网。 在“Listeners and routing”下,该 ALB 应通过端口 443 监听 HTTPS,我们希望创建一个目标组指向我们的 lambda 函数,以便我们的 ALB 将传入请求转发到我们在上面创建的消费 lambda 函数。同时,您还需要确保安全组有权限通过端口 443 接受流量。

为Load Balancer创建一个DNS Record

为了使我们更容易将我们的 ALB 用作端点,我们将在 DNS 中创建一个 A 记录,指向我们的 ALB。对于此操作,我们可以使用 AWS Route 53 服务(或您当前的 DNS 提供商)并为您想要用作端点的主机名创建一个 A 记录(例如 spevents.<your_domain>)。在 AWS 上大规模处理 DNS 时,请注意存在未公开的 DNS 限制,可能会影响大流量应用程序,尤其是那些处理大量出站流量的邮件传递系统。该 A 记录应配置为指向我们创建的 ALB。如果您使用 Route 53 来管理 DNS 记录,可以通过启用“Alias”并选择 ALB 来直接引用 ALB 实例;否则,如果您使用外部 DNS 提供商,则应将 A 记录指向 ALB 实例的公共 IP 地址。

我建议使用像 Postman 这样的工具来测试所有配置是否正确,然后再启用您的 Bird webhook。您可以向您的端点发出一个 POST 请求,并确认收到响应。如果您的 POST 请求没有返回响应,您可能需要仔细检查您的 ALB 是否在正确的端口上侦听。

为了使我们更容易将我们的 ALB 用作端点,我们将在 DNS 中创建一个 A 记录,指向我们的 ALB。对于此操作,我们可以使用 AWS Route 53 服务(或您当前的 DNS 提供商)并为您想要用作端点的主机名创建一个 A 记录(例如 spevents.<your_domain>)。在 AWS 上大规模处理 DNS 时,请注意存在未公开的 DNS 限制,可能会影响大流量应用程序,尤其是那些处理大量出站流量的邮件传递系统。该 A 记录应配置为指向我们创建的 ALB。如果您使用 Route 53 来管理 DNS 记录,可以通过启用“Alias”并选择 ALB 来直接引用 ALB 实例;否则,如果您使用外部 DNS 提供商,则应将 A 记录指向 ALB 实例的公共 IP 地址。

我建议使用像 Postman 这样的工具来测试所有配置是否正确,然后再启用您的 Bird webhook。您可以向您的端点发出一个 POST 请求,并确认收到响应。如果您的 POST 请求没有返回响应,您可能需要仔细检查您的 ALB 是否在正确的端口上侦听。

为了使我们更容易将我们的 ALB 用作端点,我们将在 DNS 中创建一个 A 记录,指向我们的 ALB。对于此操作,我们可以使用 AWS Route 53 服务(或您当前的 DNS 提供商)并为您想要用作端点的主机名创建一个 A 记录(例如 spevents.<your_domain>)。在 AWS 上大规模处理 DNS 时,请注意存在未公开的 DNS 限制,可能会影响大流量应用程序,尤其是那些处理大量出站流量的邮件传递系统。该 A 记录应配置为指向我们创建的 ALB。如果您使用 Route 53 来管理 DNS 记录,可以通过启用“Alias”并选择 ALB 来直接引用 ALB 实例;否则,如果您使用外部 DNS 提供商,则应将 A 记录指向 ALB 实例的公共 IP 地址。

我建议使用像 Postman 这样的工具来测试所有配置是否正确,然后再启用您的 Bird webhook。您可以向您的端点发出一个 POST 请求,并确认收到响应。如果您的 POST 请求没有返回响应,您可能需要仔细检查您的 ALB 是否在正确的端口上侦听。

创建一个 Bird Webhook

现在我们准备在Bird中创建webhook,并使用上面A记录定义的主机名作为我们的目标端点。要创建webhook,请导航到您的Bird帐户中的Webhooks部分,然后点击“Create Webhook”。系统会提示您为您的webhook指定一个名称,并提供一个目标URL——目标应该是您之前创建的A记录的主机名。请注意,目标URL可能需要包含“HTTPS://”。  

完成后,验证已选择正确的子账户和事件,然后按“Create Webhook”保存您的配置。所有选定事件类型的事件数据现在将流向我们的目标URL,并由我们的ALB进行下游处理。

现在我们准备在Bird中创建webhook,并使用上面A记录定义的主机名作为我们的目标端点。要创建webhook,请导航到您的Bird帐户中的Webhooks部分,然后点击“Create Webhook”。系统会提示您为您的webhook指定一个名称,并提供一个目标URL——目标应该是您之前创建的A记录的主机名。请注意,目标URL可能需要包含“HTTPS://”。  

完成后,验证已选择正确的子账户和事件,然后按“Create Webhook”保存您的配置。所有选定事件类型的事件数据现在将流向我们的目标URL,并由我们的ALB进行下游处理。

现在我们准备在Bird中创建webhook,并使用上面A记录定义的主机名作为我们的目标端点。要创建webhook,请导航到您的Bird帐户中的Webhooks部分,然后点击“Create Webhook”。系统会提示您为您的webhook指定一个名称,并提供一个目标URL——目标应该是您之前创建的A记录的主机名。请注意,目标URL可能需要包含“HTTPS://”。  

完成后,验证已选择正确的子账户和事件,然后按“Create Webhook”保存您的配置。所有选定事件类型的事件数据现在将流向我们的目标URL,并由我们的ALB进行下游处理。

处理 Webhook Event 数据

根据存储Bird事件数据的预期目的,您的需求可能仅通过将JSON有效负载存储为平面文件即可满足。您可能已经建立了可消费和加载JSON格式数据的下游ETL流程。在这些情况下,您可以直接使用我们上面创建的处理lambda生成的平面文件。

或者,您可能需要转换数据,例如将JSON格式转换为CSV格式,或者直接将数据加载到数据库。在此示例中,我们将创建一个简单的lambda函数,将webhook数据从原始JSON格式转换为可以加载到数据库的CSV文件。

根据存储Bird事件数据的预期目的,您的需求可能仅通过将JSON有效负载存储为平面文件即可满足。您可能已经建立了可消费和加载JSON格式数据的下游ETL流程。在这些情况下,您可以直接使用我们上面创建的处理lambda生成的平面文件。

或者,您可能需要转换数据,例如将JSON格式转换为CSV格式,或者直接将数据加载到数据库。在此示例中,我们将创建一个简单的lambda函数,将webhook数据从原始JSON格式转换为可以加载到数据库的CSV文件。

根据存储Bird事件数据的预期目的,您的需求可能仅通过将JSON有效负载存储为平面文件即可满足。您可能已经建立了可消费和加载JSON格式数据的下游ETL流程。在这些情况下,您可以直接使用我们上面创建的处理lambda生成的平面文件。

或者,您可能需要转换数据,例如将JSON格式转换为CSV格式,或者直接将数据加载到数据库。在此示例中,我们将创建一个简单的lambda函数,将webhook数据从原始JSON格式转换为可以加载到数据库的CSV文件。

创建一个Lambda来处理数据

与用于消费 webhook 数据的 lambda 函数一样,我们需要通过导航到 AWS 内的 Lambda 服务并按“Create Function”来创建一个新的 lambda 函数。当在我们的 S3 存储桶上创建新文件时,此新 lambda 函数将被触发——它将读取数据并将其转换为新的 csv 文件。

lambda 函数接受文件信息作为事件。在示例 lambda 函数中,您将看到我们首先进行一系列验证检查,以确保数据完整并按预期格式化。接下来,我们使用“csv”库将 JSON 负载转换为 CSV 文件并写入临时文件。Lambda 函数只能将本地文件写入到“/tmp”目录,因此我们创建一个临时 csv 文件并使用 <batch_id>.csv 的惯例命名它。我们在这里使用 batch_id 的原因仅仅是为了确保由于接收到多个 webhook 负载而运行的任何并行进程不相互干扰,因为每个 webhook 批次都具有唯一的 batch_id。

一旦数据完全转换为 CSV,我们将 CSV 数据读取为字节流,删除临时文件,并将 CSV 数据保存为 S3 上的新文件。需要注意的是,输出需要一个不同的 S3 存储桶,否则,我们有可能创建一个递归循环,导致增加 lambda 使用量和增加成本。我们需要在 lambda 函数中确定我们希望将 CSV 文件存储在哪个 S3 存储桶和位置。按照上述相同的步骤创建一个新的 S3 存储桶以存储我们的 CSV 文件。

请注意 tmp 目录仅限于 512 MB 的空间,因此重要的是在删除临时文件之后,以确保未来执行有足够的空间。我们使用临时文件而不是直接写入 S3 的原因是通过单个请求简化与 S3 的连接。

就像消费 lambda 函数一样,您可能需要更新处理 lambda 函数的权限。此 lambda 函数要求执行角色具有输入 S3 存储桶的 GetObject 权限,并且具有输出 S3 存储桶的 PutObject 和 GetObject 权限。

我们处理 lambda 函数的示例可以在这里找到。

与用于消费 webhook 数据的 lambda 函数一样,我们需要通过导航到 AWS 内的 Lambda 服务并按“Create Function”来创建一个新的 lambda 函数。当在我们的 S3 存储桶上创建新文件时,此新 lambda 函数将被触发——它将读取数据并将其转换为新的 csv 文件。

lambda 函数接受文件信息作为事件。在示例 lambda 函数中,您将看到我们首先进行一系列验证检查,以确保数据完整并按预期格式化。接下来,我们使用“csv”库将 JSON 负载转换为 CSV 文件并写入临时文件。Lambda 函数只能将本地文件写入到“/tmp”目录,因此我们创建一个临时 csv 文件并使用 <batch_id>.csv 的惯例命名它。我们在这里使用 batch_id 的原因仅仅是为了确保由于接收到多个 webhook 负载而运行的任何并行进程不相互干扰,因为每个 webhook 批次都具有唯一的 batch_id。

一旦数据完全转换为 CSV,我们将 CSV 数据读取为字节流,删除临时文件,并将 CSV 数据保存为 S3 上的新文件。需要注意的是,输出需要一个不同的 S3 存储桶,否则,我们有可能创建一个递归循环,导致增加 lambda 使用量和增加成本。我们需要在 lambda 函数中确定我们希望将 CSV 文件存储在哪个 S3 存储桶和位置。按照上述相同的步骤创建一个新的 S3 存储桶以存储我们的 CSV 文件。

请注意 tmp 目录仅限于 512 MB 的空间,因此重要的是在删除临时文件之后,以确保未来执行有足够的空间。我们使用临时文件而不是直接写入 S3 的原因是通过单个请求简化与 S3 的连接。

就像消费 lambda 函数一样,您可能需要更新处理 lambda 函数的权限。此 lambda 函数要求执行角色具有输入 S3 存储桶的 GetObject 权限,并且具有输出 S3 存储桶的 PutObject 和 GetObject 权限。

我们处理 lambda 函数的示例可以在这里找到。

与用于消费 webhook 数据的 lambda 函数一样,我们需要通过导航到 AWS 内的 Lambda 服务并按“Create Function”来创建一个新的 lambda 函数。当在我们的 S3 存储桶上创建新文件时,此新 lambda 函数将被触发——它将读取数据并将其转换为新的 csv 文件。

lambda 函数接受文件信息作为事件。在示例 lambda 函数中,您将看到我们首先进行一系列验证检查,以确保数据完整并按预期格式化。接下来,我们使用“csv”库将 JSON 负载转换为 CSV 文件并写入临时文件。Lambda 函数只能将本地文件写入到“/tmp”目录,因此我们创建一个临时 csv 文件并使用 <batch_id>.csv 的惯例命名它。我们在这里使用 batch_id 的原因仅仅是为了确保由于接收到多个 webhook 负载而运行的任何并行进程不相互干扰,因为每个 webhook 批次都具有唯一的 batch_id。

一旦数据完全转换为 CSV,我们将 CSV 数据读取为字节流,删除临时文件,并将 CSV 数据保存为 S3 上的新文件。需要注意的是,输出需要一个不同的 S3 存储桶,否则,我们有可能创建一个递归循环,导致增加 lambda 使用量和增加成本。我们需要在 lambda 函数中确定我们希望将 CSV 文件存储在哪个 S3 存储桶和位置。按照上述相同的步骤创建一个新的 S3 存储桶以存储我们的 CSV 文件。

请注意 tmp 目录仅限于 512 MB 的空间,因此重要的是在删除临时文件之后,以确保未来执行有足够的空间。我们使用临时文件而不是直接写入 S3 的原因是通过单个请求简化与 S3 的连接。

就像消费 lambda 函数一样,您可能需要更新处理 lambda 函数的权限。此 lambda 函数要求执行角色具有输入 S3 存储桶的 GetObject 权限,并且具有输出 S3 存储桶的 PutObject 和 GetObject 权限。

我们处理 lambda 函数的示例可以在这里找到。

配置一个Lambda在新数据存储到S3时执行

现在,我们已经创建了将文件从JSON格式转换为CSV格式的lambda函数,我们需要配置它以便在我们S3桶上创建新文件时触发。为此,我们需要通过打开我们的lambda函数并点击页面顶部的“Add Trigger”来添加触发器。选择“S3”并提供存储原始webhook有效负载的S3桶的名称。您还可以选择指定文件前缀和/或后缀进行过滤。设置配置完成后,您可以通过点击页面底部的“Add”来添加触发器。现在,每当新的文件被添加到您的S3桶时,您的处理lambda函数就会执行。

现在,我们已经创建了将文件从JSON格式转换为CSV格式的lambda函数,我们需要配置它以便在我们S3桶上创建新文件时触发。为此,我们需要通过打开我们的lambda函数并点击页面顶部的“Add Trigger”来添加触发器。选择“S3”并提供存储原始webhook有效负载的S3桶的名称。您还可以选择指定文件前缀和/或后缀进行过滤。设置配置完成后,您可以通过点击页面底部的“Add”来添加触发器。现在,每当新的文件被添加到您的S3桶时,您的处理lambda函数就会执行。

现在,我们已经创建了将文件从JSON格式转换为CSV格式的lambda函数,我们需要配置它以便在我们S3桶上创建新文件时触发。为此,我们需要通过打开我们的lambda函数并点击页面顶部的“Add Trigger”来添加触发器。选择“S3”并提供存储原始webhook有效负载的S3桶的名称。您还可以选择指定文件前缀和/或后缀进行过滤。设置配置完成后,您可以通过点击页面底部的“Add”来添加触发器。现在,每当新的文件被添加到您的S3桶时,您的处理lambda函数就会执行。

将数据加载到数据库

在这个示例中,我不会详细介绍如何将数据加载到数据库中,但是,如果您一直在关注此示例,您有几个选项:

  1. 在您的处理lambda函数中直接将数据加载到您的数据库中

  2. 使用已建立的ETL流程来处理您的CSV文件

无论您是使用AWS数据库服务,比如RDSDynamoDB,还是您拥有自己的PostgreSQL数据库(或类似的),都可以直接从您的处理lambda函数连接到您的数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您也可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似SQL的查询语言访问数据。我建议参考您使用的服务的相应文档,以获取更多有关如何最好地在您环境中实现此功能的信息。

同样,有许多服务可以帮助处理CSV文件并将数据加载到数据库中。您可能已经拥有可以利用的ETL流程。

我们希望您发现本指南有所帮助——发送愉快!

在这个示例中,我不会详细介绍如何将数据加载到数据库中,但是,如果您一直在关注此示例,您有几个选项:

  1. 在您的处理lambda函数中直接将数据加载到您的数据库中

  2. 使用已建立的ETL流程来处理您的CSV文件

无论您是使用AWS数据库服务,比如RDSDynamoDB,还是您拥有自己的PostgreSQL数据库(或类似的),都可以直接从您的处理lambda函数连接到您的数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您也可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似SQL的查询语言访问数据。我建议参考您使用的服务的相应文档,以获取更多有关如何最好地在您环境中实现此功能的信息。

同样,有许多服务可以帮助处理CSV文件并将数据加载到数据库中。您可能已经拥有可以利用的ETL流程。

我们希望您发现本指南有所帮助——发送愉快!

在这个示例中,我不会详细介绍如何将数据加载到数据库中,但是,如果您一直在关注此示例,您有几个选项:

  1. 在您的处理lambda函数中直接将数据加载到您的数据库中

  2. 使用已建立的ETL流程来处理您的CSV文件

无论您是使用AWS数据库服务,比如RDSDynamoDB,还是您拥有自己的PostgreSQL数据库(或类似的),都可以直接从您的处理lambda函数连接到您的数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您也可以使用“boto3”调用RDS或DynamoDB。AWS Athena服务也可以用于直接从平面文件读取数据文件,并使用类似SQL的查询语言访问数据。我建议参考您使用的服务的相应文档,以获取更多有关如何最好地在您环境中实现此功能的信息。

同样,有许多服务可以帮助处理CSV文件并将数据加载到数据库中。您可能已经拥有可以利用的ETL流程。

我们希望您发现本指南有所帮助——发送愉快!

其他新闻

阅读更多来自此类别的内容

A person is standing at a desk while typing on a laptop.

完整的AI原生平台,可与您的业务一起扩展。

© 2025 Bird

A person is standing at a desk while typing on a laptop.

完整的AI原生平台,可与您的业务一起扩展。

© 2025 Bird