请有人提供一个submitJob方法调用的例子。
在这里找到参考资料:How to execute async operations (i.e. returning a Future) from map/filter/etc.?
我相信我可以为我的用例实现它。
在我当前的实现中,我使用paritions调用并行调用,但是在调用下一个调用之前,它们正在等待响应。
Dataframe.rdd.reparition(TPS allowed on API)
.map(row => {
val response = callApi(row)
parse(response)
})但是,由于API结束时存在延迟,我在解析之前等待10秒的响应,然后再进行下一个调用。我有一个100 TPS,但是当前的逻辑只看到4-7 TPS。
如果有人使用过SparkContext.submitJob,请提供一个例子,因为我是新的火花和scala。
我想在不等待响应的情况下调用这些调用,确保100个TPS,然后一旦收到响应,我就想在响应的基础上解析和创建Dataframe。
我以前尝试过收集行并从主节点调用API调用,似乎受到创建大型线程池的硬件的限制。
submitJobT, U, R:SimpleFutureActionR
Rdd - rdd从我的数据
分析-我的rdd已经被分区了,我是否在rdd中为No.of.partitions提供了范围0?
processPartition -是我的callApi()吗?
resultHandler -不确定这里该做什么
resultFunc -我相信这将是在分析我的反应
如何在SimpleFutureAction之后创建数据帧
有人能帮忙吗?
发布于 2020-03-12 13:53:20
submitJob不会自动使API调用更快。它是Spark的并行处理的低级别实现的一部分--将火花操作分解成作业,然后将它们提交给任何已经到位的集群调度器。调用submitJob就像启动一个Java线程--作业将异步运行,但不会比简单地调用dataframe/RDD上的操作更快。
IMHO您最好的选择是使用mapPartitions,它允许您在每个分区的上下文中运行一个函数。您已经对数据进行了分区,因此为了确保最大的并发性,只需确保您有足够的Spark执行器来实际同时运行这些分区:
df.rdd.repartition(#concurrent API calls)
.mapPartitions(partition => {
partition.map(row => {
val response = callApi(row)
parse(response)
})
})
.toDF("col1", "col2", ...)mapPartitions需要一个将Iterator[T] (单个分区中的所有数据)映射到Iterator[U] (转换分区)并返回RDD[U]的函数。转换回dataframe是用适当的列名链接对toDF()的调用的问题。
您可能希望在callApi中实现某种类型的每线程速率限制,以确保没有单个执行器每秒触发大量请求。请记住,执行者可以在单独的线程和/或单独的JVM中运行。
当然,只打电话给mapPartitions就什么都没有了。您需要在结果的dataframe上触发一个操作,以使API调用真正触发。
https://stackoverflow.com/questions/60653409
复制相似问题