Een Bulk Asynchroon Bird Ontvanger Validatie Tool Bouwen

Zachary Samuels

26 mei 2022

E-mail

1 min read

Een Bulk Asynchroon Bird Ontvanger Validatie Tool Bouwen

Voor iemand die op zoek is naar een eenvoudig en snel programma dat een csv-bestand verwerkt, de ontvanger-validatie API aanroept en een CSV oplevert, is dit programma voor jou.

Bij het bouwen van e-mailapplicaties moeten ontwikkelaars vaak meerdere services en API's integreren. Inzicht in de basisprincipes van e-mail-API in cloudinfrastructuur biedt de basis voor het bouwen van robuuste tools zoals het bulkvalidatiesysteem dat we in deze gids zullen maken.

Een van de vragen die we af en toe ontvangen is, hoe kan ik e-maillijsten bulk valideren met ontvanger validatie? Er zijn hier twee opties, één is om een bestand te uploaden via de SparkPost UI voor validatie, en de andere is om individuele oproepen per e-mail naar de API te doen (aangezien de API een enkele e-mail validatie is).

De eerste optie werkt prima, maar heeft een limiet van 20Mb (ongeveer 500.000 adressen). Wat als iemand een e-maillijst heeft met miljoenen adressen? Het kan betekenen dat dat moet worden opgesplitst in duizenden CSV-bestandsuploads.

Aangezien het uploaden van duizenden CSV-bestanden een beetje vergezocht lijkt, nam ik dat gebruiksscenario en begon ik me af te vragen hoe snel ik de API kon laten draaien. In deze blogpost zal ik uitleggen wat ik heb geprobeerd en hoe ik uiteindelijk tot een programma kwam dat ongeveer 100.000 validaties in 55 seconden kon krijgen (terwijl ik in de UI rond de 100.000 validaties in 1 minuut en 10 seconden kreeg). En hoewel dit nog steeds ongeveer 100 uur zou duren om ongeveer 654 miljoen validaties uit te voeren, kan dit script op de achtergrond draaien en aanzienlijk tijd besparen.

De laatste versie van dit programma is te vinden hier.

Mijn eerste fout: het gebruik van Python

Python is een van mijn favoriete programmeertalen. Het blinkt uit in veel gebieden en is ongelooflijk eenvoudig. Echter, een gebied waar het niet in uitblinkt, is gelijktijdige processen. Hoewel Python de mogelijkheid heeft om asynchrone functies uit te voeren, heeft het iets dat bekend staat als The Python Global Interpreter Lock of GIL.

“The Python Global Interpreter Lock of GIL, in eenvoudige woorden, is een mutex (of een slot) dat slechts één thread toestaat om de controle van de Python-interpreter te houden.

Dit betekent dat slechts één thread zich op elk moment in een uitvoeringsstaat kan bevinden. De impact van de GIL is niet zichtbaar voor ontwikkelaars die enkelvoudige threading-programma's uitvoeren, maar het kan een prestatieknelpunt zijn in CPU-gebonden en multithreaded code.

Aangezien de Global Interpreter Lock (GIL) slechts één thread tegelijkertijd toestaat om uit te voeren, zelfs op multi-core systemen, heeft het de reputatie als een "beruchte" eigenschap van Python (zie Real Python’s artikel over de GIL).

In het begin was ik niet op de hoogte van de GIL, dus begon ik in Python te programmeren. Aan het einde, hoewel mijn programma asynchroon was, raakte het geblokkeerd en ongeacht hoeveel threads ik toevoegde, kreeg ik nog steeds slechts ongeveer 12-15 iteraties per seconde.

Het hoofdgedeelte van de asynchrone functie in Python is hieronder te zien:

async def validateRecipients(f, fh, apiKey, snooze, count):
    h = {
        'Authorization': apiKey,
        'Accept': 'application/json'
    }
    with tqdm(total=count) as pbar:
        async with aiohttp.ClientSession() as session:
            for address in f:
                for i in address:
                    thisReq = requests.compat.urljoin(url, i)
                    async with session.get(thisReq, headers=h, ssl=False) as resp:
                        content = await resp.json()
                        row = content['results']
                        row['email'] = i
                        fh.writerow(row)
                        pbar.update(1)

