从第一篇分析可知,flume 中所有的组件都会实现 LifecycleAware 接口。该接口定义如下:
public interface LifecycleAware {
public void start();
public void stop();
public LifecycleState getLifecycleState();
}
在组件启动的时候会调用 start 方法,当有异常时调用 stop 方法。getLifecycleState 方法返回该组件的状态。包含 ** IDLE, START, STOP, ERROR; **
当在组件开发中需要配置一些属性的时候可以实现 ** Configurable ** 接口
public interface Configurable {
public void configure(Context context);
}
下面开始分析 Agent 中各个组件的实现
source 实现
source 定义
public interface Source extends LifecycleAware, NamedComponent {
public void setChannelProcessor(ChannelProcessor channelProcessor);
public ChannelProcessor getChannelProcessor();
}
可以看到 Source 继承了 LifecycleAware 接口,并且提供了 ** ChannelProcessor ** 的 getter 和 setter 方法,channelProcessor 在** 常用架构篇中 **降到提供了日志过滤链,和 channel 选择的功能。所以 Source 的逻辑应该都在 LifecycleAware 中的 start,stop 方法中.
source 创建
在启动篇中,我们讲到了 flume 是如何启动的。大致流程就是读取配置文件,生成 flume 的各种组件,执行各个组件的 start()方法。在** getConfiguration() 方法中调用了 loadSources() **方法。
可以看到在 loadSources 方法中如何创建 Source 的
private void loadSources(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap,
Map<String, SourceRunner> sourceRunnerMap)
throws InstantiationException {
Set<String> sourceNames = agentConf.getSourceSet();//获取所有的Source
Map<String, ComponentConfiguration> compMap =agentConf.getSourceConfigMap(); //获取所有Source对应的配置
for (String sourceName : sourceNames) {
ComponentConfiguration comp = compMap.get(sourceName);//获取该source对应的配置
if(comp != null) {
SourceConfiguration config = (SourceConfiguration) comp; //转化为source配置
Source source = sourceFactory.create(comp.getComponentName(),
comp.getType());//通过sourceFactory 创建source
try {
Configurables.configure(source, config);//配置组件的其他属性
Set<String> channelNames = config.getChannels()//获取该source的所有channel名称
List<Channel> sourceChannels = new ArrayList<Channel>();
for (String chName : channelNames) {//遍历所有的额channle,如果该channel已经实例化过了并且在channelComponentMap中已经存储了,那么将该channel放入sourceChannels
ChannelComponent channelComponent = channelComponentMap.get(chName);
if(channelComponent != null) {
sourceChannels.add(channelComponent.channel);
}
}
if(sourceChannels.isEmpty()) {//如果这个source没有关联到任何channel那么直接抛出异常
String msg = String.format("Source %s is not connected to a " +
"channel", sourceName);
throw new IllegalStateException(msg);
}
//以下创建出ChannelProcessor并且配置其他属性
ChannelSelectorConfiguration selectorConfig =
config.getSelectorConfiguration();
ChannelSelector selector = ChannelSelectorFactory.create(
sourceChannels, selectorConfig);
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, config);
source.setChannelProcessor(channelProcessor);//设置channelSelector
sourceRunnerMap.put(comp.getComponentName(),
SourceRunner.forSource(source));//将改source,以及对应的SourceRunner放入SourceRunnerMap中
for(Channel channel : sourceChannels) {//遍历改source所有的channel并且将改source添加到该channel的Component中
ChannelComponent channelComponent = Preconditions.
checkNotNull(channelComponentMap.get(channel.getName()),
String.format("Channel %s", channel.getName()));
channelComponent.components.add(sourceName);
}
} catch (Exception e) {
String msg = String.format("Source %s has been removed due to an " +
"error during configuration", sourceName);
LOGGER.error(msg, e);
}
}
}
......
从上面的分析中可以看出,Source 是后 SourceFactory 创建的,创建之后绑定到 SourceRunner 中,并且在 SourceRunner 中启动了 Source。
SourceFactory 只有一个实现 DefaultSourceFactory。创建 Source 过程如下:
public Source create(String name, String type) throws FlumeException {
Preconditions.checkNotNull(name, "name");
Preconditions.checkNotNull(type, "type");
logger.info("Creating instance of source {}, type {}", name, type);
Class<? extends Source> sourceClass = getClass(type);//通过对应的类型找到对应的class
try {
Source source = sourceClass.newInstance();//直接创建实例
source.setName(name);
return source;
} catch (Exception ex) {
throw new FlumeException("Unable to create source: " + name
+", type: " + type + ", class: " + sourceClass.getName(), ex);
}
}
在创建重,通过 type 来或者 source 类的 class。在 getClass 方法中,首先会去找 type 对应类型的 class。在 SourceType 中定义的。如果没有找到,则直接获得配置的类全路径。最后通过 Class.forName(String)获取 class 对象。
source 提供了两种方式类获取数据:轮训拉去和事件驱动
PollableSource 提供的默认实现如下:
比如 KafkaSource 利用 Kafka 的 ConsumerApi,主动去拉去数据。
EventDrivenSource 提供的默认实现如下
如 HttpSource,NetcatSource 就是事件驱动的,所谓事件驱动也就是被动等待。在 HttpSource 中内置了一个 Jetty server,并且设置 FlumeHTTPServlet 作为 handler 去处理数据。
source 的启动
从上面的分析中知道,在启动 flume 读取配置文件时,会将所有的组件封装好,然后再启动。对于 Source 而言,封装成了 SourceRunner,通过 SourceRunner 间接启动 Source。
public static SourceRunner forSource(Source source) {
SourceRunner runner = null;
if (source instanceof PollableSource) {//判断该source是否为PollableSource
runner = new PollableSourceRunner();
((PollableSourceRunner) runner).setSource((PollableSource) source);
} else if (source instanceof EventDrivenSource) {//判断该source是否为EventDrivenSource
runner = new EventDrivenSourceRunner();
((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
} else {//否则抛出异常
throw new IllegalArgumentException("No known runner type for source "
+ source);
}
return runner;
}
从上面可以看出 SourceRunner 默认提供两种实现,PollableSourceRunner,EventDrivenSource.分别对应 PollableSource 和 EventDrivenSource。
查看 PollableSourceRunner 是如何启动的
@Override
public void start() {
PollableSource source = (PollableSource) getSource();
ChannelProcessor cp = source.getChannelProcessor();
cp.initialize();//初始化ChannelProcessor
source.start();//启动source组件
runner = new PollingRunner();//单独启动一个线程去轮询source
runner.source = source;
runner.counterGroup = counterGroup;
runner.shouldStop = shouldStop;
runnerThread = new Thread(runner);
runnerThread.setName(getClass().getSimpleName() + "-" +
source.getClass().getSimpleName() + "-" + source.getName());
runnerThread.start();
lifecycleState = LifecycleState.START;//设置状态为START
}
在 PollableSourceRunner 中我们看到单独启动一个线程去执行 PollingRunner,这个线程的作用就是不断的去轮询。查看 PollingRunner 的实现
@Override
public void run() {
logger.debug("Polling runner starting. Source:{}", source);
while (!shouldStop.get()) {//没有停止,那就继续吧
counterGroup.incrementAndGet("runner.polls");
try {
//真正的拉去逻辑在source process()方法中,调用该方法进行拉去数据,并且
//判断返回状态是否为BACKOFF(失败补偿),如果是那么等待時間超时之后就会重试
if (source.process().equals(PollableSource.Status.BACKOFF)) {
counterGroup.incrementAndGet("runner.backoffs");
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* backoffSleepIncrement, maxBackoffSleep));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
logger.info("Source runner interrupted. Exiting");
counterGroup.incrementAndGet("runner.interruptions");
} ......
}
logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
}
比如 KafkaSource ,它的逻辑就在 process 方法中,
// get next message
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
kafkaMessage = messageAndMetadata.message();
kafkaKey = messageAndMetadata.key();
// Add headers to event (topic, timestamp, and key)
headers = new HashMap<String, String>();
headers.put(KafkaSourceConstants.TIMESTAMP,
String.valueOf(System.currentTimeMillis()));
headers.put(KafkaSourceConstants.TOPIC, topic);
EventDrivenSourceRunner
@Override
public void start() {
Source source = getSource();//获取source
ChannelProcessor cp = source.getChannelProcessor();//获取source对应的ChannelProcessor
cp.initialize();//初始化channelprocessor
source.start();//启动source
lifecycleState = LifecycleState.START;//标记状态为START
}
可以看到 EventDrivenSourceRunner 和 PollableSourceRunnner 启动流程大致相同,只是 PollableSourceRunner 会额外启动一个线程去轮询 source。
channel 的实现
source 获取到数据后,会交给 channelProcessor 处理,发送到 channel。最后由 sink 消费掉。
所以 channel 是 source,sink 实现异步化的关键。
channelProcessor 中两格重要的成员
private final ChannelSelector selector;//channel选择器
private final InterceptorChain interceptorChain; //过滤链
InterceptorChain 是有多个 Interceptor 组成,并且实现了 Interceptor 接口
public class InterceptorChain implements Interceptor {
private List<Interceptor> interceptors;
}
Interceptor.java
public interface Interceptor {
public void initialize();// 做一些处理话工作
public Event intercept(Event event);//拦截单个event并且返回
public List<Event> intercept(List<Event> events);//批量拦截event
public void close();
public interface Builder extends Configurable {
public Interceptor build();
}//用来创建特定的Interceptor
}
Interceptor 定义了一些处理 Event 的接口,再 Event 处理之后都会返回改 Envent
从 source 的分析中我们可以知道,如果是 PollableSourceRunner 会调用 source 中的 process()方法。如果是 EventDrivenSourceRunner,就会用特定的方法来获取 source,比如 httpSource 利用 FlumeHTTPServlet 来接受消息
try {
events = handler.getEvents(request);//从请求中获取event
...
try {
getChannelProcessor().processEventBatch(events);//通过ChanneProcess进行的processEventBatch方法进行批量处理
} catch (ChannelException ex) {
比如 KafkaSource 是 PollableSourceRunner 那么会调用 KafkaSource 中的 process()方法。
public Status process() throws EventDeliveryException {
...
// get next message
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
kafkaMessage = messageAndMetadata.message();
kafkaKey = messageAndMetadata.key();
// Add headers to event (topic, timestamp, and key)
headers = new HashMap<String, String>();
headers.put(KafkaSourceConstants.TIMESTAMP,
String.valueOf(System.currentTimeMillis()));
headers.put(KafkaSourceConstants.TOPIC, topic);
if (kafkaKey != null) {
headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
}
......
event = EventBuilder.withBody(kafkaMessage, headers);
eventList.add(event);
......
if()......
getChannelProcessor().processEventBatch(eventList);//交给channelProcessor处理
counter.addToEventAcceptedCount(eventList.size());
...
}
}
从以上分析不管 source 是轮询还是事件驱动的,都会触发 ChannelProcessor 中的 processEvent 或者 ProcesEventBatch 方法
public void processEventBatch(List<Event> events) {
events = interceptorChain.intercept(events);//调用Interceptor处理events
List<Channel> reqChannels = selector.getRequiredChannels(event);//获取必须成功处理的Channel ,写失败了必须回滚source
List<Channel> optChannels = selector.getOptionalChannels(event);//获取非必须成功处理的channel,写失败了就忽略
// 這裡分析處理必須成功channel的情況。非必須的channel處理情況一樣
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();//获取该channel上的事务
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();//开始事务
List<Event> batch = reqChannelQueue.get(reqChannel);//获取events
for (Event event : batch) {
reqChannel.put(event);//处理Channel
}
tx.commit();//提交事务
} catch (Throwable t) {
tx.rollback();//发生异常回滚
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " +
reqChannel, t);
throw (Error) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close();//关闭事务
}
}
}
}
最后就是 ChannelSelector ,flume 默认提供两种实现多路复用和复制。多路复用选择器可以根据 header 中的值而选择不同的 channel,复制就会把 event 复制到多个 channel 中。flume 默认是复制选择器。
同样 Selector 的创建也是通过 ChannelSelectorFactory 创建的.
public static ChannelSelector create(List<Channel> channels,
ChannelSelectorConfiguration conf) {
String type = ChannelSelectorType.REPLICATING.toString();
if (conf != null){
type = conf.getType();
}
ChannelSelector selector = getSelectorForType(type);
selector.setChannels(channels);
Configurables.configure(selector, conf);
return selector;
}
默认提供复制选择器,如果配置文件中配置了选择器那么就从配置文件中获取。
上面看到在 processEventBatch 方法中调用 channel 的 put 方法。channel 中提供了基本的
put 和 take 方法来实现 Event 的流转。
public interface Channel extends LifecycleAware, NamedComponent {
public void put(Event event) throws ChannelException;//向channel中存放
public Event take() throws ChannelException;//消费event
public Transaction getTransaction();//获取事务
}
flume 提供的默认 channel 如下图所示:
sink 的实现
sink 定义:
public interface Sink extends LifecycleAware, NamedComponent {
public void setChannel(Channel channel);
public Channel getChannel();
public Status process() throws EventDeliveryException;
public static enum Status {
READY, BACKOFF
}
}
提供了 channel 的 setter,getter 方法。process 方法用来消费。并返回状态 READY,BACKOFF
sink 的创建
SinkConfiguration config = (SinkConfiguration) comp;
Sink sink = sinkFactory.create(comp.getComponentName(),
comp.getType());
sink 的创建也是通过 sinkFactory
public Sink create(String name, String type) throws FlumeException {
Preconditions.checkNotNull(name, "name");
Preconditions.checkNotNull(type, "type");
logger.info("Creating instance of sink: {}, type: {}", name, type);
Class<? extends Sink> sinkClass = getClass(type);//获取sink对应的类型的Class
try {
Sink sink = sinkClass.newInstance();//创建实例
sink.setName(name);
return sink;
} catch (Exception ex) {
throw new FlumeException("Unable to create sink: " + name
+ ", type: " + type + ", class: " + sinkClass.getName(), ex);
}
}
通过传入的 type 找到对应的 Class 要是没有找到则直接通过 Class.forNamae(String name)来创建
sink 还提供了分组功能。该功能由 SinkGroup 实现。在 SinkGroup 内部如何调度多个 Sink,则交给 SinkProcessor 完成。
sink 的启动
和 Source 一样,flume 也为 Sink 提供了 SinkRunner 来流转 Sink
在 sinkRunner 中
public void start() {
SinkProcessor policy = getPolicy();
policy.start();//启动SinkProcessor
runner = new PollingRunner();//单独启动一个线程,从channel中消费数据
runner.policy = policy;
runner.counterGroup = counterGroup;
runner.shouldStop = new AtomicBoolean();
runnerThread = new Thread(runner);
runnerThread.setName("SinkRunner-PollingRunner-" +
policy.getClass().getSimpleName());
runnerThread.start();
lifecycleState = LifecycleState.START;
}
sinkRunner 中通过启动 SinkProcessor 间接启动 Sink,并且单独启动一个线程,不停地调用 process()方法从 channel 中消费数据
在 SinkProcessor 中,如果是 DefaultSinkProcessor 那么直接调用 sink.start()方法启动 sink。如果是 LoadBalancingSinkProcessor,FailoverSinkProcessor 由于这两种处理器中包含多个 Sink,所以会依次遍历 sink 调用 start()方法启动
public void run() {
logger.debug("Polling sink runner starting");
while (!shouldStop.get()) {//判断是否停止
try {
if (policy.process().equals(Sink.Status.BACKOFF)) {//调用SinkProcessor的proces()方法进行处理
counterGroup.incrementAndGet("runner.backoffs");
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* backoffSleepIncrement, maxBackoffSleep));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
}
}
}
该线程会不停的执行 SinkProcessor 的 process()方法,而 SinkProcessor 的 process()方法会调用对应的 Sink 的 process()方法。然后判断处理状态如果是失败补偿,那么等待超时时间后重试
SinkGroup
public class SinkGroup implements Configurable, ConfigurableComponent {
List<Sink> sinks;
SinkProcessor processor;
SinkGroupConfiguration conf;
......
}
SinkGroup 中包含多个 Sink,并且提供一个 SinkProcessor 来处理 SinkGroup 内部调度
SinkProcessor
SinkProcessor 默认提供三种实现。DefaultSinkProcessor,LoadBalancingSinkProcessor,FailoverSinkProcessor
DefaultSinkProcessor:默认实现,适用于单个 sink
LoadBalancingSinkProcessor:提供负载均衡
FailoverSinkProcessor:提供故障转移
DefaultSinkProcessor
public class DefaultSinkProcessor implements SinkProcessor,
ConfigurableComponent {
private Sink sink;
private LifecycleState lifecycleState;
@Override
public void start() {
Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
sink.start();//启动sink
lifecycleState = LifecycleState.START;
}
@Override
public Status process() throws EventDeliveryException {
return sink.process();
}
@Override
public void setSinks(List<Sink> sinks) {
Preconditions.checkNotNull(sinks);
Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
+ "only handle one sink, "
+ "try using a policy that supports multiple sinks");
sink = sinks.get(0);
}
}
从上面可以看出 DefaultSinkProcessor 只能处理一个 Sink。在 process 方法中调用 sink 的方法。具体到某个具体的 Sink,比如 HDFSEventSink,那么就执行该 sink 的 process 方法
接下来分析 SinkProcessor 中负载均衡和故障转移 是如何具体实现的。
FailOverSinkProcessor 实现分析
** FailOverSinkProcessor ** 中 process()方法实现如下:
@Override
public Status process() throws EventDeliveryException {
Long now = System.currentTimeMillis();
while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {//检查失败队列是否有sink,并且队列中第一个sink过了失败补偿时间
FailedSink cur = failedSinks.poll();//从失败队列中获取第一个sink,并且在队列中删除
Status s;
try {
s = cur.getSink().process();//调用sink的process()方法进行处理
if (s == Status.READY) {//如果状态是就绪
liveSinks.put(cur.getPriority(), cur.getSink());//将该sink放入存活队列
activeSink = liveSinks.get(liveSinks.lastKey());//重新赋值给activeSink
logger.debug("Sink {} was recovered from the fail list",
cur.getSink().getName());
} else {//sink 处理失败
failedSinks.add(cur);//加入失败队列
}
return s;
} catch (Exception e) {
cur.incFails();//发生异常,增加失败次数
failedSinks.add(cur);//放入失败队列
}
}
//如果失败队列为空,或者失败队列中所有的sink都没有达到失败补偿时间,那么交给activeSink进行处理,
Status ret = null;
while(activeSink != null) {
try {
ret = activeSink.process();//交给activeSink 处理
return ret;
} catch (Exception e) {
logger.warn("Sink {} failed and has been sent to failover list",
activeSink.getName(), e);
activeSink = moveActiveToDeadAndGetNext();//如果activeSink处理失败,则把activeSink从存活队列中移动到失败队列中
}
}
throw new EventDeliveryException("All sinks failed to process, " +
"nothing left to failover to");
}
- 存活队列是一个 SortMap<Key,Value> 其中 key 是 sink 的优先级。activeSink 默认取存活队列中的最后一个,存活队列是根据配置的 sink 优先级来排序的
- 失败队列是一个优先队列,按照 FailSink 的 refresh 属性进行排序
@Override
public int compareTo(FailedSink arg0) {
return refresh.compareTo(arg0.refresh);
}
refresh 属性,在 FailSink 创建时和 sink 处理发生异常时 会触发调整
refresh 调整策略 如下:
private void adjustRefresh() {
refresh = System.currentTimeMillis()
+ Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY);
}
refresh 等于系统当前的毫秒加上最大等待时间(默认 30s)和失败次数指数级增长值中最小的一个。FAILURE_PENALTY 等 1s;(1 << sequentialFailures) * FAILURE_PENALTY)用于实现根据失败次数等待时间指数级递增。
一个配置的 failOver 具体的例子:
host1.sinkgroups = group1
host1.sinkgroups.group1.sinks = sink1 sink2
host1.sinkgroups.group1.processor.type = failover
host1.sinkgroups.group1.processor.priority.sink1 = 5
host1.sinkgroups.group1.processor.priority.sink2 = 10
host1.sinkgroups.group1.processor.maxpenalty = 10000
LoadBalancingSinkProcessor 实现分析
loadBalaneingSinkProcessor 用于实现 sink 的负载均衡,其功能通过 SinkSelector 实现。类似于 ChannelSelector 和 Channel 的关系
SinkSelector 中模式有三种实现
1.固定顺序
2.轮询
3.随机
LoadBalancingSinkProcessor 中使用均衡负载的方式
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Iterator<Sink> sinkIterator = selector.createSinkIterator();//使用sinkSelector创建Sink迭代器。三种方式有各自不同的实现
while (sinkIterator.hasNext()) {//遍历Sink
Sink sink = sinkIterator.next();//获取sink
try {
status = sink.process();//调用sink处理
break;//如果处理成功那么本次负载均衡就算完成
} catch (Exception ex) {
selector.informSinkFailed(sink);//如果发生异常则通知SinkSelector,采用相应的补偿算法进行处理
LOGGER.warn("Sink failed to consume event. "
+ "Attempting next sink if available.", ex);
}
}
if (status == null) {
throw new EventDeliveryException("All configured sinks have failed");
}
return status;
}
在上面的解释中,最大的两个疑惑就是
- 这个 Sink 迭代器也就是 createSinkIterator() 是如何实现的
- 发生异常后 SinkSelector 的处理是如何实现的
先来看 createSinkIterator 的实现。首先看 RoundRobinSinkSelector 的实现
如上图所示 RoundRobinSinkSelector 内部包含一个 OrderSelector 的属性。
private OrderSelector<Sink> selector;
RoundRobinSinkSelector(boolean backoff){
selector = new RoundRobinOrderSelector<Sink>(backoff);
}
@Override
public Iterator<Sink> createSinkIterator() {
return selector.createIterator();
}
内部通过一个 RoundRobinOrderSelector 来实现。查看起 createIterator 实现
@Override
public Iterator<T> createIterator() {
List<Integer> activeIndices = getIndexList();//获取存活sink的索引
int size = activeIndices.size();//存活sink的個數
//如果下一個sink的位置超過了存活sin的個數,重新指向头
if (nextHead >= size) {
nextHead = 0;
}
int begin = nextHead++; //获取起始位置
if (nextHead == activeIndices.size()) {//检查是否超过范围,超过了从头开始
nextHead = 0;
}
int[] indexOrder = new int[size];//创建一个数组,来存放访问的顺序
for (int i = 0; i < size; i++) {
indexOrder[i] = activeIndices.get((begin + i) % size);//用取模的方法实现轮询,每次都从上一个sink的下一个sink 索引开始,由begin控制
}
//indexOrder 是访问顺序,getObjects返回相关所有的sink
return new SpecificOrderIterator<T>(indexOrder, getObjects());
}
接下来看一下 getIndexList 的实现
protected List<Integer> getIndexList() {
long now = System.currentTimeMillis();//当前时间
List<Integer> indexList = new ArrayList<Integer>();//用来存放sink的索引
int i = 0;
for (T obj : stateMap.keySet()) {//获取所有sink
if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {
//如果没有开启退避,或者该sink 到失败补偿的时间,那么将改sink的索引放入IndexList
indexList.add(i);
}
i++;
}
return indexList;
}
stateMap 是一个 LinkedHashMap<T,FailState> 其中 T 在这里指的是 Sink。
如果没有开启了退避算法,那么会认为每个 sink 都是存活的,所有的 sink 都加到 IndexList。否则等到了失败补偿时间才会加入到 IndexList。可以通过 processor.backoff = true 配置开启
最后分析一下当 sink 处理失败 SinkSelector 是如何处理的
public void informFailure(T failedObject) {
if (!shouldBackOff) {//如果没有开启退避算法,当然就不做任何处理
return;
}
FailureState state = stateMap.get(failedObject);//获取当前失败sink的状态对象
long now = System.currentTimeMillis();//当前时间
long delta = now - state.lastFail;//自从上次失败的经过的时间
long lastBackoffLength = Math.min(maxTimeout, 1000 * (1 << state.sequentialFails));//计算上一次退避等待的时间
long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE;
if (allowableDiff > delta) {//如果上次失败到现在最后退避时间后的一个小时内,并且是失败次数小于期望的退避次数限制,那么就增加state.sequentialFails 实际上就增加了退避的等待时间
if (state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT) {
state.sequentialFails++;
}
} else {
state.sequentialFails = 1;//否则就不再增加退避等待时间
}
state.lastFail = now;//更新最后失败时间
state.restoreTime = now + Math.min(maxTimeout, 1000 * (1 << state.sequentialFails));//更新退避等待时间
}
CONSIDER_SEQUENTIAL_RANGE 是一个常量 只为 1 小时,EXP_BACKOFF_COUNTER_LIMIT 为期望最大的退避次数 值为 16.如果上次失败到现在的是哪在上次退避等待时间超过一个小时后 或者 退避次数超过了 EXP_BACKOFF_COUNTER_LIMIT 那么退避的等待时间将不再增加。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于