Construcción de una herramienta de validación de destinatarios de aves asíncrona masiva

Zacarías Samuels

26 may 2022

Correo electrónico

1 min read

Construcción de una herramienta de validación de destinatarios de aves asíncrona masiva

Para alguien que busca un programa simple y rápido que acepte un csv, llame a la API de validación de receptores y produzca un CSV, este programa es para ti.

Cuando se construyen aplicaciones de correo electrónico, los desarrolladores a menudo necesitan integrar múltiples servicios y APIs. Comprender los fundamentos de la API de correo electrónico en la infraestructura de nube proporciona la base para construir herramientas robustas como el sistema de validación masiva que crearemos en esta guía.

Una de las preguntas que recibimos ocasionalmente es, ¿cómo puedo validar masivamente listas de correo electrónico con validación de destinatarios? Hay dos opciones aquí, una es cargar un archivo a través de la interfaz de SparkPost para la validación, y la otra es hacer llamadas individuales por correo electrónico a la API (ya que la API es de validación de correo electrónico individual).

La primera opción funciona muy bien pero tiene una limitación de 20 Mb (alrededor de 500,000 direcciones). ¿Qué pasa si alguien tiene una lista de correos electrónicos que contiene millones de direcciones? Podría significar dividir eso en miles de cargas de archivos CSV.

Como cargar miles de archivos CSV parece un poco descabellado, tomé ese caso de uso y comencé a preguntarme qué tan rápido podría hacer que la API funcionara. En esta publicación del blog, explicaré lo que probé y cómo finalmente llegué a un programa que podría realizar 100,000 validaciones en 55 segundos (mientras que en la interfaz obtuve alrededor de 100,000 validaciones en 1 minuto y 10 segundos). Y aunque esto aún tomaría aproximadamente 100 horas parahacer cerca de 654 millones de validaciones, este script puede ejecutarse en segundo plano, ahorrando tiempo significativo.

La versión final de este programa se puede encontrar aquí.

Mi primer error: usar Python

Python es uno de mis lenguajes de programación favoritos. Sobresale en muchas áreas y es increíblemente sencillo. Sin embargo, un área en la que no sobresale es en procesos concurrentes. Aunque python tiene la capacidad de ejecutar funciones asincrónicas, tiene lo que se conoce como The Python Global Interpreter Lock o GIL.

“The Python Global Interpreter Lock o GIL, en palabras simples, es un mutex (o un bloqueo) que permite que solo un hilo tenga el control del intérprete de Python.

Esto significa que solo un hilo puede estar en un estado de ejecución en cualquier momento dado. El impacto del GIL no es visible para los desarrolladores que ejecutan programas de un solo hilo, pero puede ser un cuello de botella de rendimiento en código controlado por CPU y multihilo.

Ya que el Global Interpreter Lock (GIL) permite que solo un hilo se ejecute a la vez, incluso en sistemas multi-núcleo, ha ganado reputación como una característica “infame” de Python (ver el artículo de Real Python sobre el GIL).

Al principio, no estaba al tanto del GIL, así que comencé a programar en python. Al final, aunque mi programa era asincrónico, se estaba bloqueando, y no importaba cuántos hilos añadiera, aún así solo obtenía unas 12-15 iteraciones por segundo.

La parte principal de la función asincrónica en Python se puede ver a continuación:

async def validateRecipients(f, fh, apiKey, snooze, count):
    h = {
        'Authorization': apiKey,
        'Accept': 'application/json'
    }
    with tqdm(total=count) as pbar:
        async with aiohttp.ClientSession() as session:
            for address in f:
                for i in address:
                    thisReq = requests.compat.urljoin(url, i)
                    async with session.get(thisReq, headers=h, ssl=False) as resp:
                        content = await resp.json()
                        row = content['results']
                        row['email'] = i
                        fh.writerow(row)
                        pbar.update(1)

