构建一个批量异步鸟类接收者验证工具
扎卡里·萨缪尔斯
2022年5月26日
电子邮件
1 min read

关键要点
作者构建了一个大批量收件人验证工具,使用Bird的收件人验证API来高效验证数百万个电子邮件地址。
由于其非阻塞I/O和缺乏GIL限制,Node.js被证明比Python更快且更具可扩展性。
该工具异步读取CSV文件,为每个电子邮件调用验证API,并实时将结果写入新的CSV中。
该方法避免了内存瓶颈,并将吞吐量提高到不到一分钟完成约100,000次验证。
未来的改进可以包括更好的重试处理、用户友好的UI,或迁移到无服务器环境以实现可扩展性。
Q&A 精华
Bulk Asynchronous Recipient Validation Tool 的目的是什么?
通过直接集成 Bird 的收件人验证 API 来验证大量的电子邮件地址,快速输出经过验证的结果,无需手动上传。
为什么最初使用Python,后来被Node.js取代?
Python 的全局解释器锁 (GIL) 限制了并发性,而 Node.js 允许真正的异步执行,从而导致更快的并行 API 调用。
这个工具如何在不耗尽内存的情况下处理大型文件?
与其一次性加载所有数据,脚本逐行处理每个CSV行——发送验证请求并立即将结果写入新的CSV文件。
这个工具解决了开发人员的什么问题?
它支持大规模的电子邮件列表验证,克服了SparkPost基于UI的验证器的20MB限制,并且无需手动上传多个文件。
最终版本的程序有多快?
大约100,000次验证在55秒内完成,而使用UI版本则需要超过一分钟。
在 Windows 系统上遇到了什么问题?
Node.js HTTP 客户端连接池在大量并发请求后导致“ENOBUFS”错误,这些错误通过配置 axios 连接重用得以修复。
建议哪些未来的增强功能?
添加错误处理和重试机制,创建前端界面,或将该工具实现为无服务器Azure Function,以提高可扩展性和弹性。
对于那些寻找一个简单快速的程序的人,该程序可以接受csv,调用收件人验证API,并输出CSV,这个程序就适合你。
在构建电子邮件应用程序时,开发人员通常需要集成多个服务和API。了解云基础设施中的电子邮件API基础知识为构建强大的工具(例如,我们将在本指南中创建的批量验证系统)提供了基础。
我们偶尔收到的一个问题是:如何使用收件人验证批量验证电子邮件列表?这里有两个选项,一个是通过SparkPost UI上传文件进行验证,另一个是为每封电子邮件单独调用API(因为API是单个电子邮件验证)。
第一个选项效果很好,但限制为20Mb(约500,000个地址)。如果有人有包含数百万地址的电子邮件列表怎么办?这可能意味着将其分成数千个CSV文件上传。
由于上传数千个CSV文件似乎有些遥不可及,我考虑了这种使用情况,并开始想象我能让API运行得多快。在这篇博客文章中,我将解释我尝试了什么以及最终如何形成一个能够在55秒内进行100,000次验证的程序(而在UI中我的验证速度约为1分钟10秒)。
方法 | 测试验证次数 | 完成时间 | 大约吞吐量 |
|---|---|---|---|
批量异步Node.js工具 | 100,000 | 55秒 | ~1,818验证/秒 |
SparkPost UI上传 | 100,000 | 1分钟10秒 | ~1,428验证/秒 |
虽然这仍然需要大约100小时来完成约6.54亿次验证,但该脚本可以在后台运行,节省大量时间。
该程序的最终版本可以在这里找到。
在构建电子邮件应用程序时,开发人员通常需要集成多个服务和API。了解云基础设施中的电子邮件API基础知识为构建强大的工具(例如,我们将在本指南中创建的批量验证系统)提供了基础。
我们偶尔收到的一个问题是:如何使用收件人验证批量验证电子邮件列表?这里有两个选项,一个是通过SparkPost UI上传文件进行验证,另一个是为每封电子邮件单独调用API(因为API是单个电子邮件验证)。
第一个选项效果很好,但限制为20Mb(约500,000个地址)。如果有人有包含数百万地址的电子邮件列表怎么办?这可能意味着将其分成数千个CSV文件上传。
由于上传数千个CSV文件似乎有些遥不可及,我考虑了这种使用情况,并开始想象我能让API运行得多快。在这篇博客文章中,我将解释我尝试了什么以及最终如何形成一个能够在55秒内进行100,000次验证的程序(而在UI中我的验证速度约为1分钟10秒)。
方法 | 测试验证次数 | 完成时间 | 大约吞吐量 |
|---|---|---|---|
批量异步Node.js工具 | 100,000 | 55秒 | ~1,818验证/秒 |
SparkPost UI上传 | 100,000 | 1分钟10秒 | ~1,428验证/秒 |
虽然这仍然需要大约100小时来完成约6.54亿次验证,但该脚本可以在后台运行,节省大量时间。
该程序的最终版本可以在这里找到。
在构建电子邮件应用程序时,开发人员通常需要集成多个服务和API。了解云基础设施中的电子邮件API基础知识为构建强大的工具(例如,我们将在本指南中创建的批量验证系统)提供了基础。
我们偶尔收到的一个问题是:如何使用收件人验证批量验证电子邮件列表?这里有两个选项,一个是通过SparkPost UI上传文件进行验证,另一个是为每封电子邮件单独调用API(因为API是单个电子邮件验证)。
第一个选项效果很好,但限制为20Mb(约500,000个地址)。如果有人有包含数百万地址的电子邮件列表怎么办?这可能意味着将其分成数千个CSV文件上传。
由于上传数千个CSV文件似乎有些遥不可及,我考虑了这种使用情况,并开始想象我能让API运行得多快。在这篇博客文章中,我将解释我尝试了什么以及最终如何形成一个能够在55秒内进行100,000次验证的程序(而在UI中我的验证速度约为1分钟10秒)。
方法 | 测试验证次数 | 完成时间 | 大约吞吐量 |
|---|---|---|---|
批量异步Node.js工具 | 100,000 | 55秒 | ~1,818验证/秒 |
SparkPost UI上传 | 100,000 | 1分钟10秒 | ~1,428验证/秒 |
虽然这仍然需要大约100小时来完成约6.54亿次验证,但该脚本可以在后台运行,节省大量时间。
该程序的最终版本可以在这里找到。
My first mistake: 使用 Python
Python 是我最喜欢的编程语言之一。它在许多领域表现出色,并且非常直观。然而,它在并发处理方面并不擅长。虽然 Python 确实具有运行异步函数的能力,但它拥有被称为 Python Global Interpreter Lock 或 GIL 的东西。
“Python Global Interpreter Lock 或 GIL,简单来说,是一个互斥锁(或一种锁),它只允许一个线程控制 Python 解释器。
这意味着在任意时间点只有一个线程可以处于执行状态。对于执行单线程程序的开发人员来说,GIL 的影响并不明显,但在 CPU 密集型和多线程代码中可能成为性能瓶颈。
由于 Global Interpreter Lock (GIL) 只允许一个线程同时执行,即使在多核系统上,它已成为 Python 的一个“臭名昭著”的功能(参见 Real Python 对 GIL 的文章)。
起初,我并不了解 GIL,于是开始使用 Python 编程。最后,即使我的程序是异步的,它还是被锁住了,无论我增加多少个线程,我每秒仍只能获得大约 12-15 个迭代。
Python 中异步函数的主要部分如下所示:
async def validateRecipients(f, fh, apiKey, snooze, count): h = { 'Authorization': apiKey, 'Accept': 'application/json' } with tqdm(total=count) as pbar: async with aiohttp.ClientSession() as session: for address in f: for i in address: thisReq = requests.compat.urljoin(url, i) async with session.get(thisReq, headers=h, ssl=False) as resp: content = await resp.json() row = content['results'] row['email'] = i fh.writerow(row) pbar.update(1)
所以我放弃使用 Python,回到了起点……
我决定使用 NodeJS,因为它非常擅长执行非阻塞 I/O 操作。另一个处理异步 API 处理的极好选项是使用 Azure Functions 构建 无服务器的 webhook 消费者,它可以有效地处理可变的工作负载。我对 NodeJS 编程也相当熟悉。
利用 Node.js 的异步特性,这种方法效果很好。有关 Node.js 中异步编程的更多细节,请参见 RisingStack 关于 Node.js 异步编程的指南。
Python 是我最喜欢的编程语言之一。它在许多领域表现出色,并且非常直观。然而,它在并发处理方面并不擅长。虽然 Python 确实具有运行异步函数的能力,但它拥有被称为 Python Global Interpreter Lock 或 GIL 的东西。
“Python Global Interpreter Lock 或 GIL,简单来说,是一个互斥锁(或一种锁),它只允许一个线程控制 Python 解释器。
这意味着在任意时间点只有一个线程可以处于执行状态。对于执行单线程程序的开发人员来说,GIL 的影响并不明显,但在 CPU 密集型和多线程代码中可能成为性能瓶颈。
由于 Global Interpreter Lock (GIL) 只允许一个线程同时执行,即使在多核系统上,它已成为 Python 的一个“臭名昭著”的功能(参见 Real Python 对 GIL 的文章)。
起初,我并不了解 GIL,于是开始使用 Python 编程。最后,即使我的程序是异步的,它还是被锁住了,无论我增加多少个线程,我每秒仍只能获得大约 12-15 个迭代。
Python 中异步函数的主要部分如下所示:
async def validateRecipients(f, fh, apiKey, snooze, count): h = { 'Authorization': apiKey, 'Accept': 'application/json' } with tqdm(total=count) as pbar: async with aiohttp.ClientSession() as session: for address in f: for i in address: thisReq = requests.compat.urljoin(url, i) async with session.get(thisReq, headers=h, ssl=False) as resp: content = await resp.json() row = content['results'] row['email'] = i fh.writerow(row) pbar.update(1)
所以我放弃使用 Python,回到了起点……
我决定使用 NodeJS,因为它非常擅长执行非阻塞 I/O 操作。另一个处理异步 API 处理的极好选项是使用 Azure Functions 构建 无服务器的 webhook 消费者,它可以有效地处理可变的工作负载。我对 NodeJS 编程也相当熟悉。
利用 Node.js 的异步特性,这种方法效果很好。有关 Node.js 中异步编程的更多细节,请参见 RisingStack 关于 Node.js 异步编程的指南。
Python 是我最喜欢的编程语言之一。它在许多领域表现出色,并且非常直观。然而,它在并发处理方面并不擅长。虽然 Python 确实具有运行异步函数的能力,但它拥有被称为 Python Global Interpreter Lock 或 GIL 的东西。
“Python Global Interpreter Lock 或 GIL,简单来说,是一个互斥锁(或一种锁),它只允许一个线程控制 Python 解释器。
这意味着在任意时间点只有一个线程可以处于执行状态。对于执行单线程程序的开发人员来说,GIL 的影响并不明显,但在 CPU 密集型和多线程代码中可能成为性能瓶颈。
由于 Global Interpreter Lock (GIL) 只允许一个线程同时执行,即使在多核系统上,它已成为 Python 的一个“臭名昭著”的功能(参见 Real Python 对 GIL 的文章)。
起初,我并不了解 GIL,于是开始使用 Python 编程。最后,即使我的程序是异步的,它还是被锁住了,无论我增加多少个线程,我每秒仍只能获得大约 12-15 个迭代。
Python 中异步函数的主要部分如下所示:
async def validateRecipients(f, fh, apiKey, snooze, count): h = { 'Authorization': apiKey, 'Accept': 'application/json' } with tqdm(total=count) as pbar: async with aiohttp.ClientSession() as session: for address in f: for i in address: thisReq = requests.compat.urljoin(url, i) async with session.get(thisReq, headers=h, ssl=False) as resp: content = await resp.json() row = content['results'] row['email'] = i fh.writerow(row) pbar.update(1)
所以我放弃使用 Python,回到了起点……
我决定使用 NodeJS,因为它非常擅长执行非阻塞 I/O 操作。另一个处理异步 API 处理的极好选项是使用 Azure Functions 构建 无服务器的 webhook 消费者,它可以有效地处理可变的工作负载。我对 NodeJS 编程也相当熟悉。
利用 Node.js 的异步特性,这种方法效果很好。有关 Node.js 中异步编程的更多细节,请参见 RisingStack 关于 Node.js 异步编程的指南。
我的第二个错误:尝试将文件读入内存
我最初的想法如下:

