flume-ng 源码分析 - 核心组件分析

本贴最后更新于 2300 天前,其中的信息可能已经时移世异

从第一篇分析可知,flume 中所有的组件都会实现 LifecycleAware 接口。该接口定义如下:

public interface LifecycleAware { public void start(); public void stop(); public LifecycleState getLifecycleState(); }

在组件启动的时候会调用 start 方法,当有异常时调用 stop 方法。getLifecycleState 方法返回该组件的状态。包含 ** IDLE, START, STOP, ERROR; **

当在组件开发中需要配置一些属性的时候可以实现 ** Configurable ** 接口

public interface Configurable { public void configure(Context context); }

下面开始分析 Agent 中各个组件的实现

source 实现

source 定义

public interface Source extends LifecycleAware, NamedComponent { public void setChannelProcessor(ChannelProcessor channelProcessor); public ChannelProcessor getChannelProcessor(); }

可以看到 Source 继承了 LifecycleAware 接口,并且提供了 ** ChannelProcessor ** 的 getter 和 setter 方法,channelProcessor 在** 常用架构篇中 **降到提供了日志过滤链,和 channel 选择的功能。所以 Source 的逻辑应该都在 LifecycleAware 中的 start,stop 方法中.

source 创建

启动篇中,我们讲到了 flume 是如何启动的。大致流程就是读取配置文件,生成 flume 的各种组件,执行各个组件的 start()方法。在** getConfiguration() 方法中调用了 loadSources() **方法。

可以看到在 loadSources 方法中如何创建 Source 的

