【迁移脱敏】迁移程序简化版 Draft1


【撰文目的】

将三个月迁移脱敏程序的核心部分总结记录下来。

【注意】

下面程序为简化版,省却了脱敏方法、LRU、并行齿轮等部分,并将验证建表等前置预处理省却。

【代码】

核心迁移类:

package com.hy.lab.migrater;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 迁移器类,将来源表的数据完整迁移到去向表
 */
public class Migrater {
    // 成员变量:来源连接
    private Connection srcConn;

    // 成员变量:来源表
    private String srcTb;

    // 成员变量:去向连接
    private Connection dstConn;

    // 成员变量:去向表
    private String dstTb;

    // 批处理大小,可灵活调整
    private final int BATCH_SIZE=10000;

    /**
     * 构造函数
     *
     * @param srcConn 来源连接
     * @param srcTb   来源表
     * @param dstConn 去向连接
     * @param dstTb   去向表
     */
    public Migrater(Connection srcConn, String srcTb, Connection dstConn, String dstTb) {
        this.srcConn=srcConn;
        this.srcTb=srcTb;
        this.dstConn=dstConn;
        this.dstTb=dstTb;
    }

    /**
     * 迁移
     */
    public void migrate(){
        final int threadCnt=getSrcRecordCount()/BATCH_SIZE;
        CountDownLatch cdl=new CountDownLatch(threadCnt);

        final String sql=String.format("select * from %s",this.srcTb);
        try(PreparedStatement pstmt=this.srcConn.prepareStatement(sql);
            ResultSet rs=pstmt.executeQuery();){

            final int colCnt=10;// 假定列数为10,实际上要用Metadata取
            List rowList=new ArrayList<>(BATCH_SIZE);

            int count=0;
            while(rs.next()){
                String[] arr=new String[colCnt];

                for(int i=0;i){
                    arr[i]=rs.getString(i+1);
                }

                rowList.add(arr);

                count++;
                if(count==BATCH_SIZE){
                    // 分线程去填充
                    new Worker(this,cdl,rowList).start();

                    count=0;
                    rowList=new ArrayList<>(BATCH_SIZE);
                }
            }

            // 等最后一个线程完工
            cdl.await();

            // 收尾
            if(rowList.size()>0){
                transfer(rowList);
            }

            System.out.println("迁移结束");
        }catch(Exception ex){
            ex.printStackTrace();
        }
    }

