下载地址
运行
./bin/logstash -f logstash.conf
新增一个配置文件 first-pipeline.conf
配置说明 从 filebeat 中收集日志 然后输出到标准输出和 elasticcsearch
input{
beats{
port => "5044"
}
}
filter {
# grok {
# match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} % {NUMBER:bytes} %{NUMBER:duration}" }
# }
json {
source => "message" #要解析的字段名
target => "msg_json" #解析后的存储字段,默认和message同级别
}
}
output {
stdout{
codec => rubydebug
}
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
检查并启动
./bin/logstash -f config/first-pipeline.conf --config.test_and_exit
自动加载配置
./bin/logstash -f config/first-pipeline.conf --config.reload.automatic
grok 插件 适合系统日志
日志示例
55.3.244.1 GET /index.html 15824 0.043
#filter
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
json 插件
json {
source => "message" #要解析的字段名
target => "msg_json" #解析后的存储字段,默认和message同级别
}
nginx 的过滤器
grok {
match => { "message" => "%{IP:client} - - \[%{HTTPDATE:logdate}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"%{NOTSPACE:ref}\" \"%{DATA:user_agent}\"" }
}
date {
match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
kv {
source => "request"
field_split => "&?"
value_split => "="
}
urldecode {
all_fields => true
}
start_position
从哪个位置读取文件数据,默认从尾部,值为:end
如果要导入历史数据则设置成:beginning
input {
file {
path => ["/opt/flights2.csv”]
start_position => “beginning”
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于