Product

解决方案

资源

Company

Product

解决方案

资源

Company

构建一个批量异步鸟类接收者验证工具

扎卡里·萨缪尔斯

2022年5月26日

电子邮件

1 min read

构建一个批量异步鸟类接收者验证工具

对于那些寻找一个简单快速的程序的人,该程序可以接受csv,调用收件人验证API,并输出CSV,这个程序就适合你。

在构建电子邮件应用程序时,开发人员通常需要集成多个服务和API。了解云基础设施中的电子邮件API基础为构建像我们将在本指南中创建的大量验证系统这样的强大工具提供了基础。

我们偶尔会收到的问题之一是,我如何使用收件人验证批量验证电子邮件列表?这里有两个选项,一个是通过SparkPost用户界面上传文件进行验证,另一个是对每个电子邮件单独调用API(因为API是单个电子邮件验证)。

第一个选项效果很好,但有20MB(约500,000个地址)的限制。如果某人的电子邮件列表包含数百万个地址怎么办?这可能意味着要将其拆分为数千个CSV文件上传。

由于上传数千个CSV文件似乎有点牵强,我考虑了这种使用情况,并开始想知道我能让API运行得有多快。在这篇博客文章中,我将解释我尝试了什么以及如何最终成为一个可以在55秒内完成大约100,000次验证的程序(而在用户界面中,我大约用了1分10秒完成了100,000次验证)。尽管这仍然需要大约100小时才能完成约6.54亿次验证,但这个脚本可以在后台运行,显著节省时间。

该程序的最终版本可以在这里找到。

My first mistake: 使用 Python

Python 是我最喜欢的编程语言之一。它在许多领域表现出色,并且非常直观。然而,它在并发处理方面并不擅长。虽然 Python 确实具有运行异步函数的能力,但它拥有被称为 Python Global Interpreter Lock 或 GIL 的东西。

“Python Global Interpreter Lock 或 GIL,简单来说,是一个互斥锁(或一种锁),它只允许一个线程控制 Python 解释器。

这意味着在任意时间点只有一个线程可以处于执行状态。对于执行单线程程序的开发人员来说,GIL 的影响并不明显,但在 CPU 密集型和多线程代码中可能成为性能瓶颈。

由于 Global Interpreter Lock (GIL) 只允许一个线程同时执行,即使在多核系统上,它已成为 Python 的一个“臭名昭著”的功能(参见 Real Python 对 GIL 的文章)。

起初,我并不了解 GIL,于是开始使用 Python 编程。最后,即使我的程序是异步的,它还是被锁住了,无论我增加多少个线程,我每秒仍只能获得大约 12-15 个迭代。

Python 中异步函数的主要部分如下所示:

async def validateRecipients(f, fh, apiKey, snooze, count):
    h = {
        'Authorization': apiKey,
        'Accept': 'application/json'
    }
    with tqdm(total=count) as pbar:
        async with aiohttp.ClientSession() as session:
            for address in f:
                for i in address:
                    thisReq = requests.compat.urljoin(url, i)
                    async with session.get(thisReq, headers=h, ssl=False) as resp:
                        content = await resp.json()
                        row = content['results']
                        row['email'] = i
                        fh.writerow(row)
                        pbar.update(1)

所以我放弃使用 Python,回到了起点……

我决定使用 NodeJS,因为它非常擅长执行非阻塞 I/O 操作。另一个处理异步 API 处理的极好选项是使用 Azure Functions 构建 无服务器的 webhook 消费者,它可以有效地处理可变的工作负载。我对 NodeJS 编程也相当熟悉。

利用 Node.js 的异步特性,这种方法效果很好。有关 Node.js 中异步编程的更多细节,请参见 RisingStack 关于 Node.js 异步编程的指南

我的第二个错误:尝试将文件读入内存

我最初的想法如下:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


首先,导入电子邮件的 CSV 列表。其次,将电子邮件加载到数组中,并检查它们的格式是否正确。第三,异步调用收件人验证 API。第四,等待结果并将其加载到变量中。最后,将此变量输出到 CSV 文件。

对于较小的文件,这个方法效果很好。问题出现在我尝试处理 100,000 封电子邮件时。程序在大约 12,000 项验证时陷入停滞。在我们的一位前端开发人员的帮助下,我发现问题出在将所有结果加载到一个变量中(因此很快就会耗尽内存)。如果您想查看该程序的第一个迭代版本,我在此处链接了:Version 1(不推荐)


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


