我有一个应用程序,在这个应用程序中,我读取csv文件并进行一些转换,然后将它们推送到火花本身的弹性搜索中。像这样
input.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.resource", "{date}/" + type).save()我有几个节点,在每个节点中,我运行5-6个spark-submit命令,这些命令推送到elasticsearch。
我经常犯错误
Could not write all entries [13/128] (Maybe ES was overloaded?). Error sample (first [5] error messages):
rejected execution of org.elasticsearch.transport.TransportService$7@32e6f8f8 on EsThreadPoolExecutor[bulk, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@4448a084[Running, pool size = 4, active threads = 4, queued tasks = 200, completed tasks = 451515]]我的Elasticsearch集群有以下数据-
Nodes - 9 (1TB space,
Ram >= 15GB ) More than 8 cores per node我修改了下列弹性参数
spark.es.batch.size.bytes=5000000
spark.es.batch.size.entries=5000
spark.es.batch.write.refresh=false谁能建议,我能修正什么来消除这些错误呢?
发布于 2018-03-28 03:16:54
这是因为大容量请求的传入速率大于elasticsearch集群所能处理的速率,并且大容量请求队列已满。
默认的大容量队列大小为200。
最好在客户端处理这个问题:
1)通过减少并发运行的星火提交命令的数量
( 2)在拒绝的情况下,通过调整es.batch.write.retry.count和es.batch.write.retry.wait来重试
示例:
es.batch.write.retry.wait = "60s"
es.batch.write.retry.count = 6在elasticsearch集群端:
1)检查每个索引中是否有太多的碎片,并尝试减少它。
这个博客很好地讨论了调整碎片数量的标准。
( 2)作为最后手段,增加大小
检查此博客,并对批量拒绝进行广泛讨论。
发布于 2018-03-27 23:44:36
ES集群中的大容量队列正在达到其容量(200)。试着增加它。有关如何更改大容量队列容量,请参阅此页面。
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-threadpool.html
也请检查另一个,所以回答OP有一个非常相似的问题,并通过增加大容量池大小来修复。
https://stackoverflow.com/questions/49377717
复制相似问题