logstash重复消费写入两次es问题

现象:logstash消费kafka数据写入到es中,但是es中所有数据都有双份,确认没有多个消费程序同时消费。经过查找发现是logstash多配置文件造成的重复写入


背景

假设现在给Logstash的pipeline配置了2个conf,也就是2个输入源。如果不做任何处理,那么所有的Filter和Output都会同时触发,如下图:

图片.png

这显然跟我们期望的不同,我们希望Logstash按以下的方式来处理,也就是各自区分,独立处理:

图片.png

解决方案

官方提供有 type 和tags 配置项进行区分,type 和 tags 是 logstash 事件中两个特殊的字段。

通常来说我们会在输入区段中通过 type 来标记事件类型 —— 我们肯定是提前能知道这个事件属于什么类型的。而tags 则是在数据处理过程中,由具体的插件来添加或者删除的。

如下所示:

input {
        jdbc {
                jdbc_connection_string => "jdbc:mysql://localhost:3306/igp?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull"
                        jdbc_user => "es_igper"
                        jdbc_password => "password"
                        jdbc_driver_library => "/usr/local/logstash/tools/mysql-connector-java-8.0.23.jar"
                        jdbc_driver_class => "com.mysql.jdbc.Driver"
                        jdbc_paging_enabled => "true"
                        jdbc_page_size => "10000"
                        plugin_timezone => "local"

                        clean_run => false
                        use_column_value => true
                        tracking_column => update_time
                        tracking_column_type => "timestamp"
                        record_last_run => true
                        last_run_metadata_path => "/usr/local/logstash/config/igp/run/live_v4_run"

                        statement => "select live.* from live where live.update_time>:sql_last_value order by live.update_time asc,live.id asc"

                        schedule => "* * * * *"

                        type => "live"
        }
}
filter {
        if [type]=="live" {
                mutate {
                        remove_field => ["username","passwd","wx_openid"]
                }
        }
}
output {
        if [type]=="live" {
                elasticsearch {
                        hosts => ["localhost:9200"]
                                index => "live_v4"
                                document_id => "%{id}"
                                user => "elastic"
                                password => "password"
                }
                stdout {
                        codec => json_lines
                }
        }
}

以上是通过type区分,传入的数据中带有type字段/tags字段,会覆盖logstash配置文件设置的type属性值,这是个大坑,注意

input {
        jdbc {
                jdbc_connection_string => "jdbc:mysql://localhost:3306/igp?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull"
                        jdbc_user => "es_igper"
                        jdbc_password => "password"
                        jdbc_driver_library => "/usr/local/logstash/tools/mysql-connector-java-8.0.23.jar"
                        jdbc_driver_class => "com.mysql.jdbc.Driver"
                        jdbc_paging_enabled => "true"
                        jdbc_page_size => "10000"
                        plugin_timezone => "local"

                        clean_run => false
                        use_column_value => true
                        tracking_column => update_time
                        tracking_column_type => "timestamp"
                        record_last_run => true
                        last_run_metadata_path => "/usr/local/logstash/config/igp/run/live_v4_run"

                        statement => "select live.* from live where live.update_time>:sql_last_value order by live.update_time asc,live.id asc"

                        schedule => "* * * * *"

                        tags=> ["live"]
        }
}
filter {
        if "live" in [tags]{
                mutate {
                        remove_field => ["username","passwd","wx_openid"]
                }
        }
}
output {
       if "live" in [tags]{
                elasticsearch {
                        hosts => ["localhost:9200"]
                                index => "live_v4"
                                document_id => "%{id}"
                                user => "elastic"
                                password => "password"
                }
                stdout {
                        codec => json_lines
                }
        }
         if "user" in [tags]{
                elasticsearch {
                        hosts => ["localhost:9200"]
                                index => "user_v4"
                                document_id => "%{id}"
                                user => "elastic"
                                password => "password"
                }
                stdout {
                        codec => json_lines
                }
        }
}

通过以上方式配置,就能将pipeline各自区分开。大家可以试下,如果把type参数去掉,最后的elasticsearch数据会混或者数据重复等奇怪问题


原因


logstash启动后会把多个配置文件自动合并成一个,比如有两个conf文件,每个都有 filter、ouput,写入一条数据,会执行两次filter和output,重复写入

 

给TA打赏
共{{data.count}}人
人已打赏
运维文档

supervisord启动es报错

2022-6-19 20:23:44

运维文档

python获取k8s中指定pod添加至consul

2022-7-13 10:33:39

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索