我试图创建一个数据管道,Logstash插件每5分钟通过SQL查询获取一些数据,ElasticSearch输出插件将输入插件中的数据放入ElasticSearch服务器。我希望这个输出插件能够部分更新ElasticSearch服务器中现有的文档。我的Logstash配置文件如下所示:
input {
jdbc {
jdbc_driver_library => "/Users/hello/logstash-2.3.2/lib/mysql-connector-java-5.1.34.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:13306/mysqlDB”
jdbc_user => “root”
jdbc_password => “1234”
last_run_metadata_path => "/Users/hello/.logstash_last_run_display"
statement => "SELECT * FROM checkout WHERE checkout_no between :sql_last_value + 1 and :sql_last_value + 5 ORDER BY checkout_no ASC"
schedule => “*/5 * * * *"
use_column_value => true
tracking_column => “checkout_no”
}
}
output {
stdout { codec => json_lines }
elasticsearch {
action => "update"
index => "ecs"
document_type => “checkout”
document_id => “%{checkout_no}"
hosts => ["localhost:9200"]
}
}问题是,ElasticSearch输出插件似乎没有调用部分更新API,例如/{index}/{type}/{id}/_update。手册只列出了诸如index、delete、create、update等操作,但没有提到REST的每个操作调用,即:update操作调用/{index}/{type}/{id}/_update还是/{index}/{type}/{id} API (向上插入)。我想调用部分更新api从弹性搜索输出插件?有可能吗?
发布于 2016-07-13 04:09:46
在我的生产脚本中设置doc_as_upsert => true和action => "update"工作。
output {
elasticsearch {
hosts => ["es_host"]
document_id => "%{id}" # !!! the id here MUST be the same
index => "logstash-my-index"
timeout => 30
workers => 1
doc_as_upsert => true
action => "update"
}
}发布于 2016-07-12 09:28:16
这是可能的。Elasticsearch输出插件有一系列upsert选项,它们对应于Elasticsearch中的选项:
upsert本身:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-upsertscripted_upsert:upsertdoc_as_upsert:upserthttps://stackoverflow.com/questions/38323392
复制相似问题