前言
线上系统有没有报错,如果你不看日志是不知道的。但是一般你线上的应用会很多,如果你每个应用看过来,那也是不太合理的,而且这个也只能开发或者运维来看,对于产品、前端开发等都是比较困难的,那些日志结构也是比较难看懂。还有就是他的延迟会比较高,一般都是用户反馈了,我们才会去看报错日志,这样对我们都很被动。这个是比较传统的做法。
那么有没有办法能实时对日志进行分析,发现错误立刻发出通知呢?
答案是肯定的,这里介绍 ELK 来实时分析日志,那么什么是 ELK,它是 Elasticsearch+Logstash+Kibana 这三个的缩写。
Elasticsearch:提供存储、快速搜索功能;
Logstash:日志收集,并传输到 Elasticsearch 里;
Kibana:在 Elasticsearch 中的数据进行可视化;
Elasticsearc 安装及配置
1、安装
linux 安装
1)下载地址
curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.tar.gz
2)解压
tar -xvf elasticsearch-6.5.4.tar.gz
windows 安装
1)下载 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.msi
2)安装
根据提示一步步进行安装就可以了。
2、启动
linux
cd elasticsearch-6.5.4/bin
./elasticsearch
windows
cd elasticsearch-6.5.4/bin
./elasticsearch.bat
Logstash 安装
1、安装
windows 安装
https://www.elastic.co/cn/downloads/logstash
下载你需要的包
linux 安装
1)创建 repo
在/etc/yum.repos.d/目录下,创建 logstash.repo,文件内容如下
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
2)安装
sudo yum install logstash
2、配置
在 config 文件夹下,新建 my-logstash.conf 文件
input {
tcp {
mode => "server"
host => "127.0.0.1"
port => 4000
tags => ["es_tags"]
codec => "json_lines"
}
}
output {
elasticsearch {
action => "index"
hosts => "localhost:9200"
index => "applog-%{YYYYMMDD}"
document_type => "log"
}
}
3、运行
windows
cd bin
logstash.bat -f ../config/my-logstash.conf
linux
cd bin
logstash -f ../config/my-logstash.conf
在 logstash-6.5.4\logs 下查看日志输出
注意:这里启动的时候,需要看 logstash 是否启动正常
[ERROR][logstash.config.sourceloader] No configuration found in the configured sources.
说明启动没有成功。是因为配置文件找不到。
No config files found in path {:path=>"E:/worksoft/elk/logstash-6.5.4/bin/my-logstash.conf"}
修改启动命令,把配置文件修改到正确的地址,启动完之后的界面是
鼠标是一直停留在界面的,不会跳出来。
页面访问
Kibana 安装
1、安装
linux 安装
1)安装
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.5.4-linux-x86_64.tar.gz
shasum -a 512 kibana-6.5.4-linux-x86_64.tar.gz
tar -xzf kibana-6.5.4-linux-x86_64.tar.gz
cd kibana-6.5.4-linux-x86_64/
2)运行
./bin/kibana
windows 安装
1)解压
2)运行
.\bin\kibana.bat
2、配置
如果你的 elasticsearch 设置了用户名和密码,那么这里也需要设置
elasticsearch.username:登录用户名
elasticsearch.password:登录密码
server.port: 5601
server.host: "localhost"
3、访问地址
http://localhost:5601
整体设计
没有一个设计是完美的,那么这个设计会存在什么问题呢?
1、logstash 的吞吐量会成为瓶颈
2、与应用层耦合
3、数据丢失问题
基于 kafka 设计
kafka 启动
cd bin
windows\kafka-server-start.bat ..\config\server.properties
遇到问题
Error while fetching metadata with correlation id
解决
server.properties 文件里修改这个配置
logstash 下新增 kafka-logstash.conf 文件
input {
kafka {
topics => ["test"]
bootstrap_servers => "localhost:9092"
type => "kafka-logs"
consumer_threads =>5
decorate_events => true
}
}
output {
elasticsearch {
action => "index"
hosts => "localhost:9200"
index => "kafka-log-%{YYYYMMDD}"
}
}
java 代码
把日志输送到 logstash 里面,创建一个项目并配置到日志
pom 引入
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="30 second">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date [%-5level] [%thread] %logger{80} - %msg%X%n</pattern>
</encoder>
</appender>
<!--<appender name="logstash"-->
<!--class="net.logstash.logback.appender.LogstashTcpSocketAppender">-->
<!--<!– destination:这里的这个地址要跟logstash那边相同 –>-->
<!--<destination>localhost:4000</destination>-->
<!--<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>-->
<!--</appender>-->
<appender name="kafkaAppender"
class="com.cimu.elk.custom.KafkaAppender">
<topic>test</topic>
<brokerServer>127.0.0.1:9092</brokerServer>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<!-- 简单的日志传输 -->
<!-- <appender-ref ref="logstash"/> -->
<!-- 基于kafka的日志传输 -->
<appender-ref ref="kafkaAppender"/>
</root>
</configuration>
Controller 类
@Controller
@Slf4j
public class TestController {
@GetMapping("/test")
public String test(){
log.info("我的测试日志");
return "success";
}
}
KafkaAppender 类:
package com.cimu.elk.custom;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
/**
* Title: KafkaAppender * Copyright: Copyright (c) 2017 *
* date 2019年01月09日 12:22
*/public class KafkaAppender extends UnsynchronizedAppenderBase {
private String topic;
private String brokerServer;
//kafka生产者
private Producer, String> producer;
@Override
public void start() {
super.start();
Properties props = new Properties();
props.put("bootstrap.servers", this.brokerServer);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
@Override
public void stop() {
super.stop();
producer.close();
}
@Override
protected void append(ILoggingEvent event) {
String msg = event.getMessage();
//拼接消息内容
ProducerRecord, String> producerRecord = new ProducerRecord, String>(
this.topic, 0, "key", msg);
System.out.println("[推送数据]:" + producerRecord);
//发送kafka的消息
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//监听发送结果
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("[推送数据到kafka成功]:" + metadata);
}
}
});
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getBrokerServer() {
return brokerServer;
}
public void setBrokerServer(String brokerServer) {
this.brokerServer = brokerServer;
}
public Producer, String> getProducer() {
return producer;
}
public void setProducer(Producer, String> producer) {
this.producer = producer;
}
}
项目启动之后,访问接口,模拟日志输出,在 kibana 里面配置 index pattern,就可以查询日志了。
添加 index pattern
查询日志
也可以关注我的公众号:程序之声
关注公众号,回复:下载
获取百度下载神器:
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于