Así que dejé de usar Python y volví a la mesa de dibujo…

Me decidí por utilizar NodeJS debido a su capacidad de realizar operaciones de i/o no bloqueantes extremadamente bien. Otra excelente opción para manejar el procesamiento asincrónico de API es construir consumidores de webhook sin servidor con Azure Functions, que pueden manejar eficientemente cargas de trabajo variables. También estoy bastante familiarizado con la programación en NodeJS.

Utilizando aspectos asincrónicos de Node.js, este enfoque funcionó bien. Para más detalles sobre la programación asincrónica en Node.js, ver la guía de RisingStack sobre programación asincrónica en Node.js.

Mi segundo error: intentar leer el archivo en memoria

Mi idea inicial fue la siguiente:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Primero, ingerir una lista de correos electrónicos en formato CSV. Segundo, cargar los correos electrónicos en un array y verificar que están en el formato correcto. Tercero, llamar asincrónicamente la API de validación de destinatarios. Cuarto, esperar a los resultados y cargarlos en una variable. Y finalmente, exportar esta variable a un archivo CSV.

Esto funcionó muy bien para archivos más pequeños. El problema surgió cuando intenté ejecutar 100,000 correos electrónicos. El programa se detuvo alrededor de las 12,000 validaciones. Con la ayuda de uno de nuestros desarrolladores front-end, vi que el problema estaba en cargar todos los resultados en una variable (y, por lo tanto, quedarse sin memoria rápidamente). Si deseas ver la primera iteración de este programa, la he enlazado aquí: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Primero, ingerir una lista de correos electrónicos en formato CSV. Segundo, contar la cantidad de correos electrónicos en el archivo para propósitos de informes. Tercero, a medida que se lee cada línea de forma asincrónica, llamar a la API de validación de destinatarios y exportar los resultados a un archivo CSV.

Así, por cada línea leída, llamo a la API y escribo los resultados asincrónicamente para no mantener ninguno de estos datos en memoria a largo plazo. También eliminé la comprobación de sintaxis de correo electrónico después de hablar con el equipo de validación de destinatarios, ya que me informaron que la validación de destinatarios ya tiene comprobaciones integradas para verificar si un correo electrónico es válido o no.

Mi idea inicial fue la siguiente:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Primero, ingerir una lista de correos electrónicos en formato CSV. Segundo, cargar los correos electrónicos en un array y verificar que están en el formato correcto. Tercero, llamar asincrónicamente la API de validación de destinatarios. Cuarto, esperar a los resultados y cargarlos en una variable. Y finalmente, exportar esta variable a un archivo CSV.

Esto funcionó muy bien para archivos más pequeños. El problema surgió cuando intenté ejecutar 100,000 correos electrónicos. El programa se detuvo alrededor de las 12,000 validaciones. Con la ayuda de uno de nuestros desarrolladores front-end, vi que el problema estaba en cargar todos los resultados en una variable (y, por lo tanto, quedarse sin memoria rápidamente). Si deseas ver la primera iteración de este programa, la he enlazado aquí: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Primero, ingerir una lista de correos electrónicos en formato CSV. Segundo, contar la cantidad de correos electrónicos en el archivo para propósitos de informes. Tercero, a medida que se lee cada línea de forma asincrónica, llamar a la API de validación de destinatarios y exportar los resultados a un archivo CSV.

Así, por cada línea leída, llamo a la API y escribo los resultados asincrónicamente para no mantener ninguno de estos datos en memoria a largo plazo. También eliminé la comprobación de sintaxis de correo electrónico después de hablar con el equipo de validación de destinatarios, ya que me informaron que la validación de destinatarios ya tiene comprobaciones integradas para verificar si un correo electrónico es válido o no.

Mi idea inicial fue la siguiente:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


