我写了一个基于云函数的云存储触发器。我有10-15个文件在云存储桶中以5秒的间隔登陆,它将数据加载到bigquery表中(截断并加载)。
当存储桶中有10个文件时,我希望云函数以顺序的方式处理它们,即一次处理一个文件,因为所有文件都访问相同的表进行操作。
目前云函数一次触发多个文件,BIgquery操作失败,因为多个文件尝试访问同一个表。
有没有办法在云函数中配置??
提前感谢!
发布于 2020-07-15 20:59:19
您可以通过使用pubsub和云函数上的最大实例参数来实现这一点。
编辑
多亏了你的代码,我明白了发生了什么。实际上,BigQuery是一个声明性系统。当您执行请求或加载作业时,将创建一个作业并在后台运行。
在python中,您可以显式地等待作业的结束,但是在pandas中,我不知道如何完成!!
我刚找到了一个Google Cloud page to explain how to migrate from pandas to BigQuery client library。如您所见,在结尾处有一行
# Wait for the load job to complete.
job.result()也不愿等到工作结束。
您在_insert_into_bigquery_dwh函数中做得很好,但在staging _insert_into_bigquery_staging one中并非如此。这可能会导致两个问题:
发布于 2020-07-17 19:47:30
您描述的体系结构与您linked文档中的体系结构不同。请注意,在流程图和代码样本中,存储事件将触发云函数,云函数将直接将数据流式传输到目标表。由于BigQuery允许多个流插入作业,因此可以同时执行几个函数,而不会出现问题。在您的用例中,用于加载write-truncate进行数据清理的中间表有很大的不同,因为每次执行都需要前一个执行完成,因此需要顺序处理方法。
我想指出的是,PubSub不允许配置发送消息的速率,如果10条消息到达主题,它们都将被发送到订阅者,即使一次处理一条。由于上述原因,将函数限制为一个实例可能会导致开销,并可能增加延迟。也就是说,由于预期的工作负载是每天15-30个文件,上述可能不是什么大问题。
如果您希望并行执行,可以尝试为每条消息创建一个新表,并使用table.expires(exp_datetime)设置方法为其设置一个较短的过期期限,这样多次执行就不会相互冲突。下面是相关的库reference。否则,来自Guillaume的伟大答案将完全完成这项工作。
https://stackoverflow.com/questions/62913146
复制相似问题