Flume 读取日志文件数据存入到 Mysql

本贴最后更新于 2300 天前,其中的信息可能已经水流花落

需求是用 flume 监控日志文件,然后将文件的内容存放在 mysql 数据库中。

本文结构(注意:环境都是在 centos 7(192.168.5.105)下

  1. flume1.7.0 的安装和配置

  2. mysql 表设计

  3. MysqlSink 插件的编写

  4. 连接服务器远程调试

  5. 打包更新到服务器


flume1.7.0 的安装和配置

官网下载 flume1.7.0,我这里都放到自己的共享盘里面,方便自己也方面别人:

百度网盘 flume 文件夹下获取

上传到 centos 7 指定目录:

01.png

解压文件:

tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /home/common/

cd ../common/

mv apache-flume-1.7.0-bin/ flume-1.7.0

cd flume-1.7.0/conf

cp flume-conf.properties.template flume-conf.properties

vi flume-conf.properties

配置如下:

#配置 Agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#配置 Source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/data/XXX/logback.log
a1.sources.r1.deserializer.outputCharset = UTF-8

#配置 Sink
a1.sinks.k1.type = XXX.XXX.logs.sink.XXXXX
a1.sinks.k1.hostname=192.168.5.111
a1.sinks.k1.port=3306
a1.sinks.k1.databaseName=database
a1.sinks.k1.tableName=tables
a1.sinks.k1.user=root
a1.sinks.k1.password=password
a1.sinks.k1.channel = c1

#配置 Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

注意:记得配置 flume 运行 Java 的环境

cp flume-env.sh.template flume-env.sh

vi flume-env.sh 修改JAVA_HOME路径(这里写你自己的Java 环境)

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64

02.png

Java 方面的编写和 mysql 表的创建

公司项目不方便贴出源码,我也是参考这个来写的

注意下这个 jar 包放置的点,我是用下面的方式:

官方建议在 flume 的 plugins.d (plugins.d 目录需要自己创建)目录下创建 一个自己定义的目录,在自定义的目录下新建 lib 和 libext 文件夹,lib 文件夹为放自定义组件的 jar 包,libext 文件夹下放 自定义组件的依赖包。

​   flume-1.7.0/plugins.d/
    flume-1.7.0/plugins.d/project/
    flume-1.7.0/plugins.d/project/lib/XXXXX.jar
    flume-1.7.0/plugins.d/project/libext/mysql-connector-java-6.0.5.jar

坑点 1:WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+


​	DriverManager.getConnection("jdbc:mysql://localhost:3306/logdb","root","123456");

​	改成

​	DriverManager.getConnection("jdbc:mysql://localhost:3306/logdb?useSSL=false","root","123456");

最后,启动 flume

./flume-ng agent -c /home/common/flume-1.7.0/conf -f /home/common/flume-1.7.0/conf/flume-conf.properties -n agnet1 -Dflume.root.logger=INFO,console

030dfe0.png


使用过程中大问题总汇:

flume 的 memeryChannel 中 transactionCapacity 和 sink 的 batchsize

运行时候报下面错误:
16/04/29 09:36:15 ERROR sink.AbstractRpcSink: Rpc Sink avro-sink: Unable to get event from channel memoryChannel. Exception follows.
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:354)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)

参考的解决方案:

transactionCapacity 和 sink 的 batchsize 需要注意事项

原因是:

flume 的实时日志收集,用 flume 默认的配置后,发现不是完全实时的,于是看了一下,原来是 memeryChannel 的 transactionCapacity 在作怪,因为他默认是 100,也就是说收集端的 sink 会在收集到了 100 条以后再去提交事务(即发送到下一个目的地)。

flume 收集 java exception 错误日志的问题

flume 在收集到java throw Exception 异常日志信息的时候既然不是整条ERROR 异常错误,如下:

04.png

它是将上面的异常每条都传输到后台处理,像这种应该合并成一个Event里面

参考的解决方案:

Flume< 自定义 Source 和拦截器实现抓取异常多行日志 >

我采用的是自定义 Source 的方式,可以将错误的信息合并后传输:

其核心思想是:

如果当前有两行记录,如果两行记录都是“正常”的日志信息,比如:

2016-07-08 09:33:53-[Analysis-WC] INFO [main] FormulaContextUtil.init(68) | into FormulaContextUtil.init method begin ....
2016-07-08 09:33:53-[Analysis-WC] INFO [main] FormulaContextUtil.init(133) | FormulaContextUtil.init method end ....

则单独将每一行当做一个 Event,传递;
当连续的两条记录诸如这样的形式:

2016-07-28 15:49:05-[Analysis-WC] ERROR [http-8080-2] ControllerProxy.afterMethod(43) | java.lang.NullPointerException
java.lang.NullPointerException
    at com.ap.alt.system.web.LoginMgrController.getSumMXYJTC(LoginMgrController.java:304)

则将下一行日志合并到上一个 Event 中。

  • MySQL

    MySQL 是一个关系型数据库管理系统,由瑞典 MySQL AB 公司开发,目前属于 Oracle 公司。MySQL 是最流行的关系型数据库管理系统之一。

    691 引用 • 535 回帖
  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 630 关注
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...