Primero, ingerir una lista de correos electrónicos en formato CSV. Segundo, cargar los correos electrónicos en un array y verificar que están en el formato correcto. Tercero, llamar asincrónicamente la API de validación de destinatarios. Cuarto, esperar a los resultados y cargarlos en una variable. Y finalmente, exportar esta variable a un archivo CSV.

Esto funcionó muy bien para archivos más pequeños. El problema surgió cuando intenté ejecutar 100,000 correos electrónicos. El programa se detuvo alrededor de las 12,000 validaciones. Con la ayuda de uno de nuestros desarrolladores front-end, vi que el problema estaba en cargar todos los resultados en una variable (y, por lo tanto, quedarse sin memoria rápidamente). Si deseas ver la primera iteración de este programa, la he enlazado aquí: Version 1 (NOT RECOMMENDED).


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


Primero, ingerir una lista de correos electrónicos en formato CSV. Segundo, contar la cantidad de correos electrónicos en el archivo para propósitos de informes. Tercero, a medida que se lee cada línea de forma asincrónica, llamar a la API de validación de destinatarios y exportar los resultados a un archivo CSV.

Así, por cada línea leída, llamo a la API y escribo los resultados asincrónicamente para no mantener ninguno de estos datos en memoria a largo plazo. También eliminé la comprobación de sintaxis de correo electrónico después de hablar con el equipo de validación de destinatarios, ya que me informaron que la validación de destinatarios ya tiene comprobaciones integradas para verificar si un correo electrónico es válido o no.

Desglosando el código final

Después de leer y validar los argumentos del terminal, ejecuto el siguiente código. Primero, leo el archivo CSV de correos electrónicos y cuento cada línea. Hay dos propósitos para esta función, 1) me permite informar con precisión sobre el progreso del archivo [como veremos más adelante], y 2) me permite detener un temporizador cuando el número de correos electrónicos en el archivo es igual al de validaciones completadas. Agregué un temporizador para poder ejecutar benchmarks y asegurarme de obtener buenos resultados.

let count = 0; // Line count
require("fs")
    .createReadStream(myArgs[1])
    .on("data", function (chunk) {
        for (let i = 0; i < chunk.length; ++i)
            if (chunk[i] == 10) count++;
    })
    // Reads the infile and increases the count for each line
    .on("close", function () {
        // At the end of the infile, after all lines have been counted, run the recipient validation function
        validateRecipients.validateRecipients(count, myArgs);
    });


A continuación, llamo a la función validateRecipients. Nota que esta función es asincrónica. Después de validar que el infile y outfile son CSV, escribo una fila de encabezado y comienzo un temporizador del programa usando la biblioteca JSDOM.

async function validateRecipients(email_count, myArgs) {
    if (
        // If both the infile and outfile are in .csv format
        extname(myArgs[1]).toLowerCase() == ".csv" &&
        extname(myArgs[3]).toLowerCase() == ".csv"
    ) {
        let completed = 0; // Counter for each API call
        email_count++; // Line counter returns #lines - 1, this corrects the number of lines
        // Start a timer
        const { window } = new JSDOM();
        const start = window.performance.now();
        const output = fs.createWriteStream(myArgs[3]); // Outfile
        output.write(
            "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n"
        ); // Write the headers in the outfile
    }
}

El siguiente script es realmente la mayor parte del programa, así que lo dividiré y explicaré lo que está sucediendo. Para cada línea del infile:

fs.createReadStream(myArgs[1])
    .pipe(csv.parse({ headers: false }))
    .on("data", async (email) => {
        let url =
            SPARKPOST_HOST +
            "/api/v1/recipient-validation/single/" +
            email;
        await axios
            .get(url, {
                headers: {
                    Authorization: SPARKPOST_API_KEY,
                },
            });
        // For each row read in from the infile, call the SparkPost Recipient Validation API
    });