private void loadSources(AgentConfiguration agentConf, Map<String, ChannelComponent> channelComponentMap, Map<String, SourceRunner> sourceRunnerMap) throws InstantiationException { Set<String> sourceNames = agentConf.getSourceSet();//获取所有的Source Map<String, ComponentConfiguration> compMap =agentConf.getSourceConfigMap(); //获取所有Source对应的配置 for (String sourceName : sourceNames) { ComponentConfiguration comp = compMap.get(sourceName);//获取该source对应的配置 if(comp != null) { SourceConfiguration config = (SourceConfiguration) comp; //转化为source配置 Source source = sourceFactory.create(comp.getComponentName(), comp.getType());//通过sourceFactory 创建source try { Configurables.configure(source, config);//配置组件的其他属性 Set<String> channelNames = config.getChannels()//获取该source的所有channel名称 List<Channel> sourceChannels = new ArrayList<Channel>(); for (String chName : channelNames) {//遍历所有的额channle,如果该channel已经实例化过了并且在channelComponentMap中已经存储了,那么将该channel放入sourceChannels ChannelComponent channelComponent = channelComponentMap.get(chName); if(channelComponent != null) { sourceChannels.add(channelComponent.channel); } } if(sourceChannels.isEmpty()) {//如果这个source没有关联到任何channel那么直接抛出异常 String msg = String.format("Source %s is not connected to a " + "channel", sourceName); throw new IllegalStateException(msg); } //以下创建出ChannelProcessor并且配置其他属性 ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration(); ChannelSelector selector = ChannelSelectorFactory.create( sourceChannels, selectorConfig); ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, config); source.setChannelProcessor(channelProcessor);//设置channelSelector sourceRunnerMap.put(comp.getComponentName(), SourceRunner.forSource(source));//将改source,以及对应的SourceRunner放入SourceRunnerMap中 for(Channel channel : sourceChannels) {//遍历改source所有的channel并且将改source添加到该channel的Component中 ChannelComponent channelComponent = Preconditions. checkNotNull(channelComponentMap.get(channel.getName()), String.format("Channel %s", channel.getName())); channelComponent.components.add(sourceName); } } catch (Exception e) { String msg = String.format("Source %s has been removed due to an " + "error during configuration", sourceName); LOGGER.error(msg, e); } } } ......

从上面的分析中可以看出,Source 是后 SourceFactory 创建的,创建之后绑定到 SourceRunner 中,并且在 SourceRunner 中启动了 Source。
SourceFactory 只有一个实现 DefaultSourceFactory。创建 Source 过程如下:

public Source create(String name, String type) throws FlumeException { Preconditions.checkNotNull(name, "name"); Preconditions.checkNotNull(type, "type"); logger.info("Creating instance of source {}, type {}", name, type); Class<? extends Source> sourceClass = getClass(type);//通过对应的类型找到对应的class try { Source source = sourceClass.newInstance();//直接创建实例 source.setName(name); return source; } catch (Exception ex) { throw new FlumeException("Unable to create source: " + name +", type: " + type + ", class: " + sourceClass.getName(), ex); } }

在创建重,通过 type 来或者 source 类的 class。在 getClass 方法中,首先会去找 type 对应类型的 class。在 SourceType 中定义的。如果没有找到,则直接获得配置的类全路径。最后通过 Class.forName(String)获取 class 对象。

source 提供了两种方式类获取数据:轮训拉去和事件驱动

14961359396630.jpg

PollableSource 提供的默认实现如下:
14962066716627.jpg

比如 KafkaSource 利用 Kafka 的 ConsumerApi,主动去拉去数据。

EventDrivenSource 提供的默认实现如下

14962068696930.jpg

如 HttpSource,NetcatSource 就是事件驱动的,所谓事件驱动也就是被动等待。在 HttpSource 中内置了一个 Jetty server,并且设置 FlumeHTTPServlet 作为 handler 去处理数据。

source 的启动

从上面的分析中知道,在启动 flume 读取配置文件时,会将所有的组件封装好,然后再启动。对于 Source 而言,封装成了 SourceRunner,通过 SourceRunner 间接启动 Source。

public static SourceRunner forSource(Source source) { SourceRunner runner = null; if (source instanceof PollableSource) {//判断该source是否为PollableSource runner = new PollableSourceRunner(); ((PollableSourceRunner) runner).setSource((PollableSource) source); } else if (source instanceof EventDrivenSource) {//判断该source是否为EventDrivenSource runner = new EventDrivenSourceRunner(); ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source); } else {//否则抛出异常 throw new IllegalArgumentException("No known runner type for source " + source); } return runner; }

从上面可以看出 SourceRunner 默认提供两种实现,PollableSourceRunner,EventDrivenSource.分别对应 PollableSource 和 EventDrivenSource。

查看 PollableSourceRunner 是如何启动的

@Override public void start() { PollableSource source = (PollableSource) getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize();//初始化ChannelProcessor source.start();//启动source组件 runner = new PollingRunner();//单独启动一个线程去轮询source runner.source = source; runner.counterGroup = counterGroup; runner.shouldStop = shouldStop; runnerThread = new Thread(runner); runnerThread.setName(getClass().getSimpleName() + "-" + source.getClass().getSimpleName() + "-" + source.getName()); runnerThread.start(); lifecycleState = LifecycleState.START;//设置状态为START }

在 PollableSourceRunner 中我们看到单独启动一个线程去执行 PollingRunner,这个线程的作用就是不断的去轮询。查看 PollingRunner 的实现

@Override public void run() { logger.debug("Polling runner starting. Source:{}", source); while (!shouldStop.get()) {//没有停止,那就继续吧 counterGroup.incrementAndGet("runner.polls"); try { //真正的拉去逻辑在source process()方法中,调用该方法进行拉去数据,并且 //判断返回状态是否为BACKOFF(失败补偿),如果是那么等待時間超时之后就会重试 if (source.process().equals(PollableSource.Status.BACKOFF)) { counterGroup.incrementAndGet("runner.backoffs"); Thread.sleep(Math.min( counterGroup.incrementAndGet("runner.backoffs.consecutive") * backoffSleepIncrement, maxBackoffSleep)); } else { counterGroup.set("runner.backoffs.consecutive", 0L); } } catch (InterruptedException e) { logger.info("Source runner interrupted. Exiting"); counterGroup.incrementAndGet("runner.interruptions"); } ...... } logger.debug("Polling runner exiting. Metrics:{}", counterGroup); }

比如 KafkaSource ,它的逻辑就在 process 方法中,

// get next message MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); kafkaMessage = messageAndMetadata.message(); kafkaKey = messageAndMetadata.key(); // Add headers to event (topic, timestamp, and key) headers = new HashMap<String, String>(); headers.put(KafkaSourceConstants.TIMESTAMP, String.valueOf(System.currentTimeMillis())); headers.put(KafkaSourceConstants.TOPIC, topic);

EventDrivenSourceRunner

@Override public void start() { Source source = getSource();//获取source ChannelProcessor cp = source.getChannelProcessor();//获取source对应的ChannelProcessor cp.initialize();//初始化channelprocessor source.start();//启动source lifecycleState = LifecycleState.START;//标记状态为START }

可以看到 EventDrivenSourceRunner 和 PollableSourceRunnner 启动流程大致相同,只是 PollableSourceRunner 会额外启动一个线程去轮询 source。

channel 的实现

source 获取到数据后,会交给 channelProcessor 处理,发送到 channel。最后由 sink 消费掉。
所以 channel 是 source,sink 实现异步化的关键。

channelProcessor 中两格重要的成员

private final ChannelSelector selector;//channel选择器 private final InterceptorChain interceptorChain; //过滤链

InterceptorChain 是有多个 Interceptor 组成,并且实现了 Interceptor 接口

public class InterceptorChain implements Interceptor { private List<Interceptor> interceptors; }

Interceptor.java

public interface Interceptor { public void initialize();// 做一些处理话工作 public Event intercept(Event event);//拦截单个event并且返回 public List<Event> intercept(List<Event> events);//批量拦截event public void close(); public interface Builder extends Configurable { public Interceptor build(); }//用来创建特定的Interceptor }

Interceptor 定义了一些处理 Event 的接口,再 Event 处理之后都会返回改 Envent

从 source 的分析中我们可以知道,如果是 PollableSourceRunner 会调用 source 中的 process()方法。如果是 EventDrivenSourceRunner,就会用特定的方法来获取 source,比如 httpSource 利用 FlumeHTTPServlet 来接受消息

try { events = handler.getEvents(request);//从请求中获取event ... try { getChannelProcessor().processEventBatch(events);//通过ChanneProcess进行的processEventBatch方法进行批量处理 } catch (ChannelException ex) {

比如 KafkaSource 是 PollableSourceRunner 那么会调用 KafkaSource 中的 process()方法。

public Status process() throws EventDeliveryException { ... // get next message MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); kafkaMessage = messageAndMetadata.message(); kafkaKey = messageAndMetadata.key(); // Add headers to event (topic, timestamp, and key) headers = new HashMap<String, String>(); headers.put(KafkaSourceConstants.TIMESTAMP, String.valueOf(System.currentTimeMillis())); headers.put(KafkaSourceConstants.TOPIC, topic); if (kafkaKey != null) { headers.put(KafkaSourceConstants.KEY, new String(kafkaKey)); } ...... event = EventBuilder.withBody(kafkaMessage, headers); eventList.add(event); ...... if()...... getChannelProcessor().processEventBatch(eventList);//交给channelProcessor处理 counter.addToEventAcceptedCount(eventList.size()); ... } }

从以上分析不管 source 是轮询还是事件驱动的,都会触发 ChannelProcessor 中的 processEvent 或者 ProcesEventBatch 方法

public void processEventBatch(List<Event> events) { events = interceptorChain.intercept(events);//调用Interceptor处理events List<Channel> reqChannels = selector.getRequiredChannels(event);//获取必须成功处理的Channel ,写失败了必须回滚source List<Channel> optChannels = selector.getOptionalChannels(event);//获取非必须成功处理的channel,写失败了就忽略 // 這裡分析處理必須成功channel的情況。非必須的channel處理情況一樣 for (Channel reqChannel : reqChannelQueue.keySet()) { Transaction tx = reqChannel.getTransaction();//获取该channel上的事务 Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin();//开始事务 List<Event> batch = reqChannelQueue.get(reqChannel);//获取events for (Event event : batch) { reqChannel.put(event);//处理Channel } tx.commit();//提交事务 } catch (Throwable t) { tx.rollback();//发生异常回滚 if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; } else { throw new ChannelException("Unable to put batch on required " + "channel: " + reqChannel, t); } } finally { if (tx != null) { tx.close();//关闭事务 } } } }

最后就是 ChannelSelector ,flume 默认提供两种实现多路复用和复制。多路复用选择器可以根据 header 中的值而选择不同的 channel,复制就会把 event 复制到多个 channel 中。flume 默认是复制选择器。
14962112541107.jpg

同样 Selector 的创建也是通过 ChannelSelectorFactory 创建的.

public static ChannelSelector create(List<Channel> channels, ChannelSelectorConfiguration conf) { String type = ChannelSelectorType.REPLICATING.toString(); if (conf != null){ type = conf.getType(); } ChannelSelector selector = getSelectorForType(type); selector.setChannels(channels); Configurables.configure(selector, conf); return selector; }

默认提供复制选择器,如果配置文件中配置了选择器那么就从配置文件中获取。

上面看到在 processEventBatch 方法中调用 channel 的 put 方法。channel 中提供了基本的
put 和 take 方法来实现 Event 的流转。

public interface Channel extends LifecycleAware, NamedComponent { public void put(Event event) throws ChannelException;//向channel中存放 public Event take() throws ChannelException;//消费event public Transaction getTransaction();//获取事务 }

flume 提供的默认 channel 如下图所示:

14962117376319.jpg

sink 的实现

sink 定义:

public interface Sink extends LifecycleAware, NamedComponent { public void setChannel(Channel channel); public Channel getChannel(); public Status process() throws EventDeliveryException; public static enum Status { READY, BACKOFF } }

提供了 channel 的 setter,getter 方法。process 方法用来消费。并返回状态 READY,BACKOFF

sink 的创建

SinkConfiguration config = (SinkConfiguration) comp; Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());

sink 的创建也是通过 sinkFactory

public Sink create(String name, String type) throws FlumeException { Preconditions.checkNotNull(name, "name"); Preconditions.checkNotNull(type, "type"); logger.info("Creating instance of sink: {}, type: {}", name, type); Class<? extends Sink> sinkClass = getClass(type);//获取sink对应的类型的Class try { Sink sink = sinkClass.newInstance();//创建实例 sink.setName(name); return sink; } catch (Exception ex) { throw new FlumeException("Unable to create sink: " + name + ", type: " + type + ", class: " + sinkClass.getName(), ex); } }

通过传入的 type 找到对应的 Class 要是没有找到则直接通过 Class.forNamae(String name)来创建

sink 还提供了分组功能。该功能由 SinkGroup 实现。在 SinkGroup 内部如何调度多个 Sink,则交给 SinkProcessor 完成。

sink 的启动

和 Source 一样,flume 也为 Sink 提供了 SinkRunner 来流转 Sink
在 sinkRunner 中

public void start() { SinkProcessor policy = getPolicy(); policy.start();//启动SinkProcessor runner = new PollingRunner();//单独启动一个线程,从channel中消费数据 runner.policy = policy; runner.counterGroup = counterGroup; runner.shouldStop = new AtomicBoolean(); runnerThread = new Thread(runner); runnerThread.setName("SinkRunner-PollingRunner-" + policy.getClass().getSimpleName()); runnerThread.start(); lifecycleState = LifecycleState.START; }

sinkRunner 中通过启动 SinkProcessor 间接启动 Sink,并且单独启动一个线程,不停地调用 process()方法从 channel 中消费数据
在 SinkProcessor 中,如果是 DefaultSinkProcessor 那么直接调用 sink.start()方法启动 sink。如果是 LoadBalancingSinkProcessor,FailoverSinkProcessor 由于这两种处理器中包含多个 Sink,所以会依次遍历 sink 调用 start()方法启动

public void run() { logger.debug("Polling sink runner starting"); while (!shouldStop.get()) {//判断是否停止 try { if (policy.process().equals(Sink.Status.BACKOFF)) {//调用SinkProcessor的proces()方法进行处理 counterGroup.incrementAndGet("runner.backoffs"); Thread.sleep(Math.min( counterGroup.incrementAndGet("runner.backoffs.consecutive") * backoffSleepIncrement, maxBackoffSleep)); } else { counterGroup.set("runner.backoffs.consecutive", 0L); } } } }

该线程会不停的执行 SinkProcessor 的 process()方法,而 SinkProcessor 的 process()方法会调用对应的 Sink 的 process()方法。然后判断处理状态如果是失败补偿,那么等待超时时间后重试

SinkGroup

public class SinkGroup implements Configurable, ConfigurableComponent { List<Sink> sinks; SinkProcessor processor; SinkGroupConfiguration conf; ...... }

SinkGroup 中包含多个 Sink,并且提供一个 SinkProcessor 来处理 SinkGroup 内部调度

SinkProcessor

SinkProcessor 默认提供三种实现。DefaultSinkProcessor,LoadBalancingSinkProcessor,FailoverSinkProcessor

14962397506282.jpg

DefaultSinkProcessor:默认实现,适用于单个 sink
LoadBalancingSinkProcessor:提供负载均衡
FailoverSinkProcessor:提供故障转移

DefaultSinkProcessor

public class DefaultSinkProcessor implements SinkProcessor, ConfigurableComponent { private Sink sink; private LifecycleState lifecycleState; @Override public void start() { Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set"); sink.start();//启动sink lifecycleState = LifecycleState.START; } @Override public Status process() throws EventDeliveryException { return sink.process(); } @Override public void setSinks(List<Sink> sinks) { Preconditions.checkNotNull(sinks); Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can " + "only handle one sink, " + "try using a policy that supports multiple sinks"); sink = sinks.get(0); } }

从上面可以看出 DefaultSinkProcessor 只能处理一个 Sink。在 process 方法中调用 sink 的方法。具体到某个具体的 Sink,比如 HDFSEventSink,那么就执行该 sink 的 process 方法

接下来分析 SinkProcessor 中负载均衡和故障转移 是如何具体实现的。

FailOverSinkProcessor 实现分析

** FailOverSinkProcessor ** 中 process()方法实现如下:

@Override public Status process() throws EventDeliveryException { Long now = System.currentTimeMillis(); while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {//检查失败队列是否有sink,并且队列中第一个sink过了失败补偿时间 FailedSink cur = failedSinks.poll();//从失败队列中获取第一个sink,并且在队列中删除 Status s; try { s = cur.getSink().process();//调用sink的process()方法进行处理 if (s == Status.READY) {//如果状态是就绪 liveSinks.put(cur.getPriority(), cur.getSink());//将该sink放入存活队列 activeSink = liveSinks.get(liveSinks.lastKey());//重新赋值给activeSink logger.debug("Sink {} was recovered from the fail list", cur.getSink().getName()); } else {//sink 处理失败 failedSinks.add(cur);//加入失败队列 } return s; } catch (Exception e) { cur.incFails();//发生异常,增加失败次数 failedSinks.add(cur);//放入失败队列 } } //如果失败队列为空,或者失败队列中所有的sink都没有达到失败补偿时间,那么交给activeSink进行处理, Status ret = null; while(activeSink != null) { try { ret = activeSink.process();//交给activeSink 处理 return ret; } catch (Exception e) { logger.warn("Sink {} failed and has been sent to failover list", activeSink.getName(), e); activeSink = moveActiveToDeadAndGetNext();//如果activeSink处理失败,则把activeSink从存活队列中移动到失败队列中 } } throw new EventDeliveryException("All sinks failed to process, " + "nothing left to failover to"); }
  • 存活队列是一个 SortMap<Key,Value> 其中 key 是 sink 的优先级。activeSink 默认取存活队列中的最后一个,存活队列是根据配置的 sink 优先级来排序的
  • 失败队列是一个优先队列,按照 FailSink 的 refresh 属性进行排序
@Override public int compareTo(FailedSink arg0) { return refresh.compareTo(arg0.refresh); }

refresh 属性,在 FailSink 创建时和 sink 处理发生异常时 会触发调整
refresh 调整策略 如下:

private void adjustRefresh() { refresh = System.currentTimeMillis() + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY); }

refresh 等于系统当前的毫秒加上最大等待时间(默认 30s)和失败次数指数级增长值中最小的一个。FAILURE_PENALTY 等 1s;(1 << sequentialFailures) * FAILURE_PENALTY)用于实现根据失败次数等待时间指数级递增。

一个配置的 failOver 具体的例子:

host1.sinkgroups = group1 host1.sinkgroups.group1.sinks = sink1 sink2 host1.sinkgroups.group1.processor.type = failover host1.sinkgroups.group1.processor.priority.sink1 = 5 host1.sinkgroups.group1.processor.priority.sink2 = 10 host1.sinkgroups.group1.processor.maxpenalty = 10000

LoadBalancingSinkProcessor 实现分析

loadBalaneingSinkProcessor 用于实现 sink 的负载均衡,其功能通过 SinkSelector 实现。类似于 ChannelSelector 和 Channel 的关系

14965813235880.jpg

14965813925723.jpg

SinkSelector 中模式有三种实现
1.固定顺序
2.轮询
3.随机

LoadBalancingSinkProcessor 中使用均衡负载的方式

@Override public Status process() throws EventDeliveryException { Status status = null; Iterator<Sink> sinkIterator = selector.createSinkIterator();//使用sinkSelector创建Sink迭代器。三种方式有各自不同的实现 while (sinkIterator.hasNext()) {//遍历Sink Sink sink = sinkIterator.next();//获取sink try { status = sink.process();//调用sink处理 break;//如果处理成功那么本次负载均衡就算完成 } catch (Exception ex) { selector.informSinkFailed(sink);//如果发生异常则通知SinkSelector,采用相应的补偿算法进行处理 LOGGER.warn("Sink failed to consume event. " + "Attempting next sink if available.", ex); } } if (status == null) { throw new EventDeliveryException("All configured sinks have failed"); } return status; }

在上面的解释中,最大的两个疑惑就是

  • 这个 Sink 迭代器也就是 createSinkIterator() 是如何实现的
  • 发生异常后 SinkSelector 的处理是如何实现的

先来看 createSinkIterator 的实现。首先看 RoundRobinSinkSelector 的实现

14965819912139.jpg

如上图所示 RoundRobinSinkSelector 内部包含一个 OrderSelector 的属性。

private OrderSelector<Sink> selector; RoundRobinSinkSelector(boolean backoff){ selector = new RoundRobinOrderSelector<Sink>(backoff); } @Override public Iterator<Sink> createSinkIterator() { return selector.createIterator(); }

内部通过一个 RoundRobinOrderSelector 来实现。查看起 createIterator 实现

@Override public Iterator<T> createIterator() { List<Integer> activeIndices = getIndexList();//获取存活sink的索引 int size = activeIndices.size();//存活sink的個數 //如果下一個sink的位置超過了存活sin的個數,重新指向头 if (nextHead >= size) { nextHead = 0; } int begin = nextHead++; //获取起始位置 if (nextHead == activeIndices.size()) {//检查是否超过范围,超过了从头开始 nextHead = 0; } int[] indexOrder = new int[size];//创建一个数组,来存放访问的顺序 for (int i = 0; i < size; i++) { indexOrder[i] = activeIndices.get((begin + i) % size);//用取模的方法实现轮询,每次都从上一个sink的下一个sink 索引开始,由begin控制 } //indexOrder 是访问顺序,getObjects返回相关所有的sink return new SpecificOrderIterator<T>(indexOrder, getObjects()); }

接下来看一下 getIndexList 的实现

protected List<Integer> getIndexList() { long now = System.currentTimeMillis();//当前时间 List<Integer> indexList = new ArrayList<Integer>();//用来存放sink的索引 int i = 0; for (T obj : stateMap.keySet()) {//获取所有sink if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) { //如果没有开启退避,或者该sink 到失败补偿的时间,那么将改sink的索引放入IndexList indexList.add(i); } i++; } return indexList; }

stateMap 是一个 LinkedHashMap<T,FailState> 其中 T 在这里指的是 Sink。
如果没有开启了退避算法,那么会认为每个 sink 都是存活的,所有的 sink 都加到 IndexList。否则等到了失败补偿时间才会加入到 IndexList。可以通过 processor.backoff = true 配置开启

最后分析一下当 sink 处理失败 SinkSelector 是如何处理的

public void informFailure(T failedObject) { if (!shouldBackOff) {//如果没有开启退避算法,当然就不做任何处理 return; } FailureState state = stateMap.get(failedObject);//获取当前失败sink的状态对象 long now = System.currentTimeMillis();//当前时间 long delta = now - state.lastFail;//自从上次失败的经过的时间 long lastBackoffLength = Math.min(maxTimeout, 1000 * (1 << state.sequentialFails));//计算上一次退避等待的时间 long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE; if (allowableDiff > delta) {//如果上次失败到现在最后退避时间后的一个小时内,并且是失败次数小于期望的退避次数限制,那么就增加state.sequentialFails 实际上就增加了退避的等待时间 if (state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT) { state.sequentialFails++; } } else { state.sequentialFails = 1;//否则就不再增加退避等待时间 } state.lastFail = now;//更新最后失败时间 state.restoreTime = now + Math.min(maxTimeout, 1000 * (1 << state.sequentialFails));//更新退避等待时间 }

CONSIDER_SEQUENTIAL_RANGE 是一个常量 只为 1 小时,EXP_BACKOFF_COUNTER_LIMIT 为期望最大的退避次数 值为 16.如果上次失败到现在的是哪在上次退避等待时间超过一个小时后 或者 退避次数超过了 EXP_BACKOFF_COUNTER_LIMIT 那么退避的等待时间将不再增加。

  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 655 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...