ELK 环境搭建之 logstash

本贴最后更新于 2152 天前,其中的信息可能已经沧海桑田

下载地址

运行

./bin/logstash -f logstash.conf

新增一个配置文件 first-pipeline.conf

配置说明 从 filebeat 中收集日志 然后输出到标准输出和 elasticcsearch

input{     beats{         port => "5044"     } } filter {     # grok {     # match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %    {NUMBER:bytes} %{NUMBER:duration}" }     # }     json {         source => "message" #要解析的字段名         target => "msg_json" #解析后的存储字段,默认和message同级别     } } output {     stdout{         codec => rubydebug     }     elasticsearch {         hosts => [ "localhost:9200" ]     } }

检查并启动

./bin/logstash -f config/first-pipeline.conf --config.test_and_exit

自动加载配置

./bin/logstash -f config/first-pipeline.conf --config.reload.automatic

grok 插件  适合系统日志

日志示例

55.3.244.1 GET /index.html 15824 0.043 #filter grok {  match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } }

json 插件

json {     source => "message" #要解析的字段名     target => "msg_json" #解析后的存储字段,默认和message同级别 }

nginx 的过滤器

grok { match => { "message" => "%{IP:client} - - \[%{HTTPDATE:logdate}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"%{NOTSPACE:ref}\" \"%{DATA:user_agent}\"" } } date { match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } kv { source => "request" field_split => "&?" value_split => "=" } urldecode { all_fields => true }

start_position

从哪个位置读取文件数据,默认从尾部,值为:end
如果要导入历史数据则设置成:beginning 

input {     file {         path => ["/opt/flights2.csv”]         start_position => “beginning”      } }
  • ELK
    23 引用 • 18 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...