首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在云函数中实现文件的串行处理?

如何在云函数中实现文件的串行处理?
EN

Stack Overflow用户
提问于 2020-07-15 18:46:09
回答 2查看 547关注 0票数 0

我写了一个基于云函数的云存储触发器。我有10-15个文件在云存储桶中以5秒的间隔登陆,它将数据加载到bigquery表中(截断并加载)。

当存储桶中有10个文件时,我希望云函数以顺序的方式处理它们,即一次处理一个文件,因为所有文件都访问相同的表进行操作。

目前云函数一次触发多个文件,BIgquery操作失败,因为多个文件尝试访问同一个表。

有没有办法在云函数中配置??

提前感谢!

EN

回答 2

Stack Overflow用户

发布于 2020-07-15 20:59:19

您可以通过使用pubsub和云函数上的最大实例参数来实现这一点。

  • 然后,使用max instance set to 1创建一个HTTP函数(如果要应用过滤器,则需要使用http函数)。这样一来,一次只能执行一个函数。因此,没有concurrency!
  • Finally,在主题上创建PubSub订阅,无论是否带有过滤器,以便在HTTP中调用您的函数。

编辑

多亏了你的代码,我明白了发生了什么。实际上,BigQuery是一个声明性系统。当您执行请求或加载作业时,将创建一个作业并在后台运行。

在python中,您可以显式地等待作业的结束,但是在pandas中,我不知道如何完成!!

我刚找到了一个Google Cloud page to explain how to migrate from pandas to BigQuery client library。如您所见,在结尾处有一行

代码语言:javascript
复制
# Wait for the load job to complete.
job.result()

也不愿等到工作结束。

您在_insert_into_bigquery_dwh函数中做得很好,但在staging _insert_into_bigquery_staging one中并非如此。这可能会导致两个问题:

  • dwh函数处理旧数据,因为当您触发此作业时分段尚未完成
  • 如果分段花费10秒并在“后台”中运行(您不需要在代码中显式等待结束),而dwh花费1秒,则在dwh函数结束时处理下一个文件,即使分段文件继续在后台运行。这就导致了你的问题。
票数 0
EN

Stack Overflow用户

发布于 2020-07-17 19:47:30

您描述的体系结构与您linked文档中的体系结构不同。请注意,在流程图和代码样本中,存储事件将触发云函数,云函数将直接将数据流式传输到目标表。由于BigQuery允许多个流插入作业,因此可以同时执行几个函数,而不会出现问题。在您的用例中,用于加载write-truncate进行数据清理的中间表有很大的不同,因为每次执行都需要前一个执行完成,因此需要顺序处理方法。

我想指出的是,PubSub不允许配置发送消息的速率,如果10条消息到达主题,它们都将被发送到订阅者,即使一次处理一条。由于上述原因,将函数限制为一个实例可能会导致开销,并可能增加延迟。也就是说,由于预期的工作负载是每天15-30个文件,上述可能不是什么大问题。

如果您希望并行执行,可以尝试为每条消息创建一个新表,并使用table.expires(exp_datetime)设置方法为其设置一个较短的过期期限,这样多次执行就不会相互冲突。下面是相关的库reference。否则,来自Guillaume的伟大答案将完全完成这项工作。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62913146

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档