java 多线程分批处理数据工具类

本贴最后更新于 2032 天前,其中的信息可能已经时移世易

数据量比较大,需要使用多线程来分批处理,提高处理效率和能力,于是就写了一个通用的多线程处理工具,只需要实现自己的业务逻辑就可以正常使用,现在记录一下

主要是针对大数据量 list,将 list 划分多个线程处理

ResultBean 类: 返回结果统一 bean

package com.ts.common.model; import java.io.Serializable; import com.alibaba.fastjson.JSON; /** * 返回结果统一bean * * ResultBean<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:49:46 <BR> * @version 2.0 * */ public class ResultBean<T> implements Serializable { private static final long serialVersionUID = 1L; // 成功状态 public static final int SUCCESS = 1; // 处理中状态 public static final int PROCESSING = 0; // 失败状态 public static final int FAIL = -1; // 描述 private String msg = "success"; // 状态默认成功 private int code = SUCCESS; // 备注 private String remark; // 返回数据 private T data; public ResultBean() { super(); } public ResultBean(T data) { super(); this.data = data; } /** * 使用异常创建结果 */ public ResultBean(Throwable e) { super(); this.msg = e.toString(); this.code = FAIL; } /** * * 实例化结果默认成功状态<BR> * 方法名:newInstance<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:51:26 <BR> * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance() { ResultBean<T> instance = new ResultBean<T>(); //默认返回信息 instance.code = SUCCESS; instance.msg = "success"; return instance; } /** * * 实例化结果默认成功状态和数据<BR> * 方法名:newInstance<BR> * 创建人:wangbeidou <BR> * 时间:2018年5月10日-下午2:13:16 <BR> * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance(T data) { ResultBean<T> instance = new ResultBean<T>(); //默认返回信息 instance.code = SUCCESS; instance.msg = "success"; instance.data = data; return instance; } /** * * 实例化返回结果<BR> * 方法名:newInstance<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午4:00:53 <BR> * @param code * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance(int code, String msg) { ResultBean<T> instance = new ResultBean<T>(); //默认返回信息 instance.code = code; instance.msg = msg; return instance; } /** * * 实例化返回结果<BR> * 方法名:newInstance<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午4:00:35 <BR> * @param code * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance(int code, String msg, T data) { ResultBean<T> instance = new ResultBean<T>(); //默认返回信息 instance.code = code; instance.msg = msg; instance.data = data; return instance; } /** * * 设置返回数据<BR> * 方法名:setData<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:52:01 <BR> * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setData(T data){ this.data = data; return this; } /** * * 设置结果描述<BR> * 方法名:setMsg<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:52:34 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setMsg(String msg){ this.msg = msg; return this; } /** * * 设置状态<BR> * 方法名:setCode<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午4:17:56 <BR> * @param code * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setCode(int code){ this.code = code; return this; } /** * * 设置备注)<BR> * 方法名:setRemark<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午5:47:29 <BR> * @param remark * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setRemark(String remark){ this.remark = remark; return this; } /** * * 设置成功描述和返回数据<BR> * 方法名:success<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:52:58 <BR> * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> success(String msg, T data){ this.code = SUCCESS; this.data = data; this.msg = msg; return this; } /** * * 设置成功返回结果描述<BR> * 方法名:success<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:53:31 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> success(String msg){ this.code = SUCCESS; this.msg = msg; return this; } /** * * 设置处理中描述和返回数据<BR> * 方法名:success<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:52:58 <BR> * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> processing(String msg, T data){ this.code = PROCESSING; this.data = data; this.msg = msg; return this; } /** * * 设置处理中返回结果描述<BR> * 方法名:success<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:53:31 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> processing(String msg){ this.code = PROCESSING; this.msg = msg; return this; } /** * * 设置失败返回描述和返回数据<BR> * 方法名:fail<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:54:04 <BR> * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> fail(String msg, T data){ this.code = FAIL; this.data = data; this.msg = msg; return this; } /** * * 设置失败返回描述<BR> * 方法名:fail<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:54:32 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> fail(String msg){ this.code = FAIL; this.msg = msg; return this; } public T getData() { return data; } public String getMsg() { return msg; } public int getCode() { return code; } public String getRemark() { return remark; } /** * * 生成json字符串<BR> * 方法名:json<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午4:42:28 <BR> * @return String<BR> * @exception <BR> * @since 2.0 */ public String json(){ return JSON.toJSONString(this); } }

ITask 接口: 实现自己的业务

package com.ts.common.multi.execute; import java.util.Map; /** * 任务处理接口 * 具体业务逻辑可实现该接口 * T 返回值类型 * E 传入值类型 * ITask<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月4日-下午6:12:32 <BR> * @version 2.0 * */ public interface ITask<T, E> { /** * * 任务执行方法接口<BR> * 方法名:execute<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月4日-下午6:13:44 <BR> * @param e 传入对象 * @param params 其他辅助参数 * @return T<BR> 返回值类型 * @exception <BR> * @since 2.0 */ T execute(E e, Map<String, Object> params); }

HandleCallable 类: 实现 Callable 接口,来处理任务

package com.ts.common.multi.execute; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.ts.common.model.ResultBean; /** * * * HandleCallable<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月4日-上午11:55:41 <BR> * * @version 2.0 * */ @SuppressWarnings("rawtypes") public class HandleCallable<E> implements Callable<ResultBean> { private static Logger logger = LoggerFactory.getLogger(HandleCallable.class); // 线程名称 private String threadName = ""; // 需要处理的数据 private List<E> data; // 辅助参数 private Map<String, Object> params; // 具体执行任务 private ITask<ResultBean<String>, E> task; public HandleCallable(String threadName, List<E> data, Map<String, Object> params, ITask<ResultBean<String>, E> task) { this.threadName = threadName; this.data = data; this.params = params; this.task = task; } @Override public ResultBean<List<ResultBean<String>>> call() throws Exception { // 该线程中所有数据处理返回结果 ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance(); if (data != null && data.size() > 0) { logger.info("线程:{},共处理:{}个数据,开始处理......", threadName, data.size()); // 返回结果集 List<ResultBean<String>> resultList = new ArrayList<>(); // 循环处理每个数据 for (int i = 0; i < data.size(); i++) { // 需要执行的数据 E e = data.get(i); // 将数据执行结果加入到结果集中 resultList.add(task.execute(e, params)); logger.info("线程:{},第{}个数据,处理完成", threadName, (i + 1)); } logger.info("线程:{},共处理:{}个数据,处理完成......", threadName, data.size()); resultBean.setData(resultList); } return resultBean; } }

MultiThreadUtils 类:多线程工具类

package com.ts.common.multi.execute; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.ts.common.model.ResultBean; /** * * * MultiThreadUtils<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月8日-下午8:20:42 <BR> * @version 2.0 * */ public class MultiThreadUtils<T> { private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class); // 线程个数,如不赋值,默认为5 private int threadCount = 5; // 具体业务任务 private ITask<ResultBean<String>, T> task; // 线程池管理器 private CompletionService<ResultBean> pool = null; /** * * 初始化线程池和线程个数<BR> * 方法名:newInstance<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月8日-下午8:22:00 <BR> * @param threadCount * @return MultiThreadUtils<BR> * @exception <BR> * @since 2.0 */ public static MultiThreadUtils newInstance(int threadCount) { MultiThreadUtils instance = new MultiThreadUtils(); threadCount = threadCount; instance.setThreadCount(threadCount); return instance; } /** * * 多线程分批执行list中的任务<BR> * 方法名:execute<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月8日-下午8:22:31 <BR> * @param data 线程处理的大数据量list * @param params 处理数据是辅助参数传递 * @param task 具体执行业务的任务接口 * @return ResultBean<BR> * @exception <BR> * @since 2.0 */ @SuppressWarnings("rawtypes") public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) { // 创建线程池 ExecutorService threadpool = Executors.newFixedThreadPool(threadCount); // 根据线程池初始化线程池管理器 pool = new ExecutorCompletionService<ResultBean>(threadpool); // 开始时间(ms) long l = System.currentTimeMillis(); // 数据量大小 int length = data.size(); // 每个线程处理的数据个数 int taskCount = length / threadCount; // 划分每个线程调用的数据 for (int i = 0; i < threadCount; i++) { // 每个线程任务数据list List<T> subData = null; if (i == (threadCount - 1)) { subData = data.subList(i * taskCount, length); } else { subData = data.subList(i * taskCount, (i + 1) * taskCount); } // 将数据分配给各个线程 HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task); // 将线程加入到线程池 pool.submit(execute); } // 总的返回结果集 List<ResultBean<String>> result = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { // 每个线程处理结果集 ResultBean<List<ResultBean<String>>> threadResult; try { threadResult = pool.take().get(); result.addAll(threadResult.getData()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 关闭线程池 threadpool.shutdownNow(); // 执行结束时间 long end_l = System.currentTimeMillis(); logger.info("总耗时:{}ms", (end_l - l)); return ResultBean.newInstance().setData(result); } public int getThreadCount() { return threadCount; } public void setThreadCount(int threadCount) { this.threadCount = threadCount; } }

测试类 TestTask

package com.ts.common.multi.execute; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import com.ts.common.model.ResultBean; /** * * 具体执行业务任务 需要 实现ITask接口 在execute中重写业务逻辑 * TestTask<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月8日-下午8:40:32 <BR> * @version 2.0 * */ public class TestTask implements ITask<ResultBean<String>, Integer> { @Override public ResultBean execute(Integer e, Map<String, Object> params) { /** * 具体业务逻辑:将list中的元素加上辅助参数中的数据返回 */ int addNum = Integer.valueOf(String.valueOf(params.get("addNum"))); e = e + addNum; ResultBean<String> resultBean = ResultBean.newInstance(); resultBean.setData(e.toString()); return resultBean; } public static void main(String[] args) { // 需要多线程处理的大量数据list List<Integer> data = new ArrayList<>(10000); for(int i = 0; i < 10000; i ++){ data.add(i + 1); } // 创建多线程处理任务 MultiThreadUtils<Integer> threadUtils = MultiThreadUtils.newInstance(5); ITask<ResultBean<String>, Integer> task = new TestTask(); // 辅助参数 加数 Map<String, Object> params = new HashMap<>(); params.put("addNum", 4); // 执行多线程处理,并返回处理结果 ResultBean<List<ResultBean<String>>> resultBean = threadUtils.execute(data, params, task); } }
  • 线程
    123 引用 • 111 回帖 • 3 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Android

    Android 是一种以 Linux 为基础的开放源码操作系统,主要使用于便携设备。2005 年由 Google 收购注资,并拉拢多家制造商组成开放手机联盟开发改良,逐渐扩展到到平板电脑及其他领域上。

    335 引用 • 324 回帖 • 4 关注
  • Follow
    4 引用 • 12 回帖 • 13 关注
  • GitHub

    GitHub 于 2008 年上线,目前,除了 Git 代码仓库托管及基本的 Web 管理界面以外,还提供了订阅、讨论组、文本渲染、在线文件编辑器、协作图谱(报表)、代码片段分享(Gist)等功能。正因为这些功能所提供的便利,又经过长期的积累,GitHub 的用户活跃度很高,在开源世界里享有深远的声望,并形成了社交化编程文化(Social Coding)。

    210 引用 • 2040 回帖
  • frp

    frp 是一个可用于内网穿透的高性能的反向代理应用,支持 TCP、UDP、 HTTP 和 HTTPS 协议。

    20 引用 • 7 回帖 • 1 关注
  • SpaceVim

    SpaceVim 是一个社区驱动的模块化 vim/neovim 配置集合,以模块的方式组织管理插件以
    及相关配置,为不同的语言开发量身定制了相关的开发模块,该模块提供代码自动补全,
    语法检查、格式化、调试、REPL 等特性。用户仅需载入相关语言的模块即可得到一个开箱
    即用的 Vim-IDE。

    3 引用 • 31 回帖 • 116 关注
  • BND

    BND(Baidu Netdisk Downloader)是一款图形界面的百度网盘不限速下载器,支持 Windows、Linux 和 Mac,详细介绍请看这里

    107 引用 • 1281 回帖 • 34 关注
  • JVM

    JVM(Java Virtual Machine)Java 虚拟机是一个微型操作系统,有自己的硬件构架体系,还有相应的指令系统。能够识别 Java 独特的 .class 文件(字节码),能够将这些文件中的信息读取出来,使得 Java 程序只需要生成 Java 虚拟机上的字节码后就能在不同操作系统平台上进行运行。

    180 引用 • 120 回帖
  • Sandbox

    如果帖子标签含有 Sandbox ,则该帖子会被视为“测试帖”,主要用于测试社区功能,排查 bug 等,该标签下内容不定期进行清理。

    432 引用 • 1250 回帖 • 596 关注
  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    556 引用 • 675 回帖
  • Docker

    Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的操作系统上。容器完全使用沙箱机制,几乎没有性能开销,可以很容易地在机器和数据中心中运行。

    494 引用 • 930 回帖
  • 开源中国

    开源中国是目前中国最大的开源技术社区。传播开源的理念,推广开源项目,为 IT 开发者提供了一个发现、使用、并交流开源技术的平台。目前开源中国社区已收录超过两万款开源软件。

    7 引用 • 86 回帖 • 1 关注
  • HHKB

    HHKB 是富士通的 Happy Hacking 系列电容键盘。电容键盘即无接点静电电容式键盘(Capacitive Keyboard)。

    5 引用 • 74 回帖 • 503 关注
  • HTML

    HTML5 是 HTML 下一个的主要修订版本,现在仍处于发展阶段。广义论及 HTML5 时,实际指的是包括 HTML、CSS 和 JavaScript 在内的一套技术组合。

    108 引用 • 295 回帖
  • Chrome

    Chrome 又称 Google 浏览器,是一个由谷歌公司开发的网页浏览器。该浏览器是基于其他开源软件所编写,包括 WebKit,目标是提升稳定性、速度和安全性,并创造出简单且有效率的使用者界面。

    63 引用 • 289 回帖
  • 单点登录

    单点登录(Single Sign On)是目前比较流行的企业业务整合的解决方案之一。SSO 的定义是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。

    9 引用 • 25 回帖 • 2 关注
  • Linux

    Linux 是一套免费使用和自由传播的类 Unix 操作系统,是一个基于 POSIX 和 Unix 的多用户、多任务、支持多线程和多 CPU 的操作系统。它能运行主要的 Unix 工具软件、应用程序和网络协议,并支持 32 位和 64 位硬件。Linux 继承了 Unix 以网络为核心的设计思想,是一个性能稳定的多用户网络操作系统。

    952 引用 • 944 回帖
  • 又拍云

    又拍云是国内领先的 CDN 服务提供商,国家工信部认证通过的“可信云”,乌云众测平台认证的“安全云”,为移动时代的创业者提供新一代的 CDN 加速服务。

    20 引用 • 37 回帖 • 572 关注
  • Vim

    Vim 是类 UNIX 系统文本编辑器 Vi 的加强版本,加入了更多特性来帮助编辑源代码。Vim 的部分增强功能包括文件比较(vimdiff)、语法高亮、全面的帮助系统、本地脚本(Vimscript)和便于选择的可视化模式。

    29 引用 • 66 回帖
  • Swift

    Swift 是苹果于 2014 年 WWDC(苹果开发者大会)发布的开发语言,可与 Objective-C 共同运行于 Mac OS 和 iOS 平台,用于搭建基于苹果平台的应用程序。

    36 引用 • 37 回帖 • 546 关注
  • Sillot

    Insights(注意当前设置 master 为默认分支)

    汐洛彖夲肜矩阵(Sillot T☳Converbenk Matrix),致力于服务智慧新彖乄,具有彖乄驱动、极致优雅、开发者友好的特点。其中汐洛绞架(Sillot-Gibbet)基于自思源笔记(siyuan-note),前身是思源笔记汐洛版(更早是思源笔记汐洛分支),是智慧新录乄终端(多端融合,移动端优先)。

    主仓库地址:Hi-Windom/Sillot

    文档地址:sillot.db.sc.cn

    注意事项:

    1. ⚠️ 汐洛仍在早期开发阶段,尚不稳定
    2. ⚠️ 汐洛并非面向普通用户设计,使用前请了解风险
    3. ⚠️ 汐洛绞架基于思源笔记,开发者尽最大努力与思源笔记保持兼容,但无法实现 100% 兼容
    29 引用 • 25 回帖 • 118 关注
  • Sublime

    Sublime Text 是一款可以用来写代码、写文章的文本编辑器。支持代码高亮、自动完成,还支持通过插件进行扩展。

    10 引用 • 5 回帖 • 3 关注
  • CentOS

    CentOS(Community Enterprise Operating System)是 Linux 发行版之一,它是来自于 Red Hat Enterprise Linux 依照开放源代码规定释出的源代码所编译而成。由于出自同样的源代码,因此有些要求高度稳定的服务器以 CentOS 替代商业版的 Red Hat Enterprise Linux 使用。两者的不同在于 CentOS 并不包含封闭源代码软件。

    239 引用 • 224 回帖 • 1 关注
  • 微服务

    微服务架构是一种架构模式,它提倡将单一应用划分成一组小的服务。服务之间互相协调,互相配合,为用户提供最终价值。每个服务运行在独立的进程中。服务于服务之间才用轻量级的通信机制互相沟通。每个服务都围绕着具体业务构建,能够被独立的部署。

    96 引用 • 155 回帖 • 2 关注
  • 招聘

    哪里都缺人,哪里都不缺人。

    188 引用 • 1057 回帖
  • danl
    164 关注
  • Unity

    Unity 是由 Unity Technologies 开发的一个让开发者可以轻松创建诸如 2D、3D 多平台的综合型游戏开发工具,是一个全面整合的专业游戏引擎。

    25 引用 • 7 回帖 • 127 关注
  • 国际化

    i18n(其来源是英文单词 internationalization 的首末字符 i 和 n,18 为中间的字符数)是“国际化”的简称。对程序来说,国际化是指在不修改代码的情况下,能根据不同语言及地区显示相应的界面。

    8 引用 • 26 回帖 • 2 关注