首先,导入电子邮件的 CSV 列表。其次,将电子邮件加载到数组中,并检查它们的格式是否正确。第三,异步调用收件人验证 API。第四,等待结果并将其加载到变量中。最后,将此变量输出到 CSV 文件。
对于较小的文件,这个方法效果很好。问题出现在我尝试处理 100,000 封电子邮件时。程序在大约 12,000 项验证时陷入停滞。在我们的一位前端开发人员的帮助下,我发现问题出在将所有结果加载到一个变量中(因此很快就会耗尽内存)。如果您想查看该程序的第一个迭代版本,我在此处链接了:Version 1(不推荐)。

首先,导入电子邮件的 CSV 列表。其次,为报告目的计算文件中的电子邮件数量。第三,当每行被异步读取时,调用收件人验证 API 并将结果输出到 CSV 文件中。
因此,对于每行读取的数据,我异步调用 API 并写出结果,以避免将任何数据长时间保留在内存中。在与收件人验证团队交谈后,我还删除了电子邮件语法检查,因为他们告诉我收件人验证已经内置了检查,以判断电子邮件是否有效。
我最初的想法如下:

首先,导入电子邮件的 CSV 列表。其次,将电子邮件加载到数组中,并检查它们的格式是否正确。第三,异步调用收件人验证 API。第四,等待结果并将其加载到变量中。最后,将此变量输出到 CSV 文件。
对于较小的文件,这个方法效果很好。问题出现在我尝试处理 100,000 封电子邮件时。程序在大约 12,000 项验证时陷入停滞。在我们的一位前端开发人员的帮助下,我发现问题出在将所有结果加载到一个变量中(因此很快就会耗尽内存)。如果您想查看该程序的第一个迭代版本,我在此处链接了:Version 1(不推荐)。