首先,导入电子邮件的 CSV 列表。其次,为报告目的计算文件中的电子邮件数量。第三,当每行被异步读取时,调用收件人验证 API 并将结果输出到 CSV 文件中。

因此,对于每行读取的数据,我异步调用 API 并写出结果,以避免将任何数据长时间保留在内存中。在与收件人验证团队交谈后,我还删除了电子邮件语法检查,因为他们告诉我收件人验证已经内置了检查,以判断电子邮件是否有效。

我最初的想法如下:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


首先,导入电子邮件的 CSV 列表。其次,将电子邮件加载到数组中,并检查它们的格式是否正确。第三,异步调用收件人验证 API。第四,等待结果并将其加载到变量中。最后,将此变量输出到 CSV 文件。

对于较小的文件,这个方法效果很好。问题出现在我尝试处理 100,000 封电子邮件时。程序在大约 12,000 项验证时陷入停滞。在我们的一位前端开发人员的帮助下,我发现问题出在将所有结果加载到一个变量中(因此很快就会耗尽内存)。如果您想查看该程序的第一个迭代版本,我在此处链接了:Version 1(不推荐)


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


首先,导入电子邮件的 CSV 列表。其次,为报告目的计算文件中的电子邮件数量。第三,当每行被异步读取时,调用收件人验证 API 并将结果输出到 CSV 文件中。

因此,对于每行读取的数据,我异步调用 API 并写出结果,以避免将任何数据长时间保留在内存中。在与收件人验证团队交谈后,我还删除了电子邮件语法检查,因为他们告诉我收件人验证已经内置了检查,以判断电子邮件是否有效。

我最初的想法如下:

Flowchart illustrating the process of validating a CSV list of emails, starting with ingestion, format checking, asynchronous API validation, result aggregation, and concluding with outputting to a CSV file.


首先,导入电子邮件的 CSV 列表。其次,将电子邮件加载到数组中,并检查它们的格式是否正确。第三,异步调用收件人验证 API。第四,等待结果并将其加载到变量中。最后,将此变量输出到 CSV 文件。

对于较小的文件,这个方法效果很好。问题出现在我尝试处理 100,000 封电子邮件时。程序在大约 12,000 项验证时陷入停滞。在我们的一位前端开发人员的帮助下,我发现问题出在将所有结果加载到一个变量中(因此很快就会耗尽内存)。如果您想查看该程序的第一个迭代版本,我在此处链接了:Version 1(不推荐)


Flowchart illustrating an email processing workflow, showing steps from ingesting a CSV list of emails to outputting results to a CSV file, with asynchronous validation via an API.


首先,导入电子邮件的 CSV 列表。其次,为报告目的计算文件中的电子邮件数量。第三,当每行被异步读取时,调用收件人验证 API 并将结果输出到 CSV 文件中。

因此,对于每行读取的数据,我异步调用 API 并写出结果,以避免将任何数据长时间保留在内存中。在与收件人验证团队交谈后,我还删除了电子邮件语法检查,因为他们告诉我收件人验证已经内置了检查,以判断电子邮件是否有效。

分解最终代码

在读取并验证终端参数后,我运行以下代码。首先,我读取电子邮件的CSV文件并计算每行。这个函数有两个目的:1) 它可以让我准确报告文件进度【如我们稍后将看到的】,2) 它使我能够在文件中的电子邮件数量等于完成验证的数量时停止计时。我添加了计时器,以便能够运行基准测试并确保获得良好的结果。

let count = 0; // Line count
require("fs")
    .createReadStream(myArgs[1])
    .on("data", function (chunk) {
        for (let i = 0; i < chunk.length; ++i)
            if (chunk[i] == 10) count++;
    })
    // Reads the infile and increases the count for each line
    .on("close", function () {
        // At the end of the infile, after all lines have been counted, run the recipient validation function
        validateRecipients.validateRecipients(count, myArgs);
    });


 然后我调用validateRecipients函数。 注意这个函数是异步的。在验证infile和outfile是CSV后,我写入一个标题行,并使用JSDOM库启动一个程序计时器。

async function validateRecipients(email_count, myArgs) {
    if (
        // If both the infile and outfile are in .csv format
        extname(myArgs[1]).toLowerCase() == ".csv" &&
        extname(myArgs[3]).toLowerCase() == ".csv"
    ) {
        let completed = 0; // Counter for each API call
        email_count++; // Line counter returns #lines - 1, this corrects the number of lines
        // Start a timer
        const { window } = new JSDOM();
        const start = window.performance.now();
        const output = fs.createWriteStream(myArgs[3]); // Outfile
        output.write(
            "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n"
        ); // Write the headers in the outfile
    }
}

