记录:一次数据迁移

本贴最后更新于 2097 天前,其中的信息可能已经斗转星移

迁移前的情况

  • 数据量:80万+

  • 数据表字段数量:48个字段

  • 数据库:阿里云RDS数据库

  • 迁移环境:本地计算机

  • 情况说明:公司业务中有一个API导入导出的功能,也就是数据要从上游导进来和导出给下游,但是由于种种原因,当前项目数据库中很多字段都是用字符串来存储,在对接了一段时间后,出现问题,上游导进来的数据字符串多种多样,不规范字符随处可见,甚至还有乱码;鉴于这种情况,公司决定对数据库做规范,全部用数字代码替换字符串,但同时不能影响线上版本的使用;数据迁移就是这个需求中的其中一环。

数据迁移

迁移思路

从开始设计到实际迁移经历了3个版本优化,下面会一个一个的说明

  1. Version1.0

    从数据库取出数据,对数据做处理,然后存储到新表中;

    问题:转化速度很慢,当时做了一个实验,估算完成时间大概需要15天左右,原因就是各流程的效率是不一样的,这个时间太长显然行不通

    这个阶段的代码很初级,就不附了(ps:我不会告诉你是因为我懒)

  2. Version1.1

    从上面的测试中发现两个问题:

     处理的速度慢于写,写远远慢于读,处理与写的效率大概是1:5的比例(按完成时间计算出的);
     整体的效率很低,读写一次算作一个流程的话,每个流程要连接数据库两次;并且读完一条数据后就猫在一边等着了,处理和写操作都完成才读下一条,处理和写又是耗时最长的操作;
    

    这个过程做个比喻就像是一条月饼流水生产线,A准备原料,B做月饼,C包装,现在的情况是A准备一份的量,就在等着BC,B做完了,就等着C,C做好之后A才准备下一份,此时BC又在等着A,这个速度可想而知。

    所以这个版本尝试把流程分离,A 读出全部数据,B 处理数据的同时,C 负责不断的写,这样做需要一个盘子(中间件),B 处理好数据之后放到盘子里,C 不停从盘子里拿;(当然三个流程可以各自独立,但读耗费的时间对整体时间的影响我个人觉得稍微优化下可以接受,所以只把 BC 流程分离)

    那么问题就来了:这个盘子应该用什么来做?

    这个盘子应该是双向的,一端进,一端出,并且进出的时候不能出异常,也就是线程安全!想想应该是队列,而且是线程安全的队列,即:ConcurrentLinkedQueue(需要注意的是使用这个队列,非空判断的时候一定要避免用 size,原因嘛请百度)

    到这里,应该基本的结构就出来了:

    两个线程,一个负责处理数据,处理之后把数据放进队列,另一个线程负责写,从队列中拿数据写到新表里去;注意写操作的时候不光要对队列做非空判断,还要判断处理线程是否在进行(也就是队列中还会不会有新数据进来)

    照例,不附代码!

  3. Version2.0

    按照版本 1.1 来做还是满足不了需求,时间确实减少了很多,但还是很长,那接下来该怎么优化?

    想想实际生活,这种情况明显就是人手不足了,人手不足能怎么办?当然是招人了!所以程序的思路也是这样,既然BC两个人已经不够用了,那每个岗位就再招他个七八个人,总该够了吧!

    多线程操作,这个时候代码的结构开始有所体现了,所以果断 OOP,new 两个岗位类,一个负责处理数据,一个负责写数据;这两个类我的命名是“MigrateProcessor”(处理类)和“MigrateWriter”(写入类)

public class MigrateWriter {

    /**
     * 实例化对象
     * @param queue
     */
    public  void  getInstance(ConcurrentLinkedQueue queue,String threadName,MigrateManage migrateService,PageData c){
        this.init(queue,threadName,migrateService,c);
        this.destroy();
    }