Luego, en la respuesta

  • Agrega el correo electrónico al JSON (para poder imprimir el correo electrónico en el CSV)

  • Valida si la razón es nula, y si es así, rellena un valor vacío (esto es para que el formato CSV sea consistente, ya que en algunos casos se da una razón en la respuesta)

  • Establece las opciones y claves para el módulo json2csv.

  • Convierte el JSON a CSV y lo exporta (utilizando json2csv)

  • Escribe el progreso en el terminal

  • Finalmente, si el número de correos electrónicos en el archivo = validaciones completadas, detén el temporizador y muestra los resultados


.then(function (response) {
    response.data.results.email = String(email); 
    // Adds the email as a value/key pair to the response JSON for output
    response.data.results.reason ? null : (response.data.results.reason = ""); 
    // If reason is null, set it to blank so the CSV is uniform
    // Utilizes json-2-csv to convert the JSON to CSV format and output
    let options = {
        prependHeader: false, // Disables JSON values from being added as header rows for every line
        keys: [
            "results.email",
            "results.valid",
            "results.result",
            "results.reason",
            "results.is_role",
            "results.is_disposable",
            "results.is_free",
            "results.delivery_confidence",
        ], // Sets the order of keys
    };
    let json2csvCallback = function (err, csv) {
        if (err) throw err;
        output.write(`${csv}\n`);
    };
    converter.json2csv(response.data, json2csvCallback, options);
    completed++; // Increase the API counter
    process.stdout.write(`Done with ${completed} / ${email_count}\r`); 
    // Output status of Completed / Total to the console without showing new lines
    // If all emails have completed validation
    if (completed == email_count) {
        const stop = window.performance.now(); // Stop the timer
        console.log(
            `All emails successfully validated in ${(stop - start) / 1000} seconds`
        );
    }
});

 

Un último problema que encontré fue que, aunque esto funcionó muy bien en Mac, me encontré con el siguiente error usando Windows después de alrededor de 10,000 validaciones:

Error: connect ENOBUFS XX.XX.XXX.XXX:443 – Local (indefinido:indefinido) con correo electrónico XXXXXXX@XXXXXXXXXX.XXX

Después de investigar un poco más, parece ser un problema con el pool de conexiones del cliente HTTP de NodeJS que no reutiliza conexiones. Encontré este artículo de Stackoverflow sobre el problema, y tras investigar más, encontré una buena configuración por defecto para la biblioteca axios que resolvió este problema. Todavía no estoy seguro de por qué este problema solo sucede en Windows y no en Mac.

Siguientes pasos

Para alguien que está buscando un programa rápido y simple que tome un csv, llame a la API de validación de destinatarios y genere un CSV, este programa es para usted.

Algunas adiciones a este programa serían las siguientes:

  • Construir un front end o una interfaz de usuario más fácil de usar

  • Mejor manejo de errores y reintentos porque si por alguna razón la API arroja un error, el programa actualmente no reintenta la llamada

  • Considerar la implementación como una función de Azure sin servidor para escalamiento automático y gestión reducida de infraestructura


También me gustaría ver si se podrían lograr resultados más rápidos con otro lenguaje como Golang o Erlang/Elixir. Más allá de la elección del lenguaje, las limitaciones de infraestructura también pueden afectar el rendimiento - lo hemos aprendido de primera mano cuando enfrentamos límites de DNS no documentados en AWS que afectaron nuestros sistemas de procesamiento de correo electrónico de alto volumen.

Para desarrolladores interesados en combinar el procesamiento de API con herramientas de flujo de trabajo visual, descubran cómo integrar Flow Builder con Google Cloud Functions para flujos de trabajo de automatización sin código.

Por favor, no dude en proporcionarme cualquier comentario o sugerencia para expandir este proyecto.

Otras noticias

Leer más de esta categoría

A person is standing at a desk while typing on a laptop.

La plataforma completa nativa de AI que escala con tu negocio.

© 2025 Bird

A person is standing at a desk while typing on a laptop.

La plataforma completa nativa de AI que escala con tu negocio.

© 2025 Bird