Budowanie narzędzia do walidacji odbiorców ptaków w trybie asynchronicznym masowo

Zachary Samuels

26 maj 2022

Email

1 min read

Budowanie narzędzia do walidacji odbiorców ptaków w trybie asynchronicznym masowo

Dla kogoś, kto szuka prostego, szybkiego programu, który przyjmuje plik CSV, wywołuje API weryfikacji odbiorcy i zwraca plik CSV, ten program jest dla Ciebie.

Podczas tworzenia aplikacji e-mailowych, deweloperzy często muszą integrować wiele usług i interfejsów API. Rozumienie podstaw API e-mail w infrastrukturze chmurowej zapewnia podstawy do tworzenia solidnych narzędzi, takich jak system masowej walidacji, który stworzymy w tym przewodniku.

Jedno z pytań, które czasami otrzymujemy, brzmi: jak mogę masowo walidować listy e-mail z walidacją odbiorcy? Istnieją dwie opcje: jedna to przesłanie pliku przez UI SparkPost do walidacji, a druga to wykonywanie pojedynczych wywołań dla każdego e-maila do API (ponieważ API to walidacja pojedynczego e-maila).

Pierwsza opcja działa świetnie, ale ma ograniczenie do 20 MB (około 500 000 adresów). Co jeśli ktoś ma listę e-mail zawierającą miliony adresów? Może to oznaczać podzielenie tego na tysiące przesyłek plików CSV.

Ponieważ przesłanie tysięcy plików CSV wydaje się mało prawdopodobne, zastanowiłem się, jak szybko mogę zmusić API do działania. W tym wpisie na blogu wyjaśnię, co próbowałem i jak w końcu doszedłem do programu, który mógłby wykonać około 100 000 weryfikacji w 55 sekund (podczas gdy w UI zdobyłem około 100 000 weryfikacji w 1 minutę 10 sekund). I chociaż zajmie to jeszcze około 100 godzin, aby ukończyć około 654 milionów weryfikacji, ten skrypt może działać w tle, oszczędzając znaczną ilość czasu.

Ostateczną wersję tego programu można znaleźć tutaj.

My first mistake: używanie Python

Python jest jednym z moich ulubionych języków programowania. Wyróżnia się w wielu obszarach i jest niezwykle prosty. Jednak jednym obszarem, w którym nie wyróżnia się, są procesy równoległe. Chociaż python ma możliwość uruchamiania funkcji asynchronicznych, posiada to, co jest znane jako Global Interpreter Lock Pythona, czyli GIL.

„Global Interpreter Lock Pythona, czyli GIL, w prostych słowach to mechanizm mutex (lub blokada), który pozwala tylko jednemu wątkowi na kontrolowanie interpretera Pythona.

Oznacza to, że tylko jeden wątek może być w stanie wykonywania w danym momencie. Wpływ GIL nie jest widoczny dla programistów, którzy wykonują programy jednowątkowe, ale może być wąskim gardłem wydajnościowym w kodzie zależnym od CPU i wielowątkowym.

Ponieważ Global Interpreter Lock (GIL) pozwala na wykonywanie w danym momencie tylko jednego wątku, nawet na systemach wielordzeniowych, zyskał reputację „niesławnej” cechy Pythona (zobacz artykuł Real Python na temat GIL).

Na początku, nie byłem świadomy GIL, więc zacząłem programować w pythonie. Na końcu, mimo iż mój program był asynchroniczny, został zablokowany i niezależnie od tego, ile dodałem wątków, nadal uzyskiwałem tylko około 12-15 iteracji na sekundę.

Główna część funkcji asynchronicznej w Python można zobaczyć poniżej:

async def validateRecipients(f, fh, apiKey, snooze, count):
    h = {
        'Authorization': apiKey,
        'Accept': 'application/json'
    }
    with tqdm(total=count) as pbar:
        async with aiohttp.ClientSession() as session:
            for address in f:
                for i in address:
                    thisReq = requests.compat.urljoin(url, i)
                    async with session.get(thisReq, headers=h, ssl=False) as resp:
                        content = await resp.json()
                        row = content['results']
                        row['email'] = i
                        fh.writerow(row)
                        pbar.update(1)

Więc zrezygnowałem z używania Pythona i wróciłem do tablicy kreślarskiej...

