日志收集系统&日志 ETL
[TOC]
我司的日志收集系统负责我司的所有业务日志的收集,并分别给 Hadoop 平台提供离线数据和实时平台提供实时数据流。17 作业的日志收集系统基于 Flume 设计和搭建而成。
flume 简单入门
Flume 的架构主要有一下几个核心概念:
Event:一个数据单元,带有一个可选的消息头
Flow:Event 从源点到达目的点的迁移的抽象
Client:操作位于源点处的 Event,将其发送到 Flume Agent
Agent:一个独立的 Flume 进程,包含组件 Source、Channel、Sink
Source:用来消费传递到该组件的 Event
Channel:中转 Event 的一个临时存储,保存有 Source 组件传递过来的 Event
Sink:从 Channel 中读取并移除 Event,将 Event 传递到 Flow Pipeline 中的下一个 Agent(如果有的话)
-
每个 Flume Agent 包含三个主要组件:Source、Channel、Sink
-
Source 可以监听一个或者多个网络端口,用于接收数据或者可以从本地文件系统读取数据。每个 Source 必须至少连接一个 Channel。基于一些标准,一个 Source 可以写入几个 Channel,复制事件到所有或某些 Channe
-
Channel 是被动组件(虽然它们可以为了清理或者垃圾回收运行自己的线程),缓冲 Agent 已经接收,但尚未写出到另一个 Agent 或者存储系统的数据。Channel 的行为像队列,Source 写入到它们,Sink 从它们中读取。多个 Source 可以安全地写入到相同的 Channel,并且多个 Sink 可以从相同的 Channel 进行读取。可是一个 Sink 只能从一个 Channel 读取。
-
Sink 连续轮询各自的 Channel 来读取和删除事件。Sink 将事件推送到下一阶段(RPCSink 的情况下),或到最终目的地。一旦在下一阶段或其目的地中数据是安全的,Sink 通过事务提交通知 Channel,可以从 Channel 中删除这些事件。
source channel sink 自由组合
日志收集系统简介
日志收集是大数据的基石
业务平台每天会产生大量的日志数据。收集业务日志数据,供离线和在线的分析和使用,正是日志系统要做的事情。高可用性,和高可靠性,可扩展性是日志系统具备的基本特征
目前常用的开源日志收集系统有 Flume, Scribe 等。Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,目前已经是 Apache 的一个子项目。Scribe 是 Facebook 开源的日志收集系统,它为日志的分布式收集,统一处理提供一个可扩展的,高容错的简单方案。
日志收集系统架构
-
整个系统分为三层:Agent 层,Collector 层和 Store 层。其中 agent 层即是线上业务方,通过调用接口发送数据;Collector 层部署在中心服务器上,负责接收 Agent 层发送的日志,并且将日志根据路由规则写到相应的 Store 层中;Store 层负责提供永久或者临时的日志存储服务,或者将日志流导向其它服务器。
-
Agent 到 Collector 使用 LoadBalance 策略,将所有的日志均衡地发到所有的 Collector 上,达到负载均衡的目标,同时并处理单个 Collector 失效的问题。
-
Collector 层的目标主要有两个:SinkHdfs, SinkKafka。分别提供离线的数据到 Hdfs,和提供实时的日志流到 Kafka。
-
对于 Store 来说,Hdfs 负责永久地存储所有日志;Kafka 存储最新的 3 天日志,并给 Storm,spark 等系统提供实时日志流;
下图是日志收集系统的模块分解图,详解 Agent, Collector 和 Bypass 中的 Source, Channel 和 Sink 的关系。
a. 模块命名规则:所有的 Source 以 src 开头,所有的 Channel 以 ch 开头,所有的 Sink 以 sink 开头;
b. Channel 统一使用 flume1.6 SPILLABLEMEMORY,
c. 模块之间内部通信统一使用 Avro 接口
架构考虑
可用性
- 对日志收集系统来说,可用性(availablity)指固定周期内系统无故障运行总时间。要想提高系统的可用性,就需要消除系统的单点,提高系统的冗余度。下面来看看 17 作业的日志收集系统在可用性方面的考虑。
Agent 死掉
- 机器死机。线上部署多台 agent 服务器,通过 ng 转发,只要保证有一台服务器可用,则服务可用。
- agent 进程进程死掉。原理同机器 down 掉一样,ng 会自动转发到服务可用的机器
Collector 死掉
- 采用多个 collector 服务,agent 通过轮询方式发送到 collector,当某一 collector 死掉之后,自动寻找下一个可用的 collector 服务。所以整体服务不受影响
Hdfs 正常停机或异常停机
- 采用 flume 最新 SPILLABLEMEMORY,当写入 hdfs 失败,达到重试次数。channel 中 event 数量累计到一定程度,flume 将会采用 FILECHANNEL,写入本地磁盘,保证数据不丢失。当 HDFS 恢复服务,collector 再将 FILECHANNEL 中数据写到 HDFS
Hdfs 变慢
-
当 Hadoop 上的任务较多且有大量的读写操作时,Hdfs 的读写数据往往变的很慢。由于每天,每周都有高峰使用期,所以这种情况非常普遍。
-
对于 Hdfs 变慢的问题,我们同样使用 SPILLABLEMEMORY 来解决。当 Hdfs 写入较快时,所有的 events 只经过 MemChannel 传递数据,减少磁盘 IO,获得较高性能。当 Hdfs 写入较慢时,超过 MEMEORYCHANNEL 容纳范围的 EVENTS 将会被写到 FILECHANNEL,有一个较大的数据缓存空间。
可靠性
对日志收集系统来说,可靠性(reliability)是指 Flume 在数据流的传输过程中,保证 events 的可靠传递。
- 对 Flume 来说,所有的 events 都被保存在 Agent 的 Channel 中,然后被发送到数据流中的下一个 Agent 或者最终的存储服务中。那么一个 Agent 的 Channel 中的 events 什么时候被删除呢?当且仅当它们被保存到下一个 Agent 的 Channel 中或者被保存到最终的存储服务中。这就是 Flume 提供数据流中点到点的可靠性保证的最基本的单跳消息传递语义。
那么 Flume 是如何做到上述最基本的消息传递语义呢?
- 首先,Agent 间的事务交换。Flume 使用事务的办法来保证 event 的可靠传递。Source 和 Sink 分别被封装在 事务中,这些事务由保存 event 的存储提供或者由 Channel 提供。这就保证了 event 在数据流的点对点传输中是可靠的。在多级数据流中,如下图,上一级的 Sink 和下一级的 Source 都被包含在事务中,保证数据可靠地从一个 Channel 到另一个 Channel 转移
- 其次,数据流中 Channel 的持久性。Flume 中 MemoryChannel 是可能丢失数据的(当 Agent 死掉时),而 FileChannel 是持久性的,提供类似 mysql 的日志机制,保证数据不丢失。而我们采用 FLUME 最新的 SPILLABLECHANNEL,该 CHANNEL 结合 MEMORYCHANNEL,FILECHANNEL 的优点,当 MEMEORYCHANNEL 中容纳不下,将会自动采用 FILECHANNEL,保证数据不丢失
可扩展性(scalability)
对日志收集系统来说,可扩展性(scalability)是指系统能够线性扩展。当日志量增大时,系统能够以简单的增加机器来达到线性扩容的目的
对于基于 Flume 的日志收集系统来说,需要在设计的每一层,都可以做到线性扩展地提供服务。下面将对每一层的可扩展性做相应的说明。
Agent 层
对于 Agent 这一层来说,以 HTTP 方式提供接口,可以水平扩展,不受限制。一个方面,Agent 收集日志的能力受限于机器的性能,正常情况下一个 Agent 可以为单机提供足够服务。另一方面,如果机器比较多,可能受限于后端 Collector 提供的服务,但 Agent 到 Collector 是有 Load Balance 机制,使得 Collector 可以线性扩展提高能力。
Collector 层
对于 Collector 这一层,Agent 到 Collector 是有 Load Balance 机制,并且 Collector 提供无差别服务,所以可以线性扩展。其性能主要受限于 Store 层提供的能力。
Store 层
对于 Store 这一层来说,Hdfs 和 Kafka 都是分布式系统,可以做到线性扩展。
在 FLUME 1.5 之前提供了 MEMEORYCHANNEL 和 FILECHANNEL,1.5 以及之后提供新的复合 CHANNEL,SPILLABLECHANNEL
- MemoryChannel: 所有的 events 被保存在内存中。优点是高吞吐。缺点是容量有限并且 Agent 死掉时会丢失内存中的数据。
- FileChannel: 所有的 events 被保存在文件中。优点是容量较大且死掉时数据可恢复。缺点是速度较慢。
上述两种 Channel,优缺点相反,分别有自己适合的场景。然而,对于大部分应用来说,我们希望 Channel 可以同提供高吞吐和大缓存。基于此,我们采用新的复合 SPILLABLECHANNEL。
当堆积在 Channel 中的 events 数小于阈值时,所有的 events 被保存在 MemoryChannel 中,Sink 从 MemoryChannel 中读取数据; 当堆积在 Channel 中的 events 数大于阈值时, 所有的 events 被自动存放在 FileChannel 中,Sink 从 FileChannel 中读取数据。这样当系统正常运行时,我们可以使用 MemoryChannel 的高吞吐特性;当系统有异常时,我们可以利用 FileChannel 的大缓存的特性。
提供实时流
我司部分业务,如实时监控等服务,需要处理实时的数据流。因此我们希望 Flume 能够导出一份实时流给 Kafka
一个非常重要的要求是实时数据流不应该受到其它 Sink 的速度影响,保证实时数据流的速度。这一点,我们是通过 Collector 中设置不同的 Channel 进行隔离。
系统监控
监控是必不可少的部分。设计合理的监控,可以对异常情况及时发现,只要有一部手机,就可以知道系统是否正常运作。对于 17 作业的日志收集系统,我们建立了多维度的监控,防止未知的异常发生
通过发送给 zabbix 的数据,我们可以绘制出发送数量、拥堵情况和写 Hdfs 速度的图表,对于超预期的拥堵,
flume 写 hfds 状态的监控
对于非常重要的日志,我们每个小时会运行程序去检查日志,当发生异常我们会通过邮件短信报警
日志大小异常监控
我们会每天监控重要日志量的大小,当日志量有较大的波动,能及时反馈出来
通过以上分析我司日志服务具备高可用性,高可靠性,可扩展等特性的分布式服务
日志 ETL
ETL 目的:清洗原始数据,让数据可靠,易用
日志 ETL 整体流程如下图:
实现关键:
public class GroovyServiceImpl implements IGroovyService {
private Logger logger = LoggerFactory.getLogger(GroovyServiceImpl.class);
@Override
Object execute(String groovyScript, Map params) {
Class<GroovyObject> groovyObjectClass = parseScript(groovyScript)
GroovyObject groovyObject
try {
groovyObject = groovyObjectClass.newInstance()
} catch (any) {
logger.error("error when execute groovyScript:{}", any)
throw new RuntimeException("error,when execute groovyScript", any)
}
ArrayList<String> list = Lists.newArrayList();
return groovyObject.invokeMethod("validate", [params, list] as Object[])
}
private Class<GroovyObject> parseScript(String script) {
GroovyClassLoader loader = new GroovyClassLoader();
Class<GroovyObject> clazz = loader.parseClass(script)
clazz
}
}
example:
class AdrNormal {
private static bldr
private static writer
static {
bldr = new groovy.json.JsonBuilder()
writer = new StringWriter()
}
def validate(Map dataMap) {
def otherField = ["ktwelve", "native_version"]
def returnList = []
List<String> keys = dataMap.get("keys")
Set<String> keySet = dataMap.keySet()
List<String> keyList = Lists.newArrayList(keySet)
List<String> diffList = keyList - keys
diffList.remove("keys")
for (key in keys) {
if (otherField.contains(key)) {
continue
}
String value = dataMap.get(key)
if (Strings.isNullOrEmpty(value)) {
value = "null";
}
returnList.add(value)
}
def map = [:]
for (key in diffList) {
map[key] = dataMap.get(key)
}
bldr.call(map)
returnList.add(bldr.toString())
for (key in otherField) {
String value = dataMap.get(key)
if (Strings.isNullOrEmpty(value)) {
value = "null";
}
returnList.add(value)
}
returnList
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于