我有1-一个单一的CSV文件和2-一个直播的KAFKA流。KAFKA stream引入了直播流日志,CSV文件包含元数据记录,我需要在将它们发送到Elastic Search之前将它们与流日志连接起来。
Kafka流日志和CSV记录示例:
KAFKA log: MachineID: 2424, MachineType: 1, MessageType: 9
CSV record: MachineID: 2424, MachineOwner: JohnDuo在发送到ES之前,我需要在logstash中构建记录:
MachineID: 2424
MachineOwner: JohnDuo
MachineType: 1
MessageType: 9我想要一个解决方案,可以是Ruby或Logstash插件,或者其他任何东西,以便读取此CSV文件一次,并将它们放入Logstash conf文件中。我需要将CSV文件的内容保存在内存中,否则CSV在每个实时Kafka日志上的查找会杀死我的Logstash性能。
发布于 2020-11-01 13:08:41
尝试translate筛选器。
你需要这样的东西。
filter {
translate {
dictionary_path => "/path/to/your/csv/file.csv"
field => "[MachineId]"
destination => "[MachineOwner]"
fallback => "not found"
}
}然后你在你的file.csv中,你将拥有以下内容。
2424,JohnDuo
2425,AnotherUser对于每个具有字段MachineId的事件,此过滤器将在字典中查找此id,如果找到匹配值,它将创建一个名为MachineOwner的字段,如果没有找到匹配值,它将创建一个值为not found的字段MachineOwner,如果您不想在不匹配的情况下创建该字段,您可以删除fallback选项。
当logstash启动时,字典会加载到内存中,并且每隔300秒重新加载一次,您也可以更改该行为。
https://stackoverflow.com/questions/64628247
复制相似问题