Zdecydowałem się na wykorzystanie NodeJS ze względu na jego zdolność do wykonywania operacji i/o bez blokowania. Inną doskonałą opcją do obsługi asynchronicznego przetwarzania API jest budowa konsumentów webhooków bezserwerowych z Azure Functions, które mogą efektywnie obsługiwać zmienne obciążenia. Jestem również dość zaznajomiony z programowaniem w NodeJS.

Wykorzystując asynchroniczne aspekty Node.js, podejście to działało dobrze. Więcej szczegółów na temat programowania asynchronicznego w Node.js można znaleźć w przewodniku RisingStack na temat programowania asynchronicznego w Node.js.

Mój drugi błąd: próba odczytania pliku do pamięci

Mój początkowy pomysł był następujący:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Po pierwsze, wczytaj listę e-maili z pliku CSV. Po drugie, załaduj e-maile do tablicy i sprawdź, czy są w poprawnym formacie. Po trzecie, asynchronicznie wywołaj API weryfikacji odbiorcy. Po czwarte, poczekaj na wyniki i załaduj je do zmiennej. I na koniec, wyprowadź tę zmienną do pliku CSV.

Działało to bardzo dobrze dla mniejszych plików. Problem pojawił się, gdy próbowałem przetworzyć 100 000 e-maili. Program zatrzymał się na około 12 000 weryfikacji. Z pomocą jednego z naszych programistów front-end, zauważyłem, że problemem było ładowanie wszystkich wyników do zmiennej (a zatem szybko kończyła się pamięć). Jeśli chciałbyś zobaczyć pierwszą iterację tego programu, zamieściłem ją tutaj: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Po pierwsze, wczytaj listę e-maili z pliku CSV. Po drugie, policz liczbę e-maili w pliku do celów raportowania. Po trzecie, gdy każda linia jest czytana asynchronicznie, wywołaj API weryfikacji odbiorcy i wyprowadź wyniki do pliku CSV.

Dlatego, dla każdej czytanej linii, wywołuję API i zapisuję wyniki asynchronicznie, aby nie przechowywać żadnej z tych danych w pamięci długoterminowej. Usunąłem również sprawdzanie składni e-maili po rozmowie z zespołem ds. weryfikacji odbiorców, ponieważ poinformowali mnie, że weryfikacja odbiorców już zawiera wbudowane sprawdzenia, czy e-mail jest prawidłowy, czy nie.

Mój początkowy pomysł był następujący:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Po pierwsze, wczytaj listę e-maili z pliku CSV. Po drugie, załaduj e-maile do tablicy i sprawdź, czy są w poprawnym formacie. Po trzecie, asynchronicznie wywołaj API weryfikacji odbiorcy. Po czwarte, poczekaj na wyniki i załaduj je do zmiennej. I na koniec, wyprowadź tę zmienną do pliku CSV.

Działało to bardzo dobrze dla mniejszych plików. Problem pojawił się, gdy próbowałem przetworzyć 100 000 e-maili. Program zatrzymał się na około 12 000 weryfikacji. Z pomocą jednego z naszych programistów front-end, zauważyłem, że problemem było ładowanie wszystkich wyników do zmiennej (a zatem szybko kończyła się pamięć). Jeśli chciałbyś zobaczyć pierwszą iterację tego programu, zamieściłem ją tutaj: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Po pierwsze, wczytaj listę e-maili z pliku CSV. Po drugie, policz liczbę e-maili w pliku do celów raportowania. Po trzecie, gdy każda linia jest czytana asynchronicznie, wywołaj API weryfikacji odbiorcy i wyprowadź wyniki do pliku CSV.

Dlatego, dla każdej czytanej linii, wywołuję API i zapisuję wyniki asynchronicznie, aby nie przechowywać żadnej z tych danych w pamięci długoterminowej. Usunąłem również sprawdzanie składni e-maili po rozmowie z zespołem ds. weryfikacji odbiorców, ponieważ poinformowali mnie, że weryfikacja odbiorców już zawiera wbudowane sprawdzenia, czy e-mail jest prawidłowy, czy nie.

Mój początkowy pomysł był następujący:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Po pierwsze, wczytaj listę e-maili z pliku CSV. Po drugie, załaduj e-maile do tablicy i sprawdź, czy są w poprawnym formacie. Po trzecie, asynchronicznie wywołaj API weryfikacji odbiorcy. Po czwarte, poczekaj na wyniki i załaduj je do zmiennej. I na koniec, wyprowadź tę zmienną do pliku CSV.

