Product

Lösungen

Ressourcen

Company

Product

Lösungen

Ressourcen

Company

Entwicklung eines Bulk-Asynchronen Vogel-Empfänger-Validierungstools

Zachary Samuels

26.05.2022

E-Mail

1 min read

Entwicklung eines Bulk-Asynchronen Vogel-Empfänger-Validierungstools

Für jemanden, der nach einem einfachen, schnellen Programm sucht, das eine CSV-Datei einliest, die Empfänger-Validierungs-API aufruft und eine CSV-Datei ausgibt, ist dieses Programm genau das Richtige für Sie.

Beim Erstellen von E-Mail-Anwendungen müssen Entwickler oft mehrere Dienste und APIs integrieren. Das Verständnis von E-Mail-API-Grundlagen in der Cloud-Infrastruktur bietet die Grundlage für den Aufbau robuster Werkzeuge wie das Massenvalidierungssystem, das wir in diesem Leitfaden erstellen werden.

Eine der Fragen, die wir gelegentlich erhalten, ist: Wie kann ich E-Mail-Listen mit Empfängervalidierung in großen Mengen validieren? Es gibt hier zwei Möglichkeiten: Eine besteht darin, eine Datei über die SparkPost-Benutzeroberfläche zur Validierung hochzuladen, die andere besteht darin, einzelne Anfragen pro E-Mail an die API zu richten (da die API eine Einzel-E-Mail-Validierung ist).

Die erste Option funktioniert großartig, hat jedoch eine Begrenzung von 20 MB (etwa 500.000 Adressen). Was ist, wenn jemand eine E-Mail-Liste mit Millionen von Adressen hat? Es könnte bedeuten, dass man diese in Tausende von CSV-Datei-Uploads aufteilen muss.

Da das Hochladen von Tausenden von CSV-Dateien ein wenig weit hergeholt erscheint, habe ich diesen Anwendungsfall genommen und begann mich zu fragen, wie schnell ich die API zum Laufen bringen könnte. In diesem Blogbeitrag werde ich erklären, was ich ausprobiert habe und wie ich schließlich zu einem Programm gekommen bin, das ungefähr 100.000 Validierungen in 55 Sekunden durchführen konnte (während ich in der Benutzeroberfläche etwa 100.000 Validierungen in 1 Minute 10 Sekunden erreicht habe). Und obwohl es immer noch etwa 100 Stunden dauern würde, um etwa 654 Millionen Validierungen abzuschließen, kann dieses Skript im Hintergrund laufen und erheblich Zeit sparen.

Die endgültige Version dieses Programms finden Sie hier.

Mein erster Fehler: die Verwendung von Python

Python ist eine meiner Lieblingsprogrammiersprachen. Es glänzt in vielen Bereichen und ist unglaublich unkompliziert. In einem Bereich glänzt es jedoch nicht: bei parallelen Prozessen. Während Python die Fähigkeit hat, asynchrone Funktionen auszuführen, hat es das, was als der Python Global Interpreter Lock oder GIL bekannt ist.

„Der Python Global Interpreter Lock oder GIL, einfach ausgedrückt, ist ein Mutex (oder eine Sperre), die nur einem Thread erlaubt, die Kontrolle über den Python-Interpreter zu halten.

Das bedeutet, dass zu jedem Zeitpunkt nur ein Thread im Zustand der Ausführung sein kann. Die Auswirkung des GIL ist für Entwickler, die Programme mit einem einzigen Thread ausführen, nicht sichtbar, kann jedoch in CPU-intensivem und multi-threaded Codes eine Leistungsbremse darstellen.

Da der Global Interpreter Lock (GIL) nur einem Thread die Ausführung gleichzeitig erlaubt, selbst auf Multi-Core-Systemen, hat es den Ruf erlangt, ein „berüchtigtes“ Merkmal von Python zu sein (siehe Real Python’s Artikel über das GIL).

Zuerst war mir das GIL nicht bewusst, also begann ich mit dem Programmieren in Python. Am Ende, obwohl mein Programm asynchron war, wurde es blockiert, und egal wie viele Threads ich hinzufügte, ich erreichte trotzdem nur etwa 12-15 Iterationen pro Sekunde.

Der Hauptteil der asynchronen Funktion in Python ist unten zu sehen:

async def validateRecipients(f, fh, apiKey, snooze, count):
    h = {
        'Authorization': apiKey,
        'Accept': 'application/json'
    }
    with tqdm(total=count) as pbar:
        async with aiohttp.ClientSession() as session:
            for address in f:
                for i in address:
                    thisReq = requests.compat.urljoin(url, i)
                    async with session.get(thisReq, headers=h, ssl=False) as resp:
                        content = await resp.json()
                        row = content['results']
                        row['email'] = i
                        fh.writerow(row)
                        pbar.update(1)