Dus stopte ik met het gebruik van Python en ging terug naar de tekentafel…

Ik koos ervoor om NodeJS te gebruiken vanwege zijn vermogen om non-blocking i/o-operaties extreem goed uit te voeren. Een andere uitstekende optie voor het verwerken van asynchrone API-processen is het bouwen van serverloze webhook-consumers met Azure Functions, die variabele workloads efficiënt kunnen verwerken. Ik ben ook behoorlijk bekend met programmeren in NodeJS.

Door gebruik te maken van de asynchrone aspecten van Node.js, werkte deze aanpak goed. Voor meer details over asynchrone programmering in Node.js, zie RisingStack’s gids voor asynchrone programmering in Node.js.

Mijn tweede fout: proberen het bestand in het geheugen te lezen

Mijn oorspronkelijke idee was als volgt:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Eerst een CSV-lijst met e-mails inlezen. Ten tweede de e-mails in een array laden en controleren of ze in het juiste formaat zijn. Ten derde de ontvanger-validatie-API asynchroon aanroepen. Ten vierde wachten op de resultaten en deze in een variabele laden. En ten slotte deze variabele naar een CSV-bestand uitvoeren.

Dit werkte erg goed voor kleinere bestanden. Het probleem ontstond toen ik 100.000 e-mails probeerde te verwerken. Het programma liep vast bij ongeveer 12.000 validaties. Met de hulp van een van onze front-end ontwikkelaars ontdekte ik dat het probleem lag bij het laden van alle resultaten in een variabele (en dus snel zonder geheugen kwam te zitten). Als je de eerste iteratie van dit programma wilt zien, heb ik deze hier gelinkt: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Eerst een CSV-lijst met e-mails inlezen. Ten tweede het aantal e-mails in het bestand tellen voor rapportagedoeleinden. Ten derde, terwijl elke regel asynchroon wordt gelezen, de ontvanger-validatie-API aanroepen en de resultaten naar een CSV-bestand uitvoeren.

Dus, voor elke ingelezen regel roep ik de API aan en schrijf ik de resultaten asynchroon weg om geen van deze gegevens in het langetermijngeheugen te bewaren. Ik heb ook de e-mailsyntaxcontrole verwijderd na overleg met het ontvanger-validatieteam, aangezien ze me vertelden dat ontvanger-validatie al ingebouwde controles heeft om te controleren of een e-mail geldig is of niet.

Mijn oorspronkelijke idee was als volgt:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Eerst een CSV-lijst met e-mails inlezen. Ten tweede de e-mails in een array laden en controleren of ze in het juiste formaat zijn. Ten derde de ontvanger-validatie-API asynchroon aanroepen. Ten vierde wachten op de resultaten en deze in een variabele laden. En ten slotte deze variabele naar een CSV-bestand uitvoeren.

Dit werkte erg goed voor kleinere bestanden. Het probleem ontstond toen ik 100.000 e-mails probeerde te verwerken. Het programma liep vast bij ongeveer 12.000 validaties. Met de hulp van een van onze front-end ontwikkelaars ontdekte ik dat het probleem lag bij het laden van alle resultaten in een variabele (en dus snel zonder geheugen kwam te zitten). Als je de eerste iteratie van dit programma wilt zien, heb ik deze hier gelinkt: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Eerst een CSV-lijst met e-mails inlezen. Ten tweede het aantal e-mails in het bestand tellen voor rapportagedoeleinden. Ten derde, terwijl elke regel asynchroon wordt gelezen, de ontvanger-validatie-API aanroepen en de resultaten naar een CSV-bestand uitvoeren.

Dus, voor elke ingelezen regel roep ik de API aan en schrijf ik de resultaten asynchroon weg om geen van deze gegevens in het langetermijngeheugen te bewaren. Ik heb ook de e-mailsyntaxcontrole verwijderd na overleg met het ontvanger-validatieteam, aangezien ze me vertelden dat ontvanger-validatie al ingebouwde controles heeft om te controleren of een e-mail geldig is of niet.

