ELK 实时日志分析

本贴最后更新于 2316 天前,其中的信息可能已经斗转星移

前言

线上系统有没有报错,如果你不看日志是不知道的。但是一般你线上的应用会很多,如果你每个应用看过来,那也是不太合理的,而且这个也只能开发或者运维来看,对于产品、前端开发等都是比较困难的,那些日志结构也是比较难看懂。还有就是他的延迟会比较高,一般都是用户反馈了,我们才会去看报错日志,这样对我们都很被动。这个是比较传统的做法。

那么有没有办法能实时对日志进行分析,发现错误立刻发出通知呢?
答案是肯定的,这里介绍 ELK 来实时分析日志,那么什么是 ELK,它是 Elasticsearch+Logstash+Kibana 这三个的缩写。
Elasticsearch:提供存储、快速搜索功能;
Logstash:日志收集,并传输到 Elasticsearch 里;
Kibana:在 Elasticsearch 中的数据进行可视化;

Elasticsearc 安装及配置

1、安装

linux 安装
1)下载地址

curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.tar.gz

2)解压

tar -xvf elasticsearch-6.5.4.tar.gz

windows 安装
1)下载 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.msi
2)安装
根据提示一步步进行安装就可以了。

2、启动

linux

cd elasticsearch-6.5.4/bin ./elasticsearch

windows

cd elasticsearch-6.5.4/bin ./elasticsearch.bat

Logstash 安装

1、安装

windows 安装
https://www.elastic.co/cn/downloads/logstash
下载你需要的包

linux 安装
1)创建 repo
在/etc/yum.repos.d/目录下,创建 logstash.repo,文件内容如下

[logstash-6.x] name=Elastic repository for 6.x packages baseurl=https://artifacts.elastic.co/packages/6.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md

2)安装

sudo yum install logstash

2、配置

在 config 文件夹下,新建 my-logstash.conf 文件

input { tcp { mode => "server" host => "127.0.0.1" port => 4000 tags => ["es_tags"] codec => "json_lines" } } output { elasticsearch { action => "index" hosts => "localhost:9200" index => "applog-%{YYYYMMDD}" document_type => "log" } }

3、运行

windows

cd bin logstash.bat -f ../config/my-logstash.conf

linux

cd bin logstash -f ../config/my-logstash.conf

在 logstash-6.5.4\logs 下查看日志输出

注意:这里启动的时候,需要看 logstash 是否启动正常

[ERROR][logstash.config.sourceloader] No configuration found in the configured sources.

说明启动没有成功。是因为配置文件找不到。
No config files found in path {:path=>"E:/worksoft/elk/logstash-6.5.4/bin/my-logstash.conf"}
修改启动命令,把配置文件修改到正确的地址,启动完之后的界面是
imagepng
鼠标是一直停留在界面的,不会跳出来。

页面访问
imagepng

Kibana 安装

1、安装

linux 安装
1)安装

wget https://artifacts.elastic.co/downloads/kibana/kibana-6.5.4-linux-x86_64.tar.gz shasum -a 512 kibana-6.5.4-linux-x86_64.tar.gz tar -xzf kibana-6.5.4-linux-x86_64.tar.gz cd kibana-6.5.4-linux-x86_64/

2)运行

./bin/kibana

windows 安装
1)解压
2)运行

.\bin\kibana.bat

2、配置

如果你的 elasticsearch 设置了用户名和密码,那么这里也需要设置
elasticsearch.username:登录用户名
elasticsearch.password:登录密码
server.port: 5601
server.host: "localhost"

3、访问地址

http://localhost:5601

整体设计

imagepng
没有一个设计是完美的,那么这个设计会存在什么问题呢?
1、logstash 的吞吐量会成为瓶颈
2、与应用层耦合
3、数据丢失问题

基于 kafka 设计

imagepng

kafka 启动

cd bin windows\kafka-server-start.bat ..\config\server.properties

遇到问题

Error while fetching metadata with correlation id

解决
server.properties 文件里修改这个配置
imagepng

logstash 下新增 kafka-logstash.conf 文件

input { kafka { topics => ["test"] bootstrap_servers => "localhost:9092" type => "kafka-logs" consumer_threads =>5 decorate_events => true } } output { elasticsearch { action => "index" hosts => "localhost:9200" index => "kafka-log-%{YYYYMMDD}" } }

java 代码

把日志输送到 logstash 里面,创建一个项目并配置到日志
pom 引入

<dependency> <groupId>net.logstash.logback</groupId> <artifactId>logstash-logback-encoder</artifactId> <version>5.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>

logback.xml

<?xml version="1.0" encoding="UTF-8"?> <configuration debug="false" scan="true" scanPeriod="30 second"> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%date [%-5level] [%thread] %logger{80} - %msg%X%n</pattern> </encoder> </appender> <!--<appender name="logstash"--> <!--class="net.logstash.logback.appender.LogstashTcpSocketAppender">--> <!--&lt;!&ndash; destination:这里的这个地址要跟logstash那边相同 &ndash;&gt;--> <!--<destination>localhost:4000</destination>--> <!--<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>--> <!--</appender>--> <appender name="kafkaAppender" class="com.cimu.elk.custom.KafkaAppender"> <topic>test</topic> <brokerServer>127.0.0.1:9092</brokerServer> </appender> <root level="INFO"> <appender-ref ref="STDOUT"/> <!-- 简单的日志传输 --> <!-- <appender-ref ref="logstash"/> --> <!-- 基于kafka的日志传输 --> <appender-ref ref="kafkaAppender"/> </root> </configuration>

Controller 类

@Controller @Slf4j public class TestController { @GetMapping("/test") public String test(){ log.info("我的测试日志"); return "success"; } }

KafkaAppender 类:

package com.cimu.elk.custom; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.UnsynchronizedAppenderBase; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; /** * Title: KafkaAppender * Copyright: Copyright (c) 2017 * * date 2019年01月09日 12:22 */public class KafkaAppender extends UnsynchronizedAppenderBase { private String topic; private String brokerServer; //kafka生产者 private Producer, String> producer; @Override public void start() { super.start(); Properties props = new Properties(); props.put("bootstrap.servers", this.brokerServer); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } @Override public void stop() { super.stop(); producer.close(); } @Override protected void append(ILoggingEvent event) { String msg = event.getMessage(); //拼接消息内容 ProducerRecord, String> producerRecord = new ProducerRecord, String>( this.topic, 0, "key", msg); System.out.println("[推送数据]:" + producerRecord); //发送kafka的消息 producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //监听发送结果 if (exception != null) { exception.printStackTrace(); } else { System.out.println("[推送数据到kafka成功]:" + metadata); } } }); } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getBrokerServer() { return brokerServer; } public void setBrokerServer(String brokerServer) { this.brokerServer = brokerServer; } public Producer, String> getProducer() { return producer; } public void setProducer(Producer, String> producer) { this.producer = producer; } }

源码地址

项目启动之后,访问接口,模拟日志输出,在 kibana 里面配置 index pattern,就可以查询日志了。
添加 index pattern
imagepng
查询日志
imagepng

也可以关注我的公众号:程序之声
关注公众号,回复:下载
获取百度下载神器:
imagepng

  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1063 引用 • 3455 回帖 • 159 关注
  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    143 引用 • 442 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...