
Dla kogoś, kto szuka prostego, szybkiego programu, który przyjmuje plik CSV, wywołuje API weryfikacji odbiorcy i zwraca plik CSV, ten program jest dla Ciebie.
Podczas tworzenia aplikacji e-mailowych, deweloperzy często muszą integrować wiele usług i interfejsów API. Rozumienie podstaw API e-mail w infrastrukturze chmurowej zapewnia podstawy do tworzenia solidnych narzędzi, takich jak system masowej walidacji, który stworzymy w tym przewodniku.
Jedno z pytań, które czasami otrzymujemy, brzmi: jak mogę masowo walidować listy e-mail z walidacją odbiorcy? Istnieją dwie opcje: jedna to przesłanie pliku przez UI SparkPost do walidacji, a druga to wykonywanie pojedynczych wywołań dla każdego e-maila do API (ponieważ API to walidacja pojedynczego e-maila).
Pierwsza opcja działa świetnie, ale ma ograniczenie do 20 MB (około 500 000 adresów). Co jeśli ktoś ma listę e-mail zawierającą miliony adresów? Może to oznaczać podzielenie tego na tysiące przesyłek plików CSV.
Ponieważ przesłanie tysięcy plików CSV wydaje się mało prawdopodobne, zastanowiłem się, jak szybko mogę zmusić API do działania. W tym wpisie na blogu wyjaśnię, co próbowałem i jak w końcu doszedłem do programu, który mógłby wykonać około 100 000 weryfikacji w 55 sekund (podczas gdy w UI zdobyłem około 100 000 weryfikacji w 1 minutę 10 sekund). I chociaż zajmie to jeszcze około 100 godzin, aby ukończyć około 654 milionów weryfikacji, ten skrypt może działać w tle, oszczędzając znaczną ilość czasu.
Ostateczną wersję tego programu można znaleźć tutaj.
My first mistake: używanie Python
Python jest jednym z moich ulubionych języków programowania. Wyróżnia się w wielu obszarach i jest niezwykle prosty. Jednak nie wyróżnia się w kwestii procesów równoległych. Chociaż Python ma możliwość uruchamiania funkcji asynchronicznych, posiada to, co jest znane jako The Python Global Interpreter Lock lub GIL.
„The Python Global Interpreter Lock lub GIL, mówiąc prostymi słowami, to jest mutex (lub blokada), która pozwala tylko jednemu wątkowi kontrolować interpreter Pythona.
Oznacza to, że tylko jeden wątek może być w stanie wykonywania w danym momencie. Wpływ GIL nie jest widoczny dla programistów, którzy wykonują programy jednowątkowe, ale może być wąskim gardłem wydajnościowym w kodzie intensywnie wykorzystującym CPU i wielowątkowym.
Ponieważ GIL pozwala na wykonywanie tylko jednego wątku naraz, nawet w architekturze wielowątkowej z więcej niż jednym rdzeniem procesora, GIL zdobył reputację „niesławnej” cechy Pythona.” (https://realpython.com/python-gil/)”
Na początku nie byłem świadomy istnienia GIL, więc zacząłem programować w Pythonie. Na końcu, mimo że mój program był asynchroniczny, był zablokowany, i niezależnie od tego, ile wątków dodałem, nadal uzyskiwałem tylko około 12-15 iteracji na sekundę.
Główna część funkcji asynchronicznej w Pythonie jest pokazana poniżej:
async def validateRecipients(f, fh, apiKey, snooze, count): h = {'Authorization': apiKey, 'Accept': 'application/json'} with tqdm(total=count) as pbar: async with aiohttp.ClientSession() as session: for address in f: for i in address: thisReq = requests.compat.urljoin(url, i) async with session.get(thisReq, headers=h, ssl=False) as resp: content = await resp.json() row = content['results'] row['email'] = i fh.writerow(row) pbar.update(1)
Więc porzuciłem używanie Pythona i wróciłem do rysowania…
Zdecydowałem się na wykorzystanie NodeJS ze względu na jego zdolność do wykonywania operacji nieblokujących i/o niezwykle dobrze. Inną doskonałą opcją do obsługi asynchronicznego przetwarzania API jest budowanie serverless webhook consumers with Azure Functions, które mogą efektywnie obsługiwać zmienne obciążenia. Również jestem dość zaznajomiony z programowaniem w NodeJS.
Wykorzystując asynchroniczne aspekty NodeJS, to rozwiązanie działało dobrze. Więcej szczegółów na temat programowania asynchronicznego w NodeJS można znaleźć na https://blog.risingstack.com/node-hero-async-programming-in-node-js/
Mój drugi błąd: próba odczytania pliku do pamięci
Rozbijanie końcowego kodu
Po przeczytaniu i zweryfikowaniu argumentów terminalu, uruchamiam następujący kod. Najpierw wczytuję plik CSV z e-mailami i zliczam każdą linię. Istnieją dwa cele tej funkcji: 1) pozwala mi dokładnie raportować postęp pracy z plikiem [co zobaczymy później], oraz 2) pozwala mi zatrzymać stoper, gdy liczba e-maili w pliku równa się ukończonym weryfikacjom. Dodałem stoper, aby móc przeprowadzać testy porównawcze i upewnić się, że uzyskuję dobre wyniki.
let count = 0; //Line count require("fs") .createReadStream(myArgs[1]) .on("data", function (chunk) { for (let i = 0; i < chunk.length; ++i) if (chunk[i] == 10) count++; }) //Reads the infile and increases the count for each line .on("close", function () { //At the end of the infile, after all lines have been counted, run the recipient validation function validateRecipients.validateRecipients(count, myArgs); });
Następnie wywołuję funkcję validateRecipients. Należy zauważyć, że ta funkcja jest asynchroniczna. Po zweryfikowaniu, że infile i outfile są w formacie CSV, zapisuję wiersz nagłówkowy i uruchamiam stoper programu za pomocą biblioteki JSDOM.
async function validateRecipients(email_count, myArgs) { if ( //If both the infile and outfile are in .csv format extname(myArgs[1]).toLowerCase() == ".csv" && extname(myArgs[3]).toLowerCase() == ".csv" ) { let completed = 0; //Counter for each API call email_count++; //Line counter returns #lines - 1, this is done to correct the number of lines //Start a timer const { window } = new JSDOM(); const start = window.performance.now(); const output = fs.createWriteStream(myArgs[3]); //Outfile output.write( "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n" ); //Write the headers in the outfile
Następujący skrypt jest naprawdę główną częścią programu, więc podzielę go i wyjaśnię, co się dzieje. Dla każdej linii w pliku wejściowym:
Asynchronicznie pobierz tę linię i wywołaj API walidacji odbiorców.
fs.createReadStream(myArgs[1]) .pipe(csv.parse({ headers: false })) .on("data", async (email) => { let url = SPARKPOST_HOST + "/api/v1/recipient-validation/single/" + email; await axios .get(url, { headers: { Authorization: SPARKPOST_API_KEY, }, }) //For each row read in from the infile, call the SparkPost Recipient Validation API
Następnie, na odpowiedzi
Dodaj e-mail do JSON (aby móc wydrukować e-mail w CSV)
Sprawdź, czy powód jest null, a jeśli tak, uzupełnij pustą wartość (jest to konieczne, aby format CSV był spójny, ponieważ w niektórych przypadkach powód jest podany w odpowiedzi)
Ustaw opcje i klucze dla modułu json2csv.
Konwertuj JSON na CSV i wyprowadź (wykorzystując json2csv)
Zapisz postęp w terminalu
Wreszcie, jeśli liczba e-maili w pliku = ukończone weryfikacje, zatrzymaj stoper i wydrukuj wyniki
.then(function (response) { response.data.results.email = String(email); //Adds the email as a value/key pair to the response JSON to be used for output response.data.results.reason ? null : (response.data.results.reason = ""); //If reason is null, set it to blank so the CSV is uniform //Utilizes json-2-csv to convert the JSON to CSV format and output let options = { prependHeader: false, //Disables JSON values from being added as header rows for every line keys: [ "results.email", "results.valid", "results.result", "results.reason", "results.is_role", "results.is_disposable", "results.is_free", "results.delivery_confidence", ], //Sets the order of keys }; let json2csvCallback = function (err, csv) { if (err) throw err; output.write(`${csv}\n`); }; converter.json2csv(response.data, json2csvCallback, options); completed++; //Increase the API counter process.stdout.write(`Done with ${completed} / ${email_count}\r`); //Output status of Completed / Total to the console without showing new lines //If all emails have completed validation if (completed == email_count) { const stop = window.performance.now(); //Stop the timer console.log( `All emails successfully validated in ${ (stop - start) / 1000 } seconds` ); } })
Ostatnim problemem, który znalazłem, było to, że działało to świetnie na Mac, ale napotkałem następujący błąd podczas korzystania z Windows po około 10 000 weryfikacji:
Błąd: connect ENOBUFS XX.XX.XXX.XXX:443 – Lokalny (undefined:undefined) z e-mailem XXXXXXX@XXXXXXXXXX.XXX
Po dokładniejszych badaniach wydaje się, że jest to problem z puli połączeń klienta HTTP NodeJS nie ponownie używającego połączeń. Znalazłem ten artykuł na Stackoverflow na temat tego problemu, a po dalszych poszukiwaniach znalazłem dobrą konfigurację domyślną dla biblioteki axios, która rozwiązała ten problem. Nadal nie jestem pewien, dlaczego ten problem występuje tylko na Windows, a nie na Mac.
Następne Kroki
Dla kogoś, kto szuka prostego szybkiego programu, który przyjmuje plik csv, wykorzystuje API walidacji odbiorców, a następnie zapisuje wynik jako CSV, ten program jest dla Ciebie.
Do programu można dodać następujące elementy:
Zbuduj front-end lub łatwiejszy interfejs użytkownika
Lepsza obsługa błędów i ponowne próby, ponieważ jeśli z jakiegoś powodu API zgłasza błąd, program obecnie nie ponawia wywołania
Rozważ wdrożenie jako serverless Azure Function dla automatycznego skalowania i zmniejszonego zarządzania infrastrukturą
Byłbym również ciekaw, czy można osiągnąć szybsze rezultaty z innym językiem programowania, takim jak Golang lub Erlang/Elixir. Oprócz wyboru języka, ograniczenia infrastruktury mogą również wpływać na wydajność - nauczyliśmy się tego z pierwszej ręki, kiedy napotkaliśmy niemadokumentowane limity DNS w AWS, które wpłynęły na nasze systemy przetwarzania dużej ilości e-maili.
Dla deweloperów zainteresowanych łączeniem przetwarzania API z narzędziami do wizualnego przepływu pracy, sprawdź, jak zintegrować Flow Builder z Google Cloud Functions dla bezkodowych przepływów automatyzacji.
Proszę, nie wahaj się przesłać mi uwag lub sugestii dotyczących rozszerzenia tego projektu.