Product

Soluzioni

Risorse

Company

Product

Soluzioni

Risorse

Company

Costruzione di uno strumento di convalida per destinatari di uccelli in blocco e in modo asincrono

Zachary Samuels

26 mag 2022

Email

1 min read

Costruzione di uno strumento di convalida per destinatari di uccelli in blocco e in modo asincrono

Zachary Samuels

26 mag 2022

Email

1 min read

Costruzione di uno strumento di convalida per destinatari di uccelli in blocco e in modo asincrono

Per chi cerca un programma semplice e veloce che prenda un CSV, chiami l'API di convalida del destinatario e produca un CSV, questo programma fa per te.

Quando si costruiscono applicazioni email, gli sviluppatori spesso devono integrare più servizi e API. Comprendere i fondamenti dell'email API nell'infrastruttura cloud fornisce la base per costruire strumenti robusti come il sistema di validazione in massa che creeremo in questa guida.

Una delle domande che riceviamo occasionalmente è, come posso validare in massa liste di email con la validazione dei destinatari? Ci sono due opzioni qui, una è caricare un file tramite l'interfaccia utente SparkPost per la validazione, e l'altra è effettuare chiamate individuali per email all'API (dato che l'API è per la validazione di singole email).

La prima opzione funziona alla grande ma ha un limite di 20 Mb (circa 500.000 indirizzi). E se qualcuno avesse una lista di email contenente milioni di indirizzi? Potrebbe significare suddividere il tutto in migliaia di caricamenti di file CSV.

Poiché caricare migliaia di file CSV sembra un po' inverosimile, ho preso questo caso d'uso e ho cominciato a chiedermi quanto velocemente potessi far funzionare l'API. In questo post del blog, spiegherò cosa ho provato e come sono arrivato infine a un programma che potrebbe ottenere circa 100.000 validazioni in 55 secondi (mentre nell'interfaccia utente ho ottenuto circa 100.000 validazioni in 1 minuto e 10 secondi). E mentre questo richiederebbe ancora circa 100 ore per completare circa 654 milioni di validazioni, questo script può essere eseguito in background risparmiando tempo considerevole.

La versione finale di questo programma può essere trovata qui.

Il mio primo errore: usare Python

Python è uno dei miei linguaggi di programmazione preferiti. Eccelle in molte aree ed è incredibilmente semplice. Tuttavia, un'area in cui non eccelle è nei processi concorrenti. Sebbene Python abbia la capacità di eseguire funzioni asincrone, ha ciò che è noto come The Python Global Interpreter Lock o GIL.

“The Python Global Interpreter Lock o GIL, in parole semplici, è un mutex (o un blocco) che consente a un solo thread di tenere il controllo dell'interprete Python.

Ciò significa che solo un thread può essere in uno stato di esecuzione in qualsiasi momento. L'impatto del GIL non è visibile agli sviluppatori che eseguono programmi single-threaded, ma può essere un collo di bottiglia delle prestazioni nel codice vincolato dalla CPU e multi-thread.

