ThreadPoolExecutor 使用详解

本贴最后更新于 1594 天前,其中的信息可能已经沧海桑田

为了保证每一个线程都成功,需要重写拒绝机制,阻塞提交线程
默认的提交线程用的是
offer方法,是非阻塞的提交线程,当队列满时,不会执行
put 方法,是阻塞的提交线程,保证每个线程都执行

源码如下

/**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

参数名 作用
corePoolSize 核心线程池大小
maximumPoolSize 最大线程池大小
keepAliveTime 线程池中超过 corePoolSize 数目的空闲线程最大存活时间;可以 allowCoreThreadTimeOut(true)使得核心线程有效时间
TimeUnit keepAliveTime 时间单位
workQueue 阻塞任务队列
threadFactory 新建线程工厂
RejectedExecutionHandler 当提交任务数超过 maxmumPoolSize+workQueue 之和时,任务会交给 RejectedExecutionHandler 来处理

重点讲解:
其中比较容易让人误解的是:corePoolSize,maximumPoolSize,workQueue 之间关系。

1.当线程池小于 corePoolSize 时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2.当线程池达到 corePoolSize 时,新提交任务将被放入 workQueue 中,等待线程池中任务调度执行
3.当 workQueue 已满,且 maximumPoolSize>corePoolSize 时,新提交任务会创建新线程执行任务
4.当提交任务数超过 maximumPoolSize 时,新提交任务由 RejectedExecutionHandler 处理
5.当线程池中超过 corePoolSize 线程,空闲时间达到 keepAliveTime 时,关闭空闲线程
6.当设置 allowCoreThreadTimeOut(true)时,线程池中 corePoolSize 线程空闲时间达到 keepAliveTime 也将关闭

总结:

  1. 用 ThreadPoolExecutor 自定义线程池,看线程是的用途,如果任务量不大,可以用无界队列,如果任务量非常大,要用有界队列,防止 OOM
  2. 如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交。保证不抛弃一个任务
  3. 最大线程数一般设为 2N+1 最好,N 是 CPU 核数
  4. 核心线程数,看应用,如果是任务,一天跑一次,设置为 0,合适,因为跑完就停掉了,如果是常用线程池,看任务量,是保留一个核心还是几个核心线程数
  5. 如果要获取任务执行结果,用 CompletionService,但是注意,获取任务的结果的要重新开一个线程获取,如果在主线程获取,就要等任务都提交后才获取,就会阻塞大量任务结果,队列过大 OOM,所以最好异步开个线程获取结果

可选择的阻塞队列 BlockingQueue 详解
在重复一下新任务进入时线程池的执行策略:
如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于 corePoolSize,则任务根本不会存入 queue 中,而是直接运行)
如果运行的线程大于等于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

主要有 3 种类型的 BlockingQueue:

无界队列

队列大小无限制,常用的为无界的 LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致 OOM。阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而楼主踩到的就是这个坑,当 QPS 很高,发送数据很大,大量的任务被添加到这个无界 LinkedBlockingQueue 中,导致 cpu 和内存飙升服务器挂掉。

有界队列

常用的有两类,一类是遵循 FIFO 原则的队列如 ArrayBlockingQueue 与有界的 LinkedBlockingQueue,另一类是优先级队列如 PriorityBlockingQueue。PriorityBlockingQueue 中的优先级由任务的 Comparator 决定。
使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低 cpu 使用率和上下文切换,但是可能会限制系统吞吐量。

在我们的修复方案中,选择的就是这个类型的队列,虽然会有部分任务被丢失,但是我们线上是排序日志搜集任务,所以对部分对丢失是可以容忍的。

同步移交队列

如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用 SynchronousQueue 作为等待队列。SynchronousQueue 不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入 SynchronousQueue 中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。

  • 线程
    122 引用 • 111 回帖 • 3 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
hymn
乘风破浪,任重道远。

推荐标签 标签

  • React

    React 是 Facebook 开源的一个用于构建 UI 的 JavaScript 库。

    192 引用 • 291 回帖 • 402 关注
  • Pipe

    Pipe 是一款小而美的开源博客平台。Pipe 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    131 引用 • 1114 回帖 • 131 关注
  • 前端

    前端技术一般分为前端设计和前端开发,前端设计可以理解为网站的视觉设计,前端开发则是网站的前台代码实现,包括 HTML、CSS 以及 JavaScript 等。

    247 引用 • 1347 回帖
  • 小薇

    小薇是一个用 Java 写的 QQ 聊天机器人 Web 服务,可以用于社群互动。

    由于 Smart QQ 从 2019 年 1 月 1 日起停止服务,所以该项目也已经停止维护了!

    34 引用 • 467 回帖 • 740 关注
  • 正则表达式

    正则表达式(Regular Expression)使用单个字符串来描述、匹配一系列遵循某个句法规则的字符串。

    31 引用 • 94 回帖 • 1 关注
  • 招聘

    哪里都缺人,哪里都不缺人。

    190 引用 • 1057 回帖 • 1 关注
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 561 关注
  • 面试

    面试造航母,上班拧螺丝。多面试,少加班。

    325 引用 • 1395 回帖
  • MyBatis

    MyBatis 本是 Apache 软件基金会 的一个开源项目 iBatis,2010 年这个项目由 Apache 软件基金会迁移到了 google code,并且改名为 MyBatis ,2013 年 11 月再次迁移到了 GitHub。

    170 引用 • 414 回帖 • 383 关注
  • Postman

    Postman 是一款简单好用的 HTTP API 调试工具。

    4 引用 • 3 回帖 • 2 关注
  • 知乎

    知乎是网络问答社区,连接各行各业的用户。用户分享着彼此的知识、经验和见解,为中文互联网源源不断地提供多种多样的信息。

    10 引用 • 66 回帖
  • 互联网

    互联网(Internet),又称网际网络,或音译因特网、英特网。互联网始于 1969 年美国的阿帕网,是网络与网络之间所串连成的庞大网络,这些网络以一组通用的协议相连,形成逻辑上的单一巨大国际网络。

    98 引用 • 344 回帖
  • JetBrains

    JetBrains 是一家捷克的软件开发公司,该公司位于捷克的布拉格,并在俄国的圣彼得堡及美国麻州波士顿都设有办公室,该公司最为人所熟知的产品是 Java 编程语言开发撰写时所用的集成开发环境:IntelliJ IDEA

    18 引用 • 54 回帖
  • SendCloud

    SendCloud 由搜狐武汉研发中心孵化的项目,是致力于为开发者提供高质量的触发邮件服务的云端邮件发送平台,为开发者提供便利的 API 接口来调用服务,让邮件准确迅速到达用户收件箱并获得强大的追踪数据。

    2 引用 • 8 回帖 • 478 关注
  • SEO

    发布对别人有帮助的原创内容是最好的 SEO 方式。

    35 引用 • 200 回帖 • 15 关注
  • Lute

    Lute 是一款结构化的 Markdown 引擎,支持 Go 和 JavaScript。

    25 引用 • 191 回帖 • 17 关注
  • DNSPod

    DNSPod 建立于 2006 年 3 月份,是一款免费智能 DNS 产品。 DNSPod 可以为同时有电信、网通、教育网服务器的网站提供智能的解析,让电信用户访问电信的服务器,网通的用户访问网通的服务器,教育网的用户访问教育网的服务器,达到互联互通的效果。

    6 引用 • 26 回帖 • 505 关注
  • OnlyOffice
    4 引用 • 1 关注
  • SOHO

    为成为自由职业者在家办公而努力吧!

    7 引用 • 55 回帖 • 18 关注
  • 996
    13 引用 • 200 回帖 • 2 关注
  • JavaScript

    JavaScript 一种动态类型、弱类型、基于原型的直译式脚本语言,内置支持类型。它的解释器被称为 JavaScript 引擎,为浏览器的一部分,广泛用于客户端的脚本语言,最早是在 HTML 网页上使用,用来给 HTML 网页增加动态功能。

    728 引用 • 1326 回帖
  • GitBook

    GitBook 使您的团队可以轻松编写和维护高质量的文档。 分享知识,提高团队的工作效率,让用户满意。

    3 引用 • 8 回帖 • 4 关注
  • V2EX

    V2EX 是创意工作者们的社区。这里目前汇聚了超过 400,000 名主要来自互联网行业、游戏行业和媒体行业的创意工作者。V2EX 希望能够成为创意工作者们的生活和事业的一部分。

    17 引用 • 236 回帖 • 335 关注
  • 友情链接

    确认过眼神后的灵魂连接,站在链在!

    24 引用 • 373 回帖
  • 音乐

    你听到信仰的声音了么?

    60 引用 • 511 回帖 • 1 关注
  • Ruby

    Ruby 是一种开源的面向对象程序设计的服务器端脚本语言,在 20 世纪 90 年代中期由日本的松本行弘(まつもとゆきひろ/Yukihiro Matsumoto)设计并开发。在 Ruby 社区,松本也被称为马茨(Matz)。

    7 引用 • 31 回帖 • 212 关注
  • Rust

    Rust 是一门赋予每个人构建可靠且高效软件能力的语言。Rust 由 Mozilla 开发,最早发布于 2014 年 9 月。

    58 引用 • 22 回帖 • 4 关注