下载地址
运行
./bin/logstash -f logstash.conf
新增一个配置文件 first-pipeline.conf
配置说明 从 filebeat 中收集日志 然后输出到标准输出和 elasticcsearch
input{ beats{ port => "5044" } } filter { # grok { # match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} % {NUMBER:bytes} %{NUMBER:duration}" } # } json { source => "message" #要解析的字段名 target => "msg_json" #解析后的存储字段,默认和message同级别 } } output { stdout{ codec => rubydebug } elasticsearch { hosts => [ "localhost:9200" ] } }
检查并启动
./bin/logstash -f config/first-pipeline.conf --config.test_and_exit
自动加载配置
./bin/logstash -f config/first-pipeline.conf --config.reload.automatic
grok 插件 适合系统日志
日志示例
55.3.244.1 GET /index.html 15824 0.043 #filter grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } }
json 插件
json { source => "message" #要解析的字段名 target => "msg_json" #解析后的存储字段,默认和message同级别 }
nginx 的过滤器
grok { match => { "message" => "%{IP:client} - - \[%{HTTPDATE:logdate}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"%{NOTSPACE:ref}\" \"%{DATA:user_agent}\"" } } date { match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } kv { source => "request" field_split => "&?" value_split => "=" } urldecode { all_fields => true }
start_position
从哪个位置读取文件数据,默认从尾部,值为:end
如果要导入历史数据则设置成:beginning
input { file { path => ["/opt/flights2.csv”] start_position => “beginning” } }
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于