构建一个批量异步鸟类接收者验证工具

对于那些寻找一个简单快速的程序的人,该程序可以接受csv,调用收件人验证API,并输出CSV,这个程序就适合你。

作者

扎卡里·萨缪尔斯

类别

电子邮件

构建一个批量异步鸟类接收者验证工具

对于那些寻找一个简单快速的程序的人,该程序可以接受csv,调用收件人验证API,并输出CSV,这个程序就适合你。

作者

扎卡里·萨缪尔斯

类别

电子邮件

构建一个批量异步鸟类接收者验证工具

对于那些寻找一个简单快速的程序的人,该程序可以接受csv,调用收件人验证API,并输出CSV,这个程序就适合你。

作者

扎卡里·萨缪尔斯

类别

电子邮件

我们偶尔收到的问题之一是,我如何通过收件人验证批量验证电子邮件列表?这里有两个选项,一个是通过SparkPost用户界面上传文件进行验证,另一个是对每个电子邮件进行单独的API调用(因为该API是单邮箱验证)。


第一个选项效果很好,但限制为20Mb(大约500,000个地址)。如果有人有一个包含数百万个地址的电子邮件列表怎么办?这可能意味着将其拆分为1,000个CSV文件上传。


由于上传数千个CSV文件似乎有些不可行,我开始思考我能多快让API运行。在这篇博文中,我将解释我尝试的内容,以及我最终如何得到了一个能在55秒内完成约100,000次验证的程序(而在用户界面我大约在1分钟10秒内完成了约100,000次验证)。虽然这样仍然需要大约100个小时才能完成约654百万次验证,但这个脚本可以在后台运行,节省大量时间。


这个程序的最终版本可以在这里找到。


我的第一个错误:使用Python

Python是我最喜欢的编程语言之一。它在许多领域表现出色,并且非常直接。然而,它不擅长的一个领域是并发进程。虽然Python确实有能力运行异步函数,但它有一个被称为Python全局解释器锁(GIL)的问题。


“Python全局解释器锁或GIL,简单来说,是一个互斥锁(或锁),仅允许一个线程控制Python解释器。


这意味着在任何时候只有一个线程可以处于执行状态。对于运行单线程程序的开发人员而言,GIL的影响并不明显,但在CPU密集型和多线程代码中,它可能成为性能瓶颈。


由于GIL只允许一个线程在多线程架构中执行,即使在有多个CPU核心的情况下,GIL也因此获得了Python的“臭名昭著”的特性。”(https://realpython.com/python-gil/


起初,我并不了解GIL,因此开始用Python编程。最终,尽管我的程序是异步的,但它陷入了阻塞,无论我添加多少线程,我每秒仍然只能得到约12-15次迭代。


异步函数的主要部分可以在下面看到:

async def validateRecipients(f, fh, apiKey, snooze, count): h = {'Authorization': apiKey, 'Accept': 'application/json'} with tqdm(total=count) as pbar: async with aiohttp.ClientSession() as session: for address in f: for i in address: thisReq = requests.compat.urljoin(url, i) async with session.get(thisReq,headers=h, ssl=False) as resp: content = await resp.json() row = content['results'] row['email'] = i fh.writerow(row) pbar.update(1)

 

所以我放弃使用Python,回到起点…


我选择使用NodeJS,因为它能够极其出色地执行非阻塞的I/O操作。我对NodeJS的编程也相当熟悉。


利用NodeJS的异步特性,这个方案最终运行良好。有关NodeJS中异步编程的更多细节,请参见https://blog.risingstack.com/node-hero-async-programming-in-node-js/


我的第二个错误:尝试将文件读取到内存中

我最初的想法如下:



首先,导入一个CSV电子邮件列表。其次,将电子邮件加载到数组中并检查它们的格式是否正确。第三,异步调用收件人验证API。第四,等待结果并将其加载到一个变量中。最后,将这个变量输出到CSV文件中。


这对于较小的文件效果很好。问题在于当我尝试处理100,000个电子邮件时。程序在大约12,000次验证时停滞。在我们的前端开发人员的帮助下,我发现问题在于将所有结果加载到一个变量中(因此很快内存用完)。如果您想查看该程序的第一次迭代,我在此链接中提供了:版本1(不推荐)



首先,导入一个CSV电子邮件列表。其次,统计文件中的电子邮件数量以便报告。第三,在每一行被异步读取时,调用收件人验证API,并将结果输出到CSV文件中。


因此,对于每一行读取,我调用API并异步地写出结果,从而不在长期内存中保留任何这些数据。在与收件人验证团队交谈后,我还去掉了电子邮件语法检查,因为他们告诉我收件人验证已经内置检查以验证电子邮件是否有效。


最后代码的分解

在读取和验证终端参数后,我运行以下代码。首先,我读取电子邮件的CSV文件并统计每一行。这个函数有两个目的,1)允许我准确报告文件进度[稍后我们会看到],2)在文件中的电子邮件数量等于完成验证时停止计时器。我添加了一个计时器,以便运行基准测试并确保获得良好的结果。


