我正在尝试使用Apache Beam2.16.0构建一个处理大量XML文件的管道。平均计数为每24小时7000万次,在高峰负载时可以达到5亿次。文件大小从~1kb到200kb不等(有时可能更大,例如30mb)
文件经过各种转换,最终目标是BigQuery表,以便进一步分析。因此,我首先读取xml文件,然后反序列化为POJO (在Jackson的帮助下),然后应用所有需要的转换。转换的速度非常快,在我的机器上,根据文件大小的不同,我每秒可以得到大约40000次转换。
我主要关心的是文件读取速度。我有一种感觉,所有的阅读都是通过一个工作者完成的,我不明白这怎么能与之相提并论。我在10k测试文件数据集上进行了测试。
在我的本地机器(MacBook pro 2018:固态硬盘,16 gb内存和6核i7 cpu)上的批处理作业可以解析大约750个文件/秒。如果我在DataFlow上使用n1-standard-4机器运行它,我每秒只能获得大约75个文件。它通常不会扩展,但即使它扩展了(有时高达15个工作进程),我也只能获得大约350个文件/秒。
更有趣的是流媒体作业。它立即从6-7个工作进程开始,在UI上我可以看到1200-1500个元素/秒,但通常它不会显示速度,如果我选择页面上的最后一项,它显示它已经处理了10000个元素。
批处理作业和流作业之间的唯一区别是FileIO的以下选项:
.continuously(Duration.standardSeconds(10), Watch.Growth.never()))
为什么这会在处理速度上造成如此大的差异?
运行参数:
--runner=DataflowRunner
--project=<...>
--inputFilePattern=gs://java/log_entry/*.xml
--workerMachineType=n1-standard-4
--tempLocation=gs://java/temp
--maxNumWorkers=100Run地域和bucket地域相同。
管道:
pipeline.apply(
FileIO.match()
.withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
.filepattern(options.getInputFilePattern())
.continuously(Duration.standardSeconds(10), Watch.Growth.never()))
.apply("xml to POJO", ParDo.of(new XmlToPojoDoFn()));xml文件示例:
<LogEntry><EntryId>0</EntryId>
<LogValue>Test</LogValue>
<LogTime>12-12-2019</LogTime>
<LogProperty>1</LogProperty>
<LogProperty>2</LogProperty>
<LogProperty>3</LogProperty>
<LogProperty>4</LogProperty>
<LogProperty>5</LogProperty>
</LogEntry>现实生活中的文件和项目要复杂得多,有许多嵌套的节点和大量的转换规则。
GitHub:https://github.com/costello-art/dataflow-file-io上的简化代码它只包含“瓶颈”部分读取文件和反序列化为POJO。
如果我可以在我的机器上处理大约750个文件/秒(这是一个强大的工作进程),那么我预计在数据流中类似的10个工作进程上将有大约7500个文件/秒。
发布于 2020-01-14 23:44:16
我试图编写一个具有某些功能的测试代码,以检查FileIO.match的行为和workers的数量1。
在这段代码中,我将值numWorkers设置为50,但您可以设置所需的值。我可以看到的是,FileIO.match方法将查找与这些模式匹配的所有链接,但在此之后,您必须分别处理每个文件的内容。
例如,在我的示例中,我创建了一个接收每个文件的方法,然后将内容除以"new_line (\n)“字符(但在这里,您可以随心所欲地处理它,它还取决于文件的类型,csv,xml,...)。
因此,我将每一行转换为TableRow,格式化BigQuery理解的格式,并分别返回每个值(out.output(tab)),这样,数据流将根据管道的工作负载来处理不同工作进程中的行,例如,3个不同工作进程中的3000行,每个工作进程有1000行。
最后,由于它是批处理过程,数据流将等待处理所有行,然后将其插入到BigQuery中。
我希望这个测试代码能帮助你完成你的任务。
https://stackoverflow.com/questions/59382913
复制相似问题