Poiché il Global Interpreter Lock (GIL) consente a un solo thread di essere eseguito alla volta, anche su sistemi multi-core, ha guadagnato una reputazione come una caratteristica “infame” di Python (vedi l'articolo di Real Python sul GIL).

All'inizio non ero a conoscenza del GIL, quindi ho iniziato a programmare in Python. Alla fine, anche se il mio programma era asincrono, si bloccava, e indipendentemente da quanti thread aggiungevo, ottenevo comunque solo circa 12-15 iterazioni al secondo.

La parte principale della funzione asincrona in Python può essere vista qui sotto:

async def validateRecipients(f, fh, apiKey, snooze, count):
    h = {
        'Authorization': apiKey,
        'Accept': 'application/json'
    }
    with tqdm(total=count) as pbar:
        async with aiohttp.ClientSession() as session:
            for address in f:
                for i in address:
                    thisReq = requests.compat.urljoin(url, i)
                    async with session.get(thisReq, headers=h, ssl=False) as resp:
                        content = await resp.json()
                        row = content['results']
                        row['email'] = i
                        fh.writerow(row)
                        pbar.update(1)

Così ho abbandonato l'uso di Python e sono tornato al tavolo da disegno...

Ho deciso di utilizzare NodeJS per la sua capacità di eseguire operazioni di I/O non bloccanti estremamente bene. Un'altra eccellente opzione per gestire l'elaborazione asincrona delle API è la costruzione di consumatori di webhook serverless con Azure Functions, che possono gestire efficacemente carichi di lavoro variabili. Inoltre, sono piuttosto familiare con la programmazione in NodeJS.

Utilizzando aspetti asincroni di Node.js, questo approccio ha funzionato bene. Per ulteriori dettagli sulla programmazione asincrona in Node.js, vedi la guida di RisingStack alla programmazione asincrona in Node.js.

Il mio secondo errore: cercare di leggere il file nella memoria

La mia idea iniziale era la seguente:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Per prima cosa, ingerire un elenco di email in formato CSV. In secondo luogo, caricare le email in un array e verificare che siano nel formato corretto. In terzo luogo, chiamare in modo asincrono l'API di validazione del destinatario. In quarto luogo, attendere i risultati e caricarli in una variabile. E infine, esportare questa variabile in un file CSV.

Questo ha funzionato molto bene per file più piccoli. Il problema è sorto quando ho cercato di elaborare 100.000 email. Il programma si è bloccato intorno alle 12.000 validazioni. Con l'aiuto di uno dei nostri sviluppatori front-end, ho visto che il problema era nel caricare tutti i risultati in una variabile (esaurendo quindi rapidamente la memoria). Se desideri vedere la prima iterazione di questo programma, l'ho collegata qui: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Per prima cosa, ingerire un elenco di email in formato CSV. In secondo luogo, contare il numero di email nel file per scopi di reporting. In terzo luogo, man mano che ogni riga viene letta in modo asincrono, chiamare l'API di validazione del destinatario e esportare i risultati in un file CSV.

Quindi, per ogni riga letta, chiamo l'API e scrivo i risultati in modo asincrono per non mantenere nessuno di questi dati nella memoria a lungo termine. Ho anche rimosso il controllo della sintassi delle email dopo aver parlato con il team di validazione del destinatario, poiché mi hanno informato che la validazione del destinatario ha già controlli incorporati per verificare se un'email è valida o meno.

La mia idea iniziale era la seguente:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Per prima cosa, ingerire un elenco di email in formato CSV. In secondo luogo, caricare le email in un array e verificare che siano nel formato corretto. In terzo luogo, chiamare in modo asincrono l'API di validazione del destinatario. In quarto luogo, attendere i risultati e caricarli in una variabile. E infine, esportare questa variabile in un file CSV.

Questo ha funzionato molto bene per file più piccoli. Il problema è sorto quando ho cercato di elaborare 100.000 email. Il programma si è bloccato intorno alle 12.000 validazioni. Con l'aiuto di uno dei nostri sviluppatori front-end, ho visto che il problema era nel caricare tutti i risultati in una variabile (esaurendo quindi rapidamente la memoria). Se desideri vedere la prima iterazione di questo programma, l'ho collegata qui: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Per prima cosa, ingerire un elenco di email in formato CSV. In secondo luogo, contare il numero di email nel file per scopi di reporting. In terzo luogo, man mano che ogni riga viene letta in modo asincrono, chiamare l'API di validazione del destinatario e esportare i risultati in un file CSV.

Quindi, per ogni riga letta, chiamo l'API e scrivo i risultati in modo asincrono per non mantenere nessuno di questi dati nella memoria a lungo termine. Ho anche rimosso il controllo della sintassi delle email dopo aver parlato con il team di validazione del destinatario, poiché mi hanno informato che la validazione del destinatario ha già controlli incorporati per verificare se un'email è valida o meno.

La mia idea iniziale era la seguente:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Per prima cosa, ingerire un elenco di email in formato CSV. In secondo luogo, caricare le email in un array e verificare che siano nel formato corretto. In terzo luogo, chiamare in modo asincrono l'API di validazione del destinatario. In quarto luogo, attendere i risultati e caricarli in una variabile. E infine, esportare questa variabile in un file CSV.

Questo ha funzionato molto bene per file più piccoli. Il problema è sorto quando ho cercato di elaborare 100.000 email. Il programma si è bloccato intorno alle 12.000 validazioni. Con l'aiuto di uno dei nostri sviluppatori front-end, ho visto che il problema era nel caricare tutti i risultati in una variabile (esaurendo quindi rapidamente la memoria). Se desideri vedere la prima iterazione di questo programma, l'ho collegata qui: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Per prima cosa, ingerire un elenco di email in formato CSV. In secondo luogo, contare il numero di email nel file per scopi di reporting. In terzo luogo, man mano che ogni riga viene letta in modo asincrono, chiamare l'API di validazione del destinatario e esportare i risultati in un file CSV.

Quindi, per ogni riga letta, chiamo l'API e scrivo i risultati in modo asincrono per non mantenere nessuno di questi dati nella memoria a lungo termine. Ho anche rimosso il controllo della sintassi delle email dopo aver parlato con il team di validazione del destinatario, poiché mi hanno informato che la validazione del destinatario ha già controlli incorporati per verificare se un'email è valida o meno.

Scomposizione del codice finale

Dopo aver letto e validato gli argomenti del terminale, eseguo il seguente codice. Innanzitutto, leggo il file CSV delle email e conto ogni riga. Ci sono due scopi per questa funzione: 1) mi permette di riportare accuratamente il progresso del file [come vedremo più avanti], e 2) mi permette di fermare un timer quando il numero di email nel file è uguale alle validazioni completate. Ho aggiunto un timer in modo da poter eseguire benchmark e garantire buoni risultati.

