Jednym z pytań, które czasami otrzymujemy jest, jak mogę masowo weryfikować listy e-mailowe za pomocą walidacji odbiorcy? Istnieją dwie opcje, jedna to przesłanie pliku przez interfejs SparkPost w celu walidacji, a druga to wykonywanie indywidualnych wywołań do API dla każdego e-maila (ponieważ API weryfikuje pojedyncze e-maile).
Pierwsza opcja działa świetnie, ale ma ograniczenie do 20MB (około 500,000 adresów). Co jeśli ktoś ma listę e-mailową zawierającą miliony adresów? Może to oznaczać podzielenie tego na tysiące przesyłek plików CSV.
Skoro przesyłanie tysięcy plików CSV wydaje się nieco niewykonalne, wziąłem ten przypadek i zacząłem się zastanawiać, jak szybko mogę uruchomić API. W tym wpisie na blogu wyjaśnię, co próbowałem i jak ostatecznie doszedłem do programu, który mógł wykonac około 100,000 walidacji w 55 sekund (podczas gdy w interfejsie uzyskałem około 100,000 walidacji w 1 minutę 10 sekund). I chociaż nadal zajęłoby to około 100 godzin, aby zrealizować około 654 miliona walidacji, ten skrypt może działać w tle, oszczędzając znacząco czas.
Ostateczną wersję tego programu można znaleźć tutaj.
Moja pierwsza pomyłka: użycie Pythona
Python jest jednym z moich ulubionych języków programowania. Doskonale sprawdza się w wielu obszarach i jest niezwykle prosty. Jednak jednym z obszarów, w którym nie jest najlepszy, są procesy współbieżne. Chociaż Python ma możliwość uruchamiania funkcji asynchronicznych, ma to, co nazywa się Globalnym Blokadą Interpretatora Pythona lub GIL.
„Globalna blokada interpretera Pythona lub GIL, w prostych słowach, jest mutexem (lub blokadą), który pozwala tylko jednemu wątkowi kontrolować interpreter Pythona.
Oznacza to, że tylko jeden wątek może być w stanie wykonania w danym momencie. Wpływ GIL jest niewidoczny dla programistów, którzy wykonują programy jednowątkowe, ale może być wąskim gardłem wydajności w kodzie związanym z CPU oraz wielowątkowym.
Ponieważ GIL pozwala tylko jednemu wątkowi wykonywać kod w danym momencie, nawet w architekturze wielowątkowej z więcej niż jednym rdzeniem CPU, GIL zyskał reputację „niechcianej” cechy Pythona.” (https://realpython.com/python-gil/)”
Początkowo nie byłem świadomy GIL, więc zacząłem programować w Pythonie. Na końcu, mimo że mój program był asynchroniczny, zaciął się, a bez względu na to, ile wątków dodałem, uzyskałem tylko około 12-15 iteracji na sekundę.
Główna część funkcji asynchronicznej w Pythonie może być zobaczona poniżej:
async def validateRecipients(f, fh, apiKey, snooze, count): h = {'Authorization': apiKey, 'Accept': 'application/json'} with tqdm(total=count) as pbar: async with aiohttp.ClientSession() as session: for address in f: for i in address: thisReq = requests.compat.urljoin(url, i) async with session.get(thisReq,headers=h, ssl=False) as resp: content = await resp.json() row = content['results'] row['email'] = i fh.writerow(row) pbar.update(1
Dlatego zrezygnowałem z Pythona i wróciłem do planszy...
Zdecydowałem się na wykorzystanie NodeJS z powodu jego zdolności do wykonywania operacji I/O bez blokowania. Jestem także dość zaznajomiony z programowaniem w NodeJS.
Używając asynchronicznych aspektów NodeJS, to działało dobrze. Aby uzyskać więcej informacji na temat programowania asynchronicznego w NodeJS, zobacz https://blog.risingstack.com/node-hero-async-programming-in-node-js/
Moja druga pomyłka: próba załadowania pliku do pamięci
Moja początkowa idea wyglądała następująco:
Najpierw zaingestuj listę e-maili w formacie CSV. Po drugie, załaduj e-maile do tablicy i sprawdź, czy są w poprawnym formacie. Po trzecie, wywołaj asynchronicznie API walidacji odbiorcy. Po czwarte, poczekaj na wyniki i załaduj je do zmiennej. I na koniec, wyprowadź tę zmienną do pliku CSV.
To działało bardzo dobrze dla mniejszych plików. Problem pojawił się, gdy próbowałem uruchomić 100,000 e-maili. Program zaciął się przy około 12,000 walidacjach. Z pomocą jednego z naszych deweloperów front-end, zrozumiałem, że problem wynikał z ładowania wszystkich wyników do zmiennej (a zatem szybko kończył się pamięć). Jeśli chciałbyś zobaczyć pierwszą iterację tego programu, zamieściłem ją tutaj: Wersja 1 (NIE REKOMENDOWANA).
Najpierw zaingestuj listę e-maili w formacie CSV. Po drugie, policz liczbę e-maili w pliku w celach raportowych. Po trzecie, gdy każda linia jest odczytywana asynchronicznie, wywołaj API walidacji odbiorcy i wyprowadź wyniki do pliku CSV.
W związku z tym, dla każdej odczytanej linii, wywołuję API i zapisuję wyniki asynchronicznie, aby nie przechowywać tych danych w pamięci długoterminowej. Po rozmowie z zespołem walidacji odbiorców usunąłem również sprawdzanie składni e-maili, ponieważ poinformowali mnie, że walidacja odbiorców ma już wbudowane kontrole do sprawdzenia, czy e-mail jest ważny, czy nie.
Rozbicie ostatecznego kodu
Po przeczytaniu i walidacji argumentów terminalowych, uruchamiam następujący kod. Najpierw wczytuję plik CSV e-maili i zliczam każdą linię. Istnieją dwa cele tej funkcji, 1) pozwala mi dokładnie raportować postęp pliku [jak zobaczymy później], oraz 2) pozwala mi zatrzymać stoper, gdy liczba e-maili w pliku równa się zakończonym walidacjom. Dodałem stoper, aby mogłem przeprowadzić benchmarki i upewnić się, że otrzymuję dobre wyniki.
let count = 0; //Licznik linii require("fs") .createReadStream(myArgs[1]) .on("data", function (chunk) { for (let i = 0; i < chunk.length; ++i) if (chunk[i] == 10) count++; }) //Odczytuje plik wejściowy i zwiększa licznik dla każdej linii .on("close", function () { //Na końcu pliku wejściowego, po zliczeniu wszystkich linii, uruchom funkcję walidacji odbiorcy validateRecipients.validateRecipients(count, myArgs); });
Następnie dzwonię do funkcji validateRecipients. Zauważ, że ta funkcja jest asynchroniczna. Po potwierdzeniu, że plik wejściowy i wyjściowy są w formacie CSV, zapisuję wiersz nagłówka i uruchamiam stoper programu przy użyciu biblioteki JSDOM.
async function validateRecipients(email_count, myArgs) { if ( //Jeśli oba pliki wejściowe i wyjściowe są w formacie .csv extname(myArgs[1]).toLowerCase() == ".csv" && extname(myArgs[3]).toLowerCase() == ".csv" ) { let completed = 0; //Licznik dla każdego wywołania API email_count++; //Licznik linii zwraca #linii - 1, to jest zrobione, aby poprawić liczbę linii //Uruchom stoper const { window } = new JSDOM(); const start = window.performance.now(); const output = fs.createWriteStream(myArgs[3]); //Plik wyjściowy output.write( "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n" ); //Zapisz nagłówki w pliku wyjściowym
Następny skrypt jest w rzeczywistości główną częścią programu, więc rozdzielę go i wyjaśnię, co się dzieje. Dla każdej linii pliku wejściowego:
Asynchronicznie weź tę linię i wywołaj API walidacji odbiorcy.
fs.createReadStream(myArgs[1]) .pipe(csv.parse({ headers: false })) .on("data", async (email) => { let url = SPARKPOST_HOST + "/api/v1/recipient-validation/single/" + email; await axios .get(url, { headers: { Authorization: SPARKPOST_API_KEY, }, }) //Dla każdego wiersza odczytanego z pliku wejściowego, wywołaj API walidacji odbiorców SparkPost
Następnie, w odpowiedzi
Dodaj e-mail do JSON (aby móc wydrukować e-mail w CSV)
Sprawdź, czy przyczyna jest null, a jeśli tak, wypełnij pustą wartością (to zapewnia, że format CSV jest spójny, ponieważ w niektórych przypadkach przyczyna jest podana w odpowiedzi)
Ustaw opcje i klucze dla modułu json2csv.
Przekształć JSON na CSV i wyprowadź (używając json2csv)
Zapisz postęp w terminalu
Kiedy liczba e-maili w pliku = zakończone walidacje, zatrzymaj stoper i wydrukuj wyniki
.then(function (response) { response.data.results.email = String(email); //Dodaje e-mail jako wartość/parę kluczy do JSON odpowiedzi, aby można go było użyć do wyjścia response.data.results.reason ? null : (response.data.results.reason = ""); //Jeśli przyczyna jest null, ustaw ją na pustą, aby CSV był jednolity //Używa json-2-csv do przekształcenia JSON w format CSV i wyjścia let options = { prependHeader: false, //Wyłącza dodawanie wartości JSON jako wierszy nagłówków dla każdego wiersza keys: [ "results.email", "results.valid", "results.result", "results.reason", "results.is_role", "results.is_disposable", "results.is_free", "results.delivery_confidence", ], //Ustawia kolejność kluczy }; let json2csvCallback = function (err, csv) { if (err) throw err; output.write(`${csv}\n`); }; converter.json2csv(response.data, json2csvCallback, options); completed++; //Zwiększa licznik API process.stdout.write(`Zrobione z ${completed} / ${email_count}\r`); //Raportuje status Zakończone / Całkowite w konsoli bez pokazywania nowych linii //Jeśli wszystkie e-maile ukończyły walidację if (completed == email_count) { const stop = window.performance.now(); //Zatrzymaj stoper console.log( `Wszystkie e-maile pomyślnie zweryfikowane w ${ (stop - start) / 1000 } sekund` ); } })
Jednym z ostatnich problemów, które odkryłem, było to, że podczas gdy wszystko działało świetnie na Macu, natrafiłem na następujący błąd podczas korzystania z Windowsa po około 10,000 walidacjach:
Błąd: connect ENOBUFS XX.XX.XXX.XXX:443 – Lokalny (nieokreślony:nieokreślony) z e-mailem XXXXXXX@XXXXXXXXXX.XXX
Po dalszym badaniu okazuje się, że to problem z pulą połączeń HTTP klienta NodeJS, który nie ponownie używa połączeń. Znalazłem ten artykuł na Stackoverflow na ten temat, a po dalszym poszukiwaniu, znalazłem dobry domyślny konfigurację dla biblioteki axios, która rozwiązała ten problem. Nadal nie jestem pewien, dlaczego ten problem występuje tylko w Windowsie, a nie na Macu.
Kolejne kroki
Dla kogoś, kto szuka prostego, szybkiego programu, który przyjmuje csv, wywołuje API walidacji odbiorcy i wyprowadza CSV, ten program jest dla Ciebie.
Niektóre dodatki do tego programu mogłyby obejmować:
Stworzenie front-endu lub łatwiejszego interfejsu użytkownika do użycia
Lepsze obsługiwanie błędów i ponowne próby, ponieważ jeśli z jakiegoś powodu API zgłasza błąd, program obecnie nie próbuje ponownie wywołać
Byłbym również ciekaw, czy szybsze wyniki można osiągnąć przy użyciu innego języka, takiego jak Golang lub Erlang/Elixir.
Proszę, śmiało podziel się ze mną wszelkimi uwagami lub sugestiami dotyczącymi rozszerzenia tego projektu.