    /**
     * 初始化
     */
    private void init(ConcurrentLinkedQueue queue,String threadName,MigrateManage migrateService,PageData c){
        boolean flag = true;
        while(flag){
            if(queue.isEmpty()){
                if(Integer.parseInt(c.get("c").toString())>0){
                    continue;
                }
            }
            try{
                PageData ml = (PageData)queue.poll();
                this.execute(ml,threadName,migrateService);
            }catch (Exception e){
                e.printStackTrace();
                System.out.println(threadName+"---插入数据库失败");
                continue;
            }
            if(queue.isEmpty() && (Integer.parseInt(c.get("c").toString())<=0) ){
                flag = false;
            }
        }

    }

    /**
     * 执行操作
     */
    private void execute(PageData ml,String threadName,MigrateManage migrateService) throws Exception {
        if(null!=ml){
            long begin = System.currentTimeMillis();

            //更新到数据库
            migrateService.updateLoanCopy(ml);

            long end = System.currentTimeMillis();
            System.out.println(threadName+"---插入数据成功---id:"+ml.get("id").toString()+"---需要"+(end-begin)+"ms");
        }
    }

    /**
     * 销毁
     */
    public void destroy(){
        System.out.println(Thread.currentThread().getName()+"结束!");
        while (!Thread.currentThread().isInterrupted()) {
            Thread.currentThread().interrupt();
        }
    }

}
public class MigrateProcessor {

    /**
     * 实例化对象
     * @param queue
     */
    public void getInstance(ConcurrentLinkedQueue queue, List<PageData> migrateLoanCityInfo, String threadName, MigrateManage migrateService,PageData c) {
        this.init(queue, migrateLoanCityInfo, threadName, migrateService,c);
        int cou = Integer.parseInt(c.get("c").toString());
        c.put("c",cou--);
        this.destroy();
    }

    /**
     * 初始化
     */
    private void init(ConcurrentLinkedQueue queue, List<PageData> migrateLoanCityInfo, String threadName, MigrateManage migrateService,PageData c) {
        for (PageData ml : migrateLoanCityInfo) {
            long begin = System.currentTimeMillis();
            try {
                this.execute(queue, ml, threadName, migrateService);
            } catch (ArithmeticException s) {
                continue;
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(threadName + "处理数据发生错误:" + ml.get("id").toString());
                continue;
            }
            queue.offer(ml);
            long end = System.currentTimeMillis();
            System.out.println(threadName + "处理一条数据需要" + (end - begin) + "ms");
        }

    }

    /**
     * 执行操作(这里只转化了一个字段,做个演示用,源数据很杂,需要多次转化,这里业务逻辑可以忽略掉,只要知道这里是用来处理各个字段的就行)
     */
    private void execute(ConcurrentLinkedQueue queue, PageData ml, String threadName, MigrateManage migrateService) throws Exception {
        if ((!NumberUtils.isNumber(ml.getString("loan_oldcity"))) && null != ml.get("loan_oldcity") && (!"".equals(ml.get("loan_oldcity")))) {
            ml.put("city_name", ml.getString("loan_oldcity"));
            PageData codeInfo = migrateService.getCityCodeByCityName(ml);
            if (null != codeInfo && !"".equals(codeInfo)) {
                ml.put("loan_oldcity", codeInfo.getString("city_code"));
            } else {
                PageData cityTranscate = migrateService.getCityTranscate(ml);       //转换城市
                if (null != cityTranscate && !"".equals(cityTranscate)) {
                    //再次查询是否有code值
                    ml.put("city_name", cityTranscate.get("unified_location").toString());
                    codeInfo = migrateService.getCityCodeByCityName(ml);
                    if (null != codeInfo && !"".equals(codeInfo)) {
                        ml.put("loan_oldcity", codeInfo.getString("city_code"));
                    } else {
                        System.out.println(ml.get("id").toString() + "---loan_oldcity转化后查找不到---" + ml.getString("loan_oldcity"));
                    }
                } else {
                    System.out.println(ml.get("id").toString() + "---loan_oldcity无法转化---" + ml.getString("loan_oldcity"));
                }
            }
        } else if (NumberUtils.isNumber(ml.getString("loan_oldcity"))) {
            throw new ArithmeticException();
        }
    }