    public void transfer(List rowList) throws Exception{
        Connection conn=this.dstConn;
        conn.setAutoCommit(false);

        final String insertSql=String.format("insert into %s(id,f1,f2,f3,f4,f5,f6,f7,f8,f9) values(?,?,?,?,?,?,?,?,?,?)",this.dstTb);

        try(PreparedStatement pstmt=this.srcConn.prepareStatement(insertSql)){
            for(String[] arr:rowList){
                for(int i=0;i){
                    String cellValue=arr[i];
                    pstmt.setString(i+1,cellValue);
                }

                pstmt.addBatch();
            }

            pstmt.executeBatch();
            conn.commit();
            pstmt.clearBatch();
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 取得源表数量
     * @return
     */
    private int getSrcRecordCount(){
        final String sql=String.format("select count(*) from %s",this.srcTb);

        try(PreparedStatement pstmt=this.srcConn.prepareStatement(sql);
            ResultSet rs=pstmt.executeQuery();){

            while(rs.next()){
                return rs.getInt(1);
            }

            return -1;
        }catch(Exception ex){
            ex.printStackTrace();
            return -2;
        }
    }

    /**
     * 测试函数
     * @param args
     */
    public static void main(String[] args){
        long start=System.currentTimeMillis();

        try(Connection srcConn=DbUtil.getConn();
            Connection dstConn=DbUtil.getConn();){

            // 设定源表存在有数据,去向表存在无数据,两表表结构一致
            Migrater mgrt=new Migrater(srcConn,"emp625_from",dstConn,"emp625_to");
            mgrt.migrate();

            System.out.println("Completed");
        }catch(Exception ex){
            ex.printStackTrace();
        }

        long end=System.currentTimeMillis();
        System.out.println("Time elasped:"+ms2DHMS(start,end));
    }

    /**
     * change seconds to DayHourMinuteSecond format
     *
     * @param startMs
     * @param endMs
     * @return
     */
    public static String ms2DHMS(long startMs, long endMs) {
        String retval = null;
        long secondCount = (endMs - startMs) / 1000;
        String ms = (endMs - startMs) % 1000 + "ms";

        long days = secondCount / (60 * 60 * 24);
        long hours = (secondCount % (60 * 60 * 24)) / (60 * 60);
        long minutes = (secondCount % (60 * 60)) / 60;
        long seconds = secondCount % 60;

        if (days > 0) {
            retval = days + "d" + hours + "h" + minutes + "m" + seconds + "s";
        } else if (hours > 0) {
            retval = hours + "h" + minutes + "m" + seconds + "s";
        } else if (minutes > 0) {
            retval = minutes + "m" + seconds + "s";
        } else if(seconds > 0) {
            retval = seconds + "s";
        }else {
            return ms;
        }

        return retval + ms;
    }
}

为提速而采用的迁移工人类:

package com.hy.lab.migrater;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * 填充工人
 */
public class Worker extends Thread{
    // 命令者
    private Migrater commander;

    // 倒计器
    private CountDownLatch cdl;

    // 要处理的数据
    private List rowList;

    /**
     * 构造函数
     * @param commander 命令者
     * @param cdl 倒计器
     * @param rowList 要处理的数据
     */
    public Worker(Migrater commander,CountDownLatch cdl,List rowList){
        this.commander=commander;
        this.cdl=cdl;
        this.rowList=rowList;
    }

    public void run(){
        try{
            this.commander.transfer(this.rowList);
            this.rowList=null;
        }catch(Exception ex){
            ex.printStackTrace();
        }finally {
            this.cdl.countDown();
        }
    }
}

提供DB连接的工具类:

package com.hy.lab.migrater;

import java.sql.Connection;
import java.sql.DriverManager;

public class DbUtil {
    //-- 以下为连接Oracle数据库的四大参数
    private static final String DRIVER = "oracle.jdbc.OracleDriver";
    private static final String URL = "jdbc:oracle:thin:@127.0.0.1:1521/orcl";
    private static final String USER = "luna";
    private static final String PSWD = "1234";

    public static Connection getConn() throws Exception{
        Class.forName(DRIVER);
        Connection conn = DriverManager.getConnection(URL, USER, PSWD);
        return conn;
    }
}

【执行效果】

程序输出:

迁移结束
Completed
Time elasped:18s289ms

emp625_to的情况:

SQL> select count(*) from emp625_to;

  COUNT(*)
----------
    500012

SQL> select * from emp625_to where rownum<3;

        ID F1                   F2                   F3
---------- -------------------- -------------------- --------------------
F4                   F5                   F6
-------------------- -------------------- --------------------
F7                   F8                   F9
-------------------- -------------------- --------------------
      7660 YDAS                 OMCPKM               PMKF
AJ                   UT                   XDQCBJRFEP
H                    XZ                   PTLCRECCGF

      7661 JIRI                 CTC                  YFSGFLUN
MJVVL                TYVCE                YLJB
K                    JDB                  CS

        ID F1                   F2                   F3
---------- -------------------- -------------------- --------------------
F4                   F5                   F6
-------------------- -------------------- --------------------
F7                   F8                   F9
-------------------- -------------------- --------------------


SQL>

【迁移效率说明】

每次运行结果稍有差别,传输效率在2-3万行每秒区间,中值约为2.5万行/秒。

【建表及充值语句】

create table emp625_from(
    id number(10),
    f1 nvarchar2(10),
    f2 nvarchar2(10),
    f3 nvarchar2(10),
    f4 nvarchar2(10),
    f5 nvarchar2(10),
    f6 nvarchar2(10),
    f7 nvarchar2(10),
    f8 nvarchar2(10),
    f9 nvarchar2(10),
    primary key(id)
);

create table emp625_to(
    id number(10),
    f1 nvarchar2(10),
    f2 nvarchar2(10),
    f3 nvarchar2(10),
    f4 nvarchar2(10),
    f5 nvarchar2(10),
    f6 nvarchar2(10),
    f7 nvarchar2(10),
    f8 nvarchar2(10),
    f9 nvarchar2(10),
    primary key(id)
);

insert into emp625_from
select rownum,
   dbms_random.string('*',dbms_random.value(1,10)),
   dbms_random.string('*',dbms_random.value(1,10)),
   dbms_random.string('*',dbms_random.value(1,10)),
   dbms_random.string('*',dbms_random.value(1,10)),
   dbms_random.string('*',dbms_random.value(1,10)),
   dbms_random.string('*',dbms_random.value(1,10)),
   dbms_random.string('*',dbms_random.value(1,10)),
   dbms_random.string('*',dbms_random.value(1,10)),
   dbms_random.string('*',dbms_random.value(1,10))
from dual
connect by level<500013;

END

相关