java 多线程分批处理数据工具类

本贴最后更新于 1779 天前,其中的信息可能已经时移世易

数据量比较大,需要使用多线程来分批处理,提高处理效率和能力,于是就写了一个通用的多线程处理工具,只需要实现自己的业务逻辑就可以正常使用,现在记录一下

主要是针对大数据量 list,将 list 划分多个线程处理

ResultBean 类: 返回结果统一 bean

package com.ts.common.model;

import java.io.Serializable;

import com.alibaba.fastjson.JSON;


/**
 * 返回结果统一bean
 * 
 * ResultBean<BR>
 * 创建人:wangbeidou <BR>
 * 时间:2018年4月12日-下午3:49:46 <BR>
 * @version 2.0
 *
 */
public class ResultBean<T> implements Serializable {

    private static final long serialVersionUID = 1L;
    // 成功状态
    public static final int SUCCESS = 1;
    // 处理中状态
    public static final int PROCESSING = 0;
    // 失败状态
    public static final int FAIL = -1;

    // 描述
    private String msg = "success";
    // 状态默认成功
    private int code = SUCCESS;
    // 备注
    private String remark;
    // 返回数据
    private T data;

    public ResultBean() {
        super();
    }

    public ResultBean(T data) {
        super();
        this.data = data;
    }

    /**
     * 使用异常创建结果
     */
    public ResultBean(Throwable e) {
        super();
        this.msg = e.toString();
        this.code = FAIL;
    }
    
    /**
     * 
     * 实例化结果默认成功状态<BR>
     * 方法名:newInstance<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午3:51:26 <BR>
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public static <T> ResultBean<T> newInstance() {
        ResultBean<T> instance = new ResultBean<T>();
        //默认返回信息
        instance.code = SUCCESS;
        instance.msg = "success";
        return instance;
    }
    
    /**
     * 
     * 实例化结果默认成功状态和数据<BR>
     * 方法名:newInstance<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年5月10日-下午2:13:16 <BR>
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public static <T> ResultBean<T> newInstance(T data) {
        ResultBean<T> instance = new ResultBean<T>();
        //默认返回信息
        instance.code = SUCCESS;
        instance.msg = "success";
        instance.data = data;
        return instance;
    }
    
    /**
     * 
     * 实例化返回结果<BR>
     * 方法名:newInstance<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午4:00:53 <BR>
     * @param code
     * @param msg
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public static <T> ResultBean<T> newInstance(int code, String msg) {
        ResultBean<T> instance = new ResultBean<T>();
        //默认返回信息
        instance.code = code;
        instance.msg = msg;
        return instance;
    }
    
    /**
     * 
     * 实例化返回结果<BR>
     * 方法名:newInstance<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午4:00:35 <BR>
     * @param code
     * @param msg
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public static <T> ResultBean<T> newInstance(int code, String msg, T data) {
        ResultBean<T> instance = new ResultBean<T>();
        //默认返回信息
        instance.code = code;
        instance.msg = msg;
        instance.data = data;
        return instance;
    }
    
    /**
     * 
     * 设置返回数据<BR>
     * 方法名:setData<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午3:52:01 <BR>
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> setData(T data){
        this.data = data;
        return this;
    }
    
    /**
     * 
     * 设置结果描述<BR>
     * 方法名:setMsg<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午3:52:34 <BR>
     * @param msg
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> setMsg(String msg){
        this.msg = msg;
        return this;
    }
    
    /**
     * 
     * 设置状态<BR>
     * 方法名:setCode<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午4:17:56 <BR>
     * @param code
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> setCode(int code){
        this.code = code;
        return this;
    }
    
    /**
     * 
     * 设置备注)<BR>
     * 方法名:setRemark<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午5:47:29 <BR>
     * @param remark
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> setRemark(String remark){
        this.remark = remark;
        return this;
    }
    
    /**
     * 
     * 设置成功描述和返回数据<BR>
     * 方法名:success<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午3:52:58 <BR>
     * @param msg
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> success(String msg, T data){ 
        this.code = SUCCESS;
        this.data = data;
        this.msg = msg;
        return this;  
    } 
    
    /**
     * 
     * 设置成功返回结果描述<BR>
     * 方法名:success<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午3:53:31 <BR>
     * @param msg
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> success(String msg){ 
        this.code = SUCCESS;
        this.msg = msg;
        return this;  
    }
    
    /**
     * 
     * 设置处理中描述和返回数据<BR>
     * 方法名:success<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午3:52:58 <BR>
     * @param msg
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> processing(String msg, T data){ 
        this.code = PROCESSING;
        this.data = data;
        this.msg = msg;
        return this;  
    } 
    
    /**
     * 
     * 设置处理中返回结果描述<BR>
     * 方法名:success<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午3:53:31 <BR>
     * @param msg
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> processing(String msg){ 
        this.code = PROCESSING;
        this.msg = msg;
        return this;  
    }
    
    /**
     * 
     * 设置失败返回描述和返回数据<BR>
     * 方法名:fail<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午3:54:04 <BR>
     * @param msg
     * @param data
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> fail(String msg, T data){ 
        this.code = FAIL;
        this.data = data;
        this.msg = msg;
        return this;  
    } 
    
    /**
     * 
     * 设置失败返回描述<BR>
     * 方法名:fail<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午3:54:32 <BR>
     * @param msg
     * @return ResultBean<T><BR>
     * @exception <BR>
     * @since  2.0
     */
    public ResultBean<T> fail(String msg){ 
        this.code = FAIL;
        this.msg = msg;
        return this;  
    }
    