Działało to bardzo dobrze dla mniejszych plików. Problem pojawił się, gdy próbowałem przetworzyć 100 000 e-maili. Program zatrzymał się na około 12 000 weryfikacji. Z pomocą jednego z naszych programistów front-end, zauważyłem, że problemem było ładowanie wszystkich wyników do zmiennej (a zatem szybko kończyła się pamięć). Jeśli chciałbyś zobaczyć pierwszą iterację tego programu, zamieściłem ją tutaj: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Po pierwsze, wczytaj listę e-maili z pliku CSV. Po drugie, policz liczbę e-maili w pliku do celów raportowania. Po trzecie, gdy każda linia jest czytana asynchronicznie, wywołaj API weryfikacji odbiorcy i wyprowadź wyniki do pliku CSV.

Dlatego, dla każdej czytanej linii, wywołuję API i zapisuję wyniki asynchronicznie, aby nie przechowywać żadnej z tych danych w pamięci długoterminowej. Usunąłem również sprawdzanie składni e-maili po rozmowie z zespołem ds. weryfikacji odbiorców, ponieważ poinformowali mnie, że weryfikacja odbiorców już zawiera wbudowane sprawdzenia, czy e-mail jest prawidłowy, czy nie.

Rozbijanie końcowego kodu

Po przeczytaniu i zweryfikowaniu argumentów terminala, uruchamiam następujący kod. Najpierw odczytuję plik CSV z emailami i zliczam każdą linię. Istnieją dwa cele tej funkcji: 1) pozwala mi dokładnie raportować postęp pliku [jak zobaczymy później], oraz 2) pozwala mi zatrzymać stoper, gdy liczba emaili w pliku równa się zakończonym walidacjom. Dodałem stoper, aby móc przeprowadzać testy wydajności i upewnić się, że otrzymuję dobre wyniki.

let count = 0; // Line count
require("fs")
    .createReadStream(myArgs[1])
    .on("data", function (chunk) {
        for (let i = 0; i < chunk.length; ++i)
            if (chunk[i] == 10) count++;
    })
    // Reads the infile and increases the count for each line
    .on("close", function () {
        // At the end of the infile, after all lines have been counted, run the recipient validation function
        validateRecipients.validateRecipients(count, myArgs);
    });


 Następnie wywołuję funkcję validateRecipients. Należy zauważyć, że funkcja ta jest asynchroniczna. Po zweryfikowaniu, że infile i outfile są w formacie CSV, zapisuję wiersz nagłówkowy i uruchamiam stoper programu korzystając z biblioteki JSDOM.

async function validateRecipients(email_count, myArgs) {
    if (
        // If both the infile and outfile are in .csv format
        extname(myArgs[1]).toLowerCase() == ".csv" &&
        extname(myArgs[3]).toLowerCase() == ".csv"
    ) {
        let completed = 0; // Counter for each API call
        email_count++; // Line counter returns #lines - 1, this corrects the number of lines
        // Start a timer
        const { window } = new JSDOM();
        const start = window.performance.now();
        const output = fs.createWriteStream(myArgs[3]); // Outfile
        output.write(
            "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n"
        ); // Write the headers in the outfile
    }
}

Następujący skrypt to naprawdę główna część programu, więc podzielę go i wyjaśnię, co się dzieje. Dla każdej linii pliku infile:

fs.createReadStream(myArgs[1])
    .pipe(csv.parse({ headers: false }))
    .on("data", async (email) => {
        let url =
            SPARKPOST_HOST +
            "/api/v1/recipient-validation/single/" +
            email;
        await axios
            .get(url, {
                headers: {
                    Authorization: SPARKPOST_API_KEY,
                },
            });
        // For each row read in from the infile, call the SparkPost Recipient Validation API
    });

Następnie, na odpowiedzi

  • Dodaj email do JSON (aby móc wydrukować email w formacie CSV)

  • Sprawdź, czy reason jest puste, a jeśli tak, wprowadź wartość pustą (po to, aby format CSV był spójny, ponieważ w niektórych przypadkach reason jest podawane w odpowiedzi)

  • Ustaw opcje i klucze dla modułu json2csv.

  • Skonwertuj JSON na CSV i wyprowadź (wykorzystując json2csv)

  • Zapisz postęp w terminalu

  • Ostatecznie, jeśli liczba emaili w pliku = zakończone walidacje, zatrzymaj stoper i wydrukuj wyniki


