首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Google Dataflow批处理文件处理性能差

Google Dataflow批处理文件处理性能差
EN

Stack Overflow用户
提问于 2019-12-18 06:18:22
回答 1查看 512关注 0票数 0

我正在尝试使用Apache Beam2.16.0构建一个处理大量XML文件的管道。平均计数为每24小时7000万次,在高峰负载时可以达到5亿次。文件大小从~1kb到200kb不等(有时可能更大,例如30mb)

文件经过各种转换,最终目标是BigQuery表,以便进一步分析。因此,我首先读取xml文件,然后反序列化为POJO (在Jackson的帮助下),然后应用所有需要的转换。转换的速度非常快,在我的机器上,根据文件大小的不同,我每秒可以得到大约40000次转换。

我主要关心的是文件读取速度。我有一种感觉,所有的阅读都是通过一个工作者完成的,我不明白这怎么能与之相提并论。我在10k测试文件数据集上进行了测试。

在我的本地机器(MacBook pro 2018:固态硬盘,16 gb内存和6核i7 cpu)上的批处理作业可以解析大约750个文件/秒。如果我在DataFlow上使用n1-standard-4机器运行它,我每秒只能获得大约75个文件。它通常不会扩展,但即使它扩展了(有时高达15个工作进程),我也只能获得大约350个文件/秒。

更有趣的是流媒体作业。它立即从6-7个工作进程开始,在UI上我可以看到1200-1500个元素/秒,但通常它不会显示速度,如果我选择页面上的最后一项,它显示它已经处理了10000个元素。

批处理作业和流作业之间的唯一区别是FileIO的以下选项:

.continuously(Duration.standardSeconds(10), Watch.Growth.never()))

为什么这会在处理速度上造成如此大的差异?

运行参数:

代码语言:javascript
复制
--runner=DataflowRunner
--project=<...>
--inputFilePattern=gs://java/log_entry/*.xml
--workerMachineType=n1-standard-4
--tempLocation=gs://java/temp
--maxNumWorkers=100

Run地域和bucket地域相同。

管道:

代码语言:javascript
复制
pipeline.apply(
  FileIO.match()
    .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)
    .filepattern(options.getInputFilePattern())
    .continuously(Duration.standardSeconds(10), Watch.Growth.never()))
  .apply("xml to POJO", ParDo.of(new XmlToPojoDoFn()));

xml文件示例:

代码语言:javascript
复制
<LogEntry><EntryId>0</EntryId>
    <LogValue>Test</LogValue>
    <LogTime>12-12-2019</LogTime>
    <LogProperty>1</LogProperty>
    <LogProperty>2</LogProperty>
    <LogProperty>3</LogProperty>
    <LogProperty>4</LogProperty>
    <LogProperty>5</LogProperty>
</LogEntry>

现实生活中的文件和项目要复杂得多,有许多嵌套的节点和大量的转换规则。

GitHub:https://github.com/costello-art/dataflow-file-io上的简化代码它只包含“瓶颈”部分读取文件和反序列化为POJO。

如果我可以在我的机器上处理大约750个文件/秒(这是一个强大的工作进程),那么我预计在数据流中类似的10个工作进程上将有大约7500个文件/秒。

EN

回答 1

Stack Overflow用户

发布于 2020-01-14 23:44:16

我试图编写一个具有某些功能的测试代码,以检查FileIO.match的行为和workers的数量1。

在这段代码中,我将值numWorkers设置为50,但您可以设置所需的值。我可以看到的是,FileIO.match方法将查找与这些模式匹配的所有链接,但在此之后,您必须分别处理每个文件的内容。

例如,在我的示例中,我创建了一个接收每个文件的方法,然后将内容除以"new_line (\n)“字符(但在这里,您可以随心所欲地处理它,它还取决于文件的类型,csv,xml,...)。

因此,我将每一行转换为TableRow,格式化BigQuery理解的格式,并分别返回每个值(out.output(tab)),这样,数据流将根据管道的工作负载来处理不同工作进程中的行,例如,3个不同工作进程中的3000行,每个工作进程有1000行。

最后,由于它是批处理过程,数据流将等待处理所有行,然后将其插入到BigQuery中。

我希望这个测试代码能帮助你完成你的任务。

[1] https://github.com/GonzaloPF/dataflow-pipeline/blob/master/java/randomDataToBQ/src/main/fromListFilestoBQ.java

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59382913

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档