java 多线程分批处理数据工具类

本贴最后更新于 2079 天前,其中的信息可能已经时移世易

数据量比较大,需要使用多线程来分批处理,提高处理效率和能力,于是就写了一个通用的多线程处理工具,只需要实现自己的业务逻辑就可以正常使用,现在记录一下

主要是针对大数据量 list,将 list 划分多个线程处理

ResultBean 类: 返回结果统一 bean

package com.ts.common.model; import java.io.Serializable; import com.alibaba.fastjson.JSON; /** * 返回结果统一bean * * ResultBean<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:49:46 <BR> * @version 2.0 * */ public class ResultBean<T> implements Serializable { private static final long serialVersionUID = 1L; // 成功状态 public static final int SUCCESS = 1; // 处理中状态 public static final int PROCESSING = 0; // 失败状态 public static final int FAIL = -1; // 描述 private String msg = "success"; // 状态默认成功 private int code = SUCCESS; // 备注 private String remark; // 返回数据 private T data; public ResultBean() { super(); } public ResultBean(T data) { super(); this.data = data; } /** * 使用异常创建结果 */ public ResultBean(Throwable e) { super(); this.msg = e.toString(); this.code = FAIL; } /** * * 实例化结果默认成功状态<BR> * 方法名:newInstance<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:51:26 <BR> * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance() { ResultBean<T> instance = new ResultBean<T>(); //默认返回信息 instance.code = SUCCESS; instance.msg = "success"; return instance; } /** * * 实例化结果默认成功状态和数据<BR> * 方法名:newInstance<BR> * 创建人:wangbeidou <BR> * 时间:2018年5月10日-下午2:13:16 <BR> * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance(T data) { ResultBean<T> instance = new ResultBean<T>(); //默认返回信息 instance.code = SUCCESS; instance.msg = "success"; instance.data = data; return instance; } /** * * 实例化返回结果<BR> * 方法名:newInstance<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午4:00:53 <BR> * @param code * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance(int code, String msg) { ResultBean<T> instance = new ResultBean<T>(); //默认返回信息 instance.code = code; instance.msg = msg; return instance; } /** * * 实例化返回结果<BR> * 方法名:newInstance<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午4:00:35 <BR> * @param code * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public static <T> ResultBean<T> newInstance(int code, String msg, T data) { ResultBean<T> instance = new ResultBean<T>(); //默认返回信息 instance.code = code; instance.msg = msg; instance.data = data; return instance; } /** * * 设置返回数据<BR> * 方法名:setData<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:52:01 <BR> * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setData(T data){ this.data = data; return this; } /** * * 设置结果描述<BR> * 方法名:setMsg<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:52:34 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setMsg(String msg){ this.msg = msg; return this; } /** * * 设置状态<BR> * 方法名:setCode<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午4:17:56 <BR> * @param code * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setCode(int code){ this.code = code; return this; } /** * * 设置备注)<BR> * 方法名:setRemark<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午5:47:29 <BR> * @param remark * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> setRemark(String remark){ this.remark = remark; return this; } /** * * 设置成功描述和返回数据<BR> * 方法名:success<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:52:58 <BR> * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> success(String msg, T data){ this.code = SUCCESS; this.data = data; this.msg = msg; return this; } /** * * 设置成功返回结果描述<BR> * 方法名:success<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:53:31 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> success(String msg){ this.code = SUCCESS; this.msg = msg; return this; } /** * * 设置处理中描述和返回数据<BR> * 方法名:success<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:52:58 <BR> * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> processing(String msg, T data){ this.code = PROCESSING; this.data = data; this.msg = msg; return this; } /** * * 设置处理中返回结果描述<BR> * 方法名:success<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:53:31 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> processing(String msg){ this.code = PROCESSING; this.msg = msg; return this; } /** * * 设置失败返回描述和返回数据<BR> * 方法名:fail<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:54:04 <BR> * @param msg * @param data * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> fail(String msg, T data){ this.code = FAIL; this.data = data; this.msg = msg; return this; } /** * * 设置失败返回描述<BR> * 方法名:fail<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午3:54:32 <BR> * @param msg * @return ResultBean<T><BR> * @exception <BR> * @since 2.0 */ public ResultBean<T> fail(String msg){ this.code = FAIL; this.msg = msg; return this; } public T getData() { return data; } public String getMsg() { return msg; } public int getCode() { return code; } public String getRemark() { return remark; } /** * * 生成json字符串<BR> * 方法名:json<BR> * 创建人:wangbeidou <BR> * 时间:2018年4月12日-下午4:42:28 <BR> * @return String<BR> * @exception <BR> * @since 2.0 */ public String json(){ return JSON.toJSONString(this); } }

ITask 接口: 实现自己的业务

package com.ts.common.multi.execute; import java.util.Map; /** * 任务处理接口 * 具体业务逻辑可实现该接口 * T 返回值类型 * E 传入值类型 * ITask<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月4日-下午6:12:32 <BR> * @version 2.0 * */ public interface ITask<T, E> { /** * * 任务执行方法接口<BR> * 方法名:execute<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月4日-下午6:13:44 <BR> * @param e 传入对象 * @param params 其他辅助参数 * @return T<BR> 返回值类型 * @exception <BR> * @since 2.0 */ T execute(E e, Map<String, Object> params); }

HandleCallable 类: 实现 Callable 接口,来处理任务

package com.ts.common.multi.execute; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.ts.common.model.ResultBean; /** * * * HandleCallable<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月4日-上午11:55:41 <BR> * * @version 2.0 * */ @SuppressWarnings("rawtypes") public class HandleCallable<E> implements Callable<ResultBean> { private static Logger logger = LoggerFactory.getLogger(HandleCallable.class); // 线程名称 private String threadName = ""; // 需要处理的数据 private List<E> data; // 辅助参数 private Map<String, Object> params; // 具体执行任务 private ITask<ResultBean<String>, E> task; public HandleCallable(String threadName, List<E> data, Map<String, Object> params, ITask<ResultBean<String>, E> task) { this.threadName = threadName; this.data = data; this.params = params; this.task = task; } @Override public ResultBean<List<ResultBean<String>>> call() throws Exception { // 该线程中所有数据处理返回结果 ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance(); if (data != null && data.size() > 0) { logger.info("线程:{},共处理:{}个数据,开始处理......", threadName, data.size()); // 返回结果集 List<ResultBean<String>> resultList = new ArrayList<>(); // 循环处理每个数据 for (int i = 0; i < data.size(); i++) { // 需要执行的数据 E e = data.get(i); // 将数据执行结果加入到结果集中 resultList.add(task.execute(e, params)); logger.info("线程:{},第{}个数据,处理完成", threadName, (i + 1)); } logger.info("线程:{},共处理:{}个数据,处理完成......", threadName, data.size()); resultBean.setData(resultList); } return resultBean; } }

MultiThreadUtils 类:多线程工具类

package com.ts.common.multi.execute; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.ts.common.model.ResultBean; /** * * * MultiThreadUtils<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月8日-下午8:20:42 <BR> * @version 2.0 * */ public class MultiThreadUtils<T> { private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class); // 线程个数,如不赋值,默认为5 private int threadCount = 5; // 具体业务任务 private ITask<ResultBean<String>, T> task; // 线程池管理器 private CompletionService<ResultBean> pool = null; /** * * 初始化线程池和线程个数<BR> * 方法名:newInstance<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月8日-下午8:22:00 <BR> * @param threadCount * @return MultiThreadUtils<BR> * @exception <BR> * @since 2.0 */ public static MultiThreadUtils newInstance(int threadCount) { MultiThreadUtils instance = new MultiThreadUtils(); threadCount = threadCount; instance.setThreadCount(threadCount); return instance; } /** * * 多线程分批执行list中的任务<BR> * 方法名:execute<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月8日-下午8:22:31 <BR> * @param data 线程处理的大数据量list * @param params 处理数据是辅助参数传递 * @param task 具体执行业务的任务接口 * @return ResultBean<BR> * @exception <BR> * @since 2.0 */ @SuppressWarnings("rawtypes") public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) { // 创建线程池 ExecutorService threadpool = Executors.newFixedThreadPool(threadCount); // 根据线程池初始化线程池管理器 pool = new ExecutorCompletionService<ResultBean>(threadpool); // 开始时间(ms) long l = System.currentTimeMillis(); // 数据量大小 int length = data.size(); // 每个线程处理的数据个数 int taskCount = length / threadCount; // 划分每个线程调用的数据 for (int i = 0; i < threadCount; i++) { // 每个线程任务数据list List<T> subData = null; if (i == (threadCount - 1)) { subData = data.subList(i * taskCount, length); } else { subData = data.subList(i * taskCount, (i + 1) * taskCount); } // 将数据分配给各个线程 HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task); // 将线程加入到线程池 pool.submit(execute); } // 总的返回结果集 List<ResultBean<String>> result = new ArrayList<>(); for (int i = 0; i < threadCount; i++) { // 每个线程处理结果集 ResultBean<List<ResultBean<String>>> threadResult; try { threadResult = pool.take().get(); result.addAll(threadResult.getData()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 关闭线程池 threadpool.shutdownNow(); // 执行结束时间 long end_l = System.currentTimeMillis(); logger.info("总耗时:{}ms", (end_l - l)); return ResultBean.newInstance().setData(result); } public int getThreadCount() { return threadCount; } public void setThreadCount(int threadCount) { this.threadCount = threadCount; } }

测试类 TestTask

package com.ts.common.multi.execute; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import com.ts.common.model.ResultBean; /** * * 具体执行业务任务 需要 实现ITask接口 在execute中重写业务逻辑 * TestTask<BR> * 创建人:wangbeidou <BR> * 时间:2018年8月8日-下午8:40:32 <BR> * @version 2.0 * */ public class TestTask implements ITask<ResultBean<String>, Integer> { @Override public ResultBean execute(Integer e, Map<String, Object> params) { /** * 具体业务逻辑:将list中的元素加上辅助参数中的数据返回 */ int addNum = Integer.valueOf(String.valueOf(params.get("addNum"))); e = e + addNum; ResultBean<String> resultBean = ResultBean.newInstance(); resultBean.setData(e.toString()); return resultBean; } public static void main(String[] args) { // 需要多线程处理的大量数据list List<Integer> data = new ArrayList<>(10000); for(int i = 0; i < 10000; i ++){ data.add(i + 1); } // 创建多线程处理任务 MultiThreadUtils<Integer> threadUtils = MultiThreadUtils.newInstance(5); ITask<ResultBean<String>, Integer> task = new TestTask(); // 辅助参数 加数 Map<String, Object> params = new HashMap<>(); params.put("addNum", 4); // 执行多线程处理,并返回处理结果 ResultBean<List<ResultBean<String>>> resultBean = threadUtils.execute(data, params, task); } }
  • 线程
    123 引用 • 111 回帖 • 3 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Telegram

    Telegram 是一个非盈利性、基于云端的即时消息服务。它提供了支持各大操作系统平台的开源的客户端,也提供了很多强大的 APIs 给开发者创建自己的客户端和机器人。

    5 引用 • 35 回帖
  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    284 引用 • 248 回帖
  • CAP

    CAP 指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可兼得。

    12 引用 • 5 回帖 • 636 关注
  • 服务

    提供一个服务绝不仅仅是简单的把硬件和软件累加在一起,它包括了服务的可靠性、服务的标准化、以及对服务的监控、维护、技术支持等。

    41 引用 • 24 回帖
  • Vue.js

    Vue.js(读音 /vju ː/,类似于 view)是一个构建数据驱动的 Web 界面库。Vue.js 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。

    268 引用 • 666 回帖 • 1 关注
  • JWT

    JWT(JSON Web Token)是一种用于双方之间传递信息的简洁的、安全的表述性声明规范。JWT 作为一个开放的标准(RFC 7519),定义了一种简洁的,自包含的方法用于通信双方之间以 JSON 的形式安全的传递信息。

    20 引用 • 15 回帖 • 26 关注
  • Dubbo

    Dubbo 是一个分布式服务框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,是 [阿里巴巴] SOA 服务化治理方案的核心框架,每天为 2,000+ 个服务提供 3,000,000,000+ 次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。

    60 引用 • 82 回帖 • 615 关注
  • TGIF

    Thank God It's Friday! 感谢老天,总算到星期五啦!

    291 引用 • 4495 回帖 • 661 关注
  • Sphinx

    Sphinx 是一个基于 SQL 的全文检索引擎,可以结合 MySQL、PostgreSQL 做全文搜索,它可以提供比数据库本身更专业的搜索功能,使得应用程序更容易实现专业化的全文检索。

    1 引用 • 229 关注
  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    142 引用 • 442 回帖
  • 阿里巴巴

    阿里巴巴网络技术有限公司(简称:阿里巴巴集团)是以曾担任英语教师的马云为首的 18 人,于 1999 年在中国杭州创立,他们相信互联网能够创造公平的竞争环境,让小企业通过创新与科技扩展业务,并在参与国内或全球市场竞争时处于更有利的位置。

    43 引用 • 221 回帖 • 58 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    89 引用 • 113 回帖
  • OneDrive
    2 引用 • 3 关注
  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 7 关注
  • 书籍

    宋真宗赵恒曾经说过:“书中自有黄金屋,书中自有颜如玉。”

    81 引用 • 409 回帖
  • frp

    frp 是一个可用于内网穿透的高性能的反向代理应用,支持 TCP、UDP、 HTTP 和 HTTPS 协议。

    17 引用 • 7 回帖
  • 黑曜石

    黑曜石是一款强大的知识库工具,支持本地 Markdown 文件编辑,支持双向链接和关系图。

    A second brain, for you, forever.

    24 引用 • 242 回帖 • 1 关注
  • Docker

    Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的操作系统上。容器完全使用沙箱机制,几乎没有性能开销,可以很容易地在机器和数据中心中运行。

    497 引用 • 934 回帖 • 2 关注
  • Unity

    Unity 是由 Unity Technologies 开发的一个让开发者可以轻松创建诸如 2D、3D 多平台的综合型游戏开发工具,是一个全面整合的专业游戏引擎。

    25 引用 • 7 回帖 • 120 关注
  • Bug

    Bug 本意是指臭虫、缺陷、损坏、犯贫、窃听器、小虫等。现在人们把在程序中一些缺陷或问题统称为 bug(漏洞)。

    76 引用 • 1742 回帖 • 3 关注
  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1063 引用 • 3455 回帖 • 148 关注
  • abitmean

    有点意思就行了

    34 关注
  • GitBook

    GitBook 使您的团队可以轻松编写和维护高质量的文档。 分享知识,提高团队的工作效率,让用户满意。

    3 引用 • 8 回帖 • 2 关注
  • Solidity

    Solidity 是一种智能合约高级语言,运行在 [以太坊] 虚拟机(EVM)之上。它的语法接近于 JavaScript,是一种面向对象的语言。

    3 引用 • 18 回帖 • 436 关注
  • GitHub

    GitHub 于 2008 年上线,目前,除了 Git 代码仓库托管及基本的 Web 管理界面以外,还提供了订阅、讨论组、文本渲染、在线文件编辑器、协作图谱(报表)、代码片段分享(Gist)等功能。正因为这些功能所提供的便利,又经过长期的积累,GitHub 的用户活跃度很高,在开源世界里享有深远的声望,并形成了社交化编程文化(Social Coding)。

    209 引用 • 2040 回帖
  • JVM

    JVM(Java Virtual Machine)Java 虚拟机是一个微型操作系统,有自己的硬件构架体系,还有相应的指令系统。能够识别 Java 独特的 .class 文件(字节码),能够将这些文件中的信息读取出来,使得 Java 程序只需要生成 Java 虚拟机上的字节码后就能在不同操作系统平台上进行运行。

    180 引用 • 120 回帖 • 3 关注
  • Elasticsearch

    Elasticsearch 是一个基于 Lucene 的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于 RESTful 接口。Elasticsearch 是用 Java 开发的,并作为 Apache 许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。

    117 引用 • 99 回帖 • 194 关注