震惊!原来 threadLocal 还能这么用!

本贴最后更新于 2083 天前,其中的信息可能已经东海扬尘

--------------------------------------------------------------------------------
关于 threadLocal 如果你想了解更多,希望这边文章对你有所帮助。
--------------------------------------------------------------------------------

我想对于 threadLocal 大家都不会陌生,我们经常用他存储一些全局类型的变量,然后方便在整个调用链的各个地方使用,类似这样就可以。

private static ThreadLocal<Object> threadLocal = new ThreadLocal<>(); public static Object getThreadLocalValue() { return threadLocal.get(); } public static void setThreadLocalValue(Object object) { threadLocal.set(object); }

我相信看过 threadLocal 的同学对于类似这样的使用方式都不会陌生,很简单的初始化 set()进去之后,在其他地方 get()方法获取就可以了。当然这篇文章的目的不是为了这个,首先第一个问题,new ThreadLocal<>()能不能在子线程中使用,如果我想在子线程中使用线程变量的副本怎么办?

屏幕快照 20191128 上午 12.30.20.png
果然,这个问题还是难不到你,我太天真了。
用 ThreadLocal 的子类 InheritableThreadLocal 啊,InheritableThreadLocal 在 threadLocal 的基础上,解决了和线程相关的副本从父线程向子线程传递的问题。他的实现原理是这样的 <erwog@!ff13dfslso%$#@dfsdl;>。你娓娓道来,胸前的红领巾都不自觉地鲜艳了起来。

看着你的陈述,那么的自信,我邪魅一笑,很好,你成功吸引了我的注意!
看来,不拿点压箱底的东西是镇不住你的了,问题真的来了,如果我想从线程池中拿到主线程的全局变量副本可以吗?
哼,不知道了吧,看我给你一一解释。什么??! 你知道?好吧,我们先假装你不会,不然这篇文章都写不下去了, 逃 : ) 求饶命 ~

先说结论,alibaba 提供了一种解决线程池中线程使用主线程中副本的办法 ----- TransmittableThreadLocal
解释 TransmittableThreadLocal 实现原理之前我们先回顾下,InheritableThreadLocal 为什么能保证子线程可以从父线程中拿到副本呢?

先看下我们在父线程中 new Thread()的时候都干了些啥:

public Thread(Runnable target) { init(null, target, "Thread-" + nextThreadNum(), 0); }

我们再看下 init 这个初始化方法。

private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc) { Thread parent = currentThread(); <省略一些这篇文章不太关注的逻辑, 想要全面了解thread的初始化逻辑可翻看源码> if (parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); }

ok,豁然开朗,原来在我们平时 new Thread()的时候已经把当前主线程也就是 currentThread()里面的 inheritableThreadLocal 副本给子线程拷贝了一份啊,自然地,子线程也就可以获得主线程变量的副本了。
那么为什么线程池会是什么结果呢?我们看个栗子。

public class Test { private static ThreadLocal<Map<String,String>> holder = new InheritableThreadLocal<>(); public void testTtl(){ // 构建线程池 Executor executor = Executors.newFixedThreadPool(1); executor.execute(()-> System.out.println("init")); // 初始化ThreadLocal HashMap<String,String> map = new HashMap<>(); map.put("1","2"); holder.set(map); // 判断线程池中能够拿到主线程threadLocal副本 executor.execute(()->{ System.out.println(holder.get()); //结果为:null }); } public static void main(String[] args) { new Test().testTtl(); } }

到现在,我们知道了,因为线程池中线程的复用,所以这个 inheritableThreadLocal 只能维持在这个线程创建时候的状态。
那么接下来,就是讲解为什么 alibaba 提供的 TransmittableThreadLocal 能够实现线程池中副本的传递。
TransmittableThreadLocal 继承了 InheritableThreadLocal,重载了 get 和 set 方法。

@Override public final T get() { T value = super.get(); if (null != value) addValue(); return value; } @Override public final void set(T value) { super.set(value); // may set null to remove value if (null == value) removeValue(); else addValue(); }

可以看到在调用父类的逻辑上,新增了 addValue 和 removeValue 的逻辑,这个就是缓存的逻辑,会把当前这个 threadlocal 缓存到 holder 上面。

private void addValue() { if (!holder.get().containsKey(this)) { holder.get().put(this, null); // WeakHashMap supports null value. } } private void removeValue() { holder.get().remove(this); }

下面介绍下这个很关键的 holder。

private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder = new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() { @Override protected Map<TransmittableThreadLocal<?>, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocal<?>, Object>(); } @Override protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue); } };

首先这个 holder 本身是 InheritableThreadLocal 类型的,所以它也是和线程相关联的。可以在父子线程间传递,但是对于线程池内已经创建的线程肯定是传递不进去的。所以在初始化 wrapper 类(包装类)的时候,那个时候还是父线程,在 wrapper 类构造的时候,要把这些 threadlocal 捕获出来,这个捕获相关逻辑见下一个 Transmitter 的分析。

Transmitter 内有 3 个核心方法,ttl 表示 TransmittableThreadLocal。