Also habe ich den Einsatz von Python verworfen und bin zurück ans Reißbrett…

Ich entschied mich dafür, NodeJS zu nutzen, da es nicht-blockierende I/O-Operationen extrem gut ausführen kann. Eine weitere ausgezeichnete Option zur Handhabung von asynchronem API-Processing ist der Bau von Serverless-Webhook-Konsumenten mit Azure Functions, die variable Arbeitslasten effizient verarbeiten können. Außerdem bin ich ziemlich vertraut mit dem Programmieren in NodeJS.

Unter Nutzung der asynchronen Aspekte von Node.js funktionierte dieser Ansatz gut. Für weitere Details über asynchrones Programmieren in Node.js, siehe RisingStack’s Leitfaden für asynchrones Programmieren in Node.js.

Mein zweiter Fehler: der Versuch, die Datei in den Speicher zu lesen

Meine anfängliche Idee war wie folgt:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Zuerst eine CSV-Liste von E-Mails einlesen. Zweitens, laden Sie die E-Mails in ein Array und prüfen Sie, ob sie im richtigen Format sind. Drittens, asynchron die Empfängerüberprüfungs-API aufrufen. Viertens, auf die Ergebnisse warten und sie in eine Variable laden. Und schließlich, diese Variable in eine CSV-Datei ausgeben.

Dies funktionierte sehr gut für kleinere Dateien. Das Problem trat auf, als ich versuchte, 100.000 E-Mails durchzuführen. Das Programm blieb bei etwa 12.000 Validierungen hängen. Mit Hilfe eines unserer Frontend-Entwickler sah ich, dass das Problem darin bestand, alle Ergebnisse in eine Variable zu laden (und daher schnell den Speicher zu erschöpfen). Wenn Sie die erste Version dieses Programms sehen möchten, habe ich sie hier verlinkt: Version 1 (NICHT EMPFOHLEN).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Zuerst eine CSV-Liste von E-Mails einlesen. Zweitens, zählen Sie die Anzahl der E-Mails in der Datei zu Berichtszwecken. Drittens, während jede Zeile asynchron gelesen wird, die Empfängerüberprüfungs-API aufrufen und die Ergebnisse in eine CSV-Datei ausgeben.

So rufe ich für jede gelesene Zeile die API auf und schreibe die Ergebnisse asynchron aus, um diese Daten nicht im Langzeitspeicher zu behalten. Ich habe auch die Syntaxüberprüfung der E-Mail entfernt, nachdem ich mit dem Empfängerüberprüfungsteam gesprochen habe, da sie mir mitteilten, dass die Empfängerüberprüfung bereits über Prüfungen verfügt, um zu überprüfen, ob eine E-Mail gültig ist oder nicht.

Meine anfängliche Idee war wie folgt:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Zuerst eine CSV-Liste von E-Mails einlesen. Zweitens, laden Sie die E-Mails in ein Array und prüfen Sie, ob sie im richtigen Format sind. Drittens, asynchron die Empfängerüberprüfungs-API aufrufen. Viertens, auf die Ergebnisse warten und sie in eine Variable laden. Und schließlich, diese Variable in eine CSV-Datei ausgeben.

Dies funktionierte sehr gut für kleinere Dateien. Das Problem trat auf, als ich versuchte, 100.000 E-Mails durchzuführen. Das Programm blieb bei etwa 12.000 Validierungen hängen. Mit Hilfe eines unserer Frontend-Entwickler sah ich, dass das Problem darin bestand, alle Ergebnisse in eine Variable zu laden (und daher schnell den Speicher zu erschöpfen). Wenn Sie die erste Version dieses Programms sehen möchten, habe ich sie hier verlinkt: Version 1 (NICHT EMPFOHLEN).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Zuerst eine CSV-Liste von E-Mails einlesen. Zweitens, zählen Sie die Anzahl der E-Mails in der Datei zu Berichtszwecken. Drittens, während jede Zeile asynchron gelesen wird, die Empfängerüberprüfungs-API aufrufen und die Ergebnisse in eine CSV-Datei ausgeben.

So rufe ich für jede gelesene Zeile die API auf und schreibe die Ergebnisse asynchron aus, um diese Daten nicht im Langzeitspeicher zu behalten. Ich habe auch die Syntaxüberprüfung der E-Mail entfernt, nachdem ich mit dem Empfängerüberprüfungsteam gesprochen habe, da sie mir mitteilten, dass die Empfängerüberprüfung bereits über Prüfungen verfügt, um zu überprüfen, ob eine E-Mail gültig ist oder nicht.