Mijn oorspronkelijke idee was als volgt:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Eerst een CSV-lijst met e-mails inlezen. Ten tweede de e-mails in een array laden en controleren of ze in het juiste formaat zijn. Ten derde de ontvanger-validatie-API asynchroon aanroepen. Ten vierde wachten op de resultaten en deze in een variabele laden. En ten slotte deze variabele naar een CSV-bestand uitvoeren.

Dit werkte erg goed voor kleinere bestanden. Het probleem ontstond toen ik 100.000 e-mails probeerde te verwerken. Het programma liep vast bij ongeveer 12.000 validaties. Met de hulp van een van onze front-end ontwikkelaars ontdekte ik dat het probleem lag bij het laden van alle resultaten in een variabele (en dus snel zonder geheugen kwam te zitten). Als je de eerste iteratie van dit programma wilt zien, heb ik deze hier gelinkt: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Eerst een CSV-lijst met e-mails inlezen. Ten tweede het aantal e-mails in het bestand tellen voor rapportagedoeleinden. Ten derde, terwijl elke regel asynchroon wordt gelezen, de ontvanger-validatie-API aanroepen en de resultaten naar een CSV-bestand uitvoeren.

Dus, voor elke ingelezen regel roep ik de API aan en schrijf ik de resultaten asynchroon weg om geen van deze gegevens in het langetermijngeheugen te bewaren. Ik heb ook de e-mailsyntaxcontrole verwijderd na overleg met het ontvanger-validatieteam, aangezien ze me vertelden dat ontvanger-validatie al ingebouwde controles heeft om te controleren of een e-mail geldig is of niet.

Het uiteindelijke code ontleden

Nadat ik de terminalargumenten heb gelezen en gevalideerd, voer ik de volgende code uit. Eerst lees ik het CSV-bestand met e-mails in en tel ik elke regel. Er zijn twee doelstellingen van deze functie, 1) het stelt me in staat om nauwkeurig te rapporteren over de voortgang van het bestand [zoals we later zullen zien], en 2) het stelt me in staat om een timer te stoppen wanneer het aantal e-mails in het bestand gelijk is aan voltooide validaties. Ik heb een timer toegevoegd zodat ik benchmarks kan uitvoeren en ervoor kan zorgen dat ik goede resultaten krijg.

let count = 0; // Line count
require("fs")
    .createReadStream(myArgs[1])
    .on("data", function (chunk) {
        for (let i = 0; i < chunk.length; ++i)
            if (chunk[i] == 10) count++;
    })
    // Reads the infile and increases the count for each line
    .on("close", function () {
        // At the end of the infile, after all lines have been counted, run the recipient validation function
        validateRecipients.validateRecipients(count, myArgs);
    });


 Daarna roep ik de functie validateRecipients aan. Let op, deze functie is asynchroon. Nadat ik heb gevalideerd dat het in- en uitvoerbestand CSV zijn, schrijf ik een kopregel en start ik een programma-timer met behulp van de JSDOM-bibliotheek.

async function validateRecipients(email_count, myArgs) {
    if (
        // If both the infile and outfile are in .csv format
        extname(myArgs[1]).toLowerCase() == ".csv" &&
        extname(myArgs[3]).toLowerCase() == ".csv"
    ) {
        let completed = 0; // Counter for each API call
        email_count++; // Line counter returns #lines - 1, this corrects the number of lines
        // Start a timer
        const { window } = new JSDOM();
        const start = window.performance.now();
        const output = fs.createWriteStream(myArgs[3]); // Outfile
        output.write(
            "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n"
        ); // Write the headers in the outfile
    }
}

Het volgende script is eigenlijk de kern van het programma, dus ik zal het opsplitsen en uitleggen wat er gebeurt. Voor elke regel van het infile:

fs.createReadStream(myArgs[1])
    .pipe(csv.parse({ headers: false }))
    .on("data", async (email) => {
        let url =
            SPARKPOST_HOST +
            "/api/v1/recipient-validation/single/" +
            email;
        await axios
            .get(url, {
                headers: {
                    Authorization: SPARKPOST_API_KEY,
                },
            });
        // For each row read in from the infile, call the SparkPost Recipient Validation API
    });