.then(function (response) {
    response.data.results.email = String(email); 
    // Adds the email as a value/key pair to the response JSON for output
    response.data.results.reason ? null : (response.data.results.reason = ""); 
    // If reason is null, set it to blank so the CSV is uniform
    // Utilizes json-2-csv to convert the JSON to CSV format and output
    let options = {
        prependHeader: false, // Disables JSON values from being added as header rows for every line
        keys: [
            "results.email",
            "results.valid",
            "results.result",
            "results.reason",
            "results.is_role",
            "results.is_disposable",
            "results.is_free",
            "results.delivery_confidence",
        ], // Sets the order of keys
    };
    let json2csvCallback = function (err, csv) {
        if (err) throw err;
        output.write(`${csv}\n`);
    };
    converter.json2csv(response.data, json2csvCallback, options);
    completed++; // Increase the API counter
    process.stdout.write(`Done with ${completed} / ${email_count}\r`); 
    // Output status of Completed / Total to the console without showing new lines
    // If all emails have completed validation
    if (completed == email_count) {
        const stop = window.performance.now(); // Stop the timer
        console.log(
            `All emails successfully validated in ${(stop - start) / 1000} seconds`
        );
    }
});

 

Jedynym problemem, który znalazłem, było to, że podczas gdy to działało świetnie na Macu, napotkałem następujący błąd podczas używania Windows po około 10 000 walidacji:

Error: connect ENOBUFS XX.XX.XXX.XXX:443 – Local (undefined:undefined) with email XXXXXXX@XXXXXXXXXX.XXX

Po przeprowadzeniu dalszych badań, wydaje się, że problem tkwi w zestawie połączeń HTTP klienta NodeJS, który nie ponownie używa połączeń. Znalazłem ten artykuł na Stackoverflow na temat tego problemu, a po dalszym zgłębianiu, znalazłem dobrą domyślną konfigurację dla biblioteki axios, która rozwiązała ten problem. Nadal nie jestem pewien, dlaczego ten problem występuje tylko na Windows i nie na Mac.

Następne Kroki

Dla kogoś, kto szuka prostego szybkiego programu, który przyjmuje plik csv, wykorzystuje API walidacji odbiorców, a następnie zapisuje wynik jako CSV, ten program jest dla Ciebie.

Do programu można dodać następujące elementy:

  • Zbuduj front-end lub łatwiejszy interfejs użytkownika

  • Lepsza obsługa błędów i ponowne próby, ponieważ jeśli z jakiegoś powodu API zgłasza błąd, program obecnie nie ponawia wywołania

  • Rozważ wdrożenie jako serverless Azure Function dla automatycznego skalowania i zmniejszonego zarządzania infrastrukturą


Byłbym również ciekaw, czy można osiągnąć szybsze rezultaty z innym językiem programowania, takim jak Golang lub Erlang/Elixir. Oprócz wyboru języka, ograniczenia infrastruktury mogą również wpływać na wydajność - nauczyliśmy się tego z pierwszej ręki, kiedy napotkaliśmy niemadokumentowane limity DNS w AWS, które wpłynęły na nasze systemy przetwarzania dużej ilości e-maili.

Dla deweloperów zainteresowanych łączeniem przetwarzania API z narzędziami do wizualnego przepływu pracy, sprawdź, jak zintegrować Flow Builder z Google Cloud Functions dla bezkodowych przepływów automatyzacji.

Proszę, nie wahaj się przesłać mi uwag lub sugestii dotyczących rozszerzenia tego projektu.

Inne wiadomości

Czytaj więcej z tej kategorii

A person is standing at a desk while typing on a laptop.

Kompletna, AI-native platforma, która skaluje się wraz z Twoim business.

Produkt

Rozwiązania

Zasoby

Company

Ustawienia prywatności

Już wkrótce

Social

Biuletyn

Bądź na bieżąco z Bird dzięki cotygodniowym aktualizacjom do Twojej skrzynki odbiorczej.

Zarejestruj się

A person is standing at a desk while typing on a laptop.

Kompletna, AI-native platforma, która skaluje się wraz z Twoim business.

Produkt

Rozwiązania

Zasoby

Company

Ustawienia prywatności

Social

Biuletyn

Bądź na bieżąco z Bird dzięki cotygodniowym aktualizacjom do Twojej skrzynki odbiorczej.

Zarejestruj się