let count = 0; //行计数 require("fs") .createReadStream(myArgs[1]) .on("data", function (chunk) { for (let i = 0; i < chunk.length; ++i) if (chunk[i] == 10) count++; }) //读取输入文件并为每一行增加计数 .on("close", function () { //在输入文件结束时,所有行都被计数后,运行收件人验证函数 validateRecipients.validateRecipients(count, myArgs); });

 

然后我调用validateRecipients函数。注意这个函数是异步的。在验证输入文件和输出文件都是CSV格式后,我写入一个表头行,并使用JSDOM库启动程序计时器。


async function validateRecipients(email_count, myArgs) { if ( //如果输入文件和输出文件都是.csv格式 extname(myArgs[1]).toLowerCase() == ".csv" && extname(myArgs[3]).toLowerCase() == ".csv" ) { let completed = 0; //每个API调用的计数器 email_count++; //行计数返回#行 - 1,这是为纠正行数 //启动计时器 const { window } = new JSDOM(); const start = window.performance.now(); const output = fs.createWriteStream(myArgs[3]); //输出文件 output.write( "Email,Valid,Result,Reason,Is_Role,Is_Disposable,Is_Free,Delivery_Confidence\n" ); //在输出文件中写入表头

 

以下脚本实际上是程序的主体,因此我将其拆分并解释发生的事情。对于输入文件的每一行:


fs.createReadStream(myArgs[1]) .pipe(csv.parse({ headers: false })) .on("data", async (email) => { let url = SPARKPOST_HOST + "/api/v1/recipient-validation/single/" + email; await axios .get(url, { headers: { Authorization: SPARKPOST_API_KEY, }, }) //对于从输入文件读取的每一行,调用SparkPost收件人验证API

 

然后,在响应中

  • 将电子邮件添加到JSON中(以便能够在CSV中打印出电子邮件)

  • 验证原因是否为null,如果是,则填充一个空值(这是为了确保CSV格式一致,因为在某些情况下,响应中会给出原因)

  • 设置json2csv模块的选项和键。

  • 将JSON转换为CSV并输出(使用json2csv)

  • 在终端中写入进度

  • 最后,如果文件中的电子邮件数量=完成的验证,则停止计时器并打印出结果


.then(function (response) { response.data.results.email = String(email); //将电子邮件作为值/键对添加到用于输出的响应JSON中 response.data.results.reason ? null : (response.data.results.reason = ""); //如果原因为null,则设置为空,以确保CSV统一 //利用json-2-csv将JSON转换为CSV格式并输出 let options = { prependHeader: false, //禁用JSON值在每行作为表头被添加 键: [ "results.email", "results.valid", "results.result", "results.reason", "results.is_role", "results.is_disposable", "results.is_free", "results.delivery_confidence", ], //设置键的顺序 }; let json2csvCallback = function (err, csv) { if (err) throw err; output.write(`${csv}\n`); }; converter.json2csv(response.data, json2csvCallback, options); completed++; //增加API计数器 process.stdout.write(`完成 ${completed} / ${email_count}\r`); //在控制台输出完成/总数的状态而不换行 //如果所有电子邮件完成验证 if (completed == email_count) { const stop = window.performance.now(); //停止计时器 console.log( `所有电子邮件在 ${ (stop - start) / 1000 } 秒内成功验证` ); } })

 

我发现的最后一个问题是,虽然在Mac上非常顺利,但在Windows上使用时,我在大约10,000次验证后遇到了以下错误:


错误:connect ENOBUFS XX.XX.XXX.XXX:443 – 本地(未定义:未定义)与电子邮件 XXXXXXX@XXXXXXXXXX.XXX


经过进一步研究,这似乎是NodeJS HTTP客户端连接池不重用连接的问题。我在这个Stackoverflow文章上发现了这个问题,并经过进一步挖掘,找到一个很好的默认配置来解决此问题。我仍然不确定为什么此问题只发生在Windows而不发生在Mac上。


下一步

对于寻找一个简单快速的程序,接收CSV,调用收件人验证API并输出CSV的人来说,这个程序将很合适。


对该程序的一些补充如下:

  • 构建一个前端或更易于使用的用户界面

  • 更好的错误和重试处理,因为如果出于某种原因API抛出错误,当前程序不会重试调用


我也很好奇是否可以通过其他语言(例如Golang或Erlang/Elixir)实现更快的结果。


请随时给我提供任何反馈或建议以扩展这个项目。

Sign up

为营销、支持和财务提供的人工智能驱动平台

点击 "获取演示" 即表示您同意 Bird's

Sign up

为营销、支持和财务提供的人工智能驱动平台

点击 "获取演示" 即表示您同意 Bird's

Sign up

为营销、支持和财务提供的人工智能驱动平台

点击 "获取演示" 即表示您同意 Bird's

Channels

Grow

Engage

Automate

APIs

Resources

Company

Socials

生长

管理

自动化

生长

管理

自动化