以下脚本实际上是程序的主体,因此我将其分解并进行解释。对于infile的每一行:

fs.createReadStream(myArgs[1])
    .pipe(csv.parse({ headers: false }))
    .on("data", async (email) => {
        let url =
            SPARKPOST_HOST +
            "/api/v1/recipient-validation/single/" +
            email;
        await axios
            .get(url, {
                headers: {
                    Authorization: SPARKPOST_API_KEY,
                },
            });
        // For each row read in from the infile, call the SparkPost Recipient Validation API
    });

然后,在响应上

  • 将电子邮件添加到JSON中(以便能够在CSV中打印出电子邮件)

  • 验证原因是否为空,如果为空,则填充一个空值(这是为了使CSV格式一致,因为在某些情况下响应中给出了原因)

  • 设置json2csv模块的选项和键。

  • 转换JSON为CSV并输出(利用json2csv)

  • 在终端中写入进度

  • 最后,如果文件中的电子邮件数量=完成的验证,停止计时器并打印出结果


.then(function (response) {
    response.data.results.email = String(email); 
    // Adds the email as a value/key pair to the response JSON for output
    response.data.results.reason ? null : (response.data.results.reason = ""); 
    // If reason is null, set it to blank so the CSV is uniform
    // Utilizes json-2-csv to convert the JSON to CSV format and output
    let options = {
        prependHeader: false, // Disables JSON values from being added as header rows for every line
        keys: [
            "results.email",
            "results.valid",
            "results.result",
            "results.reason",
            "results.is_role",
            "results.is_disposable",
            "results.is_free",
            "results.delivery_confidence",
        ], // Sets the order of keys
    };
    let json2csvCallback = function (err, csv) {
        if (err) throw err;
        output.write(`${csv}\n`);
    };
    converter.json2csv(response.data, json2csvCallback, options);
    completed++; // Increase the API counter
    process.stdout.write(`Done with ${completed} / ${email_count}\r`); 
    // Output status of Completed / Total to the console without showing new lines
    // If all emails have completed validation
    if (completed == email_count) {
        const stop = window.performance.now(); // Stop the timer
        console.log(
            `All emails successfully validated in ${(stop - start) / 1000} seconds`
        );
    }
});

 

我发现的一个最后问题是,虽然这在Mac上效果很好,但在Windows上使用约10,000次验证后遇到了以下错误:

Error: connect ENOBUFS XX.XX.XXX.XXX:443 – Local (undefined:undefined) with email XXXXXXX@XXXXXXXXXX.XXX

经过进一步研究,似乎是NodeJS HTTP客户端连接池没有重用连接的问题。我在这个问题上找到了一篇Stackoverflow文章,并在进一步挖掘后,找到了一个适合解决该问题的axios库默认配置。我仍然不确定为何该问题只发生在Windows上而不是Mac上。

下一步

对于正在寻找一个简单快速的程序的人,该程序接收CSV,调用收件人验证API,并输出一个CSV,这个程序就是为你准备的。

对此程序的一些补充功能包括以下内容:

  • 构建一个前端或更简单的用户界面以供使用

  • 更好的错误处理和重试机制,因为如果由于某种原因API抛出错误,当前程序不会重试调用

  • 考虑将其实现为无服务器Azure Function,以实现自动扩展和减少基础设施管理


我也很想知道是否可以通过其他语言(比如Golang或Erlang/Elixir)实现更快的结果。除了语言选择,基础设施限制也可能影响性能——我们亲身体验到了这一点,当时我们在AWS遇到了未记录的DNS限制,这影响了我们的高容量电子邮件处理系统。

对于对将API处理与视觉工作流程工具结合的开发人员,请查看如何整合Flow Builder与Google Cloud Functions进行无代码自动化工作流程。

请随时向我提供任何反馈或建议,以扩展此项目。

其他新闻

阅读更多来自此类别的内容

A person is standing at a desk while typing on a laptop.

这个完整的AI原生平台可以随着您的业务进行扩展。

Product

解决方案

资源

社交

Newsletter

通过每周更新到您的收件箱,随时了解 Bird 的最新动态。

Signup

© 2025 Bird

A person is standing at a desk while typing on a laptop.

这个完整的AI原生平台可以随着您的业务进行扩展。

Product

解决方案

资源

社交

Newsletter

通过每周更新到您的收件箱,随时了解 Bird 的最新动态。

Signup

© 2025 Bird