Dan, bij de respons

  • Voeg de e-mail toe aan de JSON (om de e-mail in de CSV te kunnen afdrukken)

  • Valideer of reason null is, en als dat zo is, vul dan een lege waarde in (dit is zodat het CSV-formaat consistent is, omdat in sommige gevallen reason in de respons wordt gegeven)

  • Stel de opties en sleutels in voor de json2csv-module.

  • Zet de JSON om naar CSV en voer het uit (gebruikmakend van json2csv)

  • Schrijf voortgang in de terminal

  • Ten slotte, als het aantal e-mails in het bestand = voltooide validaties, stop de timer en druk de resultaten af


.then(function (response) {
    response.data.results.email = String(email); 
    // Adds the email as a value/key pair to the response JSON for output
    response.data.results.reason ? null : (response.data.results.reason = ""); 
    // If reason is null, set it to blank so the CSV is uniform
    // Utilizes json-2-csv to convert the JSON to CSV format and output
    let options = {
        prependHeader: false, // Disables JSON values from being added as header rows for every line
        keys: [
            "results.email",
            "results.valid",
            "results.result",
            "results.reason",
            "results.is_role",
            "results.is_disposable",
            "results.is_free",
            "results.delivery_confidence",
        ], // Sets the order of keys
    };
    let json2csvCallback = function (err, csv) {
        if (err) throw err;
        output.write(`${csv}\n`);
    };
    converter.json2csv(response.data, json2csvCallback, options);
    completed++; // Increase the API counter
    process.stdout.write(`Done with ${completed} / ${email_count}\r`); 
    // Output status of Completed / Total to the console without showing new lines
    // If all emails have completed validation
    if (completed == email_count) {
        const stop = window.performance.now(); // Stop the timer
        console.log(
            `All emails successfully validated in ${(stop - start) / 1000} seconds`
        );
    }
});

 

Een laatste probleem dat ik ontdekte was dat hoewel dit prima werkte op Mac, ik de volgende fout tegenkwam bij het gebruik van Windows na ongeveer 10.000 validaties:

Fout: connect ENOBUFS XX.XX.XXX.XXX:443 – Lokaal (undefined:undefined) met e-mail XXXXXXX@XXXXXXXXXX.XXX

Na verder onderzoek blijkt het een probleem te zijn met de NodeJS HTTP-client connectiepool die geen verbindingen hergebruikt. Ik vond dit Stackoverflow-artikel over het probleem, en na verder graven, vond ik een goede standaardconfiguratie voor de axios-bibliotheek die dit probleem oploste. Ik weet nog steeds niet zeker waarom dit probleem alleen op Windows en niet op Mac optreedt.

Volgende Stappen

Voor iemand die op zoek is naar een eenvoudig snel programma dat een csv als invoer neemt, de ontvangersvalidatie-API aanroept en een CSV uitvoert, is dit programma voor jou.

Enkele aanvullingen op dit programma zouden het volgende zijn:

  • Bouw een front-end of eenvoudigere UI voor gebruik

  • Beter fout- en retrybeheer omdat het programma momenteel de oproep niet opnieuw uitvoert als de API om een of andere reden een fout geeft

  • Overweeg implementatie als een serverless Azure Function voor automatische schaalvergroting en verminderd infrastructuurbeheer


Ik ben ook benieuwd of snellere resultaten zouden kunnen worden bereikt met een andere taal, zoals Golang of Erlang/Elixir. Buiten de taalkeuze kunnen infrastructuurbeperkingen ook de prestaties beïnvloeden - dit hebben we uit de eerste hand geleerd toen we ongedocumenteerde DNS-beperkingen in AWS tegenkwamen die onze systemen voor het verwerken van e-mails met een hoog volume beïnvloedden.

Voor ontwikkelaars die geïnteresseerd zijn in het combineren van API-verwerking met visuele workflowtools, bekijk hoe u Flow Builder kunt integreren met Google Cloud Functions voor no-code automatiseringsworkflows.

Voel je vrij om mij feedback of suggesties te geven voor het uitbreiden van dit project.

Andere nieuws

Lees meer uit deze categorie

A person is standing at a desk while typing on a laptop.

Het complete AI-native platform dat met uw bedrijf meegroeit.

A person is standing at a desk while typing on a laptop.

Het complete AI-native platform dat met uw bedrijf meegroeit.