迁移前的情况
-
数据量:
80万+
-
数据表字段数量:
48个字段
-
数据库:
阿里云RDS数据库
-
迁移环境:
本地计算机
-
情况说明:
公司业务中有一个API导入导出的功能,也就是数据要从上游导进来和导出给下游,但是由于种种原因,当前项目数据库中很多字段都是用字符串来存储,在对接了一段时间后,出现问题,上游导进来的数据字符串多种多样,不规范字符随处可见,甚至还有乱码;鉴于这种情况,公司决定对数据库做规范,全部用数字代码替换字符串,但同时不能影响线上版本的使用;数据迁移就是这个需求中的其中一环。
迁移思路
从开始设计到实际迁移经历了3个版本优化,下面会一个一个的说明
-
Version1.0
从数据库取出数据,对数据做处理,然后存储到新表中;
问题:
转化速度很慢,当时做了一个实验,估算完成时间大概需要15天左右,原因就是各流程的效率是不一样的,这个时间太长显然行不通
这个阶段的代码很初级,就不附了(ps:我不会告诉你是因为我懒)
-
Version1.1
从上面的测试中发现两个问题:
处理的速度慢于写,写远远慢于读,处理与写的效率大概是1:5的比例(按完成时间计算出的); 整体的效率很低,读写一次算作一个流程的话,每个流程要连接数据库两次;并且读完一条数据后就猫在一边等着了,处理和写操作都完成才读下一条,处理和写又是耗时最长的操作;
这个过程做个比喻就像是一条月饼流水生产线,A准备原料,B做月饼,C包装,现在的情况是A准备一份的量,就在等着BC,B做完了,就等着C,C做好之后A才准备下一份,此时BC又在等着A,这个速度可想而知。
所以这个版本尝试把流程分离,A 读出全部数据,B 处理数据的同时,C 负责不断的写,这样做需要一个盘子(中间件),B 处理好数据之后放到盘子里,C 不停从盘子里拿;(当然三个流程可以各自独立,但读耗费的时间对整体时间的影响我个人觉得稍微优化下可以接受,所以只把 BC 流程分离)
那么问题就来了:
这个盘子应该用什么来做?
这个盘子应该是双向的,一端进,一端出,并且进出的时候不能出异常,也就是线程安全!想想应该是队列,而且是线程安全的队列,即:ConcurrentLinkedQueue(需要注意的是使用这个队列,非空判断的时候一定要避免用 size,原因嘛请百度)
到这里,应该基本的结构就出来了:
两个线程,一个负责处理数据,处理之后把数据放进队列,另一个线程负责写,从队列中拿数据写到新表里去;注意写操作的时候不光要对队列做非空判断,还要判断处理线程是否在进行(也就是队列中还会不会有新数据进来)
照例,不附代码!
-
Version2.0
按照版本 1.1 来做还是满足不了需求,时间确实减少了很多,但还是很长,那接下来该怎么优化?
想想实际生活,这种情况明显就是人手不足了,人手不足能怎么办?当然是招人了!所以程序的思路也是这样,既然BC两个人已经不够用了,那每个岗位就再招他个七八个人,总该够了吧!
多线程操作,这个时候代码的结构开始有所体现了,所以果断 OOP,new 两个岗位类,一个负责处理数据,一个负责写数据;这两个类我的命名是“MigrateProcessor”(处理类)和“MigrateWriter”(写入类)
public class MigrateWriter {
/**
* 实例化对象
* @param queue
*/
public void getInstance(ConcurrentLinkedQueue queue,String threadName,MigrateManage migrateService,PageData c){
this.init(queue,threadName,migrateService,c);
this.destroy();
}
/**
* 初始化
*/
private void init(ConcurrentLinkedQueue queue,String threadName,MigrateManage migrateService,PageData c){
boolean flag = true;
while(flag){
if(queue.isEmpty()){
if(Integer.parseInt(c.get("c").toString())>0){
continue;
}
}
try{
PageData ml = (PageData)queue.poll();
this.execute(ml,threadName,migrateService);
}catch (Exception e){
e.printStackTrace();
System.out.println(threadName+"---插入数据库失败");
continue;
}
if(queue.isEmpty() && (Integer.parseInt(c.get("c").toString())<=0) ){
flag = false;
}
}
}
/**
* 执行操作
*/
private void execute(PageData ml,String threadName,MigrateManage migrateService) throws Exception {
if(null!=ml){
long begin = System.currentTimeMillis();
//更新到数据库
migrateService.updateLoanCopy(ml);
long end = System.currentTimeMillis();
System.out.println(threadName+"---插入数据成功---id:"+ml.get("id").toString()+"---需要"+(end-begin)+"ms");
}
}
/**
* 销毁
*/
public void destroy(){
System.out.println(Thread.currentThread().getName()+"结束!");
while (!Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
}
}
}
public class MigrateProcessor {
/**
* 实例化对象
* @param queue
*/
public void getInstance(ConcurrentLinkedQueue queue, List<PageData> migrateLoanCityInfo, String threadName, MigrateManage migrateService,PageData c) {
this.init(queue, migrateLoanCityInfo, threadName, migrateService,c);
int cou = Integer.parseInt(c.get("c").toString());
c.put("c",cou--);
this.destroy();
}
/**
* 初始化
*/
private void init(ConcurrentLinkedQueue queue, List<PageData> migrateLoanCityInfo, String threadName, MigrateManage migrateService,PageData c) {
for (PageData ml : migrateLoanCityInfo) {
long begin = System.currentTimeMillis();
try {
this.execute(queue, ml, threadName, migrateService);
} catch (ArithmeticException s) {
continue;
} catch (Exception e) {
e.printStackTrace();
System.out.println(threadName + "处理数据发生错误:" + ml.get("id").toString());
continue;
}
queue.offer(ml);
long end = System.currentTimeMillis();
System.out.println(threadName + "处理一条数据需要" + (end - begin) + "ms");
}
}
/**
* 执行操作(这里只转化了一个字段,做个演示用,源数据很杂,需要多次转化,这里业务逻辑可以忽略掉,只要知道这里是用来处理各个字段的就行)
*/
private void execute(ConcurrentLinkedQueue queue, PageData ml, String threadName, MigrateManage migrateService) throws Exception {
if ((!NumberUtils.isNumber(ml.getString("loan_oldcity"))) && null != ml.get("loan_oldcity") && (!"".equals(ml.get("loan_oldcity")))) {
ml.put("city_name", ml.getString("loan_oldcity"));
PageData codeInfo = migrateService.getCityCodeByCityName(ml);
if (null != codeInfo && !"".equals(codeInfo)) {
ml.put("loan_oldcity", codeInfo.getString("city_code"));
} else {
PageData cityTranscate = migrateService.getCityTranscate(ml); //转换城市
if (null != cityTranscate && !"".equals(cityTranscate)) {
//再次查询是否有code值
ml.put("city_name", cityTranscate.get("unified_location").toString());
codeInfo = migrateService.getCityCodeByCityName(ml);
if (null != codeInfo && !"".equals(codeInfo)) {
ml.put("loan_oldcity", codeInfo.getString("city_code"));
} else {
System.out.println(ml.get("id").toString() + "---loan_oldcity转化后查找不到---" + ml.getString("loan_oldcity"));
}
} else {
System.out.println(ml.get("id").toString() + "---loan_oldcity无法转化---" + ml.getString("loan_oldcity"));
}
}
} else if (NumberUtils.isNumber(ml.getString("loan_oldcity"))) {
throw new ArithmeticException();
}
}
/**
* 销毁
*/
private void destroy() {
System.out.println(Thread.currentThread().getName()+"结束!");
while (!Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
}
}
}
主流程(看个人的电脑配置,10 个线程我的电脑已到极限的极限):
对读操作做些优化,限制单次查询数量,耗时大约50s(为什么要限制,你也可以试试一次读出来,可能会有惊喜)
处理数据,5个线程对我来说足够,处理的同时放入队列
插入数据,4个线程
List<PageData> migrateLoanCityInfo = migrateService.getMigrateLoanCityInfo(pd);
System.out.println("总数据量:"+migrateLoanCityInfo.size());
int size = 5;
PageData c = new PageData();
c.put("c",size);
List<List<PageData>> listArr=new ArrayList<List<PageData>>();
int remaider=migrateLoanCityInfo.size()%size; //(先计算出余数)
int number=migrateLoanCityInfo.size()/size; //然后是商
int offset=0;//偏移量
for(int i=0;i<size;i++){
List<PageData> value=null;
if(remaider>0){
value=migrateLoanCityInfo.subList(i*number+offset, (i+1)*number+offset+1);
remaider--;
offset++;
}else{
value=migrateLoanCityInfo.subList(i*number+offset, (i+1)*number+offset);
}
listArr.add(value);
}
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); //用来存储数据的队列
//processor处理字段,并存入ConcurrentLinkedQueue中
Thread mp_1 = new Thread(new Runnable() {
@Override
public void run() {
MigrateProcessor mp = new MigrateProcessor();
mp.getInstance(queue,listArr.get(0),"mp_1",migrateService,c);
}
});
Thread mp_2 = new Thread(new Runnable() {
@Override
public void run() {
MigrateProcessor mp = new MigrateProcessor();
mp.getInstance(queue,listArr.get(1),"mp_2",migrateService,c);
}
});
Thread mp_3 = new Thread(new Runnable() {
@Override
public void run() {
MigrateProcessor mp = new MigrateProcessor();
mp.getInstance(queue,listArr.get(2),"mp_3",migrateService,c);
}
});
Thread mp_4 = new Thread(new Runnable() {
@Override
public void run() {
MigrateProcessor mp = new MigrateProcessor();
mp.getInstance(queue,listArr.get(3),"mp_4",migrateService,c);
}
});
Thread mp_5 = new Thread(new Runnable() {
@Override
public void run() {
MigrateProcessor mp = new MigrateProcessor();
mp.getInstance(queue,listArr.get(4),"mp_5",migrateService,c);
}
});
mp_1.start();
mp_2.start();
mp_3.start();
mp_4.start();
mp_5.start();
Thread mv_t1 = new Thread(new Runnable() {
@Override
public void run() {
MigrateWriter mw = new MigrateWriter();
mw.getInstance(queue,"mv_t1",migrateService,c);
}
});
Thread mv_t2 = new Thread(new Runnable() {
@Override
public void run() {
MigrateWriter mw = new MigrateWriter();
mw.getInstance(queue,"mv_t2",migrateService,c);
}
});
Thread mv_t3 = new Thread(new Runnable() {
@Override
public void run() {
MigrateWriter mw = new MigrateWriter();
mw.getInstance(queue,"mv_t3",migrateService,c);
}
});
Thread mv_t4 = new Thread(new Runnable() {
@Override
public void run() {
MigrateWriter mw = new MigrateWriter();
mw.getInstance(queue,"mv_t4",migrateService,c);
}
});
mv_t1.start();
mv_t2.start();
mv_t3.start();
mv_t4.start();
这样做迁移处理完所有数据需要 4-5 个小时左右,当前的耗时可以在我项目进度之内,所以接下来就没有再另外做优化,那另外还有其他的优化方式吗?答案是肯定有的!
最后说明两点:
代码中还有很多生硬的部分,比如线程间的通信协调,如果有大牛看到了,希望指点指点~
这些代码只是演示说明用,并不是全部,直接用会出bug的哦
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于