Mongodb有两种类型的$oid引用-
类型1-
//MongoDB
city_id : "5fe3206428bf745876649fd3"
//Kafka Message
city_id : {
"$oid": "5fe3206428bf745876649fd3"
}类型2-
//MongoDB
city_ids : ["5fe3206428bf745876649fd3","5fe3206428bf745876649fd3","5fe3206428bf745876649fd3"]
//Kafka Message
city_ids : [
{
"$oid": "5fe3206428bf745876649fd3"
},
{
"$oid": "5fe3206428bf745876649fd3"
},
{
"$oid": "5fe3206428bf745876649fd3"
}
]如何在logstash中处理这两种类型,以便获得elasticsearch的确切数据结构,因为它保存在MongoDB中。
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["users","organisations","cities"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
rename => { "[payload]" => "document"}
remove_field => ["message","json_payload","payload"]
add_field => {
"[es_index]" => "%{[@metadata][kafka][topic]}"
"[mongo_id]" => "%{[document][_id][$oid]}"
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}这是上一个question的后续版本。
发布于 2020-12-30 06:25:59
下面的代码将为每个包含$oid条目的字段动态执行此操作。它对结构做了很多假设--如果它包含$oid条目,那么就只保留这些条目。
ruby {
code => '
event.to_hash.each { |k, v|
if v.is_a? Hash
if v["$oid"]
event.set(k, v["$oid"])
end
end
if v.is_a? Array
if v[0]["$oid"]
a = []
v.each { |x| a << x["$oid"] }
event.set(k, a)
end
end
}
'
}https://stackoverflow.com/questions/65463897
复制相似问题