ELK 环境搭建之 logstash

本贴最后更新于 2025 天前,其中的信息可能已经沧海桑田

下载地址

运行

./bin/logstash -f logstash.conf

新增一个配置文件 first-pipeline.conf

配置说明 从 filebeat 中收集日志 然后输出到标准输出和 elasticcsearch

input{

    beats{

        port => "5044"

    }

}

filter {

    # grok {

    # match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %    {NUMBER:bytes} %{NUMBER:duration}" }

    # }

    json {

        source => "message" #要解析的字段名

        target => "msg_json" #解析后的存储字段,默认和message同级别

    }

}

output {

    stdout{

        codec => rubydebug

    }

    elasticsearch {

        hosts => [ "localhost:9200" ]

    }

}

检查并启动

./bin/logstash -f config/first-pipeline.conf --config.test_and_exit

自动加载配置

./bin/logstash -f config/first-pipeline.conf --config.reload.automatic

grok 插件  适合系统日志

日志示例

55.3.244.1 GET /index.html 15824 0.043

#filter

grok {

 match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }

}

json 插件

json {

    source => "message" #要解析的字段名

    target => "msg_json" #解析后的存储字段,默认和message同级别

}

nginx 的过滤器

grok {

match => { "message" => "%{IP:client} - - \[%{HTTPDATE:logdate}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"%{NOTSPACE:ref}\" \"%{DATA:user_agent}\"" }

}

date {

match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]

target => "@timestamp"

}

kv {

source => "request"

field_split => "&?"

value_split => "="

}

urldecode {

all_fields => true

}

start_position

从哪个位置读取文件数据,默认从尾部,值为:end
如果要导入历史数据则设置成:beginning 

input {

    file {

        path => ["/opt/flights2.csv”]

        start_position => “beginning”

     }

}
  • ELK
    23 引用 • 18 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...