Webhooki w czasie rzeczywistym z Bird są niezwykle cennym narzędziem dla nadawców, które pozwala na automatyczne przesyłanie danych do ich systemów. Może to napędzać automatyzację w dół strumienia, taką jak aktualizacja list mailingowych, uruchamianie zautomatyzowanych ścieżek e-mailowych lub wypełnianie wewnętrznych pulpitów nawigacyjnych. Choć te same dane o wydarzeniach można uzyskać za pomocą interfejsu użytkownika Bird, korzystając z Wyszukiwania Wydarzeń, lub programowo, wykorzystując API Wydarzeń Bird, ograniczenia nałożone na liczbę rekordów zwracanych w jednym żądaniu lub limity szybkości nałożone na punkt końcowy API mogą sprawić, że obie te metody będą ograniczające dla dużych i zaawansowanych nadawców.
Webhooki zdarzeń w czasie rzeczywistym pozwalają nadawcy skonfigurować punkt końcowy, do którego Bird przesyła dane, a dane mogą być przetwarzane bez konieczności planowania zadań cron, które ściągają dane. Istnieją również ograniczenia logistyczne, gdy ściągasz dane w porównaniu do ich przesyłania do Ciebie, takie jak konieczność określenia, jaki okres czasu i jakie parametry zastosować dla każdego żądania API. Jeśli okresy czasu nie są idealnie zgrane, istnieje ryzyko pominięcia danych, a jeśli okresy czasu się pokrywają, musisz poradzić sobie z duplikatami danych. Dzięki webhookom w czasie rzeczywistym dane wydarzeń są po prostu przesyłane do Twojego punktu końcowego, gdy stają się dostępne w Bird.
Chociaż korzyści płynące z otrzymywania danych o wydarzeniach w czasie rzeczywistym, aby napędzać procesy automatyzacji w dół strumienia, mogą być od razu zrozumiane przez wielu nadawców, faktyczny proces implementacji i korzystania z webhooków może być onieśmielający. To może być szczególnie prawdziwe, jeśli nie jesteś zaznajomiony z technicznymi elementami tworzenia punktu końcowego i programowego przetwarzania danych. Istnieją usługi, które będą pobierać dane webhooków Bird i automatycznie ETL do twojej bazy danych — przykładem może być StitchData, o którym pisaliśmy w przeszłości. Jednak jeśli chcesz mieć większą kontrolę nad procesem, możesz łatwo zbudować komponenty samodzielnie. Oto prosty przewodnik, który pomoże nadawcom poczuć się komfortowo przy tworzeniu webhooku wydarzeń Bird i przetwarzaniu danych przy użyciu infrastruktury w AWS.
Konfigurowanie punktu końcowego webhooka
Kiedy tworzony jest event Bird, chcemy, aby dane tego wydarzenia były przesyłane w czasie rzeczywistym do punktu końcowego w AWS, abyśmy mogli programowo przetwarzać i używać tych danych. Dane będą wysyłane z Bird do punktu końcowego, który przekaże ładunek do funkcji lambda, która przetworzy i przechowa dane w koszyku S3. Wysokopoziomowy diagram opisanego przepływu danych można zobaczyć poniżej:
Aby zrealizować ten workflow, zacznijmy od budowy ich w odwrotnej kolejności, zaczynając od stworzenia koszyka S3, w którym będziemy przechowywać nasze dane wydarzeń, a następnie będziemy pracować wstecz — dodając każdy komponent, który wprowadza do tego, co już zbudowaliśmy.
Utwórz koszyk S3 do przechowywania danych webhooka
Zanim utworzymy nasz load balancer do akceptacji danych lub naszą funkcję lambda do przechowywania danych, musimy najpierw utworzyć nasz koszyk S3, w którym dane będą przechowywane. Aby to zrobić, przejdź do usługi S3 w AWS i naciśnij „Utwórz koszyk”. Zostaniesz poproszony o przypisanie nazwy do swojego koszyka i ustawienie regionu — upewnij się, że używasz tego samego regionu co twój ALB i funkcja lambda. Kiedy Twój koszyk S3 zostanie utworzony, będzie pusty — jeśli chcesz zorganizować dane wewnątrz folderu, możesz teraz utworzyć zamierzony katalog, lub katalog zostanie utworzony, gdy Twoja funkcja lambda zapisze plik. W tym przykładzie nazwaliśmy nasz koszyk S3 „bird-webhooks” i stworzyliśmy folder o nazwie „B Dane Wydarzeń”, aby przechować nasze dane wydarzeń — zobaczysz te nazwy odnoszące się do naszej funkcji lambda poniżej.
Utwórz funkcję lambda do przetwarzania danych
Faktyczne przetwarzanie i przechowywanie danych zostanie wykonane przez funkcję lambda, która zostanie wywołana przez nasz aplikacyjny load balancer (ALB).
Pierwszym krokiem jest utworzenie funkcji lambda, przechodząc do usługi Lambda w AWS i klikając „Utwórz funkcję.” Zostaniesz poproszony o przypisanie nazwy swojemu funkcjonować i wybranie, w jakim języku programowania chcesz napisać swoją funkcję. W tym przykładzie używamy Pythona jako języka uruchomieniowego.
Teraz musimy opracować naszą funkcję lambda. Na chwilę zakładamy, że nasz aplikacyjny load balancer został skonfigurowany i przekazuje ładunek webhooka do naszej funkcji lambda – lambda odbierze ładunek zawierający pełne nagłówki i treść. Ładunek jest przekazywany do naszej funkcji lambda za pomocą obiektu „event” jako słownik. Możesz odwołać się do nagłówków i treści ładunku niezależnie, uzyskując dostęp do obiektów „headers” i „body” w ładunku. W tym przykładzie po prostu odczytamy nagłówek „x-messagesystems-batch-id”, gdzie ID partii to unikalna wartość stworzone przez Bird dla partii webhooków, i użyjemy go jako nazwy pliku podczas przechowywania treści jako płaskiego pliku w S3; jednak możesz chcieć dodać dodatkowe funkcje, takie jak kontrole autoryzacji lub obsługa błędów, w zależności od potrzeb.
Podczas zapisywania ładunku jako płaskiego pliku w S3, będziemy musieli określić nazwę koszyka S3, lokalizację i nazwę pliku, w którym dane ładunku będą przechowywane. W naszej przykładowej funkcji lambda robimy to w ramach funkcji „store_batch”. W tym przykładzie zamierzamy przechować całą partię jako pojedynczy plik, co pomaga zapewnić, że dane są zbierane i przechowywane zanim połączenie HTTP między Bird a Twoim punktem końcowym wygaśnie. Chociaż moglibyśmy dostosować ustawienia limitu połączeń na swoim load balancerze, nie ma gwarancji, że połączenie nie wygaśnie po stronie przesyłania (w tym przypadku Bird) lub że połączenie nie zostanie zakończone przed zakończeniem wykonywania funkcji lambda. Najlepszą praktyką jest, aby funkcja konsumenta była jak najbardziej efektywna i rezerwowała działania przetwarzania danych dla procesów działających w dół strumienia, gdzie to możliwe — takich jak konwersja partii danych sformatowanych w JSON do pliku CSV lub załadowanie danych zdarzeń do bazy danych.
Ważne jest, aby zauważyć, że możesz potrzebować zaktualizować uprawnienia dla swojej funkcji lambda. Twoja rola wykonawcza będzie potrzebować uprawnień PutObject i GetObject dla S3. Najlepszą praktyką jest egzekwowanie zasady najmniejszych uprawnień, więc zalecam ustawienie tych uprawnień tylko dla koszyka S3, w którym będą przechowywane ładunki webhooków.
Przykład naszej konsumenckiej funkcji lambda można znaleźć tutaj.
Krótka uwaga dotycząca ID partii: Bird będzie grupować zdarzenia w pojedynczym ładunku, w którym każda partia może zawierać od 1 do 350 lub więcej rekordów wydarzeń. Partia otrzyma unikalne ID, które może być użyte do przeglądania statusu partii, wykorzystując API Webhooków Zdarzeń lub w ramach Twojego konta Bird, klikając na strumień webhooków i wybierając „Status partii”. W przypadku, gdy ładunek webhooków nie mógł zostać dostarczony, np. w wyniku przekroczenia limitu czasu połączenia, Bird automatycznie ponowi próbę partii, wykorzystując to samo ID partii. Może się to zdarzyć, gdy funkcja lambda działa blisko maksymalnego czasu przejazdu wynoszącego 10 sekund i jest powodem do optymalizacji funkcji konsumenta, aby zredukować czas wykonywania.
Aby zrealizować wszystkie działania przetwarzania danych, zalecam utworzenie osobnej funkcji lambda, która jest wywoływana za każdym razem, gdy nowy plik jest tworzony w koszyku S3 — w ten sposób przetwarzanie danych odbywa się asynchronicznie do przesyłania danych, a ryzyko utraty danych z powodu zakończonego połączenia nie istnieje. Omawiam funkcję przetwarzania lambda w późniejszej sekcji.
Utwórz aplikacyjny load balancer
Aby otrzymać ładunek webhooków, musimy zapewnić punkt końcowy do przesyłania ładunków. Robimy to, tworząc aplikacyjny load balancer w AWS, przechodząc do EC2 > Load Balancers i klikając „Utwórz Load Balancer.” Zostaniesz poproszony o wybór, jaki typ load balancera chcesz utworzyć — w tym przypadku chcemy utworzyć aplikacyjny load balancer. Musimy użyć aplikacyjnego load balancera (ALB), aby zbudować naszego konsumenta, ponieważ webhooki zdarzeń będą przesyłane jako żądanie HTTP, a ALB są używane do routingu żądań HTTP w AWS. Moglibyśmy wdrożyć bramę HTTP jako alternatywę; jednak w tym projekcie używamy ALB, ponieważ jest on lżejszy i bardziej opłacalny niż brama HTTP. Ważne jest, aby zauważyć, że jeśli zdecydujesz się użyć bramy HTTP, format zdarzenia może być inny niż w przypadku ALB, a zatem Twoja funkcja lambda będzie musiała odpowiednio obsłużyć obiekt żądania.
Gdy Twój ALB zostanie utworzony, zostaniesz poproszony o nadanie nazwy swojemu ALB oraz skonfigurowanie schematu i ustawień zabezpieczeń — ponieważ planujemy otrzymywać dane zdarzeń z zewnętrznego źródła (Bird), chcemy, aby nasz ALB był dostępny przez internet. W sekcji „Słuchacze i routing” ALB powinien nasłuchiwać HTTPS na porcie 443, a chcemy utworzyć grupę docelową, która wskazuje na naszą funkcję lambda, aby nasz ALB mógł przekazywać przychodzące żądania do funkcji konsumenckiej lambda, którą stworzyliśmy powyżej. Musisz również upewnić się, że grupa zabezpieczeń ma pozwolenie na akceptowanie ruchu przez port 443.
Utwórz rekord DNS dla Load Balancera
Aby ułatwić nam korzystanie z naszego ALB jako punktu końcowego, utworzymy rekord A w DNS, który wskazuje na nasze ALB. W tym celu możemy skorzystać z usługi AWS Route 53 (lub swojego aktualnego dostawcy DNS) i utworzyć rekord A dla nazwy hosta, której chcesz użyć dla swojego punktu końcowego (np. spevents.<twoja_domena>). Rekord A powinien być skonfigurowany, aby wskazywał na ALB, który utworzyliśmy. Jeśli używasz Route 53 do zarządzania rekordami DNS, możesz bezpośrednio odwołać się do instancji ALB, włączając „Alias” i wybierając ALB; w przeciwnym razie, jeśli korzystasz z zewnętrznego dostawcy DNS, powinieneś skierować rekord A na publiczny adres IP z instancji ALB.
Zalecam użycie narzędzia takiego jak Postman, aby przetestować, czy wszystko zostało poprawnie skonfigurowane przed włączeniem webhooku Bird. Możesz wysłać żądanie POST do swojego punktu końcowego i potwierdzić, że odpowiedź została odebrana. Jeśli Twoje żądanie POST nie zwraca odpowiedzi, możesz potrzebować podwójnie sprawdzić, czy Twój ALB nasłuchuje na odpowiednim porcie.
Utwórz webhook Bird
Teraz możemy utworzyć webhook w Bird i użyć nazwy hosta zdefiniowanej przez powyższy rekord A jako naszego docelowego punktu końcowego. Aby utworzyć webhook, przejdź do sekcji Webhooki w swoim koncie Bird i kliknij „Utwórz Webhook.” Zostaniesz poproszony o przypisanie nazwy do swojego webhooka i podanie adresu URL celu — cel powinien być nazwą hosta rekordu A, który stworzyłeś wcześniej. Zauważ, że adres URL celu może wymagać dodania „HTTPS://” do adresu URL.
Po zakończeniu sprawdź, czy wybrany jest właściwy podkonto i czy wybrane są odpowiednie zdarzenia, a następnie naciśnij „Utwórz Webhook”, aby zapisać swoją konfigurację. Dane wydarzeń dla wszystkich wybranych typów wydarzeń będą teraz przesyłane do naszego adresu URL docelowego i będą przetwarzane przez nasz ALB w dół strumienia.
Przetwarzanie danych zdarzeń webhooka
W zależności od zamierzonego celu przechowywania danych zdarzeń Bird, Twoje wymagania mogą być spełnione po prostu przez przechowywanie ładunku JSON jako płaskiego pliku. Możesz także mieć już ustalony proces ETL w dół strumienia, który jest w stanie konsumować i ładować dane w formacie JSON. W obu tych przypadkach możesz być w stanie użyć płaskiego pliku stworzonego przez naszą funkcję lambda przetwarzającą, którą stworzyliśmy wcześniej, tak jak jest.
Alternatywnie, możesz potrzebować przekształcić dane — na przykład, aby przekonwertować z formatu JSON na CSV — lub załadować dane bezpośrednio do bazy danych. W tym przykładzie stworzymy prostą funkcję lambda, która przekształci dane webhooka z oryginalnego formatu JSON do pliku CSV, który można załadować do bazy danych.
Utwórz lambdę do przetwarzania danych
Podobnie jak w przypadku funkcji lambda do konsumowania danych webhooków, musimy utworzyć nową funkcję lambda, przechodząc do usługi Lambda w AWS i naciskając „Utwórz funkcję.” Ta nowa funkcja lambda będzie wywoływana, gdy nowy plik zostanie utworzony w naszym koszyku S3 — odczyta dane i przekształci je w nowy plik csv.
Funkcja lambda przyjmuje informacje o pliku jako zdarzenie. W przykładowej funkcji lambda zobaczysz, że na początku mamy szereg kontroli walidacyjnych, aby upewnić się, że dane są kompletne i sformatowane zgodnie z oczekiwaniami. Następnie konwertujemy ładunek JSON do pliku CSV, korzystając z biblioteki „csv” i zapisując do pliku tymczasowego. Funkcje lambda mogą zapisywać lokalne pliki tylko w katalogu „/tmp”, dlatego tworzymy tymczasowy plik csv i nadajemy mu nazwę zgodnie z konwencją <batch_id>.csv. Powód, dla którego używamy ID partii tutaj, jest po prostu, aby upewnić się, że wszelkie równoległe procesy, które działają na skutek odbioru wielu ładunków webhooków, nie będą miały na siebie wpływu, ponieważ każda partia webhooków będzie miała unikalne ID partii.
Po pełnej konwersji danych na CSV odczytujemy dane CSV jako strumień bajtów, usuwamy plik tymczasowy i zapisujemy dane CSV jako nowy plik w S3. Ważne jest, aby zauważyć, że potrzebny jest inny koszyk S3 dla wyjścia, w przeciwnym razie ryzykujemy stworzenie rekurencyjnej pętli, która może skutkować zwiększonym użyciem lambdy i zwiększonymi kosztami. Będziemy musieli określić, w którym koszyku S3 i lokalizacji chcemy, aby nasz plik CSV był przechowywany w naszej funkcji lambda. Następnie zastosuj tę samą procedurę, co powyżej, aby stworzyć nowy koszyk S3, aby przechować nasz plik CSV.
Zauważ, że katalog tmp ma ograniczenie do 512 MB miejsca, więc ważne jest, aby plik tymczasowy został usunięty później, aby zapewnić wystarczającą ilość miejsca do przyszłych wykonania. Powodem, dla którego używamy pliku tymczasowego, w przeciwieństwie do bezpośredniego pisania do S3, jest uproszczenie połączenia z S3, mając jedną żądanie.
Podobnie jak w przypadku konsumpcyjnej funkcji lambda, może być konieczne zaktualizowanie uprawnień dla Twojej funkcji lambda do przetwarzania. Ta funkcja lambda wymaga, aby rola wykonawcza miała uprawnienia GetObject dla wejściowego koszyka S3 oraz zarówno PutObject, jak i GetObject dla wyjściowego koszyka S3.
Przykład naszej funkcji lambda do przetwarzania można znaleźć tutaj.
Skonfiguruj lambdę, aby uruchomić, gdy nowe dane są przechowywane w S3
Teraz, gdy nasza funkcja lambda do konwersji pliku z formatu JSON na CSV została utworzona, musimy ją skonfigurować, aby wywoływała się, gdy nowy plik zostanie stworzony w naszym koszyku S3. Aby to zrobić, musimy dodać wyzwalacz do naszej funkcji lambda, otwierając naszą funkcję lambda i klikając „Dodaj wyzwalacz” u góry strony. Wybierz „S3” i podaj nazwę koszyka S3, w którym przechowywane są surowe ładunki webhooków. Możesz także mieć opcję określenia prefiksu i/lub sufiksu pliku do filtrowania. Po skonfigurowaniu ustawień możesz dodać wyzwalacz, klikając „Dodaj” na dole strony. Teraz Twoja funkcja lambda przetwarzająca będzie się uruchamiać za każdym razem, gdy nowy plik zostanie dodany do Twojego koszyka S3.
Ładowanie danych do bazy danych
W tym przykładzie nie omówię ładowania danych do bazy danych szczegółowo, ale jeśli śledziłeś ten przykład, masz kilka opcji:
Ładuj dane bezpośrednio do swojej bazy danych w swojej funkcji lambda do przetwarzania
Konsumuj swój plik CSV przy użyciu ustalonego procesu ETL
Niezależnie od tego, czy używasz usługi bazy danych AWS, takiej jak RDS lub DynamoDB, czy masz własną bazę danych PostgreSQL (lub podobną), możesz bezpośrednio połączyć się z usługą bazy danych z funkcji lambda do przetwarzania. Na przykład, w ten sam sposób, w jaki w naszej funkcji lambda wywołałeś usługę S3 za pomocą „boto3”, możesz również użyć „boto3”, aby wywołać RDS lub DynamoDB. Usługa AWS Athena mogłaby również być używana do odczytywania danych z plików bezpośrednio z płaskich plików oraz dostępu do danych przy użyciu języka zapytań podobnego do SQL. Zalecam zapoznanie się z odpowiednią dokumentacją dla używanej usługi w celu uzyskania dalszych informacji na temat tego, jak najlepiej to zrealizować w Twoim środowisku.
Podobnie istnieje wiele usług, które mogą pomóc w konsumowaniu plików CSV i ładowaniu danych do bazy danych. Możesz już mieć ustalony proces ETL, który możesz wykorzystać.
Mamy nadzieję, że przewodnik ten okazał się pomocny – życzymy udanego wysyłania!