天天看点

【项目实战】——历史数据归档

迁移目标

        按季度(每个租户自定义季度日期且各不相同)划分,有明显的冷热数据区分,目标将冷数据分隔,减少单表过大,提供SQL等业务处理能力,期待预期按租户自定义时间迁移,且迁移过程实现自动化,无需人工干预。

归档方案

        按各租户自定义中季度日历进行迁移,热数据保留最近四个季度的数据,其他数据,以日历中设定的春暑秋寒四个季度为年,进行年维度的历史数据归档。

执行方案

定时任务,每日执行定时任务,判断各个租户当前所属季度,判断上一年的该季度数据是否已经完成迁移,无迁移记录,则进行数据迁移。

1、动态的创建归档数据表,此过程需要可复用,以满足多个数据表迁移归档的需求

2、分校存在一个季度有两种日历的情况,需要数据全部迁移不可遗留

3、迁移的数据明细需要被记录,以此保证幂等,避免数据被重复迁移

核心逻辑过程

1、判断目标表是否存在,不存在则创建

select count(*) from information_schema.TABLES where table_name = #{tableName}      

返回数据为0,则证明数据表不存在,创建目标表

利用mybatis中${},将表名称和数据结构传入,进行创建

<update id="createTable">
    create table ${tableName} ${content}
</update>      

2、批量数据迁移提升效率

1).INSERT INTO SELECT语句

      语句形式为:Insert into Table2(field1,field2,...) select value1,value2,... from Table1

      要求目标表Table2必须存在,由于目标表Table2已经存在,所以我们除了插入源表Table1的字段外,还可以插入常量。

 2).SELECT INTO FROM语句

      语句形式为:SELECT vale1, value2 into Table2 from Table1

      要求目标表Table2不存在,因为在插入时会自动创建表Table2,并将Table1中指定字段数据复制到Table2中

数据迁移以年为单位,目标表Table2在迁移的时候一定是存在,故选用INSERT INTO SELECT方案,替代Insert INTO table(field1,field2,...) values(value1,value2,...)提升迁移效率,避免大量数据查询再插入带来应用服务器压力

3、记录已经迁移的数据(可复用)

CREATE TABLE `tb_move_data` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `biz_id` varchar(32) NOT NULL COMMENT '业务id',
  `city_code` varchar(8) NOT NULL COMMENT '租户代码',
  `table_name` varchar(32) NOT NULL COMMENT '表名',
  `num` int(11) NOT NULL COMMENT '迁移条数',
  `year` int(8) NOT NULL COMMENT '年',
  `term` int(8) NOT NULL COMMENT '季度',
  `start_time` datetime NOT NULL COMMENT '开始日期',
  `end_time` datetime NOT NULL COMMENT '截止日期',
  `deleted` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否删除',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='数据迁移记录';      

利用insert into select 可以获取到迁移数据条数,将其他业务信息拼接插入即可。

4、迁移完成后,移除原表中的已迁移数据,定期清理数据碎片。

public class PlanHintStrategy implements HintShardingAlgorithm<String> {
/**
*
* @param actualTables 物理表
* @param hintShardingValue 分片键,指定的表
* @return
*/
@Override
public Collection<String> doSharding(Collection actualTables, HintShardingValue hintShardingValue) {
List<String> shardingResult = new ArrayList<>();
actualTables.forEach(table -> {
String tableName = (String) table;
String suffix = tableName.substring(tableName.lastIndexOf("_") + 1);
Collection<String> tableNames = hintShardingValue.getValues();
if (tableNames.contains(suffix)) {
shardingResult.add(tableName);
}
});
return shardingResult;
}
}      
// Hint分片策略必须要使用 HintManager工具类
HintManager hintManager = HintManager.getInstance();
//查询数据库,可以不配置
hintManager.addDatabaseShardingValue("tb_history_plan", 0);
//查询分片表,可以配置多个
hintManager.addTableShardingValue("tb_history_plan", "2019");
hintManager.addTableShardingValue("tb_history_plan", "2020")
// 直接指定对应具体的数据库
hintManager.setDatabaseShardingValue(1);