    public T getData() {  
        return data;  
    }  
    public String getMsg() {  
        return msg;  
    }  
    public int getCode() {  
        return code;  
    }  
    public String getRemark() {  
        return remark;  
    }  
    
    /**
     * 
     * 生成json字符串<BR>
     * 方法名:json<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年4月12日-下午4:42:28 <BR>
     * @return String<BR>
     * @exception <BR>
     * @since  2.0
     */
    public String json(){
        return JSON.toJSONString(this);
    }
}

ITask 接口: 实现自己的业务

package com.ts.common.multi.execute;

import java.util.Map;

/**
 * 任务处理接口
 * 具体业务逻辑可实现该接口
 *  T 返回值类型
 *  E 传入值类型
 * ITask<BR>
 * 创建人:wangbeidou <BR>
 * 时间:2018年8月4日-下午6:12:32 <BR>
 * @version 2.0
 *
 */
public interface ITask<T, E> {
    
    /**
     * 
     * 任务执行方法接口<BR>
     * 方法名:execute<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年8月4日-下午6:13:44 <BR>
     * @param e 传入对象
     * @param params 其他辅助参数
     * @return T<BR> 返回值类型
     * @exception <BR>
     * @since  2.0
     */
    T execute(E e, Map<String, Object> params);
}

HandleCallable 类: 实现 Callable 接口,来处理任务

package com.ts.common.multi.execute;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ts.common.model.ResultBean;

/**
 * 
 * 
 * HandleCallable<BR>
 * 创建人:wangbeidou <BR>
 * 时间:2018年8月4日-上午11:55:41 <BR>
 * 
 * @version 2.0
 *
 */
@SuppressWarnings("rawtypes")
public class HandleCallable<E> implements Callable<ResultBean> {
    private static Logger logger = LoggerFactory.getLogger(HandleCallable.class);
    // 线程名称 
    private String threadName = "";
    // 需要处理的数据
    private List<E> data;
    // 辅助参数
    private Map<String, Object> params;
    // 具体执行任务
    private ITask<ResultBean<String>, E> task;

    public HandleCallable(String threadName, List<E> data, Map<String, Object> params,
            ITask<ResultBean<String>, E> task) {
        this.threadName = threadName;
        this.data = data;
        this.params = params;
        this.task = task;
    }

    @Override
    public ResultBean<List<ResultBean<String>>> call() throws Exception {
        // 该线程中所有数据处理返回结果
        ResultBean<List<ResultBean<String>>> resultBean = ResultBean.newInstance();
        if (data != null && data.size() > 0) {
            logger.info("线程:{},共处理:{}个数据,开始处理......", threadName, data.size());
            // 返回结果集
            List<ResultBean<String>> resultList = new ArrayList<>();
            // 循环处理每个数据
            for (int i = 0; i < data.size(); i++) {
                // 需要执行的数据
                E e = data.get(i);
                // 将数据执行结果加入到结果集中
                resultList.add(task.execute(e, params));
                logger.info("线程:{},第{}个数据,处理完成", threadName, (i + 1));
            }
            logger.info("线程:{},共处理:{}个数据,处理完成......", threadName, data.size());
            resultBean.setData(resultList);
        }
        return resultBean;
    }

}

MultiThreadUtils 类:多线程工具类


package com.ts.common.multi.execute;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ts.common.model.ResultBean;

/**
 * 
 * 
 * MultiThreadUtils<BR>
 * 创建人:wangbeidou <BR>
 * 时间:2018年8月8日-下午8:20:42 <BR>
 * @version 2.0
 *
 */
public class MultiThreadUtils<T> {
    private static Logger logger = LoggerFactory.getLogger(MultiThreadUtils.class);

    // 线程个数,如不赋值,默认为5
    private int threadCount = 5;
    // 具体业务任务
    private ITask<ResultBean<String>, T> task;
    // 线程池管理器
    private CompletionService<ResultBean> pool = null;

