
对于那些寻找一个简单快速的程序的人,该程序可以接受csv,调用收件人验证API,并输出CSV,这个程序就适合你。
Business in a box.
探索我们的解决方案。
与我们的销售团队交谈
我们偶尔收到的一个问题是,如何使用收件人验证批量验证邮件列表?这里有两个选项,一是通过SparkPost UI上传文件进行验证,另一个是为每封邮件调用API(因为API是单个邮件验证)。
第一个选项效果很好,但有20Mb的限制(大约500,000个地址)。如果某人有一个包含数百万地址的邮件列表怎么办?这可能意味着将其拆分为1,000个的CSV文件上传。
由于上传数千个CSV文件似乎有点牵强,我考虑了这个用例并开始思考如何加快API的运行速度。在这篇博客文章中,我将解释我尝试了什么以及如何最终编写出一个能够在55秒内进行约100,000次验证的程序(而在UI中,我大约在1分钟10秒内进行了100,000次验证)。虽然这仍然需要大约100小时才能完成约654百万次验证,但此脚本可以在后台运行,节省大量时间。
此程序的最终版本可以在 这里找到。
My first mistake: 使用 Python
Python 是我最喜欢的编程语言之一。它在许多领域表现出色,并且非常简单。然而,它在并发处理方面并不出色。尽管 python 确实具有运行异步函数的能力,但它有一个众所周知的 Python 全局解释器锁(GIL)。
“Python 全局解释器锁或 GIL 简单来说,是一个互斥锁(锁定开关),仅允许一个线程控制 Python 解释器。
这意味着在任何时候,只有一个线程可以处于执行状态。对于执行单线程程序的开发人员来说,GIL 的影响并不明显,但在 CPU 密集型和多线程代码中,它可能是一个性能瓶颈。
由于 GIL 即使在具有多个 CPU 核心的多线程架构中也只允许一个线程执行,因此 GIL 被誉为 Python 的一个“臭名昭著”的特性。” (https://realpython.com/python-gil/)”
起初,我不知道 GIL,所以我开始用 python 编程。最后,即使我的程序是异步的,它也被锁住了,无论我添加多少个线程,我每秒仍然只能获得大约 12-15 次迭代。
Python 中异步函数的主要部分如下所示:
async def validateRecipients(f, fh, apiKey, snooze, count): h = {'Authorization': apiKey, 'Accept': 'application/json'} with tqdm(total=count) as pbar: async with aiohttp.ClientSession() as session: for address in f: for i in address: thisReq = requests.compat.urljoin(url, i) async with session.get(thisReq,headers=h, ssl=False) as resp: content = await resp.json() row = content['results'] row['email'] = i fh.writerow(row) pbar.update(1)
所以我放弃使用 Python,并回到了绘图板……
我决定使用 NodeJS,因为它能够非常好地执行非阻塞 i/o 操作。我对 NodeJS 编程也很熟悉。
利用 NodeJS 的异步特性,这最终工作得很好。关于 NodeJS 中异步编程的详细信息,请参阅 https://blog.risingstack.com/node-hero-async-programming-in-node-js/
我的第二个错误:尝试将文件读入内存
分解最终代码
在阅读并验证终端参数之后,我运行以下代码。首先,我读入电子邮件的CSV文件并计算每一行。有两个目的:1)它让我可以准确地报告文件进度[如我们稍后所见],2)当文件中的电子邮件数量等于已完成的验证时,它让我可以停止计时器。我添加了一个计时器,以便我可以运行基准测试并确保我获得良好的结果。
let count = 0; //Line count require("fs") .createReadStream(myArgs[1]) .on("data", function (chunk) { for (let i = 0; i < chunk.length; ++i) if (chunk[i] == 10) count++; }) //Reads the infile and increases the count for each line .on("close", function () { //At the end of the infile, after all lines have been counted, run the recipient validation function validateRecipients.validateRecipients(count, myArgs); });
然后我调用validateRecipients函数。请注意,此函数是异步的。在验证输入文件和输出文件是CSV格式之后,我写入一个标题行,并使用JSDOM库启动程序计时器。
async function validateRecipients(email_count, myArgs) { if ( //If both the infile and outfile are in .csv format extname(myArgs[1]).toLowerCase() == ".csv" && extname(myArgs[3]).toLowerCase() == ".csv" ) { let completed = 0; //Counter for each API call email_count++; //Line counter returns #lines - 1, this is done to correct the number of lines //Start a timer const { window } = new JSDOM(); const start = window.performance.now(); const output = fs.createWriteStream(myArgs[3]); //Outfile output.write( "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n" ); //Write the headers in the outfile
以下脚本实际上是程序的主体,所以我将其分解并解释发生了什么。对于输入文件中的每一行:
异步获取该行并调用收件人验证API。
fs.createReadStream(myArgs[1]) .pipe(csv.parse({ headers: false })) .on("data", async (email) => { let url = SPARKPOST_HOST + "/api/v1/recipient-validation/single/" + email; await axios .get(url, { headers: { Authorization: SPARKPOST_API_KEY, }, }) //For each row read in from the infile, call the SparkPost Recipient Validation API
然后,在响应中
将电子邮件添加到JSON中(以便能够在CSV中打印出电子邮件)
验证如果原因为空值,如果是这样,则填充值(这样CSV格式一致,因为在某些情况下响应中会给出原因)
为json2csv模块设置选项和键。
将JSON转换为CSV并输出(利用json2csv)
在终端上写入进度
最后,如果文件中的电子邮件数量=完成的验证,停止计时器并打印出结果
.then(function (response) { response.data.results.email = String(email); //Adds the email as a value/key pair to the response JSON to be used for output response.data.results.reason ? null : (response.data.results.reason = ""); //If reason is null, set it to blank so the CSV is uniform //Utilizes json-2-csv to convert the JSON to CSV format and output let options = { prependHeader: false, //Disables JSON values from being added as header rows for every line keys: [ "results.email", "results.valid", "results.result", "results.reason", "results.is_role", "results.is_disposable", "results.is_free", "results.delivery_confidence", ], //Sets the order of keys }; let json2csvCallback = function (err, csv) { if (err) throw err; output.write(`${csv}\n`); }; converter.json2csv(response.data, json2csvCallback, options); completed++; //Increase the API counter process.stdout.write(`Done with ${completed} / ${email_count}\r`); //Output status of Completed / Total to the console without showing new lines //If all emails have completed validation if (completed == email_count) { const stop = window.performance.now(); //Stop the timer console.log( `所有电子邮件已成功验证,共耗时 ${ (stop - start) / 1000 } 秒` ); } })
我发现的最后一个问题是,虽然这在Mac上运行良好,但在使用Windows进行大约10,000次验证后遇到了以下错误:
Error: connect ENOBUFS XX.XX.XXX.XXX:443 – Local (undefined:undefined) with email XXXXXXX@XXXXXXXXXX.XXX
经过进一步研究,似乎是NodeJS HTTP客户端连接池不重新使用连接的问题。我在这个问题上找到了一个Stackoverflow文章,并经过进一步挖掘,找到了一个好的axios库的默认配置,解决了这个问题。我仍然不确定为什么这个问题只在Windows上发生而不在Mac上发生。
下一步
对于那些寻找简单快速程序的人,这个程序正对您,它接收一个CSV文件,调用收件人验证API,并输出一个CSV文件。
对这个程序的一些添加建议如下:
构建一个前端或更易用的用户界面
更好的错误和重试处理,因为如果API因某种原因抛出错误,程序目前不会重试调用
我也很好奇是否可以通过使用其他语言(例如Golang或Erlang/Elixir)获得更快的结果。
请随时向我提供任何反馈或建议,以便扩展这个项目。