前言
线上系统有没有报错,如果你不看日志是不知道的。但是一般你线上的应用会很多,如果你每个应用看过来,那也是不太合理的,而且这个也只能开发或者运维来看,对于产品、前端开发等都是比较困难的,那些日志结构也是比较难看懂。还有就是他的延迟会比较高,一般都是用户反馈了,我们才会去看报错日志,这样对我们都很被动。这个是比较传统的做法。
那么有没有办法能实时对日志进行分析,发现错误立刻发出通知呢?
答案是肯定的,这里介绍 ELK 来实时分析日志,那么什么是 ELK,它是 Elasticsearch+Logstash+Kibana 这三个的缩写。
Elasticsearch:提供存储、快速搜索功能;
Logstash:日志收集,并传输到 Elasticsearch 里;
Kibana:在 Elasticsearch 中的数据进行可视化;
Elasticsearc 安装及配置
1、安装
linux 安装
1)下载地址
curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.tar.gz
2)解压
tar -xvf elasticsearch-6.5.4.tar.gz
windows 安装
1)下载 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.msi
2)安装
根据提示一步步进行安装就可以了。
2、启动
linux
cd elasticsearch-6.5.4/bin ./elasticsearch
windows
cd elasticsearch-6.5.4/bin ./elasticsearch.bat
Logstash 安装
1、安装
windows 安装
https://www.elastic.co/cn/downloads/logstash
下载你需要的包
linux 安装
1)创建 repo
在/etc/yum.repos.d/目录下,创建 logstash.repo,文件内容如下
[logstash-6.x] name=Elastic repository for 6.x packages baseurl=https://artifacts.elastic.co/packages/6.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
2)安装
sudo yum install logstash
2、配置
在 config 文件夹下,新建 my-logstash.conf 文件
input { tcp { mode => "server" host => "127.0.0.1" port => 4000 tags => ["es_tags"] codec => "json_lines" } } output { elasticsearch { action => "index" hosts => "localhost:9200" index => "applog-%{YYYYMMDD}" document_type => "log" } }
3、运行
windows
cd bin logstash.bat -f ../config/my-logstash.conf
linux
cd bin logstash -f ../config/my-logstash.conf
在 logstash-6.5.4\logs 下查看日志输出
注意:这里启动的时候,需要看 logstash 是否启动正常
[ERROR][logstash.config.sourceloader] No configuration found in the configured sources.
说明启动没有成功。是因为配置文件找不到。
No config files found in path {:path=>"E:/worksoft/elk/logstash-6.5.4/bin/my-logstash.conf"}
修改启动命令,把配置文件修改到正确的地址,启动完之后的界面是
鼠标是一直停留在界面的,不会跳出来。
页面访问
Kibana 安装
1、安装
linux 安装
1)安装
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.5.4-linux-x86_64.tar.gz shasum -a 512 kibana-6.5.4-linux-x86_64.tar.gz tar -xzf kibana-6.5.4-linux-x86_64.tar.gz cd kibana-6.5.4-linux-x86_64/
2)运行
./bin/kibana
windows 安装
1)解压
2)运行
.\bin\kibana.bat
2、配置
如果你的 elasticsearch 设置了用户名和密码,那么这里也需要设置
elasticsearch.username:登录用户名
elasticsearch.password:登录密码
server.port: 5601
server.host: "localhost"
3、访问地址
http://localhost:5601
整体设计
没有一个设计是完美的,那么这个设计会存在什么问题呢?
1、logstash 的吞吐量会成为瓶颈
2、与应用层耦合
3、数据丢失问题
基于 kafka 设计
kafka 启动
cd bin windows\kafka-server-start.bat ..\config\server.properties
遇到问题
Error while fetching metadata with correlation id
解决
server.properties 文件里修改这个配置
logstash 下新增 kafka-logstash.conf 文件
input { kafka { topics => ["test"] bootstrap_servers => "localhost:9092" type => "kafka-logs" consumer_threads =>5 decorate_events => true } } output { elasticsearch { action => "index" hosts => "localhost:9200" index => "kafka-log-%{YYYYMMDD}" } }
java 代码
把日志输送到 logstash 里面,创建一个项目并配置到日志
pom 引入
<dependency> <groupId>net.logstash.logback</groupId> <artifactId>logstash-logback-encoder</artifactId> <version>5.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency>
logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration debug="false" scan="true" scanPeriod="30 second"> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%date [%-5level] [%thread] %logger{80} - %msg%X%n</pattern> </encoder> </appender> <!--<appender name="logstash"--> <!--class="net.logstash.logback.appender.LogstashTcpSocketAppender">--> <!--<!– destination:这里的这个地址要跟logstash那边相同 –>--> <!--<destination>localhost:4000</destination>--> <!--<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>--> <!--</appender>--> <appender name="kafkaAppender" class="com.cimu.elk.custom.KafkaAppender"> <topic>test</topic> <brokerServer>127.0.0.1:9092</brokerServer> </appender> <root level="INFO"> <appender-ref ref="STDOUT"/> <!-- 简单的日志传输 --> <!-- <appender-ref ref="logstash"/> --> <!-- 基于kafka的日志传输 --> <appender-ref ref="kafkaAppender"/> </root> </configuration>
Controller 类
@Controller @Slf4j public class TestController { @GetMapping("/test") public String test(){ log.info("我的测试日志"); return "success"; } }
KafkaAppender 类:
package com.cimu.elk.custom; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.UnsynchronizedAppenderBase; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; /** * Title: KafkaAppender * Copyright: Copyright (c) 2017 * * date 2019年01月09日 12:22 */public class KafkaAppender extends UnsynchronizedAppenderBase { private String topic; private String brokerServer; //kafka生产者 private Producer, String> producer; @Override public void start() { super.start(); Properties props = new Properties(); props.put("bootstrap.servers", this.brokerServer); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } @Override public void stop() { super.stop(); producer.close(); } @Override protected void append(ILoggingEvent event) { String msg = event.getMessage(); //拼接消息内容 ProducerRecord, String> producerRecord = new ProducerRecord, String>( this.topic, 0, "key", msg); System.out.println("[推送数据]:" + producerRecord); //发送kafka的消息 producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //监听发送结果 if (exception != null) { exception.printStackTrace(); } else { System.out.println("[推送数据到kafka成功]:" + metadata); } } }); } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getBrokerServer() { return brokerServer; } public void setBrokerServer(String brokerServer) { this.brokerServer = brokerServer; } public Producer, String> getProducer() { return producer; } public void setProducer(Producer, String> producer) { this.producer = producer; } }
项目启动之后,访问接口,模拟日志输出,在 kibana 里面配置 index pattern,就可以查询日志了。
添加 index pattern
查询日志
也可以关注我的公众号:程序之声
关注公众号,回复:下载
获取百度下载神器:
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于