    /**
     * 
     * 初始化线程池和线程个数<BR>
     * 方法名:newInstance<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年8月8日-下午8:22:00 <BR>
     * @param threadCount
     * @return MultiThreadUtils<BR>
     * @exception <BR>
     * @since  2.0
     */
    public static MultiThreadUtils newInstance(int threadCount) {
        MultiThreadUtils instance = new MultiThreadUtils();
        threadCount = threadCount;
        instance.setThreadCount(threadCount);

        return instance;
    }

    /**
     * 
     * 多线程分批执行list中的任务<BR>
     * 方法名:execute<BR>
     * 创建人:wangbeidou <BR>
     * 时间:2018年8月8日-下午8:22:31 <BR>
     * @param data  线程处理的大数据量list
     * @param params    处理数据是辅助参数传递
     * @param task        具体执行业务的任务接口
     * @return ResultBean<BR>    
     * @exception <BR>
     * @since  2.0
     */
    @SuppressWarnings("rawtypes")
    public ResultBean execute(List<T> data, Map<String, Object> params, ITask<ResultBean<String>, T> task) {
        // 创建线程池
        ExecutorService threadpool = Executors.newFixedThreadPool(threadCount);
        // 根据线程池初始化线程池管理器
        pool = new ExecutorCompletionService<ResultBean>(threadpool);
        // 开始时间(ms)
        long l = System.currentTimeMillis();
        // 数据量大小
        int length = data.size();
        // 每个线程处理的数据个数
        int taskCount = length / threadCount;
        // 划分每个线程调用的数据
        for (int i = 0; i < threadCount; i++) {
            // 每个线程任务数据list
            List<T> subData = null;
            if (i == (threadCount - 1)) {
                subData = data.subList(i * taskCount, length);
            } else {
                subData = data.subList(i * taskCount, (i + 1) * taskCount);
            }
            // 将数据分配给各个线程
            HandleCallable execute = new HandleCallable<T>(String.valueOf(i), subData, params, task);
            // 将线程加入到线程池
            pool.submit(execute);
        }
        
        // 总的返回结果集
        List<ResultBean<String>> result = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            // 每个线程处理结果集
            ResultBean<List<ResultBean<String>>> threadResult;
            try {
                threadResult = pool.take().get();
                result.addAll(threadResult.getData());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        }
        // 关闭线程池
        threadpool.shutdownNow();
        // 执行结束时间
        long end_l = System.currentTimeMillis();
        logger.info("总耗时:{}ms", (end_l - l));
        return ResultBean.newInstance().setData(result);
    }

    public int getThreadCount() {
        return threadCount;
    }

    public void setThreadCount(int threadCount) {
        this.threadCount = threadCount;
    }

}

测试类 TestTask

package com.ts.common.multi.execute;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.ts.common.model.ResultBean;

/**
 * 
 * 具体执行业务任务 需要 实现ITask接口  在execute中重写业务逻辑
 * TestTask<BR>
 * 创建人:wangbeidou <BR>
 * 时间:2018年8月8日-下午8:40:32 <BR>
 * @version 2.0
 *
 */
public class TestTask implements ITask<ResultBean<String>, Integer> {

    @Override
    public ResultBean execute(Integer e, Map<String, Object> params) {
        /**
         * 具体业务逻辑:将list中的元素加上辅助参数中的数据返回
         */
        int addNum = Integer.valueOf(String.valueOf(params.get("addNum")));
        e = e + addNum;
        ResultBean<String> resultBean = ResultBean.newInstance();
        resultBean.setData(e.toString());
        return resultBean;
    }
    
