首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用SparkContext.submitJob调用REST

如何使用SparkContext.submitJob调用REST
EN

Stack Overflow用户
提问于 2020-03-12 11:41:04
回答 1查看 543关注 0票数 1

请有人提供一个submitJob方法调用的例子。

在这里找到参考资料:How to execute async operations (i.e. returning a Future) from map/filter/etc.?

我相信我可以为我的用例实现它。

在我当前的实现中,我使用paritions调用并行调用,但是在调用下一个调用之前,它们正在等待响应。

代码语言:javascript
复制
Dataframe.rdd.reparition(TPS allowed on API)
.map(row => {
            val response = callApi(row)
            parse(response)
    })

但是,由于API结束时存在延迟,我在解析之前等待10秒的响应,然后再进行下一个调用。我有一个100 TPS,但是当前的逻辑只看到4-7 TPS。

如果有人使用过SparkContext.submitJob,请提供一个例子,因为我是新的火花和scala。

我想在不等待响应的情况下调用这些调用,确保100个TPS,然后一旦收到响应,我就想在响应的基础上解析和创建Dataframe。

我以前尝试过收集行并从主节点调用API调用,似乎受到创建大型线程池的硬件的限制。

submitJobT, U, R:SimpleFutureActionR

Rdd - rdd从我的数据

分析-我的rdd已经被分区了,我是否在rdd中为No.of.partitions提供了范围0?

processPartition -是我的callApi()吗?

resultHandler -不确定这里该做什么

resultFunc -我相信这将是在分析我的反应

如何在SimpleFutureAction之后创建数据帧

有人能帮忙吗?

EN

回答 1

Stack Overflow用户

发布于 2020-03-12 13:53:20

submitJob不会自动使API调用更快。它是Spark的并行处理的低级别实现的一部分--将火花操作分解成作业,然后将它们提交给任何已经到位的集群调度器。调用submitJob就像启动一个Java线程--作业将异步运行,但不会比简单地调用dataframe/RDD上的操作更快。

IMHO您最好的选择是使用mapPartitions,它允许您在每个分区的上下文中运行一个函数。您已经对数据进行了分区,因此为了确保最大的并发性,只需确保您有足够的Spark执行器来实际同时运行这些分区:

代码语言:javascript
复制
df.rdd.repartition(#concurrent API calls)
  .mapPartitions(partition => {
    partition.map(row => {
      val response = callApi(row)
      parse(response)
    })
  })
  .toDF("col1", "col2", ...)

mapPartitions需要一个将Iterator[T] (单个分区中的所有数据)映射到Iterator[U] (转换分区)并返回RDD[U]的函数。转换回dataframe是用适当的列名链接对toDF()的调用的问题。

您可能希望在callApi中实现某种类型的每线程速率限制,以确保没有单个执行器每秒触发大量请求。请记住,执行者可以在单独的线程和/或单独的JVM中运行。

当然,只打电话给mapPartitions就什么都没有了。您需要在结果的dataframe上触发一个操作,以使API调用真正触发。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60653409

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档