鸟类的实时 事件网络钩子 对于发送者来说是一个非常有价值的工具,可以将数据自动推送到他们的系统。这可以驱动下游自动化,例如更新邮件列表,触发自动电子邮件旅行或填充内部仪表板。虽然相同的事件数据可以通过鸟类用户界面访问,使用 事件搜索 或通过利用鸟类的 事件API 以编程方式访问,但在单个请求中返回的记录数量限制或对API端点施加的速率限制,可能会使这两种方法对于大型和复杂的发送者变得有限制。
实时事件网络钩子使发送者能够配置一个端点,鸟类将数据传送到该端点,数据可以消耗,而无需安排提取数据的cron作业。在提取数据时与推送数据到您那里的情况相比,还有物流方面的权衡,例如必须确定每个API请求使用的时间段和参数。如果时间段没有完美对齐,您可能会错过数据,而如果时间段重叠,则需要处理重复的数据记录。通过实时网络钩子,事件数据会在鸟类中可用时简单地推送到您的端点。
虽然许多发送者可能会立刻理解实时接收事件数据驱动下游自动化流程的好处,但实施和消费网络钩子的实际过程可能会令人感到畏惧。这对于不熟悉创建端点和以编程方式处理数据的技术组件的人来说尤其真实。有一些服务可以自动消费鸟类的网络钩子数据并将ETL入你的数据库——一个例子是 StitchData,我们在过去的博客中提到过。 但是,如果您希望对该过程有更多控制,可以轻松自己构建这些组件。以下是一个简单的指南,帮助发送者在创建鸟类事件网络钩子和使用AWS内部基础设施消费数据时感到舒适。
配置网络钩子目标端点
当创建鸟类事件时,我们希望将事件数据实时流式传输到AWS中的端点,以便我们可以以编程方式消费和使用该数据。数据将从鸟类发送到目标端点,该端点将把有效负载转发到一个lambda函数,该函数将处理并将数据存储在S3桶中。描述的数据流的高级图示如下所示:
为了实施这个工作流程,让我们实际上从后往前构建它,从创建一个我们将存储事件数据的S3桶开始,然后再反向添加每个组件,使其融入我们所构建的内容中。
创建一个S3桶来存储网络钩子数据
在创建我们的负载均衡器以接受数据之前,或者我们的lambda函数以存储数据之前,我们需要首先创建我们的 S3桶,数据将存储在这里。 为此,导航到AWS中的S3服务并按下“创建桶”。您将被提示为您的桶分配一个名称并设置区域——确保使用与您的ALB和lambda函数相同的区域。当您的S3桶创建时,它将是空的——如果您希望在文件夹内部组织数据,您可以现在创建预期的目录,或者当您的lambda函数存储文件时,该目录将被创建。在本例中,我们将S3桶命名为“bird-webhooks”,并创建一个名为“B事件数据”的文件夹来存储我们的事件数据——您将在下面的lambda函数中看到这些名称被引用。
创建一个Lambda函数以消费数据
实际的数据处理和存储将由一个 lambda函数 执行,该函数由我们的应用程序负载均衡器 (ALB) 调用。
第一步是通过导航到AWS中的Lambda服务并点击“创建函数”来创建您的lambda函数。您将被提示为您的lambda函数分配一个名称,并选择要用来编写函数的编程语言。在本示例中,我们使用Python作为运行时语言。
现在,我们需要开发我们的lambda函数。暂时假设我们的应用程序负载均衡器已配置,并将网络钩子有效负载转发到我们的lambda函数——lambda将接收到一个有效负载,包括完整的头部和主体。有效负载通过将对象“event”作为字典传递给我们的lambda函数。您可以独立访问有效负载中的“headers”和“body”对象来引用头部和主体。在本例中,我们将简单地读取“x-messagesystems-batch-id”头部,其中批次ID是鸟类为网络钩子批次创建的唯一值,并将其用作存储主体时的文件名;然而,您可能希望添加附加功能,例如身份验证检查或 错误处理,如有需要。
在将有效负载存储到S3的平面文件时,我们需要定义S3桶的名称、位置和要存储有效负载数据的文件名。在我们的示例lambda函数中,我们在“store_batch”函数内执行此操作。在这个例子中,我们将批量存储整个批次为一个文件,这有助于确保数据在鸟类和您的端点之间的HTTP连接超时之前被收集和存储。虽然您可以调整负载均衡器上的连接超时设置,但没有保证连接在传输方(在此情况下为鸟类)不会超时,或者在您的lambda函数完成执行之前不会被终止。保持消费者函数尽可能高效,并在可能的情况下将数据处理活动保留给下游处理过程是最佳实践——例如将批量的JSON格式有效负载转换为CSV文件,或将事件数据加载到数据库中。
值得注意的是,您可能需要更新lambda函数的权限。您的执行角色需要在S3上具有PutObject和GetObject权限。执行 最小权限原则 是最佳实践,因此我建议仅对存储网络钩子有效负载的S3桶设置这些权限。
我们的消费者lambda函数的示例可以在这里找到。
关于批次ID的快速说明: 鸟类会将 事件批量处理 为一个单一的有效负载,其中每个批次可能包含1到350或更多事件记录。 批次将被赋予一个唯一的批次ID,可以通过利用 事件网络钩子API 或在您的鸟类帐户中单击 网络钩子流 并选择“批次状态”来查看批次状态。如果无法传递网络钩子有效负载,例如在连接超时期间,鸟类将自动使用相同的批次ID 重试 批次。这可能发生在您的lambda函数运行接近最大往返时间的10秒时,是优化消费者函数以减少执行时间的原因。
为了处理所有数据处理活动,我建议创建一个单独的lambda函数,该函数在新的文件创建在S3桶中时执行 - 这样,数据处理是异步进行的,并且不必担心由于连接中断而丢失数据。我在后面的部分讨论处理的lambda函数。
创建一个应用程序负载均衡器
为了接收网络钩子有效负载,我们需要提供一个将有效负载发送到的端点。我们可以通过在AWS中创建一个应用程序负载均衡器来做到这一点,方法是导航到EC2 > 负载均衡器并点击“创建负载均衡器”。您将被提示选择要创建的负载均衡器类型——为此,我们希望创建一个应用程序负载均衡器。我们需要使用应用程序负载均衡器(ALB)构建我们的消费者,因为事件网络钩子将作为HTTP请求发送,而ALB用于在AWS中路由HTTP请求。我们可以实现HTTP网关作为替代;但是,我们在此项目中使用ALB,因为它比HTTP网关更轻量且更具性价比。值得注意的是,如果您选择使用HTTP网关,则事件格式可能与ALB有所不同,因此您的lambda函数需要相应地处理请求对象。
一旦创建了ALB,您将被提示为您的ALB分配名称并配置方案和访问/安全设置——由于我们计划从外部来源(鸟类)接收事件数据,我们希望我们的ALB是面向互联网的。 在“监听器和路由”下,ALB应在443端口上侦听HTTPS,并且我们希望创建一个目标组,指向我们的lambda函数,以便ALB将传入的请求转发到我们上面创建的消费lambda函数。 您还需要确保安全组有权限接受通过443端口的流量。
为负载均衡器创建DNS记录
为了更方便地将ALB用作端点,我们将创建一个指向ALB的A记录。在此,我们可以使用AWS Route 53服务(或您当前的DNS提供商)并为您想用于端点的主机名创建A记录(例如:spevents.<your_domain>)。A记录应配置为指向我们创建的ALB。如果您使用Route 53管理DNS记录,可以通过启用“别名”并选择ALB直接引用ALB实例;否则,如果您使用外部DNS提供商,则应将A记录指向ALB实例的公共IP地址。
我建议使用Postman等工具来测试一切是否已正确配置,然后再启用您的鸟类网络钩子。您可以向您的端点发出POST请求并确认收到响应。如果您的POST请求未返回响应,则可能需要仔细检查您的ALB是否在侦听正确的端口。
创建一个鸟类网络钩子
现在我们准备在鸟类中创建 网络钩子,并使用上述A记录定义的主机名作为我们的目标端点。要创建网络钩子,导航到您的鸟类帐户中的 网络钩子部分,然后点击“创建网络钩子”。您将被提示为您的网络钩子分配一个名称并提供目标URL——目标应为您之前创建的A记录的主机名。请注意,目标URL可能需要在URL中包含“HTTPS://”。
完成后,验证所选子账户和事件是否正确,并按下“创建网络钩子”以保存您的配置。所有选定事件类型的事件数据现在将流式传输到我们的目标URL,并由我们的ALB进行下游处理。
处理网络钩子事件数据
根据存储鸟类事件数据的预期目的,您的需求可能仅通过简单地将JSON有效负载存储为平面文件得到满足。您可能也已经建立了一个可以消费和加载JSON格式数据的下游ETL过程。在这两种情况下,您可能可以直接使用我们上面创建的处理lambda生成的平面文件。
或者,您可能需要转换数据——例如将其从JSON格式转换为CSV格式——或将数据直接加载到数据库中。在本示例中,我们将创建一个简单的lambda函数,将网络钩子数据从原始JSON格式转换为可以加载到数据库中的CSV文件。
创建一个Lambda来处理数据
与消费网络钩子数据的lambda函数一样,我们需要通过导航到AWS中的Lambda服务并按下“创建函数”来创建一个新的lambda函数。这个新的lambda函数将在我们的S3桶中创建新文件时被触发——它将读取数据并将其转换为新的csv文件。
lambda函数将文件信息作为事件接收。在示例lambda函数中,您会看到我们首先进行一系列验证检查,以确保数据完整且格式符合预期。接下来,我们使用“csv”库将JSON有效负载转换为CSV文件并写入临时文件。Lambda函数只能将本地文件写入“/tmp”目录,因此我们创建一个临时csv文件,并使用约定<batch_id>.csv命名它。我们在这里使用batch_id的原因,仅仅是为了确保由于接收到多个网络钩子有效负载而运行的任何并行流程不会相互干扰,因为每个网络钩子批次将具有唯一的batch_id。
一旦数据完全转换为CSV,我们将CSV数据读取为字节流,删除临时文件,并将CSV数据保存为S3上的新文件。需要注意的是,输出需要一个不同的S3桶,否则我们将面临创建递归循环的风险,这将导致增加lambda使用和费用。我们需要在lambda函数中确定希望在哪里存储我们的CSV文件以及在哪个S3桶和位置中。 按照上面的相同步骤创建一个新的S3桶以存储我们的CSV文件。
请注意,tmp目录的空间限制为512 MB,因此确保在之后删除临时文件以确保未来执行有足够的空间是非常重要的。我们使用临时文件而不是直接写入S3的原因是为了简化与S3的连接,使其成为单个请求。
就像消费lambda函数一样,您可能需要更新处理lambda函数的权限。此lambda函数要求执行角色对输入S3桶具有GetObject权限,而对输出S3桶同时具有PutObject和GetObject权限。
我们的处理lambda函数的示例可以在 这里 找到。
配置一个Lambda在S3上存储新数据时执行
现在我们的lambda函数已经创建,用于将文件从JSON转换为CSV格式,我们需要配置它,以便在我们的S3桶中创建新文件时触发。为此,我们需要通过打开我们的lambda函数并单击页面顶部的“添加触发器”来添加触发器。 选择“S3”,并提供存储原始网络钩子有效负载的S3桶的名称。您还可以选择指定文件前缀和/或后缀以进行过滤。一旦设置完成,您可以通过单击页面底部的“添加”来添加触发器。现在,当有新文件添加到您的S3桶时,您的处理lambda函数将执行。
将数据加载到数据库中
在本示例中,我不会详细介绍将数据加载到数据库中,但如果您一直跟随本示例,您有几个选项:
直接将数据加载到您处理的lambda函数中的数据库中
使用已建立的ETL过程消费您的CSV文件
无论您是使用AWS数据库服务,例如 RDS 或 DynamoDB,还是拥有自己的PostgreSQL数据库(或类似的),您都可以直接从您的处理lambda函数连接到数据库服务。例如,就像我们在lambda函数中使用“boto3”调用S3服务一样,您也可以使用“boto3”调用RDS或DynamoDB。AWS Athena 服务也可以用来直接从平面文件读取数据文件,并使用类似于SQL的查询语言访问数据。我建议查阅您所使用的服务的相关文档,以获取有关如何最好地在您的环境中完成此任务的更多信息。
同样,有许多服务可以帮助消费CSV文件并将数据加载到数据库中。您可能已经有可以利用的已建立ETL过程。
我们希望您发现本指南有帮助——发送愉快!