    public static void main(String[] args) {
        // 需要多线程处理的大量数据list
        List<Integer> data = new ArrayList<>(10000);
        for(int i = 0; i < 10000; i ++){
            data.add(i + 1);
        }
        
        // 创建多线程处理任务
        MultiThreadUtils<Integer> threadUtils = MultiThreadUtils.newInstance(5);
        ITask<ResultBean<String>, Integer> task = new TestTask();
        // 辅助参数  加数
        Map<String, Object> params = new HashMap<>();
        params.put("addNum", 4);
        // 执行多线程处理,并返回处理结果
        ResultBean<List<ResultBean<String>>> resultBean = threadUtils.execute(data, params, task);
    }
}
  • 线程
    120 引用 • 111 回帖 • 3 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Pipe

    Pipe 是一款小而美的开源博客平台。Pipe 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    131 引用 • 1114 回帖 • 136 关注
  • HHKB

    HHKB 是富士通的 Happy Hacking 系列电容键盘。电容键盘即无接点静电电容式键盘(Capacitive Keyboard)。

    5 引用 • 74 回帖 • 445 关注
  • iOS

    iOS 是由苹果公司开发的移动操作系统,最早于 2007 年 1 月 9 日的 Macworld 大会上公布这个系统,最初是设计给 iPhone 使用的,后来陆续套用到 iPod touch、iPad 以及 Apple TV 等产品上。iOS 与苹果的 Mac OS X 操作系统一样,属于类 Unix 的商业操作系统。

    84 引用 • 139 回帖 • 2 关注
  • 倾城之链
    23 引用 • 66 回帖 • 125 关注
  • Log4j

    Log4j 是 Apache 开源的一款使用广泛的 Java 日志组件。

    20 引用 • 18 回帖 • 26 关注
  • wolai

    我来 wolai:不仅仅是未来的云端笔记!

    2 引用 • 14 回帖 • 4 关注
  • 黑曜石

    黑曜石是一款强大的知识库工具,支持本地 Markdown 文件编辑,支持双向链接和关系图。

    A second brain, for you, forever.

    10 引用 • 88 回帖
  • 资讯

    资讯是用户因为及时地获得它并利用它而能够在相对短的时间内给自己带来价值的信息,资讯有时效性和地域性。

    54 引用 • 85 回帖
  • 开源中国

    开源中国是目前中国最大的开源技术社区。传播开源的理念,推广开源项目,为 IT 开发者提供了一个发现、使用、并交流开源技术的平台。目前开源中国社区已收录超过两万款开源软件。

    7 引用 • 86 回帖 • 1 关注
  • Jenkins

    Jenkins 是一套开源的持续集成工具。它提供了非常丰富的插件,让构建、部署、自动化集成项目变得简单易用。

    52 引用 • 37 回帖 • 2 关注
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 387 关注
  • JSON

    JSON (JavaScript Object Notation)是一种轻量级的数据交换格式。易于人类阅读和编写。同时也易于机器解析和生成。

    52 引用 • 190 回帖 • 1 关注
  • Mobi.css

    Mobi.css is a lightweight, flexible CSS framework that focus on mobile.

    1 引用 • 6 回帖 • 721 关注
  • SOHO

    为成为自由职业者在家办公而努力吧!

    7 引用 • 55 回帖 • 48 关注
  • 电影

    这是一个不能说的秘密。

    120 引用 • 598 回帖
  • 以太坊

    以太坊(Ethereum)并不是一个机构,而是一款能够在区块链上实现智能合约、开源的底层系统。以太坊是一个平台和一种编程语言 Solidity,使开发人员能够建立和发布下一代去中心化应用。 以太坊可以用来编程、分散、担保和交易任何事物:投票、域名、金融交易所、众筹、公司管理、合同和知识产权等等。

    34 引用 • 367 回帖
  • SVN

    SVN 是 Subversion 的简称,是一个开放源代码的版本控制系统,相较于 RCS、CVS,它采用了分支管理系统,它的设计目标就是取代 CVS。

    29 引用 • 98 回帖 • 698 关注
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 562 关注
  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    86 引用 • 122 回帖 • 619 关注
  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖 • 1 关注
  • 服务

    提供一个服务绝不仅仅是简单的把硬件和软件累加在一起,它包括了服务的可靠性、服务的标准化、以及对服务的监控、维护、技术支持等。

    41 引用 • 24 回帖 • 1 关注
  • 小说

    小说是以刻画人物形象为中心,通过完整的故事情节和环境描写来反映社会生活的文学体裁。

    28 引用 • 108 回帖
  • 面试

    面试造航母,上班拧螺丝。多面试,少加班。

    324 引用 • 1395 回帖
  • CloudFoundry

    Cloud Foundry 是 VMware 推出的业界第一个开源 PaaS 云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题。

    5 引用 • 18 回帖 • 148 关注
  • Logseq

    Logseq 是一个隐私优先、开源的知识库工具。

    Logseq is a joyful, open-source outliner that works on top of local plain-text Markdown and Org-mode files. Use it to write, organize and share your thoughts, keep your to-do list, and build your own digital garden.

    5 引用 • 62 回帖 • 8 关注
  • WordPress

    WordPress 是一个使用 PHP 语言开发的博客平台,用户可以在支持 PHP 和 MySQL 数据库的服务器上架设自己的博客。也可以把 WordPress 当作一个内容管理系统(CMS)来使用。WordPress 是一个免费的开源项目,在 GNU 通用公共许可证(GPLv2)下授权发布。

    58 引用 • 113 回帖 • 273 关注
  • Facebook

    Facebook 是一个联系朋友的社交工具。大家可以通过它和朋友、同事、同学以及周围的人保持互动交流,分享无限上传的图片,发布链接和视频,更可以增进对朋友的了解。

    4 引用 • 15 回帖 • 460 关注