let count = 0; // Line count
require("fs")
    .createReadStream(myArgs[1])
    .on("data", function (chunk) {
        for (let i = 0; i < chunk.length; ++i)
            if (chunk[i] == 10) count++;
    })
    // Reads the infile and increases the count for each line
    .on("close", function () {
        // At the end of the infile, after all lines have been counted, run the recipient validation function
        validateRecipients.validateRecipients(count, myArgs);
    });


 Chiamo quindi la funzione validateRecipients. Nota che questa funzione è asincrona. Dopo aver validato che infile e outfile siano CSV, scrivo una riga di intestazione e avvio un timer del programma utilizzando la libreria JSDOM.

async function validateRecipients(email_count, myArgs) {
    if (
        // If both the infile and outfile are in .csv format
        extname(myArgs[1]).toLowerCase() == ".csv" &&
        extname(myArgs[3]).toLowerCase() == ".csv"
    ) {
        let completed = 0; // Counter for each API call
        email_count++; // Line counter returns #lines - 1, this corrects the number of lines
        // Start a timer
        const { window } = new JSDOM();
        const start = window.performance.now();
        const output = fs.createWriteStream(myArgs[3]); // Outfile
        output.write(
            "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n"
        ); // Write the headers in the outfile
    }
}

Lo script seguente è davvero la parte principale del programma, quindi lo dividerò e spiegherò cosa sta accadendo. Per ogni riga di infile:

fs.createReadStream(myArgs[1])
    .pipe(csv.parse({ headers: false }))
    .on("data", async (email) => {
        let url =
            SPARKPOST_HOST +
            "/api/v1/recipient-validation/single/" +
            email;
        await axios
            .get(url, {
                headers: {
                    Authorization: SPARKPOST_API_KEY,
                },
            });
        // For each row read in from the infile, call the SparkPost Recipient Validation API
    });

Quindi, alla risposta

  • Aggiungere l'email al JSON (per poter stampare l'email nel CSV)

  • Validare se il motivo è nullo e, in tal caso, popolare un valore vuoto (questo è per mantenere il formato CSV coerente, poiché in alcuni casi il motivo è fornito nella risposta)

  • Impostare le opzioni e le chiavi per il modulo json2csv.

  • Convertire il JSON in CSV e eseguire l'output (utilizzando json2csv)

  • Scrivi il progresso nel terminale

  • Infine, se il numero di email nel file = validazioni completate, fermare il timer e stampare i risultati


