
Für jemanden, der nach einem einfachen, schnellen Programm sucht, das eine CSV-Datei einliest, die Empfänger-Validierungs-API aufruft und eine CSV-Datei ausgibt, ist dieses Programm genau das Richtige für Sie.
Beim Erstellen von E-Mail-Anwendungen müssen Entwickler häufig mehrere Dienste und APIs integrieren. Das Verständnis der E-Mail-API-Grundlagen in der Cloud-Infrastruktur bildet die Grundlage für den Aufbau robuster Tools wie das Massenvalidierungssystem, das wir in diesem Leitfaden erstellen werden.
Eine der Fragen, die wir gelegentlich erhalten, ist, wie kann ich E-Mail-Listen mit Empfänger-Validierung massenweise validieren? Es gibt hier zwei Optionen: die eine besteht darin, eine Datei über die SparkPost-Oberfläche zur Validierung hochzuladen, und die andere besteht darin, für jede E-Mail einen individuellen Aufruf an die API zu machen (da die API eine Einzel-E-Mail-Validierung ist).
Die erste Option funktioniert großartig, hat jedoch die Einschränkung von 20 MB (etwa 500.000 Adressen). Was ist, wenn jemand eine E-Mail-Liste mit Millionen von Adressen hat? Das könnte bedeuten, dass die Datei in Tausende von CSV-Uploads aufgeteilt werden muss.
Da das Hochladen von Tausenden von CSV-Dateien etwas weit hergeholt scheint, habe ich diesen Anwendungsfall betrachtet und mich gefragt, wie schnell ich die API zum Laufen bringen könnte. In diesem Blogbeitrag werde ich erläutern, was ich ausprobiert habe und wie ich schließlich zu einem Programm gekommen bin, das ungefähr 100.000 Validierungen in 55 Sekunden durchführen konnte (während ich in der Benutzeroberfläche etwa 100.000 Validierungen in 1 Minute 10 Sekunden geschafft habe). Und obwohl dies immer noch ungefähr 100 Stunden dauern würde, um etwa 654 Millionen Validierungen durchzuführen, kann dieses Skript im Hintergrund laufen und dabei erheblich Zeit sparen.
Die endgültige Version dieses Programms finden Sie hier.
Mein erster Fehler: die Verwendung von Python
Python ist eine meiner Lieblingsprogrammiersprachen. Es glänzt in vielen Bereichen und ist unglaublich einfach zu verstehen. Ein Bereich, in dem es nicht glänzt, sind jedoch gleichzeitige Prozesse. Während Python die Fähigkeit hat, asynchrone Funktionen auszuführen, gibt es das, was als The Python Global Interpreter Lock oder GIL bekannt ist.
„Der Python Global Interpreter Lock oder GIL ist einfach gesagt ein Mutex (oder ein Sperren), das nur einem Thread erlaubt, die Kontrolle über den Python-Interpreter zu halten.
Das bedeutet, dass sich zu jedem Zeitpunkt nur ein Thread in einem Ausführungszustand befinden kann. Die Auswirkungen des GIL sind für Entwickler, die Einzel-Thread-Programme ausführen, nicht sichtbar, können jedoch in CPU-intensivem und Multi-Thread-Code zu einem Leistungsengpass werden.
Da der GIL es nur einem Thread ermöglicht, gleichzeitig in einer Multi-Thread-Architektur mit mehr als einem CPU-Kern auszuführen, hat der GIL einen Ruf als „berüchtigtes“ Feature von Python erlangt.“ (https://realpython.com/python-gil/)“
Anfangs war mir der GIL nicht bewusst, deshalb begann ich, in Python zu programmieren. Am Ende, obwohl mein Programm asynchron war, wurde es blockiert und egal, wie viele Threads ich hinzufügte, ich erreichte immer noch nur etwa 12-15 Iterationen pro Sekunde.
Der Hauptteil der asynchronen Funktion in Python ist unten zu sehen:
async def validateRecipients(f, fh, apiKey, snooze, count): h = {'Authorization': apiKey, 'Accept': 'application/json'} with tqdm(total=count) as pbar: async with aiohttp.ClientSession() as session: for address in f: for i in address: thisReq = requests.compat.urljoin(url, i) async with session.get(thisReq,headers=h, ssl=False) as resp: content = await resp.json() row = content['results'] row['email'] = i fh.writerow(row) pbar.update(1)
Also habe ich die Verwendung von Python aufgegeben und bin zurück an den Zeichentisch gegangen…
Ich entschied mich für die Verwendung von NodeJS aufgrund seiner Fähigkeit, nicht blockierende I/O-Operationen extrem gut auszuführen. Eine weitere ausgezeichnete Möglichkeit zur Behandlung asynchroner API-Verarbeitung besteht im Aufbau von serverlosen Webhook-Verbrauchern mit Azure Functions, die variable Arbeitslasten effizient handhaben können. Ich bin auch ziemlich vertraut mit der Programmierung in NodeJS.
Unter Nutzung der asynchronen Aspekte von NodeJS funktionierte dies gut. Für weitere Details über asynchrones Programmieren in NodeJS siehe https://blog.risingstack.com/node-hero-async-programming-in-node-js/
Mein zweiter Fehler: der Versuch, die Datei in den Speicher zu lesen
Aufschlüsselung des finalen Codes
Nachdem die Optionen im Terminal gelesen und validiert wurden, führe ich den folgenden Code aus. Zuerst lese ich die CSV-Datei der E-Mails ein und zähle jede Zeile. Diese Funktion hat zwei Zwecke, 1) ermöglicht sie es mir, genau über den Fortschritt der Datei zu berichten [wie wir später sehen werden], und 2) erlaubt es mir, einen Timer zu stoppen, wenn die Anzahl der E-Mails in der Datei den abgeschlossenen Validierungen entspricht. Ich habe einen Timer hinzugefügt, damit ich Benchmarks durchführen und sicherstellen kann, dass ich gute Ergebnisse erziele.
let count = 0; // Zeilenzähler require("fs") .createReadStream(myArgs[1]) .on("data", function (chunk) { for (let i = 0; i < chunk.length; ++i) if (chunk[i] == 10) count++; }) // Liest die Eingabedatei und erhöht den Zähler für jede Zeile .on("close", function () { // Am Ende der Eingabedatei, nachdem alle Zeilen gezählt wurden, die Empfänger-Validierungsfunktion ausführen validateRecipients.validateRecipients(count, myArgs); });
Dann rufe ich die validateRecipients-Funktion auf. Beachten Sie, dass diese Funktion asynchron ist. Nachdem geprüft wurde, dass die Eingabe- und Ausgabedateien CSV-Dateien sind, schreibe ich eine Kopfzeile und starte einen Programm-Timer unter Verwendung der JSDOM-Bibliothek.
async function validateRecipients(email_count, myArgs) { if ( // Wenn sowohl die Eingabedatei als auch die Ausgabedatei im .csv-Format sind extname(myArgs[1]).toLowerCase() == ".csv" && extname(myArgs[3]).toLowerCase() == ".csv" ) { let completed = 0; // Zähler für jeden API-Aufruf email_count++; // Zeilenzähler gibt #Zeilen - 1 zurück, dies wird gemacht, um die Anzahl der Zeilen zu korrigieren // Starten Sie einen Timer const { window } = new JSDOM(); const start = window.performance.now(); const output = fs.createWriteStream(myArgs[3]); // Ausgabedatei output.write( "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n" ); // Schreibe die Überschriften in die Ausgabedatei
Das folgende Skript ist wirklich der Hauptteil des Programms, also werde ich es aufteilen und erklären, was passiert. Für jede Zeile in der Eingabedatei:
Asynchron diese Zeile nehmen und den Empfänger-Validierungs-API aufrufen.
fs.createReadStream(myArgs[1]) .pipe(csv.parse({ headers: false })) .on("data", async (email) => { let url = SPARKPOST_HOST + "/api/v1/recipient-validation/single/" + email; await axios .get(url, { headers: { Authorization: SPARKPOST_API_KEY, }, }) // Für jede Zeile, die aus der Eingabedatei gelesen wird, rufe die SparkPost-Empfänger-Validierungs-API auf
Dann, bei der Antwort
Die E-Mail zum JSON hinzufügen (um die E-Mail in der CSV ausdrucken zu können)
Überprüfen, ob Reason null ist, und wenn ja, einen leeren Wert einfügen (dies ist, damit das CSV-Format konsistent ist, da in einigen Fällen Reason in der Antwort angegeben wird)
Die Optionen und Schlüssel für das json2csv-Modul festlegen.
Das JSON in CSV umwandeln und ausgeben (unter Verwendung von json2csv)
Fortschritt im Terminal ausgeben
Schließlich, wenn die Anzahl der E-Mails in der Datei den abgeschlossenen Validierungen entspricht, stoppen Sie den Timer und geben die Ergebnisse aus
.then(function (response) { response.data.results.email = String(email); // Fügt die E-Mail als Wert-Schlüssel-Paar zur Antwort-JSON hinzu, um für die Ausgabe verwendet zu werden response.data.results.reason ? null : (response.data.results.reason = ""); // Wenn Reason null ist, auf leer setzen, damit die CSV einheitlich ist // Nutzt json-2-csv, um das JSON in CSV-Format umzuwandeln und auszugeben let options = { prependHeader: false, // Deaktiviert JSON-Werte, die als Kopfzeilen für jede Zeile hinzugefügt werden keys: [ "results.email", "results.valid", "results.result", "results.reason", "results.is_role", "results.is_disposable", "results.is_free", "results.delivery_confidence", ], // Legt die Reihenfolge der Schlüssel fest }; let json2csvCallback = function (err, csv) { if (err) throw err; output.write(`${csv}\n`); }; converter.json2csv(response.data, json2csvCallback, options); completed++; // Erhöhen des API-Zählers process.stdout.write(`Done with ${completed} / ${email_count}\r`); // Ausgabenstatus von Abgeschlossen / Gesamt auf die Konsole ohne neue Zeilen anzuzeigen // Wenn alle E-Mails validiert wurden if (completed == email_count) { const stop = window.performance.now(); // Timer stoppen console.log( `Alle E-Mails erfolgreich validiert in ${ (stop - start) / 1000 } Sekunden` ); } })
Ein letztes Problem, das ich festgestellt habe, war, dass dies auf Mac großartig funktionierte, aber ich stieß auf den folgenden Fehler bei der Verwendung von Windows nach etwa 10.000 Validierungen:
Error: connect ENOBUFS XX.XX.XXX.XXX:443 – Local (undefined:undefined) mit E-Mail XXXXXXX@XXXXXXXXXX.XXX
Nach weiterer Recherche scheint es sich um ein Problem mit dem NodeJS HTTP-Client-Verbindungspool zu handeln, der Verbindungen nicht wiederverwendet. Ich fand diesen Stackoverflow-Artikel zu diesem Thema und nach weiterem Suchen fand ich eine gute Standardkonfiguration für die Axios-Bibliothek, die dieses Problem löste. Ich bin mir immer noch nicht sicher, warum dieses Problem nur auf Windows und nicht auf Mac auftritt.
Nächste Schritte
Für jemanden, der ein einfaches, schnelles Programm sucht, das eine CSV aufnimmt, die Empfänger-Validierungs-API aufruft und eine CSV ausgibt, ist dieses Programm genau das Richtige.
Einige Ergänzungen zu diesem Programm wären die folgenden:
Erstellen Sie ein Frontend oder eine einfachere Benutzeroberfläche zur Nutzung
Bessere Fehler- und Wiederholungsbehandlung, denn wenn die API aus irgendeinem Grund einen Fehler auslöst, wiederholt das Programm derzeit den Aufruf nicht
Erwägen Sie die Implementierung als serverlose Azure-Funktion für automatische Skalierung und reduzierte Infrastrukturverwaltung
Ich wäre auch neugierig zu sehen, ob schnellere Ergebnisse mit einer anderen Sprache wie Golang oder Erlang/Elixir erreicht werden könnten. Abgesehen von der Sprachwahl können auch Infrastrukturgrenzen die Leistung beeinflussen – das haben wir aus erster Hand erfahren, als wir auf undokumentierte DNS-Einschränkungen in AWS gestoßen sind, die unsere Systeme zur Verarbeitung von E-Mails in großem Umfang beeinträchtigten.
Für Entwickler, die an der Kombination von API-Verarbeitung mit visuellen Workflow-Werkzeugen interessiert sind, sehen Sie sich an, wie Sie Flow Builder mit Google Cloud Functions integrieren für No-Code-Automatisierungs-Workflows.
Bitte zögern Sie nicht, mir Feedback oder Vorschläge zur Erweiterung dieses Projekts zu geben.