【Nacos 源码 三】TaskManager 任务管理的使用

本贴最后更新于 1903 天前,其中的信息可能已经水流花落

<font face="黑体" color=green size=2> 版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa
版权协议,转载请附上原文出处链接和本声明。
本文链接: http://blog.shiyi.online/articles/2019/08/22/1566435405649.html
首发地址: https://mp.weixin.qq.com/s/CmWMVUmceek6SXUeuP3NSQ

任务管理类

因为 Nacos 中有很多地方使用了这个 TaskManager,所以我们得先了解一下这个类是干啥用的,方便后面阅读源码时候不会吃力;

先说结论:
TaskManager 可以看成是一个待执行的任务集合,用于处理一定要执行成功的任务 单线程的方式处理任务,保证任务一定被成功处理; 如果执行失败了,任务会被重新放入集合中等待下一次被消费;

AbstractTask

AbstractTask 是个抽象类,所有的需要被执行的任务都继续这个类; 这个类主要提供执行任务所需要的数据和方法;例如

   /* 一个任务两次处理的间隔,单位是毫秒*/
    private long taskInterval;
    /*任务上次被处理的时间,用毫秒表示*/
    private long lastProcessTime;
/* TaskManager 判断当前是否需要处理这个Task,子类可以Override这个函数实现自己的逻辑
     */
    public boolean shouldProcess() {
        return (System.currentTimeMillis() - this.lastProcessTime >= this.taskInterval);
    }

TaskProcessor 任务处理器

TaskProcessor 是任务处理器接口,它有个方法

boolean process(String taskType, AbstractTask task);

用于执行对应的 AbstractTask 任务类; 不同的任务类型,可以实现自己的执行任务逻辑;

TaskManager 任务管理类

TaskManager 是个任务管理类;
它里面有两个属性保存了待消费的任务 AbstractTask,和任务执行需要的 TaskProcessor;

/**待消费的任务AbstractTask**/
private final ConcurrentHashMap<String, AbstractTask> tasks = new ConcurrentHashMap<String, AbstractTask>();
/**任务AbstractTask对应的任务执行器TaskProcessor**/
 private final ConcurrentHashMap<String, TaskProcessor> taskProcessors =new ConcurrentHashMap<String, TaskProcessor>();

如果 taskProcessors 中没有找到对应的任务执行器,那么它里面有一个默认执行器会执行

 /**默认执行器**/
 private TaskProcessor defaultTaskProcessor;

使用用例

Nacos 配置中心模块很重要一个功能就是,在初始化的时候以及每隔一段时间就会去数据库中把所有数据 Dump 到磁盘中;Dump 就是一个任务类 AbstractTask; 我们上面说过
AbstractTask 就是一个信息承载对象,主要给 TaskProcessor 提供执行所需要的数据;我们看看 DumpTask;

DumpTask

image.png
DumpTask 定义了自己的一些属性; 再看看其他的例如 DumpAllTaskDumpAllBetaTask

image.png
这两个任务类只定义了 TASK_ID
既然有 DumpTask 任务类,那肯定就有对应的任务处理器类 DumpProcessor;

DumpProcessor

DumpProcessor 是 DumpTask 任务的执行器;执行器中的方法

public boolean process(String taskType, AbstractTask task)

代码太长就不在这里分析了,后面会专门开一篇文章写 Dump 的操作(Todo..);它里面主要做的操作就是 保存配置文件到本地磁盘中,并缓存 md5

对应 DumpAllTaskDumpAllBetaTask 任务的任务执行器有 DumpAllProcessorDumpAllBetaProcessor

DumpAllTask 任务触发执行的地方

上面是 DumpAllTask 的定义和 DumpAllTaskProcessor 执行器的定义;定义好了之后是怎么被触发的呢?

DumpService 初始化 Dump 配置信息

这个类就是专门 Dump 配置信息的服务类;上面提及的 DumpAll 就是在这里被调用的;我们来看下他主要方法;

    @PostConstruct
    public void init() {
        DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this);
      /**在new这个TaskManager类的时候,专门执行任务的一个线程就已经开始启动了,这不过这个时候还没有任务Task添加进去**/
       dumpAllTaskMgr = new TaskManager( "com.alibaba.nacos.server.DumpAllTaskManager");
dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
      Runnable dumpAll = new Runnable() {
            @Override
            public void run() {
dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
            }
        };
            /**每10分钟执行一次DumpAll操作**/
      TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE,
                TimeUnit.MINUTES);
    }

DumpService 在初始化的时候回调用这个 init 方法;
1.先 new 了一个 DumpAllProcessor 执行器;
2.再 new 了一个 TaskManager 任务管理器;在 new 这个任务管理器的时候,就会启动一个线程专门去执行所有待执行的任务;只不过这个时候还没有添加任务;
3.将这个任务管理器的默认执行器设置为 DumpAllProcessor;
4.每十分钟执行一次往 TaskManager 中添加一个 DumpAllTask 的任务;一经添加就会被 TaskManager 中的线程 processingThread 执行 process 方法;

  • Nacos
    23 引用 • 4 回帖 • 1 关注
  • 代码
    466 引用 • 631 回帖 • 9 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...