Bird’s real-time event-webhooks zijn een ongelooflijk waardevol hulpmiddel voor zenders om gegevens automatisch naar hun systemen te laten sturen. Dit kan downstream automatisering stimuleren, zoals het bijwerken van mailinglijsten, het activeren van geautomatiseerde e-mailtrajecten of het vullen van interne dashboards. Hoewel dezelfde evenementengegevens toegankelijk zijn via de Bird UI met behulp van Event Search, of programmeerbaar zijn door gebruik te maken van de Bird Events API, kunnen beperkingen gesteld aan het aantal records dat in een enkel verzoek wordt geretourneerd of snelheidslimieten die op het API-eindpunt worden geplaatst, beide methoden restrictief maken voor grote en geavanceerde zenders.
Real-time event-webhooks stellen een zender in staat om een eindpunt te configureren waar Bird de gegevens naartoe stuurt, en de gegevens kunnen worden geconsumeerd zonder het inplannen van cron-jobs die de gegevens ophalen. Er zijn ook logistieke compromissen bij het ophalen van de gegevens in plaats van ze naar jou te laten sturen, zoals het identificeren van welke tijdsperiode en parameters voor elk API-verzoek moeten worden gebruikt. Als de tijdsperioden niet perfect zijn afgestemd, loop je het risico gegevens te missen, en als de tijdsperioden elkaar overlappen, moet je dubbele gegevensrecords afhandelen. Met real-time webhooks worden de evenementengegevens eenvoudig naar jouw eindpunt gestuurd zodra ze beschikbaar zijn binnen Bird.
Hoewel de voordelen van het ontvangen van evenementengegevens in realtime om downstream automatiseringsprocessen aan te drijven door veel zenders direct kunnen worden begrepen, kan het werkelijke proces voor het implementeren en consumeren van webhooks intimiderend zijn. Dit kan vooral het geval zijn als je niet bekend bent met de technische componenten van het creëren van een eindpunt en het programmatisch verwerken van de gegevens. Er zijn diensten beschikbaar die Bird-webhookgegevens kunnen consumeren en automatisch in je database kunnen ETL'en – een voorbeeld zou StitchData zijn, waar we in het verleden over hebben geblogd. Als je echter meer controle over het proces wilt, kun je gemakkelijk zelf de componenten bouwen. Het volgende is een eenvoudige gids om zenders gerust te stellen wanneer ze een Bird events-webhook maken en de gegevens consumeren met behulp van de infrastructuur binnen AWS.
Configureren van het Webhook Doeleindpunt
Wanneer een Bird-evenement wordt gecreëerd, willen we dat die evenementengegevens in realtime naar een eindpunt in AWS worden gestreamd, zodat we die gegevens programmatisch kunnen consumeren en gebruiken. De gegevens worden door Bird naar een doeleindpunt verzonden, dat het payload doorstuurt naar een lambdafunctie die de gegevens verwerkt en opslaat in een S3-bucket. Een hoogwaardig diagram van de beschreven gegevensstroom is hieronder te zien:
Om deze workflow te implementeren, laten we ze in omgekeerde volgorde beginnen en eerst een S3-bucket maken waar we onze evenementengegevens opslaan en dan terugwerken - elke component toevoegen die ingrijpt op wat we hebben gebouwd.
Maak een S3-bucket voor het opslaan van de Webhook-gegevens
Voordat we onze load balancer maken om de gegevens te accepteren, of onze lambdafunctie maken voor het opslaan van de gegevens, moeten we eerst ons S3-bucket maken waar de gegevens worden opgeslagen. Om dit te doen, ga naar de S3-service binnen AWS en druk op "Create Bucket." Je wordt gevraagd een naam aan je bucket toe te wijzen en de regio in te stellen - zorg ervoor dat je dezelfde regio gebruikt als je ALB en lambdafunctie. Wanneer je S3-bucket is aangemaakt, is deze leeg - als je de gegevens in een map wilt organiseren, kun je nu de beoogde map aanmaken, of de map zal worden aangemaakt wanneer je lambdafunctie het bestand opslaat. In dit voorbeeld hebben we onze S3-bucket "bird-webhooks" genoemd en een map "B Event Data" aangemaakt om onze evenementengegevens op te slaan - je zult deze namen terugzien in onze lambdafunctie hieronder.
Maak een Lambda-functie om de Gegevens te Consumeren
De feitelijke verwerking en opslag van de gegevens zal worden uitgevoerd door een lambdafunctie die wordt aangeroepen door onze applicatie-load balancer (ALB).
De eerste stap is om je lambdafunctie te maken door naar de Lambda-service binnen AWS te gaan en op "Create Function" te klikken. Je wordt gevraagd een naam voor je lambdafunctie toe te wijzen en te selecteren in welke programmeertaal je je functie wilt schrijven. Voor dit voorbeeld gebruiken we Python als de runtime-taal.
Nu moeten we onze lambdafunctie ontwikkelen. Laten we voor een moment aannemen dat onze applicatie-load balancer is geconfigureerd en het webhook-payload doorstuurt naar onze lambdafunctie - de lambda ontvangt een payload inclusief de volledige headers en body. De payload wordt aan onze lambdafunctie doorgegeven met behulp van het object "event" als een woordenboek. Je kunt de headers en de body van de payload onafhankelijk van elkaar raadplegen door toegang te krijgen tot de "headers" en "body" objecten binnen de payload. In dit voorbeeld gaan we eenvoudigweg de "x-messagesystems-batch-id" header lezen, waarnaar de batch-ID een unieke waarde is die is aangemaakt door Bird voor de webhook-batch, en deze gebruiken als de bestandsnaam wanneer de body als een platte bestand in S3 wordt opgeslagen; echter, je kunt aanvullende functionaliteiten zoals autorisatiecontroles of foutafhandeling toevoegen, indien nodig.
Bij het opslaan van het payload naar een platte bestand op S3, moeten we de naam van de S3-bucket, locatie en bestandsnaam van het bestand definiëren waar de payloadgegevens worden opgeslagen. In onze voorbeeld-lambdafunctie doen we dit binnen de "store_batch" functie. In dit voorbeeld gaan we de gehele batch als een enkel bestand opslaan, wat helpt om ervoor te zorgen dat de gegevens worden verzameld en opgeslagen voordat de HTTP-verbinding tussen Bird en je eindpunt wordt verbroken. Hoewel je de connectietime-outinstellingen op je load balancer kunt aanpassen, zijn er geen garanties dat de connectie niet wordt verbroken aan de transmissiekant (in dit geval Bird) of dat de verbinding niet wordt verbroken voordat je lambdafunctie klaar is met uitvoeren. Het is een best practice om je consumerfunctie zo efficiënt mogelijk te houden en gegevensverwerkingstaken te reserveren voor downstreamprocessen waar mogelijk - zoals het converteren van de gebatchdete JSON-geformuleerd payload naar een CSV-bestand, of het laden van de evenementengegevens in een database.
Het is belangrijk op te merken dat je mogelijk de machtigingen voor je lambdafunctie moet bijwerken. Je uitvoeringsrol heeft PutObject- en GetObject-machtigingen voor S3 nodig. Het is een beste praktijk om het principe van minimalisatie af te dwingen, dus ik raad aan om deze machtigingen alleen in te stellen voor de S3-bucket waar de webhook-payloads worden opgeslagen.
Een voorbeeld van onze consumer-lambdafunctie is te vinden hier.
Als een korte opmerking over de batch-ID: Bird zal gebeurtenissen batchen in een enkele payload, waarbij elke batch 1 tot 350 of meer evenementenrecords kan bevatten. De batch krijgt een unieke batch-ID, die kan worden gebruik om de batch-status te bekijken door de Event Webhooks API aan te wenden of binnen je Bird-account door te klikken op een webhookstroom en "Batch Status" te selecteren. In het geval dat een webhook-payload niet kon worden afgeleverd, zoals bij een verbindingstime-out, zal Bird automatisch de batch opnieuw proberen af te leveren met dezelfde batch-ID. Dit kan gebeuren wanneer je lambdafunctie dicht bij de maximale roundtriptijd van 10 seconden werkt en is een reden om de consumerfunctie te optimaliseren om de uitvoeringstijd te verkorten.
Om alle gegevensverwerkingstaken af te handelen, raad ik aan om een aparte lambdafunctie aan te maken die wordt uitgevoerd telkens wanneer er een nieuw bestand wordt aangemaakt op de S3-bucket – op deze manier wordt de gegevensverwerking asynchroon op de dataoverdracht uitgevoerd, en is er geen risico op het verliezen van gegevens vanwege een verbroken verbinding. Ik bespreek de verwerkingslambdafunctie in een later gedeelte.
Maak een Applicatie Load Balancer
Om een webhook-payload te ontvangen, moeten we een eindpunt bieden om de payloads naar te sturen. Dit doen we door een applicatie-load balancer binnen AWS te maken door te navigeren naar EC2 > Load Balancers en op "Create Load Balancer" te klikken. Je wordt gevraagd om te kiezen welk type load balancer je wilt maken - hiervoor willen we een applicatie-load balancer aanmaken. We moeten een applicatie-load balancer (ALB) gebruiken om onze consument te bouwen omdat de event-webhooks worden verzonden als een HTTP-verzoek, en ALB's worden gebruikt voor het routeren van HTTP-verzoeken binnen AWS. We zouden een HTTP Gateway als alternatief kunnen implementeren; echter, we gebruiken een ALB voor dit project omdat het lichter en kosteneffectiever is dan HTTP Gateway. Het is belangrijk op te merken dat, als je ervoor kiest een HTTP Gateway te gebruiken, het gebeurtenisformaat mogelijk anders is dan met een ALB, en daarom moet je lambdafunctie het verzoekobject dienovereenkomstig afhandelen.
Als je ALB eenmaal is gemaakt, word je gevraagd een naam toe te wijzen aan je ALB en het schema en de toegangs-/beveiligingsinstellingen te configureren - aangezien we van plan zijn om evenementgegevens van een externe bron (Bird) te ontvangen, willen we dat onze ALB internetgericht is. Onder "Listeners en routing" moet de ALB luisteren naar HTTPS op poort 443, en we willen een doeleindgroepen maken die naar onze lambdafunctie wijst zodat onze ALB inkomende verzoeken naar de consumer-lambdafunctie doorstuurt die we hierboven hebben gemaakt. Je moet er ook voor zorgen dat de beveiligingsgroep toestemming heeft om verkeer via poort 443 te accepteren.
Maak een DNS Record voor de Load Balancer
Om het voor ons gemakkelijker te maken om onze ALB als eindpunt te gebruiken, zullen we een A record in DNS maken die naar onze ALB wijst. Hiervoor kunnen we de AWS Route 53-service gebruiken (of je huidige DNS-provider) en een A record aanmaken voor de hostnaam die je wilt gebruiken voor je eindpunt (bijv. spevents.<your_domain>). De A record moet worden geconfigureerd om naar de ALB te wijzen die we hebben gemaakt. Als je Route 53 gebruikt om de DNS-records te beheren, kun je de ALB-instantie direct verwijzen door "Alias" in te schakelen en de ALB te selecteren; anders, als je een externe DNS-provider gebruikt, moet je de A-record naar het openbare IP-adres van de ALB-instantie wijzen.
Ik raad aan om een tool zoals Postman te gebruiken om te testen of alles correct is geconfigureerd voordat je je Bird-webhook inschakelt. Je kunt een POST-verzoek naar je eindpunt doen en bevestigen dat er een reactie wordt ontvangen. Als je POST-verzoek geen reactie retourneert, moet je misschien controleren of je ALB naar de juiste poort luistert.
Maak een Bird Webhook
Nu zijn we klaar om de webhook in Bird te maken en de hostnaam te gebruiken die is gedefinieerd door de A record hierboven als ons doeleindpunt. Om de webhook te maken, navigeer naar het Webhooks-gedeelte binnen je Bird-account en klik op "Create Webhook." Je wordt gevraagd om een naam voor je webhook toe te kennen en een doeleind-URL op te geven - het doel moet de hostnaam zijn van de A record die je eerder hebt gemaakt. Merk op dat de doeleind-URL mogelijk "HTTPS://" moet bevatten in de URL.
Eenmaal voltooid, verifieer de correcte subaccount en evenementen zijn geselecteerd, en druk op "Create Webhook" om je configuratie op te slaan. De evenementengegevens voor alle geselecteerde gebeurtenistypen zullen nu naar onze doeleind-URL streamen en worden geconsumeerd door onze ALB voor downstreamverwerking.
Verwerking van Webhook Event Data
Afhankelijk van het beoogde doel voor het opslaan van de Bird evenementengegevens kunnen je vereisten worden vervuld door simpelweg de JSON-payload als een plat bestand op te slaan. Je kunt ook al een downstream ETL-proces hebben opgezet dat in staat is om gegevens in een JSON-indeling te consumeren en laden. In beide gevallen kun je misschien het platte bestand gebruiken dat door onze verwerkingslambda is gemaakt zoals het is.
Alternatief kun je de gegevens moeten transformeren - bijvoorbeeld om te converteren van een JSON- naar een CSV-indeling - of de gegevens direct in een database laden. In dit voorbeeld zullen we een eenvoudige lambdafunctie maken die de webhook-gegevens van het originele JSON-formaat naar een CSV-bestand converteert dat in een database geladen kan worden.
Maak een Lambda om de Gegevens te Verwerken
Net als bij de lambdafunctie om de webhook-gegevens te consumeren, moeten we een nieuwe lambdafunctie maken door te navigeren naar de Lambda-service binnen AWS en op "Create Function" te drukken. Deze nieuwe lambdafunctie wordt geactiveerd wanneer er een nieuw bestand wordt aangemaakt in onze S3-bucket - het zal de gegevens lezen en omzetten in een nieuw CSV-bestand.
De lambdafunctie accepteert de bestandsinformatie als een gebeurtenis. In de voorbeeld-lambdafunctie zul je zien dat we eerst een reeks validatiecontroles hebben om ervoor te zorgen dat de gegevens compleet en geformatteerd zijn zoals verwacht. Vervolgens zetten we de JSON-payload om naar een CSV-bestand door gebruik te maken van de "csv"-bibliotheek en te schrijven naar een tijdelijk bestand. Lambdafuncties kunnen alleen lokale bestanden schrijven naar de "/tmp"-directory, dus maken we een tijdelijk CSV-bestand en benoemen het met de conventie <batch_id>.csv. De reden dat we de batch_id hier gebruiken, is eenvoudigweg om ervoor te zorgen dat eventuele parallelle processen die draaien als gevolg van het ontvangen van meerdere webhook-payloads elkaar niet storen, omdat elke webhook-batch een unieke batch_id zal hebben.
Zodra de gegevens volledig naar CSV zijn omgezet, lezen we de CSV-gegevens als een byte-stroom, verwijderen we het tijdelijke bestand en slaan we de CSV-gegevens op als een nieuw bestand op S3. Het is belangrijk op te merken dat een andere S3-bucket nodig is voor de output, anders lopen we het risico een recursieve lus te creëren die kan resulteren in verhoogd lambdagebruik en hogere kosten. We moeten binnen onze lambdafunctie bepalen in welke S3-bucket en locatie we ons CSV-bestand willen opslaan. Volg dezelfde procedure als hierboven om een nieuwe S3-bucket te maken voor het opslaan van ons CSV-bestand.
Let op dat de tmp-directory beperkt is tot 512 MB ruimte, dus het is belangrijk dat het tijdelijke bestand daarna wordt verwijderd om voldoende ruimte te garanderen voor toekomstige uitvoeringen. De reden dat we een tijdelijk bestand gebruiken, in plaats van direct naar S3 te schrijven, is om de verbinding met S3 te vereenvoudigen door één enkel verzoek te hebben.
Net als bij de consumer-lambdafunctie moet je mogelijk de machtigingen voor je verwerkingslambdafunctie bijwerken. Deze lambdafunctie vereist dat de uitvoeringsrol GetObject-machtigingen heeft voor de input S3-bucket, en zowel PutObject als GetObject voor de output S3-bucket.
Een voorbeeld van onze verwerkingslambdafunctie is te vinden hier.
Configureer een Lambda om te Uitvoeren Wanneer Nieuwe Gegevens op S3 Worden Opgeslagen
Nu onze lambdafunctie voor het omzetten van het bestand van JSON naar CSV-formaat is gemaakt, moeten we deze configureren om te activeren wanneer er een nieuw bestand wordt aangemaakt op onze S3-bucket. Om dit te doen, moeten we een trigger toevoegen aan onze lambdafunctie door onze lambdafunctie te openen en op "Add Trigger" bovenaan de pagina te klikken. Selecteer "S3" en geef de naam op van de S3-bucket waar de ruwe webhook-payloads worden opgeslagen. Je hebt ook de optie om op bestandsvoorvoegsel en/of achtervoegsel te filteren. Zodra de instellingen zijn geconfigureerd, kun je de trigger toevoegen door op "Add" onderaan de pagina te klikken. Nu wordt je verwerkingslambdafunctie uitgevoerd telkens wanneer een nieuw bestand aan je S3-bucket wordt toegevoegd.
Gegevens Laden in een Database
In dit voorbeeld zal ik het laden van de gegevens in een database niet in detail behandelen, maar als je deze voorbeeld hebt gevolgd, heb je een paar opties:
Laad de gegevens direct in je database binnen je verwerkingslambdafunctie
Gebruik je CSV-bestand met een vastgesteld ETL-proces
Of je nu een AWS-databaseservice gebruikt, zoals RDS of DynamoDB, of je hebt je eigen PostgreSQL-database (of vergelijkbaar), je kunt verbinding maken met je databaseservice direct vanuit je verwerkingslambdafunctie. Op dezelfde manier als we de S3-service hebben aangeroepen met "boto3" in onze lambdafunctie, kun je ook "boto3" gebruiken om RDS of DynamoDB aan te roepen. De AWS Athena service kan ook worden gebruikt om de gegevensbestanden direct uit de platte bestanden te lezen en toegang tot de gegevens te krijgen met behulp van een querytaal die vergelijkbaar is met SQL. Ik raad aan om de desbetreffende documentatie voor de dienst die je gebruikt te raadplegen voor meer informatie over hoe dit het beste binnen je omgeving kan worden bereikt.
Evenzo zijn er veel diensten beschikbaar die kunnen helpen om CSV-bestanden te verwerken en de gegevens in een database te laden. Wellicht heb je al een gevestigd ETL-proces dat je kunt benutten.
We hopen dat je deze gids nuttig vond – veel succes met het verzenden!