    /**
     * 销毁
     */
    private void destroy() {
        System.out.println(Thread.currentThread().getName()+"结束!");
        while (!Thread.currentThread().isInterrupted()) {
            Thread.currentThread().interrupt();
        }
    }

}

主流程(看个人的电脑配置,10 个线程我的电脑已到极限的极限):

对读操作做些优化,限制单次查询数量,耗时大约50s(为什么要限制,你也可以试试一次读出来,可能会有惊喜)
处理数据,5个线程对我来说足够,处理的同时放入队列
插入数据,4个线程
        List<PageData> migrateLoanCityInfo = migrateService.getMigrateLoanCityInfo(pd);
        System.out.println("总数据量:"+migrateLoanCityInfo.size());

        int size = 5;
        PageData c = new PageData();
        c.put("c",size);


        List<List<PageData>> listArr=new ArrayList<List<PageData>>();
        int remaider=migrateLoanCityInfo.size()%size;  //(先计算出余数)
        int number=migrateLoanCityInfo.size()/size;  //然后是商
        int offset=0;//偏移量
        for(int i=0;i<size;i++){
            List<PageData> value=null;
            if(remaider>0){
                value=migrateLoanCityInfo.subList(i*number+offset, (i+1)*number+offset+1);
                remaider--;
                offset++;
            }else{
                value=migrateLoanCityInfo.subList(i*number+offset, (i+1)*number+offset);
            }
            listArr.add(value);
        }

        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();      //用来存储数据的队列

        //processor处理字段,并存入ConcurrentLinkedQueue中
        Thread mp_1 = new Thread(new Runnable() {
            @Override
            public void run() {
                MigrateProcessor mp = new MigrateProcessor();
                mp.getInstance(queue,listArr.get(0),"mp_1",migrateService,c);
            }
        });
        Thread mp_2 = new Thread(new Runnable() {
            @Override
            public void run() {
                MigrateProcessor mp = new MigrateProcessor();
                mp.getInstance(queue,listArr.get(1),"mp_2",migrateService,c);
            }
        });
        Thread mp_3 = new Thread(new Runnable() {
            @Override
            public void run() {
                MigrateProcessor mp = new MigrateProcessor();
                mp.getInstance(queue,listArr.get(2),"mp_3",migrateService,c);
            }
        });
        Thread mp_4 = new Thread(new Runnable() {
            @Override
            public void run() {
                MigrateProcessor mp = new MigrateProcessor();
                mp.getInstance(queue,listArr.get(3),"mp_4",migrateService,c);
            }
        });
        Thread mp_5 = new Thread(new Runnable() {
            @Override
            public void run() {
                MigrateProcessor mp = new MigrateProcessor();
                mp.getInstance(queue,listArr.get(4),"mp_5",migrateService,c);
            }
        });

        mp_1.start();
        mp_2.start();
        mp_3.start();
        mp_4.start();
        mp_5.start();

        Thread mv_t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                MigrateWriter mw = new MigrateWriter();
                mw.getInstance(queue,"mv_t1",migrateService,c);
            }
        });
        Thread mv_t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                MigrateWriter mw = new MigrateWriter();
                mw.getInstance(queue,"mv_t2",migrateService,c);
            }
        });
        Thread mv_t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                MigrateWriter mw = new MigrateWriter();
                mw.getInstance(queue,"mv_t3",migrateService,c);
            }
        });
        Thread mv_t4 = new Thread(new Runnable() {
            @Override
            public void run() {
                MigrateWriter mw = new MigrateWriter();
                mw.getInstance(queue,"mv_t4",migrateService,c);
            }
        });

        mv_t1.start();
        mv_t2.start();
        mv_t3.start();
        mv_t4.start();