.then(function (response) {
    response.data.results.email = String(email); 
    // Adds the email as a value/key pair to the response JSON for output
    response.data.results.reason ? null : (response.data.results.reason = ""); 
    // If reason is null, set it to blank so the CSV is uniform
    // Utilizes json-2-csv to convert the JSON to CSV format and output
    let options = {
        prependHeader: false, // Disables JSON values from being added as header rows for every line
        keys: [
            "results.email",
            "results.valid",
            "results.result",
            "results.reason",
            "results.is_role",
            "results.is_disposable",
            "results.is_free",
            "results.delivery_confidence",
        ], // Sets the order of keys
    };
    let json2csvCallback = function (err, csv) {
        if (err) throw err;
        output.write(`${csv}\n`);
    };
    converter.json2csv(response.data, json2csvCallback, options);
    completed++; // Increase the API counter
    process.stdout.write(`Done with ${completed} / ${email_count}\r`); 
    // Output status of Completed / Total to the console without showing new lines
    // If all emails have completed validation
    if (completed == email_count) {
        const stop = window.performance.now(); // Stop the timer
        console.log(
            `All emails successfully validated in ${(stop - start) / 1000} seconds`
        );
    }
});

 

Un ultimo problema che ho trovato è che mentre questo funzionava bene su Mac, ho riscontrato il seguente errore utilizzando Windows dopo circa 10.000 validazioni:

Errore: connect ENOBUFS XX.XX.XXX.XXX:443 – Local (undefined:undefined) con email XXXXXXX@XXXXXXXXXX.XXX

Dopo aver effettuato ulteriori ricerche, sembra essere un problema con il pool di connessioni del client HTTP NodeJS che non riutilizza le connessioni. Ho trovato questo Stackoverflow article sul problema e, dopo ulteriori scavi, ho trovato una buona configurazione di default per la libreria axios che ha risolto questo problema. Non sono ancora certo del motivo per cui questo problema si verifica solo su Windows e non su Mac.

Passi successivi

Per chi cerca un programma semplice e veloce che accetta un CSV, chiama l'API di validazione del destinatario e produce un CSV, questo programma fa per te.

Alcune aggiunte a questo programma potrebbero essere le seguenti:

  • Costruire un front-end o un'interfaccia utente più semplice da usare

  • Migliorare la gestione degli errori e dei tentativi perché se per qualche motivo l'API genera un errore, attualmente il programma non ritenta la chiamata

  • Considera l'implementazione come una funzione serverless di Azure per scalabilità automatica e gestione ridotta dell'infrastruttura


Sarei anche curioso di vedere se risultati più rapidi potrebbero essere ottenuti con un altro linguaggio come Golang o Erlang/Elixir. Oltre alla scelta del linguaggio, le limitazioni dell'infrastruttura possono anche influire sulle prestazioni - lo abbiamo appreso in prima persona quando abbiamo incontrato i limiti DNS non documentati in AWS che hanno influenzato i nostri sistemi di elaborazione email ad alto volume.

Per gli sviluppatori interessati a combinare l'elaborazione delle API con strumenti di flusso di lavoro visivi, scopri come integrare Flow Builder con Google Cloud Functions per flussi di lavoro di automazione senza codice.

Non esitate a fornirmi qualsiasi feedback o suggerimento per ampliare questo progetto.

A person is standing at a desk while typing on a laptop.

La piattaforma AI-native completa che scala con il tuo business.

Product

Soluzioni

Risorse

Company

In arrivo

Sociale

Newsletter

Rimani aggiornato con Bird attraverso aggiornamenti settimanali nella tua inbox.

Registrati

© 2025 Bird

A person is standing at a desk while typing on a laptop.

La piattaforma AI-native completa che scala con il tuo business.

Product

Soluzioni

Risorse

Company

Sociale

Newsletter

Rimani aggiornato con Bird attraverso aggiornamenti settimanali nella tua inbox.

Registrati

© 2025 Bird