什么是 flume
Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume 支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume 提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。flume 常用场景:log-->flume-->[hdfs,hbase,kafka],收集日志并落地到各种不同的存储,以供不同需求的计算。
flume 源码结构
主要模块介绍
-
flume-ng-core
flume 的整个核心框架,包含了各个模块的接口以及逻辑关系实现。core 下大部分代码都是 source,channle,sink 中
-
flume-ng-channels
里面包含了 fileChannel,jdbcChannel,kafkaChannel,spillableMemoryChannel 等通道实现
-
flume-ng-sinks
各种 sink 的实现,包括但不限于:hdfsSink,hiveSink,esSink,kafkaSink
-
flume-ng-sources
各种 source 的实现,包括但不限于: jms,kafka,scirbe,twitter.其他 source 则在 flume-ng-core 模块中
-
flume-ng-node
实现 flume 的一些基本类。包括 agent 的 main(Application).这也是我们的分析代码的入口类
flume 逻辑结构
一个 agent 包含三个基本组件
- sourace
- channel
- sink
flume 启动脚本 flume-ng 分析
######################################################################
# constants flume常量的设定,不通环境执行不同的类
######################################################################
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"
######################################################################
#真正启动flume,具体由$FLUME_APPLICATON_CLASS指定
######################################################################
run_flume() {
local FLUME_APPLICATION_CLASS
if [ "$#" -gt 0 ]; then
FLUME_APPLICATION_CLASS=$1
shift
else
error "Must specify flume application class" 1
fi
if [ ${CLEAN_FLAG} -ne 0 ]; then
set -x
fi
$EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
-Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}
##################################################
# main 启动过程中用到的变量,都可以在启动的时指定
# 如果不设置java堆空间大小,默认大小为20M,可以在flume.sh
# 中进行设置
##################################################
# set default params
FLUME_CLASSPATH=""
FLUME_JAVA_LIBRARY_PATH=""
JAVA_OPTS="-Xmx20m"
LD_LIBRARY_PATH=""
opt_conf=""
opt_classpath=""
opt_plugins_dirs=""
arr_java_props=()
arr_java_props_ct=0
opt_dryrun=""
mode=$1
shift
##################################################
#最后根据不同参数启动不同的类,可以看到启动agent时,
#执行的是flume-ng-node中Applicaton.java
# finally, invoke the appropriate command
##################################################
if [ -n "$opt_agent" ] ; then
run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
run_flume $FLUME_TOOLS_CLASS $args
else
error "This message should never appear" 1
fi
agent 的启动分析 Application.java
从上面的分析可以知道当我们启动一个 Agent 时,执行的是 org.apache.flume.node.Application.
看 main 函数的源码
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
option = new Option("f", "conf-file", true,
"specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);
option = new Option(null, "no-reload-conf", false,
"do not reload config file if changed");
options.addOption(option);
// Options for Zookeeper
option = new Option("z", "zkConnString", true,
"specify the ZooKeeper connection to use (required if -f missing)");
option.setRequired(false);
options.addOption(option);
option = new Option("p", "zkBasePath", true,
"specify the base path in ZooKeeper for agent configs");
option.setRequired(false);
options.addOption(option);
option = new Option("h", "help", false, "display help text");
options.addOption(option);
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
if (commandLine.hasOption('h')) {
new HelpFormatter().printHelp("flume-ng agent", options, true);
return;
}
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");
主要是对名利行参数的校验和解析
在我们启动 Agent 时,会指定,-n -f 等一些参数
继续往下看
//是否包含zk配置
if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
isZkConfigured = true;
}
Application application = null;
if (isZkConfigured) {
// get options
String zkConnectionStr = commandLine.getOptionValue('z');
String baseZkPath = commandLine.getOptionValue('p');
if (reload) {//如果是需要重新加载(配置文件改变时)
EventBus eventBus = new EventBus(agentName + "-event-bus");
List<LifecycleAware> components = Lists.newArrayList();
PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
new PollingZooKeeperConfigurationProvider(
agentName, zkConnectionStr, baseZkPath, eventBus);
components.add(zookeeperConfigurationProvider);
application = new Application(components);
eventBus.register(application);
} else {//不需要检车配置文件的变更
StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
new StaticZooKeeperConfigurationProvider(
agentName, zkConnectionStr, baseZkPath);
application = new Application();
application.handleConfigurationEvent(zookeeperConfigurationProvider
.getConfiguration());
}
}
从以上代码我们可以看出,当配置文件是配置的是 zk 上的路径时,如果需要 reload,则会启动 PollingZooKeeperConfigurationProvider,该类里面会监听 zk 的变化,再通过 guava 的 EventBus(类似于观察者模式,EventBus),传递消息.
注意
此时只是将 PollingZooKeeperConfigurationProvider 加入 components 中,并没有正真的启动
private final List<LifecycleAware> components;
PollingZooKeeperConfigurationProvider 部分关键代码
try {
agentNodeCache = new NodeCache(client, basePath + "/" + getAgentName());
agentNodeCache.start();
agentNodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
refreshConfiguration();
}
});
} catch (Exception e) {
client.close();
throw e;
}
在 zk node 上设置 listener,如果 zk node 有任何的变化则会触发 refreshConfiguration 方法
private void refreshConfiguration() throws IOException {
LOGGER.info("Refreshing configuration from ZooKeeper");
byte[] data = null;
ChildData childData = agentNodeCache.getCurrentData();
if (childData != null) {
data = childData.getData();
}
flumeConfiguration = configFromBytes(data);
//发送时间消息,所有注册到该eventBus上的handler都会收到该事件
eventBus.post(getConfiguration());
}
好了我们继续分析 Application 的代码。上面讲到了利用 zk 来做 flume 配置文件的代码。当然 flume 也支持本地文件的方式。代码如下:
File configurationFile = new File(commandLine.getOptionValue('f'));
/*
* The following is to ensure that by default the agent will fail on
* startup if the file does not exist.
*/
if (!configurationFile.exists()) {
// If command line invocation, then need to fail fast
if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
null) {
String path = configurationFile.getPath();
try {
path = configurationFile.getCanonicalPath();
} catch (IOException ex) {
logger.error("Failed to read canonical path for file: " + path,
ex);
}
throw new ParseException(
"The specified configuration file does not exist: " + path);
}
}
List<LifecycleAware> components = Lists.newArrayList();
if (reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(
agentName, configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider
.getConfiguration());
}
}
如果-f 指定的配置文件不存在,那么将快速失败,抛出异常。
再判断配置文件发生改变时是否需要重新 reload,套路和用 zk 保存配置文件一个道理
如果需要动态加载配置文件,那么启动 PollingPropertiesFileConfigurationProvider,每三十秒
加载一次配置文件
之后执行 application.start()方法。让我们继续看 start()方法
private final LifecycleSupervisor supervisor;
public synchronized void start() {
for(LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}
在 start 方法中遍历 compents 执行 supervisor.suervise()方法.
在继续分析之前我们先看一下 LifecycleSupervisor,PollingPropertiesFileConfigurationProvider 的类结构
从以上两图中可以看出它们都实现了 LifecycleAware 接口。这个接口定义了 flume 组件的生命周期。LifecycleSupervisor 提供了实现。
LifecycleAware.java
/**
* <p>
* Starts a service or component.
* </p>
* <p>
* Implementations should determine the result of any start logic and effect
* the return value of {@link #getLifecycleState()} accordingly.
* </p>
*
* @throws LifecycleException
* @throws InterruptedException
*/
public void start();
/**
* <p>
* Stops a service or component.
* </p>
* <p>
* Implementations should determine the result of any stop logic and effect
* the return value of {@link #getLifecycleState()} accordingly.
* </p>
*
* @throws LifecycleException
* @throws InterruptedException
*/
public void stop();
/**
* <p>
* Return the current state of the service or component.
* </p>
*/
public LifecycleState getLifecycleState();
让我们继续分析 LifecycleSupervisor.supervise()方法
public synchronized void supervise(LifecycleAware lifecycleAware,
SupervisorPolicy policy, LifecycleState desiredState) {
if(this.monitorService.isShutdown()
|| this.monitorService.isTerminated()
|| this.monitorService.isTerminating()){
throw new FlumeException("Supervise called on " + lifecycleAware + " " +
"after shutdown has been initiated. " + lifecycleAware + " will not" +
" be started");
}
Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware),
"Refusing to supervise " + lifecycleAware + " more than once");
if (logger.isDebugEnabled()) {
logger.debug("Supervising service:{} policy:{} desiredState:{}",
new Object[] { lifecycleAware, policy, desiredState });
}
Supervisoree process = new Supervisoree();
process.status = new Status();
process.policy = policy;
process.status.desiredState = desiredState;
process.status.error = false;
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
}
在上面的代码中创建了一个 MonitorRunnable 对象,通过 jdk 的 scheduleWithFixedDelay 进行定时调用,每次执行完成延迟 3 秒调度。
再看 monitorRunable 中的内容
run 方法中部分内容
if (!lifecycleAware.getLifecycleState().equals(
supervisoree.status.desiredState)) {
logger.debug("Want to transition {} from {} to {} (failures:{})",
new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
supervisoree.status.desiredState,
supervisoree.status.failures });
switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
} catch (Throwable e) {
logger.error("Unable to start " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
// This component can never recover, shut it down.
supervisoree.status.desiredState = LifecycleState.STOP;
try {
lifecycleAware.stop();
logger.warn("Component {} stopped, since it could not be"
+ "successfully started due to missing dependencies",
lifecycleAware);
} catch (Throwable e1) {
logger.error("Unsuccessful attempt to "
+ "shutdown component: {} due to missing dependencies."
+ " Please shutdown the agent"
+ "or disable this component, or the agent will be"
+ "in an undefined state.", e1);
supervisoree.status.error = true;
if (e1 instanceof Error) {
throw (Error) e1;
}
// Set the state to stop, so that the conf poller can
// proceed.
}
}
supervisoree.status.failures++;
}
break;
case STOP:
try {
lifecycleAware.stop();
} catch (Throwable e) {
logger.error("Unable to stop " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
throw (Error) e;
}
supervisoree.status.failures++;
}
break;
default:
logger.warn("I refuse to acknowledge {} as a desired state",
supervisoree.status.desiredState);
}
if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
logger.error(
"Policy {} of {} has been violated - supervisor should exit!",
supervisoree.policy, lifecycleAware);
}
}
首先因为 monitorRunnbale 对象时重复调用的,所以在 run 方法中作了一个状态判断,当该组件的状态不等于期望的状态时继续往下执行,否则什么都不做。这样避免重复启动。当组件第一次被启动的时候,组件本身的状态是 IDEL,而 desired state 是 START,此时就会执行组件的 start 方法。
总结一下启动的时序图
比如启动 PollingPropertiesFileConfigurationProvider 组件,这个组件的作用就是定时去获取 flume 的配置。那么会调用 PollingPropertiesFileConfigurationProvider 的 start 方法。
下面以 PollingPropertiesFileConfigurationProvider 为列 分析 flume 的配置时如何动态载入的。
配置载入分析
从上面分析得知,启动 PollingPropertiesFileConfigurationProvider ,则执行该组件的 start 方法。查看 start 方法如下
@Override
public void start() {
LOGGER.info("Configuration provider starting");
Preconditions.checkState(file != null,
"The parameter file must not be null");
executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
.build());
FileWatcherRunnable fileWatcherRunnable =
new FileWatcherRunnable(file, counterGroup);
executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
TimeUnit.SECONDS);
lifecycleState = LifecycleState.START;
LOGGER.debug("Configuration provider started");
}
在 start 方法中单独启动一个线程,执行 FileWatcherRunnable,并设置状态为 START
继续看 fileWatcher
public void run() {
LOGGER.debug("Checking file:{} for changes", file);
counterGroup.incrementAndGet("file.checks");
long lastModified = file.lastModified();
if (lastModified > lastChange) {
LOGGER.info("Reloading configuration file:{}", file);
counterGroup.incrementAndGet("file.loads");
lastChange = lastModified;
try {
eventBus.post(getConfiguration());
} catch (Exception e) {
LOGGER.error("Failed to load configuration data. Exception follows.",
e);
} catch (NoClassDefFoundError e) {
LOGGER.error("Failed to start agent because dependencies were not " +
"found in classpath. Error follows.", e);
} catch (Throwable t) {
// caught because the caller does not handle or log Throwables
LOGGER.error("Unhandled error", t);
}
}
}
在 fileWatcher 中通过对文件修改时间来判断配置文件是否发生变化。如果配置文件发生变化
调用 eventBus.post(getConfiguration()); 将配置文件的内容发布。
在 Application.java 中有如下代码
@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
stopAllComponents();
startAllComponents(conf);
}
此方法订阅了 eventBus 的消息。当一有消息将会触发该方法,此方法的功能相当于重启 flume 组件。还记得上面分析的代码吗?要是用户配置 no-reload-conf 那么将会直接调用该方法。
那么 getConfiguration()方法是如何实现的呢?
protected abstract FlumeConfiguration getFlumeConfiguration();
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames =
new HashSet<String>(channelComponentMap.keySet());
for(String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.
get(channelName);
if(channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap = channelCache.
get(channelComponent.channel.getClass());
if(nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}
getConfiguration()中调用了 getFlumeConfiguration()方法;getFlumeConfiguration() 是一个抽象方法,以 PollingPropertiesFileConfigurationProvider 实现为列 。该实现在父类中。
@Override
public FlumeConfiguration getFlumeConfiguration() {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(file));
Properties properties = new Properties();
properties.load(reader);
return new FlumeConfiguration(toMap(properties));
} catch (IOException ex) {
LOGGER.error("Unable to load file:" + file
+ " (I/O failure) - Exception follows.", ex);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ex) {
LOGGER.warn(
"Unable to close file reader for file: " + file, ex);
}
}
}
return new FlumeConfiguration(new HashMap<String, String>());
}
该方法通过基本的流加载方法返回 FlumeConfigruation 对象。该对象封装一个 Map 对象
。在 FlumeConfigruation 的构造函数中将会遍历这个 Map 对象,调用 addRawProperty 方法
该方法首先会进行一些合法性的检查,并且该方法会创建一个 AgentConfiguration 对象的 aoconf
该方法最后调用 aconf.addProperty 方法
在 aconf.addProperty 方法中会区分 source,channel,sink ,sinkgroup。将对应的配置信息放在
sourceContextMap,channelContextMap,sinkContextMap,sinkGroupContextMap。这些信息封装在 AgentConfiguration,AgentConfiguration 封装在 FlumeConfiguration 中,key 是 agentName。使用时可以通过 getConfigurationFor(String hostname) 来获取。
##flume 如何获自定义的 key
在上面的分析中 addProperty 方法中,调用了 parseConfigKey 方法
cnck = parseConfigKey(key,
BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX);
具体实现如下:
private ComponentNameAndConfigKey parseConfigKey(String key, String prefix) {
// key must start with prefix
if (!key.startsWith(prefix)) {
return null;
}
// key must have a component name part after the prefix of the format:
// <prefix><component-name>.<config-key>
int index = key.indexOf('.', prefix.length() + 1);
if (index == -1) {
return null;
}
String name = key.substring(prefix.length(), index);
String configKey = key.substring(prefix.length() + name.length() + 1);
// name and config key must be non-empty
if (name.length() == 0 || configKey.length() == 0) {
return null;
}
return new ComponentNameAndConfigKey(name, configKey);
}
上面代码中 prefix 为定义的常量如下:
public final class BasicConfigurationConstants {
public static final String CONFIG_SOURCES = "sources";
public static final String CONFIG_SOURCES_PREFIX = CONFIG_SOURCES + ".";
public static final String CONFIG_SOURCE_CHANNELSELECTOR_PREFIX = "selector.";
public static final String CONFIG_SINKS = "sinks";
public static final String CONFIG_SINKS_PREFIX = CONFIG_SINKS + ".";
public static final String CONFIG_SINK_PROCESSOR_PREFIX = "processor.";
public static final String CONFIG_SINKGROUPS = "sinkgroups";
public static final String CONFIG_SINKGROUPS_PREFIX = CONFIG_SINKGROUPS + ".";
public static final String CONFIG_CHANNEL = "channel";
public static final String CONFIG_CHANNELS = "channels";
public static final String CONFIG_CHANNELS_PREFIX = CONFIG_CHANNELS + ".";
public static final String CONFIG_CONFIG = "config";
public static final String CONFIG_TYPE = "type";
private BasicConfigurationConstants() {
// disable explicit object creation
}
}
- 比如我们配置的格式是 agent1.sources.source1.type=avro(注意在后面 parse 时,agent1.已经被截取掉)
- 在上面的 parseKey 方法中首先会判断 prefix 的后面有多少个字符
- 解析出 name 。source1 就是 name
- 解析出 configKey 。type 就是 configKey
- 封装为 ComponentNameAndConfigKey
- 然后有上面的分析把 sources、channel、sink 配置信息,分别存放到 sourceContextMap、channelConfigMap、sinkConfigMap 三个 HashMap,这些信息封装在 AgentConfiguration,AgentConfiguration 封装在 FlumeConfiguration 中,key 是 agentName。使用时可以通过 getConfigurationFor(String hostname) 来获取
总结
以上分析了 flume 启动 agent 的流程。部分源码没有贴出来,可以自行阅读;以及 flume 中如何解析
用户自定义的 source,channel,sink;以及 flume 如何用 zk listener 和 fileWatcher 实现配置文件的动态加载。下篇主要讲解 flume 整体架构--常用架构篇
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于