ELK 实时日志分析

本贴最后更新于 2184 天前,其中的信息可能已经斗转星移

前言

线上系统有没有报错,如果你不看日志是不知道的。但是一般你线上的应用会很多,如果你每个应用看过来,那也是不太合理的,而且这个也只能开发或者运维来看,对于产品、前端开发等都是比较困难的,那些日志结构也是比较难看懂。还有就是他的延迟会比较高,一般都是用户反馈了,我们才会去看报错日志,这样对我们都很被动。这个是比较传统的做法。

那么有没有办法能实时对日志进行分析,发现错误立刻发出通知呢?
答案是肯定的,这里介绍 ELK 来实时分析日志,那么什么是 ELK,它是 Elasticsearch+Logstash+Kibana 这三个的缩写。
Elasticsearch:提供存储、快速搜索功能;
Logstash:日志收集,并传输到 Elasticsearch 里;
Kibana:在 Elasticsearch 中的数据进行可视化;

Elasticsearc 安装及配置

1、安装

linux 安装
1)下载地址

curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.tar.gz

2)解压

tar -xvf elasticsearch-6.5.4.tar.gz

windows 安装
1)下载 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.msi
2)安装
根据提示一步步进行安装就可以了。

2、启动

linux

cd elasticsearch-6.5.4/bin
./elasticsearch

windows

cd elasticsearch-6.5.4/bin
./elasticsearch.bat

Logstash 安装

1、安装

windows 安装
https://www.elastic.co/cn/downloads/logstash
下载你需要的包

linux 安装
1)创建 repo
在/etc/yum.repos.d/目录下,创建 logstash.repo,文件内容如下

[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

2)安装

sudo yum install logstash

2、配置

在 config 文件夹下,新建 my-logstash.conf 文件

input {
  tcp {
    mode => "server"
    host => "127.0.0.1"
    port => 4000
	tags => ["es_tags"]
	codec => "json_lines"
  }
}

output {
  elasticsearch {
    action => "index"
    hosts  => "localhost:9200"
    index  => "applog-%{YYYYMMDD}"
	document_type => "log"
  }
}

3、运行

windows

cd bin
logstash.bat -f ../config/my-logstash.conf

linux

cd bin
logstash -f ../config/my-logstash.conf

在 logstash-6.5.4\logs 下查看日志输出

注意:这里启动的时候,需要看 logstash 是否启动正常

[ERROR][logstash.config.sourceloader] No configuration found in the configured sources.

说明启动没有成功。是因为配置文件找不到。
No config files found in path {:path=>"E:/worksoft/elk/logstash-6.5.4/bin/my-logstash.conf"}
修改启动命令,把配置文件修改到正确的地址,启动完之后的界面是
imagepng
鼠标是一直停留在界面的,不会跳出来。

页面访问
imagepng

Kibana 安装

1、安装

linux 安装
1)安装

wget https://artifacts.elastic.co/downloads/kibana/kibana-6.5.4-linux-x86_64.tar.gz
shasum -a 512 kibana-6.5.4-linux-x86_64.tar.gz
tar -xzf kibana-6.5.4-linux-x86_64.tar.gz
cd kibana-6.5.4-linux-x86_64/

2)运行

./bin/kibana

windows 安装
1)解压
2)运行

.\bin\kibana.bat

2、配置

如果你的 elasticsearch 设置了用户名和密码,那么这里也需要设置
elasticsearch.username:登录用户名
elasticsearch.password:登录密码
server.port: 5601
server.host: "localhost"

3、访问地址

http://localhost:5601

整体设计

imagepng
没有一个设计是完美的,那么这个设计会存在什么问题呢?
1、logstash 的吞吐量会成为瓶颈
2、与应用层耦合
3、数据丢失问题

基于 kafka 设计

imagepng

kafka 启动

cd bin
windows\kafka-server-start.bat ..\config\server.properties

遇到问题

Error while fetching metadata with correlation id

解决
server.properties 文件里修改这个配置
imagepng

logstash 下新增 kafka-logstash.conf 文件