Meine anfängliche Idee war wie folgt:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Zuerst eine CSV-Liste von E-Mails einlesen. Zweitens, laden Sie die E-Mails in ein Array und prüfen Sie, ob sie im richtigen Format sind. Drittens, asynchron die Empfängerüberprüfungs-API aufrufen. Viertens, auf die Ergebnisse warten und sie in eine Variable laden. Und schließlich, diese Variable in eine CSV-Datei ausgeben.

Dies funktionierte sehr gut für kleinere Dateien. Das Problem trat auf, als ich versuchte, 100.000 E-Mails durchzuführen. Das Programm blieb bei etwa 12.000 Validierungen hängen. Mit Hilfe eines unserer Frontend-Entwickler sah ich, dass das Problem darin bestand, alle Ergebnisse in eine Variable zu laden (und daher schnell den Speicher zu erschöpfen). Wenn Sie die erste Version dieses Programms sehen möchten, habe ich sie hier verlinkt: Version 1 (NICHT EMPFOHLEN).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Zuerst eine CSV-Liste von E-Mails einlesen. Zweitens, zählen Sie die Anzahl der E-Mails in der Datei zu Berichtszwecken. Drittens, während jede Zeile asynchron gelesen wird, die Empfängerüberprüfungs-API aufrufen und die Ergebnisse in eine CSV-Datei ausgeben.

So rufe ich für jede gelesene Zeile die API auf und schreibe die Ergebnisse asynchron aus, um diese Daten nicht im Langzeitspeicher zu behalten. Ich habe auch die Syntaxüberprüfung der E-Mail entfernt, nachdem ich mit dem Empfängerüberprüfungsteam gesprochen habe, da sie mir mitteilten, dass die Empfängerüberprüfung bereits über Prüfungen verfügt, um zu überprüfen, ob eine E-Mail gültig ist oder nicht.

Aufschlüsselung des finalen Codes

Nach dem Einlesen und Überprüfen der Terminalargumente führe ich den folgenden Code aus. Zuerst lese ich die CSV-Datei der E-Mails ein und zähle jede Zeile. Diese Funktion hat zwei Zwecke: 1) Sie ermöglicht es mir, über den Fortschritt der Datei genau zu berichten [wie wir später sehen werden], und 2) es ermöglicht mir, einen Timer zu stoppen, wenn die Anzahl der E-Mails in der Datei der abgeschlossenen Validierungen entspricht. Ich habe einen Timer hinzugefügt, damit ich Benchmarks durchführen und sicherstellen kann, dass ich gute Ergebnisse erziele.

let count = 0; // Line count
require("fs")
    .createReadStream(myArgs[1])
    .on("data", function (chunk) {
        for (let i = 0; i < chunk.length; ++i)
            if (chunk[i] == 10) count++;
    })
    // Reads the infile and increases the count for each line
    .on("close", function () {
        // At the end of the infile, after all lines have been counted, run the recipient validation function
        validateRecipients.validateRecipients(count, myArgs);
    });


 Dann rufe ich die Funktion validateRecipients auf. Beachten Sie, dass diese Funktion asynchron ist. Nachdem ich überprüft habe, dass die infile und outfile CSV-Dateien sind, schreibe ich eine Kopfzeile und starte einen Programm-Timer mit der JSDOM-Bibliothek.

async function validateRecipients(email_count, myArgs) {
    if (
        // If both the infile and outfile are in .csv format
        extname(myArgs[1]).toLowerCase() == ".csv" &&
        extname(myArgs[3]).toLowerCase() == ".csv"
    ) {
        let completed = 0; // Counter for each API call
        email_count++; // Line counter returns #lines - 1, this corrects the number of lines
        // Start a timer
        const { window } = new JSDOM();
        const start = window.performance.now();
        const output = fs.createWriteStream(myArgs[3]); // Outfile
        output.write(
            "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n"
        ); // Write the headers in the outfile
    }
}

Das folgende Skript ist wirklich der Hauptteil des Programms, daher werde ich es aufbrechen und erklären, was passiert. Für jede Zeile der infile:

fs.createReadStream(myArgs[1])
    .pipe(csv.parse({ headers: false }))
    .on("data", async (email) => {
        let url =
            SPARKPOST_HOST +
            "/api/v1/recipient-validation/single/" +
            email;
        await axios
            .get(url, {
                headers: {
                    Authorization: SPARKPOST_API_KEY,
                },
            });
        // For each row read in from the infile, call the SparkPost Recipient Validation API
    });

