什么是 flume
Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume 支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume 提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。flume 常用场景:log-->flume-->[hdfs,hbase,kafka],收集日志并落地到各种不同的存储,以供不同需求的计算。
flume 源码结构
主要模块介绍
-
flume-ng-core
flume 的整个核心框架,包含了各个模块的接口以及逻辑关系实现。core 下大部分代码都是 source,channle,sink 中
-
flume-ng-channels
里面包含了 fileChannel,jdbcChannel,kafkaChannel,spillableMemoryChannel 等通道实现
-
flume-ng-sinks
各种 sink 的实现,包括但不限于:hdfsSink,hiveSink,esSink,kafkaSink
-
flume-ng-sources
各种 source 的实现,包括但不限于: jms,kafka,scirbe,twitter.其他 source 则在 flume-ng-core 模块中
-
flume-ng-node
实现 flume 的一些基本类。包括 agent 的 main(Application).这也是我们的分析代码的入口类
flume 逻辑结构
一个 agent 包含三个基本组件
- sourace
- channel
- sink
flume 启动脚本 flume-ng 分析
###################################################################### # constants flume常量的设定,不通环境执行不同的类 ###################################################################### FLUME_AGENT_CLASS="org.apache.flume.node.Application" FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient" FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo" FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"
###################################################################### #真正启动flume,具体由$FLUME_APPLICATON_CLASS指定 ###################################################################### run_flume() { local FLUME_APPLICATION_CLASS if [ "$#" -gt 0 ]; then FLUME_APPLICATION_CLASS=$1 shift else error "Must specify flume application class" 1 fi if [ ${CLEAN_FLAG} -ne 0 ]; then set -x fi $EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \ -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $* }
################################################## # main 启动过程中用到的变量,都可以在启动的时指定 # 如果不设置java堆空间大小,默认大小为20M,可以在flume.sh # 中进行设置 ################################################## # set default params FLUME_CLASSPATH="" FLUME_JAVA_LIBRARY_PATH="" JAVA_OPTS="-Xmx20m" LD_LIBRARY_PATH="" opt_conf="" opt_classpath="" opt_plugins_dirs="" arr_java_props=() arr_java_props_ct=0 opt_dryrun="" mode=$1 shift
################################################## #最后根据不同参数启动不同的类,可以看到启动agent时, #执行的是flume-ng-node中Applicaton.java # finally, invoke the appropriate command ################################################## if [ -n "$opt_agent" ] ; then run_flume $FLUME_AGENT_CLASS $args elif [ -n "$opt_avro_client" ] ; then run_flume $FLUME_AVRO_CLIENT_CLASS $args elif [ -n "${opt_version}" ] ; then run_flume $FLUME_VERSION_CLASS $args elif [ -n "${opt_tool}" ] ; then run_flume $FLUME_TOOLS_CLASS $args else error "This message should never appear" 1 fi
agent 的启动分析 Application.java
从上面的分析可以知道当我们启动一个 Agent 时,执行的是 org.apache.flume.node.Application.
看 main 函数的源码
Options options = new Options(); Option option = new Option("n", "name", true, "the name of this agent"); option.setRequired(true); options.addOption(option); option = new Option("f", "conf-file", true, "specify a config file (required if -z missing)"); option.setRequired(false); options.addOption(option); option = new Option(null, "no-reload-conf", false, "do not reload config file if changed"); options.addOption(option); // Options for Zookeeper option = new Option("z", "zkConnString", true, "specify the ZooKeeper connection to use (required if -f missing)"); option.setRequired(false); options.addOption(option); option = new Option("p", "zkBasePath", true, "specify the base path in ZooKeeper for agent configs"); option.setRequired(false); options.addOption(option); option = new Option("h", "help", false, "display help text"); options.addOption(option); CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(options, args); if (commandLine.hasOption('h')) { new HelpFormatter().printHelp("flume-ng agent", options, true); return; } String agentName = commandLine.getOptionValue('n'); boolean reload = !commandLine.hasOption("no-reload-conf");
主要是对名利行参数的校验和解析
在我们启动 Agent 时,会指定,-n -f 等一些参数
继续往下看
//是否包含zk配置 if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) { isZkConfigured = true; } Application application = null; if (isZkConfigured) { // get options String zkConnectionStr = commandLine.getOptionValue('z'); String baseZkPath = commandLine.getOptionValue('p'); if (reload) {//如果是需要重新加载(配置文件改变时) EventBus eventBus = new EventBus(agentName + "-event-bus"); List<LifecycleAware> components = Lists.newArrayList(); PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider = new PollingZooKeeperConfigurationProvider( agentName, zkConnectionStr, baseZkPath, eventBus); components.add(zookeeperConfigurationProvider); application = new Application(components); eventBus.register(application); } else {//不需要检车配置文件的变更 StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider = new StaticZooKeeperConfigurationProvider( agentName, zkConnectionStr, baseZkPath); application = new Application(); application.handleConfigurationEvent(zookeeperConfigurationProvider .getConfiguration()); } }
从以上代码我们可以看出,当配置文件是配置的是 zk 上的路径时,如果需要 reload,则会启动 PollingZooKeeperConfigurationProvider,该类里面会监听 zk 的变化,再通过 guava 的 EventBus(类似于观察者模式,EventBus),传递消息.
注意
此时只是将 PollingZooKeeperConfigurationProvider 加入 components 中,并没有正真的启动
private final List<LifecycleAware> components;
PollingZooKeeperConfigurationProvider 部分关键代码
try { agentNodeCache = new NodeCache(client, basePath + "/" + getAgentName()); agentNodeCache.start(); agentNodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { refreshConfiguration(); } }); } catch (Exception e) { client.close(); throw e; }
在 zk node 上设置 listener,如果 zk node 有任何的变化则会触发 refreshConfiguration 方法
private void refreshConfiguration() throws IOException { LOGGER.info("Refreshing configuration from ZooKeeper"); byte[] data = null; ChildData childData = agentNodeCache.getCurrentData(); if (childData != null) { data = childData.getData(); } flumeConfiguration = configFromBytes(data); //发送时间消息,所有注册到该eventBus上的handler都会收到该事件 eventBus.post(getConfiguration()); }
好了我们继续分析 Application 的代码。上面讲到了利用 zk 来做 flume 配置文件的代码。当然 flume 也支持本地文件的方式。代码如下:
File configurationFile = new File(commandLine.getOptionValue('f')); /* * The following is to ensure that by default the agent will fail on * startup if the file does not exist. */ if (!configurationFile.exists()) { // If command line invocation, then need to fail fast if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) { String path = configurationFile.getPath(); try { path = configurationFile.getCanonicalPath(); } catch (IOException ex) { logger.error("Failed to read canonical path for file: " + path, ex); } throw new ParseException( "The specified configuration file does not exist: " + path); } } List<LifecycleAware> components = Lists.newArrayList(); if (reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider( agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else { PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider( agentName, configurationFile); application = new Application(); application.handleConfigurationEvent(configurationProvider .getConfiguration()); } }
如果-f 指定的配置文件不存在,那么将快速失败,抛出异常。
再判断配置文件发生改变时是否需要重新 reload,套路和用 zk 保存配置文件一个道理
如果需要动态加载配置文件,那么启动 PollingPropertiesFileConfigurationProvider,每三十秒
加载一次配置文件
之后执行 application.start()方法。让我们继续看 start()方法
private final LifecycleSupervisor supervisor; public synchronized void start() { for(LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }
在 start 方法中遍历 compents 执行 supervisor.suervise()方法.
在继续分析之前我们先看一下 LifecycleSupervisor,PollingPropertiesFileConfigurationProvider 的类结构
从以上两图中可以看出它们都实现了 LifecycleAware 接口。这个接口定义了 flume 组件的生命周期。LifecycleSupervisor 提供了实现。
LifecycleAware.java
/** * <p> * Starts a service or component. * </p> * <p> * Implementations should determine the result of any start logic and effect * the return value of {@link #getLifecycleState()} accordingly. * </p> * * @throws LifecycleException * @throws InterruptedException */ public void start(); /** * <p> * Stops a service or component. * </p> * <p> * Implementations should determine the result of any stop logic and effect * the return value of {@link #getLifecycleState()} accordingly. * </p> * * @throws LifecycleException * @throws InterruptedException */ public void stop(); /** * <p> * Return the current state of the service or component. * </p> */ public LifecycleState getLifecycleState();
让我们继续分析 LifecycleSupervisor.supervise()方法
public synchronized void supervise(LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState) { if(this.monitorService.isShutdown() || this.monitorService.isTerminated() || this.monitorService.isTerminating()){ throw new FlumeException("Supervise called on " + lifecycleAware + " " + "after shutdown has been initiated. " + lifecycleAware + " will not" + " be started"); } Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware), "Refusing to supervise " + lifecycleAware + " more than once"); if (logger.isDebugEnabled()) { logger.debug("Supervising service:{} policy:{} desiredState:{}", new Object[] { lifecycleAware, policy, desiredState }); } Supervisoree process = new Supervisoree(); process.status = new Status(); process.policy = policy; process.status.desiredState = desiredState; process.status.error = false; MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future); }
在上面的代码中创建了一个 MonitorRunnable 对象,通过 jdk 的 scheduleWithFixedDelay 进行定时调用,每次执行完成延迟 3 秒调度。
再看 monitorRunable 中的内容
run 方法中部分内容
if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) { logger.debug("Want to transition {} from {} to {} (failures:{})", new Object[] { lifecycleAware, supervisoree.status.lastSeenState, supervisoree.status.desiredState, supervisoree.status.failures }); switch (supervisoree.status.desiredState) { case START: try { lifecycleAware.start(); } catch (Throwable e) { logger.error("Unable to start " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { // This component can never recover, shut it down. supervisoree.status.desiredState = LifecycleState.STOP; try { lifecycleAware.stop(); logger.warn("Component {} stopped, since it could not be" + "successfully started due to missing dependencies", lifecycleAware); } catch (Throwable e1) { logger.error("Unsuccessful attempt to " + "shutdown component: {} due to missing dependencies." + " Please shutdown the agent" + "or disable this component, or the agent will be" + "in an undefined state.", e1); supervisoree.status.error = true; if (e1 instanceof Error) { throw (Error) e1; } // Set the state to stop, so that the conf poller can // proceed. } } supervisoree.status.failures++; } break; case STOP: try { lifecycleAware.stop(); } catch (Throwable e) { logger.error("Unable to stop " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { throw (Error) e; } supervisoree.status.failures++; } break; default: logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState); } if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) { logger.error( "Policy {} of {} has been violated - supervisor should exit!", supervisoree.policy, lifecycleAware); } }
首先因为 monitorRunnbale 对象时重复调用的,所以在 run 方法中作了一个状态判断,当该组件的状态不等于期望的状态时继续往下执行,否则什么都不做。这样避免重复启动。当组件第一次被启动的时候,组件本身的状态是 IDEL,而 desired state 是 START,此时就会执行组件的 start 方法。
总结一下启动的时序图
比如启动 PollingPropertiesFileConfigurationProvider 组件,这个组件的作用就是定时去获取 flume 的配置。那么会调用 PollingPropertiesFileConfigurationProvider 的 start 方法。
下面以 PollingPropertiesFileConfigurationProvider 为列 分析 flume 的配置时如何动态载入的。
配置载入分析
从上面分析得知,启动 PollingPropertiesFileConfigurationProvider ,则执行该组件的 start 方法。查看 start 方法如下
@Override public void start() { LOGGER.info("Configuration provider starting"); Preconditions.checkState(file != null, "The parameter file must not be null"); executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") .build()); FileWatcherRunnable fileWatcherRunnable = new FileWatcherRunnable(file, counterGroup); executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, TimeUnit.SECONDS); lifecycleState = LifecycleState.START; LOGGER.debug("Configuration provider started"); }
在 start 方法中单独启动一个线程,执行 FileWatcherRunnable,并设置状态为 START
继续看 fileWatcher
public void run() { LOGGER.debug("Checking file:{} for changes", file); counterGroup.incrementAndGet("file.checks"); long lastModified = file.lastModified(); if (lastModified > lastChange) { LOGGER.info("Reloading configuration file:{}", file); counterGroup.incrementAndGet("file.loads"); lastChange = lastModified; try { eventBus.post(getConfiguration()); } catch (Exception e) { LOGGER.error("Failed to load configuration data. Exception follows.", e); } catch (NoClassDefFoundError e) { LOGGER.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); } catch (Throwable t) { // caught because the caller does not handle or log Throwables LOGGER.error("Unhandled error", t); } } }
在 fileWatcher 中通过对文件修改时间来判断配置文件是否发生变化。如果配置文件发生变化
调用 eventBus.post(getConfiguration()); 将配置文件的内容发布。
在 Application.java 中有如下代码
@Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); }
此方法订阅了 eventBus 的消息。当一有消息将会触发该方法,此方法的功能相当于重启 flume 组件。还记得上面分析的代码吗?要是用户配置 no-reload-conf 那么将会直接调用该方法。
那么 getConfiguration()方法是如何实现的呢?
protected abstract FlumeConfiguration getFlumeConfiguration(); public MaterializedConfiguration getConfiguration() { MaterializedConfiguration conf = new SimpleMaterializedConfiguration(); FlumeConfiguration fconfig = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for(String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap. get(channelName); if(channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache. get(channelComponent.channel.getClass()); if(nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }
getConfiguration()中调用了 getFlumeConfiguration()方法;getFlumeConfiguration() 是一个抽象方法,以 PollingPropertiesFileConfigurationProvider 实现为列 。该实现在父类中。
@Override public FlumeConfiguration getFlumeConfiguration() { BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(file)); Properties properties = new Properties(); properties.load(reader); return new FlumeConfiguration(toMap(properties)); } catch (IOException ex) { LOGGER.error("Unable to load file:" + file + " (I/O failure) - Exception follows.", ex); } finally { if (reader != null) { try { reader.close(); } catch (IOException ex) { LOGGER.warn( "Unable to close file reader for file: " + file, ex); } } } return new FlumeConfiguration(new HashMap<String, String>()); }
该方法通过基本的流加载方法返回 FlumeConfigruation 对象。该对象封装一个 Map 对象
。在 FlumeConfigruation 的构造函数中将会遍历这个 Map 对象,调用 addRawProperty 方法
该方法首先会进行一些合法性的检查,并且该方法会创建一个 AgentConfiguration 对象的 aoconf
该方法最后调用 aconf.addProperty 方法
在 aconf.addProperty 方法中会区分 source,channel,sink ,sinkgroup。将对应的配置信息放在
sourceContextMap,channelContextMap,sinkContextMap,sinkGroupContextMap。这些信息封装在 AgentConfiguration,AgentConfiguration 封装在 FlumeConfiguration 中,key 是 agentName。使用时可以通过 getConfigurationFor(String hostname) 来获取。
##flume 如何获自定义的 key
在上面的分析中 addProperty 方法中,调用了 parseConfigKey 方法
cnck = parseConfigKey(key, BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX);
具体实现如下:
private ComponentNameAndConfigKey parseConfigKey(String key, String prefix) { // key must start with prefix if (!key.startsWith(prefix)) { return null; } // key must have a component name part after the prefix of the format: // <prefix><component-name>.<config-key> int index = key.indexOf('.', prefix.length() + 1); if (index == -1) { return null; } String name = key.substring(prefix.length(), index); String configKey = key.substring(prefix.length() + name.length() + 1); // name and config key must be non-empty if (name.length() == 0 || configKey.length() == 0) { return null; } return new ComponentNameAndConfigKey(name, configKey); }
上面代码中 prefix 为定义的常量如下:
public final class BasicConfigurationConstants { public static final String CONFIG_SOURCES = "sources"; public static final String CONFIG_SOURCES_PREFIX = CONFIG_SOURCES + "."; public static final String CONFIG_SOURCE_CHANNELSELECTOR_PREFIX = "selector."; public static final String CONFIG_SINKS = "sinks"; public static final String CONFIG_SINKS_PREFIX = CONFIG_SINKS + "."; public static final String CONFIG_SINK_PROCESSOR_PREFIX = "processor."; public static final String CONFIG_SINKGROUPS = "sinkgroups"; public static final String CONFIG_SINKGROUPS_PREFIX = CONFIG_SINKGROUPS + "."; public static final String CONFIG_CHANNEL = "channel"; public static final String CONFIG_CHANNELS = "channels"; public static final String CONFIG_CHANNELS_PREFIX = CONFIG_CHANNELS + "."; public static final String CONFIG_CONFIG = "config"; public static final String CONFIG_TYPE = "type"; private BasicConfigurationConstants() { // disable explicit object creation } }
- 比如我们配置的格式是 agent1.sources.source1.type=avro(注意在后面 parse 时,agent1.已经被截取掉)
- 在上面的 parseKey 方法中首先会判断 prefix 的后面有多少个字符
- 解析出 name 。source1 就是 name
- 解析出 configKey 。type 就是 configKey
- 封装为 ComponentNameAndConfigKey
- 然后有上面的分析把 sources、channel、sink 配置信息,分别存放到 sourceContextMap、channelConfigMap、sinkConfigMap 三个 HashMap,这些信息封装在 AgentConfiguration,AgentConfiguration 封装在 FlumeConfiguration 中,key 是 agentName。使用时可以通过 getConfigurationFor(String hostname) 来获取
总结
以上分析了 flume 启动 agent 的流程。部分源码没有贴出来,可以自行阅读;以及 flume 中如何解析
用户自定义的 source,channel,sink;以及 flume 如何用 zk listener 和 fileWatcher 实现配置文件的动态加载。下篇主要讲解 flume 整体架构--常用架构篇
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于