input {
  kafka {
	topics => ["test"]
	bootstrap_servers => "localhost:9092"
	type => "kafka-logs"
	consumer_threads =>5
	decorate_events => true
  }
}

output {
  elasticsearch {
    action => "index"
    hosts  => "localhost:9200"
    index  => "kafka-log-%{YYYYMMDD}"
  }
}

java 代码

把日志输送到 logstash 里面,创建一个项目并配置到日志
pom 引入

<dependency>
	<groupId>net.logstash.logback</groupId>
	<artifactId>logstash-logback-encoder</artifactId>
	<version>5.2</version>
</dependency>
<dependency>
	<groupId>org.slf4j</groupId>
	<artifactId>slf4j-api</artifactId>
	<version>1.7.25</version>
</dependency>
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.1.0</version>
</dependency>

logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="30 second">

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%date [%-5level] [%thread] %logger{80} - %msg%X%n</pattern>
        </encoder>
    </appender>
	  
	  <!--<appender name="logstash"-->
              <!--class="net.logstash.logback.appender.LogstashTcpSocketAppender">-->
        <!--&lt;!&ndash; destination:这里的这个地址要跟logstash那边相同 &ndash;&gt;-->
        <!--<destination>localhost:4000</destination>-->
        <!--<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder"/>-->
    <!--</appender>-->
	
    <appender name="kafkaAppender"
              class="com.cimu.elk.custom.KafkaAppender">
        <topic>test</topic>
        <brokerServer>127.0.0.1:9092</brokerServer>
    </appender>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
        <!-- 简单的日志传输 -->
        <!-- <appender-ref ref="logstash"/> -->
        <!-- 基于kafka的日志传输 -->
        <appender-ref ref="kafkaAppender"/>
    </root>

</configuration>

Controller 类

@Controller
@Slf4j
public class TestController {
    @GetMapping("/test")
    public String test(){
        log.info("我的测试日志");
 return "success";
  }
}

KafkaAppender 类:

package com.cimu.elk.custom;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/**
 * Title: KafkaAppender * Copyright: Copyright (c) 2017 * 
  * date 2019年01月09日 12:22
 */public class KafkaAppender extends UnsynchronizedAppenderBase {
    private String topic;
 private String brokerServer;
  //kafka生产者
  private Producer, String> producer;

  @Override
  public void start() {
        super.start();
  Properties props = new Properties();
  props.put("bootstrap.servers", this.brokerServer);
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  producer = new KafkaProducer<>(props);
  }

    @Override
  public void stop() {
        super.stop();
  producer.close();
  }

    @Override
  protected void append(ILoggingEvent event) {
        String msg = event.getMessage();
  //拼接消息内容
  ProducerRecord, String> producerRecord = new ProducerRecord, String>(
                this.topic, 0, "key", msg);
  System.out.println("[推送数据]:" + producerRecord);
  //发送kafka的消息
  producer.send(producerRecord, new Callback() {
            @Override
  public void onCompletion(RecordMetadata metadata, Exception exception) {
                //监听发送结果
  if (exception != null) {
                    exception.printStackTrace();
  } else {
                    System.out.println("[推送数据到kafka成功]:" + metadata);
  }
            }
        });
  }

    public String getTopic() {
        return topic;
  }

    public void setTopic(String topic) {
        this.topic = topic;
  }

    public String getBrokerServer() {
        return brokerServer;
  }

    public void setBrokerServer(String brokerServer) {
        this.brokerServer = brokerServer;
  }

    public Producer, String> getProducer() {
        return producer;
  }

    public void setProducer(Producer, String> producer) {
        this.producer = producer;
  }
}

源码地址

项目启动之后,访问接口,模拟日志输出,在 kibana 里面配置 index pattern,就可以查询日志了。
添加 index pattern
imagepng
查询日志
imagepng

也可以关注我的公众号:程序之声
关注公众号,回复:下载
获取百度下载神器:
imagepng

  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1063 引用 • 3454 回帖 • 187 关注
  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    143 引用 • 442 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...