Dann, bei der Antwort

  • Die E-Mail zum JSON hinzufügen (um die E-Mail in der CSV ausgeben zu können)

  • Wenn der Grund null ist, prüfen, und falls ja, einen leeren Wert einfügen (dies dient dazu, das CSV-Format konsistent zu halten, da in einigen Fällen ein Grund in der Antwort angegeben wird)

  • Die Optionen und Schlüssel für das json2csv-Modul festlegen.

  • Das JSON in CSV konvertieren und ausgeben (mithilfe von json2csv)

  • Fortschritt im Terminal schreiben

  • Schließlich, wenn die Anzahl der E-Mails in der Datei = abgeschlossenen Validierungen ist, den Timer stoppen und die Ergebnisse drucken


.then(function (response) {
    response.data.results.email = String(email); 
    // Adds the email as a value/key pair to the response JSON for output
    response.data.results.reason ? null : (response.data.results.reason = ""); 
    // If reason is null, set it to blank so the CSV is uniform
    // Utilizes json-2-csv to convert the JSON to CSV format and output
    let options = {
        prependHeader: false, // Disables JSON values from being added as header rows for every line
        keys: [
            "results.email",
            "results.valid",
            "results.result",
            "results.reason",
            "results.is_role",
            "results.is_disposable",
            "results.is_free",
            "results.delivery_confidence",
        ], // Sets the order of keys
    };
    let json2csvCallback = function (err, csv) {
        if (err) throw err;
        output.write(`${csv}\n`);
    };
    converter.json2csv(response.data, json2csvCallback, options);
    completed++; // Increase the API counter
    process.stdout.write(`Done with ${completed} / ${email_count}\r`); 
    // Output status of Completed / Total to the console without showing new lines
    // If all emails have completed validation
    if (completed == email_count) {
        const stop = window.performance.now(); // Stop the timer
        console.log(
            `All emails successfully validated in ${(stop - start) / 1000} seconds`
        );
    }
});

 

Ein letztes Problem, das ich fand, war, dass dies auf Mac hervorragend funktionierte, ich jedoch nach etwa 10.000 Validierungen auf Windows auf den folgenden Fehler stieß:

Fehler: connect ENOBUFS XX.XX.XXX.XXX:443 – Lokale (undefined:undefined) mit E-Mail XXXXXXX@XXXXXXXXXX.XXX

Nach weitergehenden Recherchen scheint es ein Problem mit dem NodeJS-HTTP-Client-Verbindungspool zu sein, der Verbindungen nicht wiederverwendet. Ich fand diesen Stackoverflow-Artikel zu dem Problem und nach weiterer Recherche eine gute Standardkonfiguration für die axios-Bibliothek, die dieses Problem gelöst hat. Ich bin mir immer noch nicht sicher, warum dieses Problem nur unter Windows und nicht auf Mac auftritt.

Nächste Schritte

Für jemanden, der nach einem einfachen, schnellen Programm sucht, das eine CSV einliest, die Empfänger-Validation API aufruft und eine CSV ausgibt, ist dieses Programm genau das Richtige.

Einige Ergänzungen zu diesem Programm wären folgende:

  • Erstellen Sie eine Benutzeroberfläche oder ein Frontend für die einfachere Nutzung

  • Bessere Fehlerbehandlung und erneute Versuche, da das Programm derzeit den Aufruf nicht wiederholt, falls die API aus irgendeinem Grund einen Fehler ausgibt

  • Erwägen Sie die Implementierung als eine serverlose Azure-Funktion für automatisches Skalieren und reduzierte Infrastrukturverwaltung


Ich wäre auch neugierig zu sehen, ob schnellere Ergebnisse mit einer anderen Sprache wie Golang oder Erlang/Elixir erzielt werden könnten. Neben der Sprachwahl können auch Infrastrukturbeschränkungen die Leistung beeinflussen - das haben wir aus erster Hand erfahren, als wir auf undokumentierte DNS-Limits in AWS stießen, die unsere hochvolumigen E-Mail-Verarbeitungssysteme beeinträchtigten.

Für Entwickler, die daran interessiert sind, API-Verarbeitung mit visuellen Workflow-Tools zu kombinieren, sehen Sie sich an, wie Sie Flow Builder mit Google Cloud Functions integrieren können, um No-Code-Automationsworkflows zu realisieren.

Bitte zögern Sie nicht, mir Feedback oder Vorschläge zur Erweiterung dieses Projekts zu geben.

Andere Neuigkeiten

Mehr lesen aus dieser Kategorie

A person is standing at a desk while typing on a laptop.

Die vollständige AI-native Plattform, die mit Ihrem Business skaliert.

A person is standing at a desk while typing on a laptop.

Die vollständige AI-native Plattform, die mit Ihrem Business skaliert.

Product

Lösungen

Ressourcen

Company

Sozial

Newsletter

Bleiben Sie mit Bird auf dem Laufenden durch wöchentliche Updates in Ihrem Posteingang.

Anmelden