  • capture:捕获父线程的 ttl
  • replay:重放父线程 ttl
  • restore:恢复之前子线程的 ttl

capture 用于捕获父线程的 ttl,捕获操作要在父线程执行。

public static Object capture() { return new Snapshot(captureTtlValues(), captureThreadLocalValues()); } private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() { WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); } return ttl2Value; } private static WeakHashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() { final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>(); for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); final TtlCopier<Object> copier = entry.getValue(); threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get())); } return threadLocal2Value; }

replay 传入 capture 方法捕获的 ttl,然后在子线程重放,也就是调用 ttl 的 set 方法,会设置到当前的线程中去,最后会把子线程之前存在的 ttl 返回。

public static Object replay(@NonNull Object captured) { final Snapshot capturedSnapshot = (Snapshot) captured; return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value)); } @NonNull private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) { WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); // backup backup.put(threadLocal, threadLocal.get()); // clear the TTL values that is not in captured // avoid the extra TTL values after replay when run task if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // set TTL values to captured setTtlValuesTo(captured); // call beforeExecute callback doExecuteCallback(true); return backup; } private static WeakHashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> captured) { final WeakHashMap<ThreadLocal<Object>, Object> backup = new WeakHashMap<ThreadLocal<Object>, Object>(); for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) { final ThreadLocal<Object> threadLocal = entry.getKey(); backup.put(threadLocal, threadLocal.get()); final Object value = entry.getValue(); if (value == threadLocalClearMark) threadLocal.remove(); else threadLocal.set(value); } return backup; }

最后就是执行结束,restore 之前的上下文,用到 replay 返回的 back。

public static void restore(@NonNull Object backup) { final Snapshot backupSnapshot = (Snapshot) backup; restoreTtlValues(backupSnapshot.ttl2Value); restoreThreadLocalValues(backupSnapshot.threadLocal2Value); }

要把 capture,repaly 和 restore 的逻辑串起来,那么就需要看下面的 TtlRunnable 类,这个就是我一直说的包装类。

private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { //捕获父线程ttl this.capturedRef = new AtomicReference<Object>(capture()); this.runnable = runnable; this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; }

在构造函数,也就是父线程,会通过 capture 捕获父线程的 ttl,然后保存在 capturedRef 中
在 run 方法中,replay,restore 逻辑一目了然。

@Override public void run() { Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } Object backup = replay(captured); try { runnable.run(); } finally { restore(backup); } }

至此,关于如何在线程池中实现 TransmittableThreadLocal 副本的传递方案阐述完毕。
哈哈哈哈哈哈.jpg

那么我们有什么方便简单的方式使用这个 TransmittableThreadLocal 实现线程池中的线程变量副本传递吗?
当然有,alibaba 在提供了 TransmittableThreadLocal 的同时,提供了一个非常好用的使用方式 -- TtlExecutors.
TtlExecutors 使用起来非常的方便,在我们构建的线程池上面做个封装就好了。

Executor executor = Executors.newFixedThreadPool(10); Executor ttlExecutor = TtlExecutors.getTtlExecutor(executor);

我们看一眼 TtlExecutors.getTtlExecutor()做了什么。

public static Executor getTtlExecutor(@Nullable Executor executor) { if (TtlAgent.isTtlAgentLoaded() || null == executor || executor instanceof TtlEnhanced) { return executor; } return new ExecutorTtlWrapper(executor); }

其实就构建了一个包装类 ExecutorTtlWrapper,我们再跟进去看一眼,发现 ExecutorTtlWrapper 很简单,他实现 Executor,覆盖了 execute 方法。

@Override public void execute(@NonNull Runnable command) { executor.execute(TtlRunnable.get(command)); }