首先,导入电子邮件的 CSV 列表。其次,为报告目的计算文件中的电子邮件数量。第三,当每行被异步读取时,调用收件人验证 API 并将结果输出到 CSV 文件中。
因此,对于每行读取的数据,我异步调用 API 并写出结果,以避免将任何数据长时间保留在内存中。在与收件人验证团队交谈后,我还删除了电子邮件语法检查,因为他们告诉我收件人验证已经内置了检查,以判断电子邮件是否有效。
我最初的想法如下:

首先,导入电子邮件的 CSV 列表。其次,将电子邮件加载到数组中,并检查它们的格式是否正确。第三,异步调用收件人验证 API。第四,等待结果并将其加载到变量中。最后,将此变量输出到 CSV 文件。
对于较小的文件,这个方法效果很好。问题出现在我尝试处理 100,000 封电子邮件时。程序在大约 12,000 项验证时陷入停滞。在我们的一位前端开发人员的帮助下,我发现问题出在将所有结果加载到一个变量中(因此很快就会耗尽内存)。如果您想查看该程序的第一个迭代版本,我在此处链接了:Version 1(不推荐)。

首先,导入电子邮件的 CSV 列表。其次,为报告目的计算文件中的电子邮件数量。第三,当每行被异步读取时,调用收件人验证 API 并将结果输出到 CSV 文件中。
因此,对于每行读取的数据,我异步调用 API 并写出结果,以避免将任何数据长时间保留在内存中。在与收件人验证团队交谈后,我还删除了电子邮件语法检查,因为他们告诉我收件人验证已经内置了检查,以判断电子邮件是否有效。
分解最终代码
在读取并验证终端参数后,我运行以下代码。首先,我读取CSV文件的电子邮件并计算每一行。有两个目的,一是让我可以准确报告文件进度[如我们稍后将看到],二是让我可以在文件中的电子邮件数量等于完成验证时停止计时器。我添加了一个计时器,以便我可以运行基准测试并确保我获得良好的结果。
let count = 0; // Line count require("fs") .createReadStream(myArgs[1]) .on("data", function (chunk) { for (let i = 0; i < chunk.length; ++i) if (chunk[i] == 10) count++; }) // Reads the infile and increases the count for each line .on("close", function () { // At the end of the infile, after all lines have been counted, run the recipient validation function validateRecipients.validateRecipients(count, myArgs); });
然后我调用validateRecipients函数。注意此功能是异步的。在验证输入文件和输出文件是CSV后,我写入一个标题行,并使用JSDOM库启动程序计时器。
async function validateRecipients(email_count, myArgs) { if ( // If both the infile and outfile are in .csv format extname(myArgs[1]).toLowerCase() == ".csv" && extname(myArgs[3]).toLowerCase() == ".csv" ) { let completed = 0; // Counter for each API call email_count++; // Line counter returns #lines - 1, this corrects the number of lines // Start a timer const { window } = new JSDOM(); const start = window.performance.now(); const output = fs.createWriteStream(myArgs[3]); // Outfile output.write( "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n" ); // Write the headers in the outfile } }
以下脚本确实是程序的核心,因此我将其分解并解释发生了什么。对于输入文件的每一行:
异步处理该行并调用收件人验证API。
fs.createReadStream(myArgs[1]) .pipe(csv.parse({ headers: false })) .on("data", async (email) => { let url = SPARKPOST_HOST + "/api/v1/recipient-validation/single/" + email; await axios .get(url, { headers: { Authorization: SPARKPOST_API_KEY, }, }); // For each row read in from the infile, call the SparkPost Recipient Validation API });
然后,响应时
将电子邮件添加到JSON中(以便可以在CSV中打印出电子邮件)
验证原因是否为空,如果是,则填充空值(这是为了保持CSV格式的一致性,因为在某些情况下响应中给出了原因)
设置json2csv模块的选项和键。
将JSON转换为CSV并输出(使用json2csv)
在终端中写入进度
最后,如果文件中的电子邮件数量等于完成的验证,则停止计时器并打印出结果
.then(function (response) { response.data.results.email = String(email); // Adds the email as a value/key pair to the response JSON for output response.data.results.reason ? null : (response.data.results.reason = ""); // If reason is null, set it to blank so the CSV is uniform // Utilizes json-2-csv to convert the JSON to CSV format and output let options = { prependHeader: false, // Disables JSON values from being added as header rows for every line keys: [ "results.email", "results.valid", "results.result", "results.reason", "results.is_role", "results.is_disposable", "results.is_free", "results.delivery_confidence", ], // Sets the order of keys }; let json2csvCallback = function (err, csv) { if (err) throw err; output.write(`${csv}\n`); }; converter.json2csv(response.data, json2csvCallback, options); completed++; // Increase the API counter process.stdout.write(`Done with ${completed} / ${email_count}\r`); // Output status of Completed / Total to the console without showing new lines // If all emails have completed validation if (completed == email_count) { const stop = window.performance.now(); // Stop the timer console.log( `All emails successfully validated in ${(stop - start) / 1000} seconds` ); } });
我发现的一个最终问题是,虽然这在Mac上能良好运作,但在使用Windows进行约10,000次验证后,我遇到了以下错误:
Error: connect ENOBUFS XX.XX.XXX.XXX:443 – Local (undefined:undefined) with email XXXXXXX@XXXXXXXXXX.XXX
在做了进一步的研究后,似乎是NodeJS HTTP客户端连接池不重用连接的问题。我找到了一个关于此问题的Stackoverflow文章,经过进一步挖掘,发现了一个好的axios库默认配置,解决了这个问题。我仍然不确定为什么这个问题只发生在Windows上,而不是在Mac上。
在读取并验证终端参数后,我运行以下代码。首先,我读取CSV文件的电子邮件并计算每一行。有两个目的,一是让我可以准确报告文件进度[如我们稍后将看到],二是让我可以在文件中的电子邮件数量等于完成验证时停止计时器。我添加了一个计时器,以便我可以运行基准测试并确保我获得良好的结果。
let count = 0; // Line count require("fs") .createReadStream(myArgs[1]) .on("data", function (chunk) { for (let i = 0; i < chunk.length; ++i) if (chunk[i] == 10) count++; }) // Reads the infile and increases the count for each line .on("close", function () { // At the end of the infile, after all lines have been counted, run the recipient validation function validateRecipients.validateRecipients(count, myArgs); });
然后我调用validateRecipients函数。注意此功能是异步的。在验证输入文件和输出文件是CSV后,我写入一个标题行,并使用JSDOM库启动程序计时器。
async function validateRecipients(email_count, myArgs) { if ( // If both the infile and outfile are in .csv format extname(myArgs[1]).toLowerCase() == ".csv" && extname(myArgs[3]).toLowerCase() == ".csv" ) { let completed = 0; // Counter for each API call email_count++; // Line counter returns #lines - 1, this corrects the number of lines // Start a timer const { window } = new JSDOM(); const start = window.performance.now(); const output = fs.createWriteStream(myArgs[3]); // Outfile output.write( "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n" ); // Write the headers in the outfile } }
以下脚本确实是程序的核心,因此我将其分解并解释发生了什么。对于输入文件的每一行:
异步处理该行并调用收件人验证API。
fs.createReadStream(myArgs[1]) .pipe(csv.parse({ headers: false })) .on("data", async (email) => { let url = SPARKPOST_HOST + "/api/v1/recipient-validation/single/" + email; await axios .get(url, { headers: { Authorization: SPARKPOST_API_KEY, }, }); // For each row read in from the infile, call the SparkPost Recipient Validation API });
然后,响应时
将电子邮件添加到JSON中(以便可以在CSV中打印出电子邮件)
验证原因是否为空,如果是,则填充空值(这是为了保持CSV格式的一致性,因为在某些情况下响应中给出了原因)
设置json2csv模块的选项和键。
将JSON转换为CSV并输出(使用json2csv)
在终端中写入进度
最后,如果文件中的电子邮件数量等于完成的验证,则停止计时器并打印出结果
.then(function (response) { response.data.results.email = String(email); // Adds the email as a value/key pair to the response JSON for output response.data.results.reason ? null : (response.data.results.reason = ""); // If reason is null, set it to blank so the CSV is uniform // Utilizes json-2-csv to convert the JSON to CSV format and output let options = { prependHeader: false, // Disables JSON values from being added as header rows for every line keys: [ "results.email", "results.valid", "results.result", "results.reason", "results.is_role", "results.is_disposable", "results.is_free", "results.delivery_confidence", ], // Sets the order of keys }; let json2csvCallback = function (err, csv) { if (err) throw err; output.write(`${csv}\n`); }; converter.json2csv(response.data, json2csvCallback, options); completed++; // Increase the API counter process.stdout.write(`Done with ${completed} / ${email_count}\r`); // Output status of Completed / Total to the console without showing new lines // If all emails have completed validation if (completed == email_count) { const stop = window.performance.now(); // Stop the timer console.log( `All emails successfully validated in ${(stop - start) / 1000} seconds` ); } });
我发现的一个最终问题是,虽然这在Mac上能良好运作,但在使用Windows进行约10,000次验证后,我遇到了以下错误:
Error: connect ENOBUFS XX.XX.XXX.XXX:443 – Local (undefined:undefined) with email XXXXXXX@XXXXXXXXXX.XXX
在做了进一步的研究后,似乎是NodeJS HTTP客户端连接池不重用连接的问题。我找到了一个关于此问题的Stackoverflow文章,经过进一步挖掘,发现了一个好的axios库默认配置,解决了这个问题。我仍然不确定为什么这个问题只发生在Windows上,而不是在Mac上。
在读取并验证终端参数后,我运行以下代码。首先,我读取CSV文件的电子邮件并计算每一行。有两个目的,一是让我可以准确报告文件进度[如我们稍后将看到],二是让我可以在文件中的电子邮件数量等于完成验证时停止计时器。我添加了一个计时器,以便我可以运行基准测试并确保我获得良好的结果。
let count = 0; // Line count require("fs") .createReadStream(myArgs[1]) .on("data", function (chunk) { for (let i = 0; i < chunk.length; ++i) if (chunk[i] == 10) count++; }) // Reads the infile and increases the count for each line .on("close", function () { // At the end of the infile, after all lines have been counted, run the recipient validation function validateRecipients.validateRecipients(count, myArgs); });
然后我调用validateRecipients函数。注意此功能是异步的。在验证输入文件和输出文件是CSV后,我写入一个标题行,并使用JSDOM库启动程序计时器。
async function validateRecipients(email_count, myArgs) { if ( // If both the infile and outfile are in .csv format extname(myArgs[1]).toLowerCase() == ".csv" && extname(myArgs[3]).toLowerCase() == ".csv" ) { let completed = 0; // Counter for each API call email_count++; // Line counter returns #lines - 1, this corrects the number of lines // Start a timer const { window } = new JSDOM(); const start = window.performance.now(); const output = fs.createWriteStream(myArgs[3]); // Outfile output.write( "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n" ); // Write the headers in the outfile } }
以下脚本确实是程序的核心,因此我将其分解并解释发生了什么。对于输入文件的每一行:
异步处理该行并调用收件人验证API。
fs.createReadStream(myArgs[1]) .pipe(csv.parse({ headers: false })) .on("data", async (email) => { let url = SPARKPOST_HOST + "/api/v1/recipient-validation/single/" + email; await axios .get(url, { headers: { Authorization: SPARKPOST_API_KEY, }, }); // For each row read in from the infile, call the SparkPost Recipient Validation API });
然后,响应时
将电子邮件添加到JSON中(以便可以在CSV中打印出电子邮件)
验证原因是否为空,如果是,则填充空值(这是为了保持CSV格式的一致性,因为在某些情况下响应中给出了原因)
设置json2csv模块的选项和键。
将JSON转换为CSV并输出(使用json2csv)
在终端中写入进度
最后,如果文件中的电子邮件数量等于完成的验证,则停止计时器并打印出结果
.then(function (response) { response.data.results.email = String(email); // Adds the email as a value/key pair to the response JSON for output response.data.results.reason ? null : (response.data.results.reason = ""); // If reason is null, set it to blank so the CSV is uniform // Utilizes json-2-csv to convert the JSON to CSV format and output let options = { prependHeader: false, // Disables JSON values from being added as header rows for every line keys: [ "results.email", "results.valid", "results.result", "results.reason", "results.is_role", "results.is_disposable", "results.is_free", "results.delivery_confidence", ], // Sets the order of keys }; let json2csvCallback = function (err, csv) { if (err) throw err; output.write(`${csv}\n`); }; converter.json2csv(response.data, json2csvCallback, options); completed++; // Increase the API counter process.stdout.write(`Done with ${completed} / ${email_count}\r`); // Output status of Completed / Total to the console without showing new lines // If all emails have completed validation if (completed == email_count) { const stop = window.performance.now(); // Stop the timer console.log( `All emails successfully validated in ${(stop - start) / 1000} seconds` ); } });
我发现的一个最终问题是,虽然这在Mac上能良好运作,但在使用Windows进行约10,000次验证后,我遇到了以下错误:
Error: connect ENOBUFS XX.XX.XXX.XXX:443 – Local (undefined:undefined) with email XXXXXXX@XXXXXXXXXX.XXX
在做了进一步的研究后,似乎是NodeJS HTTP客户端连接池不重用连接的问题。我找到了一个关于此问题的Stackoverflow文章,经过进一步挖掘,发现了一个好的axios库默认配置,解决了这个问题。我仍然不确定为什么这个问题只发生在Windows上,而不是在Mac上。
下一步
对于正在寻找一个简单快速的程序的人,该程序接收CSV,调用收件人验证API,并输出一个CSV,这个程序就是为你准备的。
对此程序的一些补充功能包括以下内容:
构建一个前端或更简单的用户界面以供使用
更好的错误处理和重试机制,因为如果由于某种原因API抛出错误,当前程序不会重试调用
考虑将其实现为无服务器Azure Function,以实现自动扩展和减少基础设施管理
我也很想知道是否可以通过其他语言(比如Golang或Erlang/Elixir)实现更快的结果。除了语言选择,基础设施限制也可能影响性能——我们亲身体验到了这一点,当时我们在AWS遇到了未记录的DNS限制,这影响了我们的高容量电子邮件处理系统。
对于对将API处理与视觉工作流程工具结合的开发人员,请查看如何整合Flow Builder与Google Cloud Functions进行无代码自动化工作流程。
请随时向我提供任何反馈或建议,以扩展此项目。
对于正在寻找一个简单快速的程序的人,该程序接收CSV,调用收件人验证API,并输出一个CSV,这个程序就是为你准备的。
对此程序的一些补充功能包括以下内容:
构建一个前端或更简单的用户界面以供使用
更好的错误处理和重试机制,因为如果由于某种原因API抛出错误,当前程序不会重试调用
考虑将其实现为无服务器Azure Function,以实现自动扩展和减少基础设施管理
我也很想知道是否可以通过其他语言(比如Golang或Erlang/Elixir)实现更快的结果。除了语言选择,基础设施限制也可能影响性能——我们亲身体验到了这一点,当时我们在AWS遇到了未记录的DNS限制,这影响了我们的高容量电子邮件处理系统。
对于对将API处理与视觉工作流程工具结合的开发人员,请查看如何整合Flow Builder与Google Cloud Functions进行无代码自动化工作流程。
请随时向我提供任何反馈或建议,以扩展此项目。
对于正在寻找一个简单快速的程序的人,该程序接收CSV,调用收件人验证API,并输出一个CSV,这个程序就是为你准备的。
对此程序的一些补充功能包括以下内容:
构建一个前端或更简单的用户界面以供使用
更好的错误处理和重试机制,因为如果由于某种原因API抛出错误,当前程序不会重试调用
考虑将其实现为无服务器Azure Function,以实现自动扩展和减少基础设施管理
我也很想知道是否可以通过其他语言(比如Golang或Erlang/Elixir)实现更快的结果。除了语言选择,基础设施限制也可能影响性能——我们亲身体验到了这一点,当时我们在AWS遇到了未记录的DNS限制,这影响了我们的高容量电子邮件处理系统。
对于对将API处理与视觉工作流程工具结合的开发人员,请查看如何整合Flow Builder与Google Cloud Functions进行无代码自动化工作流程。
请随时向我提供任何反馈或建议,以扩展此项目。