这样做迁移处理完所有数据需要 4-5 个小时左右,当前的耗时可以在我项目进度之内,所以接下来就没有再另外做优化,那另外还有其他的优化方式吗?答案是肯定有的!

最后说明两点:

代码中还有很多生硬的部分,比如线程间的通信协调,如果有大牛看到了,希望指点指点~
这些代码只是演示说明用,并不是全部,直接用会出bug的哦
  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1083 引用 • 3461 回帖 • 256 关注
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3169 引用 • 8208 回帖
  • MySQL

    MySQL 是一个关系型数据库管理系统,由瑞典 MySQL AB 公司开发,目前属于 Oracle 公司。MySQL 是最流行的关系型数据库管理系统之一。

    675 引用 • 535 回帖
  • 线程
    120 引用 • 111 回帖 • 3 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • NetBeans

    NetBeans 是一个始于 1997 年的 Xelfi 计划,本身是捷克布拉格查理大学的数学及物理学院的学生计划。此计划延伸而成立了一家公司进而发展这个商用版本的 NetBeans IDE,直到 1999 年 Sun 买下此公司。Sun 于次年(2000 年)六月将 NetBeans IDE 开源,直到现在 NetBeans 的社群依然持续增长。

    78 引用 • 102 回帖 • 650 关注
  • 开源

    Open Source, Open Mind, Open Sight, Open Future!

    402 引用 • 3522 回帖
  • 人工智能

    人工智能(Artificial Intelligence)是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门技术科学。

    77 引用 • 159 回帖 • 1 关注
  • WordPress

    WordPress 是一个使用 PHP 语言开发的博客平台,用户可以在支持 PHP 和 MySQL 数据库的服务器上架设自己的博客。也可以把 WordPress 当作一个内容管理系统(CMS)来使用。WordPress 是一个免费的开源项目,在 GNU 通用公共许可证(GPLv2)下授权发布。

    45 引用 • 113 回帖 • 273 关注
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 20 关注
  • Firefox

    Mozilla Firefox 中文俗称“火狐”(正式缩写为 Fx 或 fx,非正式缩写为 FF),是一个开源的网页浏览器,使用 Gecko 排版引擎,支持多种操作系统,如 Windows、OSX 及 Linux 等。

    7 引用 • 30 回帖 • 429 关注
  • ZeroNet

    ZeroNet 是一个基于比特币加密技术和 BT 网络技术的去中心化的、开放开源的网络和交流系统。

    1 引用 • 21 回帖 • 607 关注
  • Google

    Google(Google Inc.,NASDAQ:GOOG)是一家美国上市公司(公有股份公司),于 1998 年 9 月 7 日以私有股份公司的形式创立,设计并管理一个互联网搜索引擎。Google 公司的总部称作“Googleplex”,它位于加利福尼亚山景城。Google 目前被公认为是全球规模最大的搜索引擎,它提供了简单易用的免费服务。不作恶(Don't be evil)是谷歌公司的一项非正式的公司口号。

    49 引用 • 192 回帖
  • C++

    C++ 是在 C 语言的基础上开发的一种通用编程语言,应用广泛。C++ 支持多种编程范式,面向对象编程、泛型编程和过程化编程。

    106 引用 • 152 回帖 • 1 关注
  • Sillot

    Insights(注意当前设置 master 为默认分支)

    汐洛彖夲肜矩阵(Sillot T☳Converbenk Matrix),致力于服务智慧新彖乄,具有彖乄驱动、极致优雅、开发者友好的特点。其中汐洛绞架(Sillot-Gibbet)基于自思源笔记(siyuan-note),前身是思源笔记汐洛版(更早是思源笔记汐洛分支),是智慧新录乄终端(多端融合,移动端优先)。

    主仓库地址:Hi-Windom/Sillot

    文档地址:sillot.db.sc.cn

    注意事项:

    1. ⚠️ 汐洛仍在早期开发阶段,尚不稳定
    2. ⚠️ 汐洛并非面向普通用户设计,使用前请了解风险
    3. ⚠️ 汐洛绞架基于思源笔记,开发者尽最大努力与思源笔记保持兼容,但无法实现 100% 兼容
    29 引用 • 25 回帖 • 53 关注
  • 笔记

    好记性不如烂笔头。

    306 引用 • 782 回帖 • 1 关注
  • 创业

    你比 99% 的人都优秀么?

    83 引用 • 1398 回帖
  • 深度学习

    深度学习(Deep Learning)是机器学习的分支,是一种试图使用包含复杂结构或由多重非线性变换构成的多个处理层对数据进行高层抽象的算法。

    41 引用 • 40 回帖
  • Kubernetes

    Kubernetes 是 Google 开源的一个容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理。

    109 引用 • 54 回帖 • 1 关注
  • 支付宝

    支付宝是全球领先的独立第三方支付平台,致力于为广大用户提供安全快速的电子支付/网上支付/安全支付/手机支付体验,及转账收款/水电煤缴费/信用卡还款/AA 收款等生活服务应用。

    29 引用 • 347 回帖 • 5 关注
  • QQ

    1999 年 2 月腾讯正式推出“腾讯 QQ”,在线用户由 1999 年的 2 人(马化腾和张志东)到现在已经发展到上亿用户了,在线人数超过一亿,是目前使用最广泛的聊天软件之一。

    45 引用 • 557 回帖 • 158 关注
  • Typecho

    Typecho 是一款博客程序,它在 GPLv2 许可证下发行,基于 PHP 构建,可以运行在各种平台上,支持多种数据库(MySQL、PostgreSQL、SQLite)。

    12 引用 • 60 回帖 • 464 关注
  • 房星科技

    房星网,我们不和没有钱的程序员谈理想,我们要让程序员又有理想又有钱。我们有雄厚的房地产行业线下资源,遍布昆明全城的 100 家门店、四千地产经纪人是我们坚实的后盾。

    6 引用 • 141 回帖 • 567 关注
  • SVN

    SVN 是 Subversion 的简称,是一个开放源代码的版本控制系统,相较于 RCS、CVS,它采用了分支管理系统,它的设计目标就是取代 CVS。

    29 引用 • 98 回帖 • 690 关注
  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    536 引用 • 672 回帖
  • DevOps

    DevOps(Development 和 Operations 的组合词)是一组过程、方法与系统的统称,用于促进开发(应用程序/软件工程)、技术运营和质量保障(QA)部门之间的沟通、协作与整合。

    45 引用 • 25 回帖
  • PostgreSQL

    PostgreSQL 是一款功能强大的企业级数据库系统,在 BSD 开源许可证下发布。

    22 引用 • 22 回帖
  • BookxNote

    BookxNote 是一款全新的电子书学习工具,助力您的学习与思考,让您的大脑更高效的记忆。

    笔记整理交给我,一心只读圣贤书。

    1 引用 • 1 回帖 • 1 关注
  • Telegram

    Telegram 是一个非盈利性、基于云端的即时消息服务。它提供了支持各大操作系统平台的开源的客户端,也提供了很多强大的 APIs 给开发者创建自己的客户端和机器人。

    5 引用 • 35 回帖
  • Maven

    Maven 是基于项目对象模型(POM)、通过一小段描述信息来管理项目的构建、报告和文档的软件项目管理工具。

    186 引用 • 318 回帖 • 332 关注
  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    76 引用 • 429 回帖 • 3 关注
  • 以太坊

    以太坊(Ethereum)并不是一个机构,而是一款能够在区块链上实现智能合约、开源的底层系统。以太坊是一个平台和一种编程语言 Solidity,使开发人员能够建立和发布下一代去中心化应用。 以太坊可以用来编程、分散、担保和交易任何事物:投票、域名、金融交易所、众筹、公司管理、合同和知识产权等等。

    34 引用 • 367 回帖 • 2 关注