
对于那些寻找一个简单快速的程序的人,该程序可以接受csv,调用收件人验证API,并输出CSV,这个程序就适合你。
在构建电子邮件应用程序时,开发人员通常需要集成多个服务和API。了解云基础设施中的电子邮件API基础为构建像我们将在本指南中创建的大量验证系统这样的强大工具提供了基础。
我们偶尔会收到的问题之一是,我如何使用收件人验证批量验证电子邮件列表?这里有两个选项,一个是通过SparkPost用户界面上传文件进行验证,另一个是对每个电子邮件单独调用API(因为API是单个电子邮件验证)。
第一个选项效果很好,但有20MB(约500,000个地址)的限制。如果某人的电子邮件列表包含数百万个地址怎么办?这可能意味着要将其拆分为数千个CSV文件上传。
由于上传数千个CSV文件似乎有点牵强,我考虑了这种使用情况,并开始想知道我能让API运行得有多快。在这篇博客文章中,我将解释我尝试了什么以及如何最终成为一个可以在55秒内完成大约100,000次验证的程序(而在用户界面中,我大约用了1分10秒完成了100,000次验证)。尽管这仍然需要大约100小时才能完成约6.54亿次验证,但这个脚本可以在后台运行,显著节省时间。
该程序的最终版本可以在这里找到。
My first mistake: 使用 Python
Python 是我最喜欢的编程语言之一。它在许多领域表现出色,而且非常直观。然而,它在并发处理方面的表现不是很好。虽然 Python 确实具有运行异步函数的能力,但它有一个称为 Python 全局解释器锁 (GIL) 的机制。
“Python 全局解释器锁或 GIL,简单来说,是一个互斥锁(或锁),只允许一个线程控制 Python 解释器。
这意味着在任何时候只有一个线程可以处于执行状态。对于执行单线程程序的开发者来说,GIL 的影响不明显,但在 CPU 密集型和多线程代码中它可能是一个性能瓶颈。
由于 GIL 即使在拥有多个 CPU 核心的多线程架构中也只允许一个线程执行,因此 GIL 被誉为 Python 的“臭名昭著”的功能。” (https://realpython.com/python-gil/)”
一开始,我对 GIL 并不了解,所以我开始在 Python 中编程。最终,尽管我的程序是异步的,它仍然被锁住,无论我添加多少线程,每秒也只有大约 12-15 次迭代。
Python 中异步函数的主要部分如下所示:
async def validateRecipients(f, fh, apiKey, snooze, count): h = {'Authorization': apiKey, 'Accept': 'application/json'} with tqdm(total=count) as pbar: async with aiohttp.ClientSession() as session: for address in f: for i in address: thisReq = requests.compat.urljoin(url, i) async with session.get(thisReq,headers=h, ssl=False) as resp: content = await resp.json() row = content['results'] row['email'] = i fh.writerow(row) pbar.update(1)
所以我放弃了使用 Python,然后重新开始……
我决定采用 NodeJS,因为它能够非常好地执行非阻塞 i/o 操作。处理异步 API 处理的另一个优选方案是使用 Azure Functions 构建无服务器 webhook 消费者,可以有效处理可变工作负载。我也非常熟悉 NodeJS 编程。
利用 NodeJS 的异步特性,这最终效果很好。有关 NodeJS 中异步编程的更多详情,请参阅 https://blog.risingstack.com/node-hero-async-programming-in-node-js/
我的第二个错误:尝试将文件读入内存
分解最终代码
在阅读并验证终端参数之后,我运行以下代码。首先,我读入电子邮件的CSV文件并计算每一行。有两个目的:1)它让我可以准确地报告文件进度[如我们稍后所见],2)当文件中的电子邮件数量等于已完成的验证时,它让我可以停止计时器。我添加了一个计时器,以便我可以运行基准测试并确保我获得良好的结果。
let count = 0; //Line count require("fs") .createReadStream(myArgs[1]) .on("data", function (chunk) { for (let i = 0; i < chunk.length; ++i) if (chunk[i] == 10) count++; }) //Reads the infile and increases the count for each line .on("close", function () { //At the end of the infile, after all lines have been counted, run the recipient validation function validateRecipients.validateRecipients(count, myArgs); });
然后我调用validateRecipients函数。请注意,此函数是异步的。在验证输入文件和输出文件是CSV格式之后,我写入一个标题行,并使用JSDOM库启动程序计时器。
async function validateRecipients(email_count, myArgs) { if ( //If both the infile and outfile are in .csv format extname(myArgs[1]).toLowerCase() == ".csv" && extname(myArgs[3]).toLowerCase() == ".csv" ) { let completed = 0; //Counter for each API call email_count++; //Line counter returns #lines - 1, this is done to correct the number of lines //Start a timer const { window } = new JSDOM(); const start = window.performance.now(); const output = fs.createWriteStream(myArgs[3]); //Outfile output.write( "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n" ); //Write the headers in the outfile
以下脚本实际上是程序的主体,所以我将其分解并解释发生了什么。对于输入文件中的每一行:
异步获取该行并调用收件人验证API。
fs.createReadStream(myArgs[1]) .pipe(csv.parse({ headers: false })) .on("data", async (email) => { let url = SPARKPOST_HOST + "/api/v1/recipient-validation/single/" + email; await axios .get(url, { headers: { Authorization: SPARKPOST_API_KEY, }, }) //For each row read in from the infile, call the SparkPost Recipient Validation API
然后,在响应中
将电子邮件添加到JSON中(以便能够在CSV中打印出电子邮件)
验证如果原因为空值,如果是这样,则填充值(这样CSV格式一致,因为在某些情况下响应中会给出原因)
为json2csv模块设置选项和键。
将JSON转换为CSV并输出(利用json2csv)
在终端上写入进度
最后,如果文件中的电子邮件数量=完成的验证,停止计时器并打印出结果
.then(function (response) { response.data.results.email = String(email); //Adds the email as a value/key pair to the response JSON to be used for output response.data.results.reason ? null : (response.data.results.reason = ""); //If reason is null, set it to blank so the CSV is uniform //Utilizes json-2-csv to convert the JSON to CSV format and output let options = { prependHeader: false, //Disables JSON values from being added as header rows for every line keys: [ "results.email", "results.valid", "results.result", "results.reason", "results.is_role", "results.is_disposable", "results.is_free", "results.delivery_confidence", ], //Sets the order of keys }; let json2csvCallback = function (err, csv) { if (err) throw err; output.write(`${csv}\n`); }; converter.json2csv(response.data, json2csvCallback, options); completed++; //Increase the API counter process.stdout.write(`Done with ${completed} / ${email_count}\r`); //Output status of Completed / Total to the console without showing new lines //If all emails have completed validation if (completed == email_count) { const stop = window.performance.now(); //Stop the timer console.log( `所有电子邮件已成功验证,共耗时 ${ (stop - start) / 1000 } 秒` ); } })
我发现的最后一个问题是,虽然这在Mac上运行良好,但在使用Windows进行大约10,000次验证后遇到了以下错误:
Error: connect ENOBUFS XX.XX.XXX.XXX:443 – Local (undefined:undefined) with email XXXXXXX@XXXXXXXXXX.XXX
经过进一步研究,似乎是NodeJS HTTP客户端连接池不重新使用连接的问题。我在这个问题上找到了一个Stackoverflow文章,并经过进一步挖掘,找到了一个好的axios库的默认配置,解决了这个问题。我仍然不确定为什么这个问题只在Windows上发生而不在Mac上发生。
下一步
对于正在寻找一个简单快速的程序的人,该程序接收CSV,调用收件人验证API,并输出一个CSV,这个程序就是为你准备的。
对此程序的一些补充功能包括以下内容:
构建一个前端或更简单的用户界面以供使用
更好的错误处理和重试机制,因为如果由于某种原因API抛出错误,当前程序不会重试调用
考虑将其实现为无服务器Azure Function,以实现自动扩展和减少基础设施管理
我也很想知道是否可以通过其他语言(比如Golang或Erlang/Elixir)实现更快的结果。除了语言选择,基础设施限制也可能影响性能——我们亲身体验到了这一点,当时我们在AWS遇到了未记录的DNS限制,这影响了我们的高容量电子邮件处理系统。
对于对将API处理与视觉工作流程工具结合的开发人员,请查看如何整合Flow Builder与Google Cloud Functions进行无代码自动化工作流程。
请随时向我提供任何反馈或建议,以扩展此项目。