看到这我们发现原来这个 execute 方法根据我们传进来的 Runnable 构建了 TtlRunnable,TtlRunnable 我们在上面已经介绍,就是在这个类的 run()方法里面我们实现了线程池变量副本的传递。
当然除了 TtlExecutors 之外,还有通过 TtlAgent 使用 TransmittableThreadLocal 的办法,这里不做过多介绍了。
详细信息可以参考官方:https://github.com/alibaba/transmittable-thread-local

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3203 引用 • 8217 回帖
  • 线程
    123 引用 • 111 回帖 • 3 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • 星云链

    星云链是一个开源公链,业内简单的将其称为区块链上的谷歌。其实它不仅仅是区块链搜索引擎,一个公链的所有功能,它基本都有,比如你可以用它来开发部署你的去中心化的 APP,你可以在上面编写智能合约,发送交易等等。3 分钟快速接入星云链 (NAS) 测试网

    3 引用 • 16 回帖
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    948 引用 • 1460 回帖 • 4 关注
  • 微服务

    微服务架构是一种架构模式,它提倡将单一应用划分成一组小的服务。服务之间互相协调,互相配合,为用户提供最终价值。每个服务运行在独立的进程中。服务于服务之间才用轻量级的通信机制互相沟通。每个服务都围绕着具体业务构建,能够被独立的部署。

    96 引用 • 155 回帖
  • 互联网

    互联网(Internet),又称网际网络,或音译因特网、英特网。互联网始于 1969 年美国的阿帕网,是网络与网络之间所串连成的庞大网络,这些网络以一组通用的协议相连,形成逻辑上的单一巨大国际网络。

    98 引用 • 367 回帖
  • 禅道

    禅道是一款国产的开源项目管理软件,她的核心管理思想基于敏捷方法 scrum,内置了产品管理和项目管理,同时又根据国内研发现状补充了测试管理、计划管理、发布管理、文档管理、事务管理等功能,在一个软件中就可以将软件研发中的需求、任务、bug、用例、计划、发布等要素有序的跟踪管理起来,完整地覆盖了项目管理的核心流程。

    10 引用 • 15 回帖 • 4 关注
  • 学习

    “梦想从学习开始,事业从实践起步” —— 习近平

    173 引用 • 541 回帖
  • OpenCV
    15 引用 • 36 回帖
  • WiFiDog

    WiFiDog 是一套开源的无线热点认证管理工具,主要功能包括:位置相关的内容递送;用户认证和授权;集中式网络监控。

    1 引用 • 7 回帖 • 611 关注
  • GitLab

    GitLab 是利用 Ruby 一个开源的版本管理系统,实现一个自托管的 Git 项目仓库,可通过 Web 界面操作公开或私有项目。

    46 引用 • 72 回帖 • 2 关注
  • Lute

    Lute 是一款结构化的 Markdown 引擎,支持 Go 和 JavaScript。

    29 引用 • 202 回帖 • 37 关注
  • CSDN

    CSDN (Chinese Software Developer Network) 创立于 1999 年,是中国的 IT 社区和服务平台,为中国的软件开发者和 IT 从业者提供知识传播、职业发展、软件开发等全生命周期服务,满足他们在职业发展中学习及共享知识和信息、建立职业发展社交圈、通过软件开发实现技术商业化等刚性需求。

    14 引用 • 155 回帖
  • 深度学习

    深度学习(Deep Learning)是机器学习的分支,是一种试图使用包含复杂结构或由多重非线性变换构成的多个处理层对数据进行高层抽象的算法。

    43 引用 • 44 回帖
  • 爬虫

    网络爬虫(Spider、Crawler),是一种按照一定的规则,自动地抓取万维网信息的程序。

    106 引用 • 275 回帖 • 2 关注
  • 生活

    生活是指人类生存过程中的各项活动的总和,范畴较广,一般指为幸福的意义而存在。生活实际上是对人生的一种诠释。生活包括人类在社会中与自己息息相关的日常活动和心理影射。

    230 引用 • 1432 回帖
  • Ant-Design

    Ant Design 是服务于企业级产品的设计体系,基于确定和自然的设计价值观上的模块化解决方案,让设计者和开发者专注于更好的用户体验。

    17 引用 • 23 回帖 • 15 关注
  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    79 引用 • 431 回帖
  • 开源中国

    开源中国是目前中国最大的开源技术社区。传播开源的理念,推广开源项目,为 IT 开发者提供了一个发现、使用、并交流开源技术的平台。目前开源中国社区已收录超过两万款开源软件。

    7 引用 • 86 回帖 • 1 关注
  • SendCloud

    SendCloud 由搜狐武汉研发中心孵化的项目,是致力于为开发者提供高质量的触发邮件服务的云端邮件发送平台,为开发者提供便利的 API 接口来调用服务,让邮件准确迅速到达用户收件箱并获得强大的追踪数据。

    2 引用 • 8 回帖 • 512 关注
  • 正则表达式

    正则表达式(Regular Expression)使用单个字符串来描述、匹配一系列遵循某个句法规则的字符串。

    31 引用 • 94 回帖 • 2 关注
  • Visio
    1 引用 • 2 回帖
  • Excel
    31 引用 • 28 回帖
  • 音乐

    你听到信仰的声音了么?

    62 引用 • 512 回帖
  • IPFS

    IPFS(InterPlanetary File System,星际文件系统)是永久的、去中心化保存和共享文件的方法,这是一种内容可寻址、版本化、点对点超媒体的分布式协议。请浏览 IPFS 入门笔记了解更多细节。

    20 引用 • 245 回帖 • 241 关注
  • 资讯

    资讯是用户因为及时地获得它并利用它而能够在相对短的时间内给自己带来价值的信息,资讯有时效性和地域性。

    56 引用 • 85 回帖 • 3 关注
  • Jenkins

    Jenkins 是一套开源的持续集成工具。它提供了非常丰富的插件,让构建、部署、自动化集成项目变得简单易用。

    54 引用 • 37 回帖 • 2 关注
  • Scala

    Scala 是一门多范式的编程语言,集成面向对象编程和函数式编程的各种特性。

    13 引用 • 11 回帖 • 165 关注
  • Wide

    Wide 是一款基于 Web 的 Go 语言 IDE。通过浏览器就可以进行 Go 开发,并有代码自动完成、查看表达式、编译反馈、Lint、实时结果输出等功能。

    欢迎访问我们运维的实例: https://wide.b3log.org

    30 引用 • 218 回帖 • 642 关注