MySql使用存储过程进行单表数据迁移的实现
目录
- 前景
- 设计思路
- 迁移脚本
- 迁移示例
- 总结
前景
近期在进行业务解耦,对冗余在一起切又属于不同业务的代码进行分离,同时也将数据库进行分离存储,那么这时候就涉及到多个表的数据要进行迁移,这里我们就来总结一下如何使用存储过程进行数据高效迁移。
设计思路
跨数据库实例表数据迁移,无非就是把一个表完完整整的复制到另一个数据库实例当中,但是怎么做才能简单易懂又高效呢?
首先我们写一个脚本肯定也希望能够多次使用,否则何必浪费时间去研究写大批量处理的脚本呢!我们先分析数据迁移的一些主要步骤:目标实例创建表、数据分批处理、数据迁移记录、数据迁移入库
- 目标实例创建表:我们需要根据个人所需,明确是否强制重新创建表,通常情况既然是全表迁移那都是要强制重新建表的(无论是否已有表数据)。
- 数据分批处理:要对数据进行分批处理,首先我们需要对数据进行排序,那么排序最好我们是以自增主键id进行排序,这样方便我们进行分批数据获取。
- 数据迁移记录:这里我们最好有个临时表用来做实时数据迁移记录,以免大数据迁移我们都不知道迁移到哪了,同时临时表也有助于数据分批处理。
- 数据迁移入库:对数据进行排好序后,我们根据主键id对数据进行过滤,避免在limit后面进行分页操作,limit只需要确认迁移数据量即可。
下面我们就具体分析一下完整的迁移脚本。
迁移脚本
首先我们需要明确数据要迁移的目标数据库,最好要把这点写在脚本里,方式跑错数据库实例:
USE `test_db`;
临时表创建:主要辅助记录需要迁移的数据量、实时更新已迁移的数据量、迁移表名、迁移的数据最大主键id用来进行数据过滤。
DROP TABLE IF EXISTS tmp_migrate_table_record; CREATE TABLE tmp_miandroidgrate_table_record( `id` INT PRIMARY KEY NOT NULL AUTO_INCREMENT COMMENT '主键ID', `table_name` VARCHAR(50) COMMENT '迁移表名', `source_table_count` INT COMMENT '源表记录数', `target_table_count` INT COMMENT '源表记录数', `max_id` BIGINT COMMENT '编程客栈已迁移最大主键id' );
全表迁移存储过程实现,主要包含以下几个参数:
- sourceSchema:数据源schema名称
- tableName:需要迁移的表名(若需要迁移的目标表名与源表名不一致,可自行添加参数做修改)
- primaryKey:表主键名,用于数据排序、过滤、分批处理
- BATchSize:大数据迁移分批次大小
DROP PROCEDURE IF EXISTS p_migrate_full_table_data; DELIMITER $$ CREATE PROCEDURE p_migrate_full_table_data(IN sourceSchema VARCHAR(50), IN tableName VARCHAR(50), IN primaryKey VARCHAR(50), IN batchSize INT) BEGIN 编程 -- 判断旧表是否存在,若存在则删除旧表(强制重新建表) SET @dropTableSql = CONCAT('DROP TABLE IF EXISTS ', tableName, ';'); PREPARE dropTableSql FROM @dropTableSql; EXECUTE dropTableSql; -- 依赖源数据,创建新表(这里可以根据需要更换表名) SET @craeteTable = CONCAT('CREATE TABLE ', tableName, ' LIKhttp://www.devze.comE `', sourceSchema, '`.', tableName, ';'); PREPARE craeteTable FROM @craeteTable; EXECUTE craeteTable; -- 清除当前表迁移数据记录数,防止有旧数据影响 SET @deleteCountSql = CONCAT('DELETE FROM tmp_migrate_table_record WHERE table_name=''', tableName, ''';'); PREPARE deleteCountSql FROM @deleteCountSql; EXECUTE deleteCountSql; -- 初始记录临时表正在进行数据迁移的表信息 SET @countSql = CONCAT('INSERT INTO tmp_migrate_table_record(table_name, source_table_count, target_table_count) SELECT ''', tableName, ''', COUNT(*), 0 FROM `', sourceSchema, '`.', tableName, ';'); PREPARE countSql FROM @countSql; EXECUTE countSql; -- 用于查看预编译后的SQL脚本,若有需要可以打开注释查看 -- SELECT @dropTableSql, @craeteTable, @deleteCountSql, @countSql; -- 数据源表记录数 SET @sourceCount = 0; -- 目标表记录数 SET @targetCount = 0; -- 已导入最大id SET @maxId = 0; SELECT source_table_count, target_table_count, IFNULL(max_id, 0) INTO @sourceCount, @targetCount, @maxId FROM tmp_migrate_table_record WHERE table_name=tableName; -- 循环分批迁移数据,根据已迁移数量与需要迁移数量进行比较 WHILE @sourceCount <> @targetCount DO -- 开启事务 START TRANSACTION; -- 执行数据分批迁移 SET @insertSql = CONCAT('INSERT INTO ', tableName, ' SELECT * FROM `', sourceSchema, '`.', tableName, ' WHERE ', primaryKey, ' > ', @maxId, ' ORDER BY ', primaryKey, ' ASC ', 'LIMIT ', batchSize, ';'); PREPARE insertSql FROM @insertSql; EXECUTE insertSql; -- 实时更新临时表已迁移记录数 SET @updateCountSql = CONCAT('UPDATE tmp_migrate_table_record SET target_table_count = (SELECT COUNT(*) FROM ', tableName, ') WHERE table_name=''', tableName, ''';'); PREPARE updateCountSql FROM @updateCountSql; EXECUTE updateCountSql; -- 跟新临时表以迁移数据最大主键id SET @updateMaxIdSql = CONCAT('UPDATE tmp_migrate_table_record SET max_id = (SELECT IFNULL(MAX(', primaryKey,'), 0) FROM ', tableName, ') WHERE table_name=''', tableName, ''';'); PREPARE updateMaxIdSql FROM @updateMaxIdSql; EXECUTE updateMaxIdSql; -- 刷新变量 SELECT target_table_count, max_id INTO @targetCount, @maxId FROM tmp_migrate_table_record WHERE table_name=tableName; -- 查看上面拼装后的sql,需要排查问题可打开查看 SELECT @insertSql, @updateCountSql, @updateMaxIdSql, @sourceCounhttp://www.devze.comt, @targetCount; -- 提交事务 COMMIT; END WHILE; END $$ DELIMITER ;
存储过程完成后,接下来就是执行存储过程进行数据迁移了:
- CALL p_migrate_full_table_data('test_source_db', 't_student', 'student_id', 50000);
- CALL p_migrate_full_table_data('test_source_db', 't_student', 'student_id', 50000);
- CALL p_migrate_full_table_data('test_source_db', 't_student', 'student_id', 50000);
确认数据没有问题后,可以对临时表进行清理:
DROP TABLE IF EXISTS tmp_migrate_table_record;
迁移示例
创建测试表
DROP TABLE IF EXISTS t_student; CREATE TABLE t_student( id BIGINT, `name` VARCHAR(50), age INT(3), state CHAR(1), PRIMARY KEY (id) ); DROP TABLE IF EXISTS t_course; CREATE TABLE t_course( id BIGINT, `name` VARCHAR(50) );
使用存储过程初始化数据
DROP PROCEDURE IF EXISTS init_student; DELIMITER $$ CREATE PROCEDURE init_student() BEGIN DELETE FROM t_student; SET @p = 1; -- 测试数据数量自己定 WHILE @p < 234567 DO INSERT INTO t_student VALUES(@p, CONCAT('user', @p * 1000000), 18, 'A'); SET @p = @p + 1; END WHILE; END $$ DELIMITER ; CALL init_student();
执行迁移脚本
CALL p_migrate_full_table_data('test_source_db', 't_student', 'id', 50000); CALL p_migrate_full_table_data('test_source_db', 't_course', 'id', 50000);
总结
数据迁移的方式有很多,如果是大量的表都要迁移的情况,建议直接整个库迁移,再删掉不需要的表效果会更好,再大的困难都不是问题,关键是找对方法很重要。
到此这篇关于mysql使用存储过程进行单表数据迁移的实现的文章就介绍到这了,更多相关MySql 单表数据迁移内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论