我们偶尔收到的问题之一是,我如何通过收件人验证批量验证电子邮件列表?这里有两个选项,一个是通过SparkPost用户界面上传文件进行验证,另一个是对每个电子邮件进行单独的API调用(因为该API是单邮箱验证)。
第一个选项效果很好,但限制为20Mb(大约500,000个地址)。如果有人有一个包含数百万个地址的电子邮件列表怎么办?这可能意味着将其拆分为1,000个CSV文件上传。
由于上传数千个CSV文件似乎有些不可行,我开始思考我能多快让API运行。在这篇博文中,我将解释我尝试的内容,以及我最终如何得到了一个能在55秒内完成约100,000次验证的程序(而在用户界面我大约在1分钟10秒内完成了约100,000次验证)。虽然这样仍然需要大约100个小时才能完成约654百万次验证,但这个脚本可以在后台运行,节省大量时间。
这个程序的最终版本可以在这里找到。
我的第一个错误:使用Python
Python是我最喜欢的编程语言之一。它在许多领域表现出色,并且非常直接。然而,它不擅长的一个领域是并发进程。虽然Python确实有能力运行异步函数,但它有一个被称为Python全局解释器锁(GIL)的问题。
“Python全局解释器锁或GIL,简单来说,是一个互斥锁(或锁),仅允许一个线程控制Python解释器。
这意味着在任何时候只有一个线程可以处于执行状态。对于运行单线程程序的开发人员而言,GIL的影响并不明显,但在CPU密集型和多线程代码中,它可能成为性能瓶颈。
由于GIL只允许一个线程在多线程架构中执行,即使在有多个CPU核心的情况下,GIL也因此获得了Python的“臭名昭著”的特性。”(https://realpython.com/python-gil/)
起初,我并不了解GIL,因此开始用Python编程。最终,尽管我的程序是异步的,但它陷入了阻塞,无论我添加多少线程,我每秒仍然只能得到约12-15次迭代。
异步函数的主要部分可以在下面看到:
async def validateRecipients(f, fh, apiKey, snooze, count): h = {'Authorization': apiKey, 'Accept': 'application/json'} with tqdm(total=count) as pbar: async with aiohttp.ClientSession() as session: for address in f: for i in address: thisReq = requests.compat.urljoin(url, i) async with session.get(thisReq,headers=h, ssl=False) as resp: content = await resp.json() row = content['results'] row['email'] = i fh.writerow(row) pbar.update(1)
所以我放弃使用Python,回到起点…
我选择使用NodeJS,因为它能够极其出色地执行非阻塞的I/O操作。我对NodeJS的编程也相当熟悉。
利用NodeJS的异步特性,这个方案最终运行良好。有关NodeJS中异步编程的更多细节,请参见https://blog.risingstack.com/node-hero-async-programming-in-node-js/
我的第二个错误:尝试将文件读取到内存中
我最初的想法如下:
首先,导入一个CSV电子邮件列表。其次,将电子邮件加载到数组中并检查它们的格式是否正确。第三,异步调用收件人验证API。第四,等待结果并将其加载到一个变量中。最后,将这个变量输出到CSV文件中。
这对于较小的文件效果很好。问题在于当我尝试处理100,000个电子邮件时。程序在大约12,000次验证时停滞。在我们的前端开发人员的帮助下,我发现问题在于将所有结果加载到一个变量中(因此很快内存用完)。如果您想查看该程序的第一次迭代,我在此链接中提供了:版本1(不推荐)。
首先,导入一个CSV电子邮件列表。其次,统计文件中的电子邮件数量以便报告。第三,在每一行被异步读取时,调用收件人验证API,并将结果输出到CSV文件中。
因此,对于每一行读取,我调用API并异步地写出结果,从而不在长期内存中保留任何这些数据。在与收件人验证团队交谈后,我还去掉了电子邮件语法检查,因为他们告诉我收件人验证已经内置检查以验证电子邮件是否有效。
最后代码的分解
在读取和验证终端参数后,我运行以下代码。首先,我读取电子邮件的CSV文件并统计每一行。这个函数有两个目的,1)允许我准确报告文件进度[稍后我们会看到],2)在文件中的电子邮件数量等于完成验证时停止计时器。我添加了一个计时器,以便运行基准测试并确保获得良好的结果。
let count = 0; //行计数 require("fs") .createReadStream(myArgs[1]) .on("data", function (chunk) { for (let i = 0; i < chunk.length; ++i) if (chunk[i] == 10) count++; }) //读取输入文件并为每一行增加计数 .on("close", function () { //在输入文件结束时,所有行都被计数后,运行收件人验证函数 validateRecipients.validateRecipients(count, myArgs); });
然后我调用validateRecipients函数。注意这个函数是异步的。在验证输入文件和输出文件都是CSV格式后,我写入一个表头行,并使用JSDOM库启动程序计时器。
async function validateRecipients(email_count, myArgs) { if ( //如果输入文件和输出文件都是.csv格式 extname(myArgs[1]).toLowerCase() == ".csv" && extname(myArgs[3]).toLowerCase() == ".csv" ) { let completed = 0; //每个API调用的计数器 email_count++; //行计数返回#行 - 1,这是为纠正行数 //启动计时器 const { window } = new JSDOM(); const start = window.performance.now(); const output = fs.createWriteStream(myArgs[3]); //输出文件 output.write( "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n" ); //在输出文件中写入表头
以下脚本实际上是程序的主体,因此我将其拆分并解释发生的事情。对于输入文件的每一行:
异步获取该行并调用收件人验证API。
fs.createReadStream(myArgs[1]) .pipe(csv.parse({ headers: false })) .on("data", async (email) => { let url = SPARKPOST_HOST + "/api/v1/recipient-validation/single/" + email; await axios .get(url, { headers: { Authorization: SPARKPOST_API_KEY, }, }) //对于从输入文件读取的每一行,调用SparkPost收件人验证API
然后,在响应中
将电子邮件添加到JSON中(以便能够在CSV中打印出电子邮件)
验证原因是否为null,如果是,则填充一个空值(这是为了确保CSV格式一致,因为在某些情况下,响应中会给出原因)
设置json2csv模块的选项和键。
将JSON转换为CSV并输出(使用json2csv)
在终端中写入进度
最后,如果文件中的电子邮件数量=完成的验证,则停止计时器并打印出结果
.then(function (response) { response.data.results.email = String(email); //将电子邮件作为值/键对添加到用于输出的响应JSON中 response.data.results.reason ? null : (response.data.results.reason = ""); //如果原因为null,则设置为空,以确保CSV统一 //利用json-2-csv将JSON转换为CSV格式并输出 let options = { prependHeader: false, //禁用JSON值在每行作为表头被添加 键: [ "results.email", "results.valid", "results.result", "results.reason", "results.is_role", "results.is_disposable", "results.is_free", "results.delivery_confidence", ], //设置键的顺序 }; let json2csvCallback = function (err, csv) { if (err) throw err; output.write(`${csv}\n`); }; converter.json2csv(response.data, json2csvCallback, options); completed++; //增加API计数器 process.stdout.write(`完成 ${completed} / ${email_count}\r`); //在控制台输出完成/总数的状态而不换行 //如果所有电子邮件完成验证 if (completed == email_count) { const stop = window.performance.now(); //停止计时器 console.log( `所有电子邮件在 ${ (stop - start) / 1000 } 秒内成功验证` ); } })
我发现的最后一个问题是,虽然在Mac上非常顺利,但在Windows上使用时,我在大约10,000次验证后遇到了以下错误:
错误:connect ENOBUFS XX.XX.XXX.XXX:443 – 本地(未定义:未定义)与电子邮件 XXXXXXX@XXXXXXXXXX.XXX
经过进一步研究,这似乎是NodeJS HTTP客户端连接池不重用连接的问题。我在这个Stackoverflow文章上发现了这个问题,并经过进一步挖掘,找到一个很好的默认配置来解决此问题。我仍然不确定为什么此问题只发生在Windows而不发生在Mac上。
下一步
对于寻找一个简单快速的程序,接收CSV,调用收件人验证API并输出CSV的人来说,这个程序将很合适。
对该程序的一些补充如下:
构建一个前端或更易于使用的用户界面
更好的错误和重试处理,因为如果出于某种原因API抛出错误,当前程序不会重试调用
我也很好奇是否可以通过其他语言(例如Golang或Erlang/Elixir)实现更快的结果。
请随时给我提供任何反馈或建议以扩展这个项目。