现象:logstash消费kafka数据写入到es中,但是es中所有数据都有双份,确认没有多个消费程序同时消费。经过查找发现是logstash多配置文件造成的重复写入
背景
假设现在给Logstash的pipeline配置了2个conf,也就是2个输入源。如果不做任何处理,那么所有的Filter和Output都会同时触发,如下图:
这显然跟我们期望的不同,我们希望Logstash按以下的方式来处理,也就是各自区分,独立处理:
解决方案
官方提供有 type 和tags 配置项进行区分,type 和 tags 是 logstash 事件中两个特殊的字段。
通常来说我们会在输入区段中通过 type 来标记事件类型 —— 我们肯定是提前能知道这个事件属于什么类型的。而tags 则是在数据处理过程中,由具体的插件来添加或者删除的。
如下所示:
input { jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/igp?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull" jdbc_user => "es_igper" jdbc_password => "password" jdbc_driver_library => "/usr/local/logstash/tools/mysql-connector-java-8.0.23.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "10000" plugin_timezone => "local" clean_run => false use_column_value => true tracking_column => update_time tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/usr/local/logstash/config/igp/run/live_v4_run" statement => "select live.* from live where live.update_time>:sql_last_value order by live.update_time asc,live.id asc" schedule => "* * * * *" type => "live" } } filter { if [type]=="live" { mutate { remove_field => ["username","passwd","wx_openid"] } } } output { if [type]=="live" { elasticsearch { hosts => ["localhost:9200"] index => "live_v4" document_id => "%{id}" user => "elastic" password => "password" } stdout { codec => json_lines } } }
以上是通过type区分,传入的数据中带有type字段/tags字段,会覆盖logstash配置文件设置的type属性值,这是个大坑,注意
input { jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/igp?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull" jdbc_user => "es_igper" jdbc_password => "password" jdbc_driver_library => "/usr/local/logstash/tools/mysql-connector-java-8.0.23.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "10000" plugin_timezone => "local" clean_run => false use_column_value => true tracking_column => update_time tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/usr/local/logstash/config/igp/run/live_v4_run" statement => "select live.* from live where live.update_time>:sql_last_value order by live.update_time asc,live.id asc" schedule => "* * * * *" tags=> ["live"] } } filter { if "live" in [tags]{ mutate { remove_field => ["username","passwd","wx_openid"] } } } output { if "live" in [tags]{ elasticsearch { hosts => ["localhost:9200"] index => "live_v4" document_id => "%{id}" user => "elastic" password => "password" } stdout { codec => json_lines } } if "user" in [tags]{ elasticsearch { hosts => ["localhost:9200"] index => "user_v4" document_id => "%{id}" user => "elastic" password => "password" } stdout { codec => json_lines } } }
通过以上方式配置,就能将pipeline各自区分开。大家可以试下,如果把type参数去掉,最后的elasticsearch数据会混或者数据重复等奇怪问题
原因
logstash启动后会把多个配置文件自动合并成一个,比如有两个conf文件,每个都有 filter、ouput,写入一条数据,会执行两次filter和output,重复写入