Oracle全量同步与增量同步方式
目录
- 一、全量同步(T+1)
- (一)全量同步步骤
- 1.创建源表
- 2.创建数据同步的目标表
- 3.创建全量同步的存储过程并调用
- (二)全量同步练习
- (三)存储过程 & 动态SQL & 全量同步之间的区别
- 二、增量同步——MERGE INTO
- (一)相关概念介绍
- 1.什么是增量
- 2.什么是增量同步
- 3.全量同步与增量同步的区别
- 4.增量同步MERGE INTO语法结构
- (二)增量同步练习
- 1.练习1
- 2.练习2
- 总结
etl——Extract, Transform, Load,即“提取、转换、加载”
一、全量同步(T+1)
逻辑:用源表的数据直接覆盖目标表。
实现的逻辑:在往目标表中插入数据之前,【先清空目标表】,然后查询源表的数据,直接插入目标表。适用于数据量小的情况。
(一)全量同步步骤
1.创建源表
drop table emp_source; CREATE TABLE emp_source AS SELECT e.*, SYSDATE create_date, SYSDATE last_update_date FROM emp e WHERE 1 = 2; INSERT INTO emp_source SELECT e.*, SYSDATE create_date, SYSDATE last_update_date FROM emp e; commit ; select * from emp_source;
2.创建数据同步的目标表
drop TABLE emp_tar; CREATE TABLE emp_tar AS SELECT e.*, SYmDrTqSDATE etl_dt FROM emp e WHERE 1 = 2; select * from emp_tar;
3.创建全量同步的存储过程并调用
CREATE OR REPLACE PROCEDURE P_FULL IS BEGIN ----清空目标表数据 EXECUTE IMMEDIATE 'truncate table emp_tar'; ---插入数据到目标表 INSERT INTO emp_tar SELECT e.empno, e.ename, e.job, e.mgr, e.hiredate, e.sal, e.comm, e.deptno, SYSDATE etl_dt FROM emp_source e; COMMIT; ----写入异常 EXCEPTION WHEN OTHERS THEN dbms_output.put_line(SQLERRM); END; BEGIN P_FULL; END ; SELECT * FROM emp_tar;
目标表同步成功:
(二)全量同步练习
通过入参 P_JOB 工种,将非这个工种的数据,全量同步到 EMP_0318 这个表(全量用TRUNCATE 实现,使用动态SQL实现);
create table EMP_0318 as select * from emp where 1,2; create or replace procedure p_fullsync(P_JOB VARCHAR2) is begin EXECUTE IMMEDIATE 'TRUNCATE TABLE EMP_0318'; -- 清空目标表数据 -- 插入数据到目标表 insert into EMP_0318 select * from emp where job != P_JOB; commit; -- 写入异常 exception when others then rollback; end; begin p_fullsync(P_JOB => 'CLERK'); end; SELECT * FROM EMP_0318;
(三)存储过程 & 动态SQL & 全量同步之间的区别
1.存储过程:是封装一段 数据的 同步 &转换逻辑;
2.动态SQL:当SQL中存在不稳定因素,比如,表名不确定,筛选条件不确定,或者是 DDL语句(TRUNCATE / DROP)不能直接运行,这个时候需要 拼接一个变量的SQL语句字符串,然后用 EXECUTE IMMEDIATE SQL语句字符串 去动态执行。如果这个SQL是 SELECT 语句 一般后面还有 INTO 变量赋值。
3.全量同步:在插入目标表的时候,需要先清空目标表,这样才能保证目标表的数据不会重复。(否则 我们调用一次存储过程,目标表的数据就会重复一次)。
二、增量同步——MERGE INTO
(一)相关概念介绍
1.什么是增量
增量指的是那一天新增的或者发生修改的数据。
2.什么是增量同步
逻辑:用源表的数据 更新 目标表 ,如果这条数据在目标表中存在则更新,数据不存在,则插入。
实现的逻辑:首先判断 目标表中是否有源表中的数据:如果有,则用 源表的数据 更新目标表中对应的数据;如果没有,则查询源表的记录,直接插入目标表。
通常用 MERGE INTO 的方式来做增量同步数据。
3.全量同步与增量同步的区别
全量同步是同步整张表的数据,增量同步只同步增量数据(比如今天只同步昨天新增的或者修改的数据)
全量同步之前要清空目标表的数据,增量同步不用清空表,有则更新,无则插入;
4.增量同步MERGE INTO语法结构
MERGE INTO 目标表 USING (增量数据的查询结果集) --子查询 查询源表的增量结果集 ON (匹配字段) --用来判断增量结果集里的数据到底是更新,目标表里的数据还是插入到目标表中 WHEN MATCHED THEN UPDATE SET 目标表的字段 = 增量结果集字段 --UPDATE和SET之间不需要加表名 WHEN NOT MATCHED THEN INSERT(目标表字段) VALUEhttp://www.devze.comS(增量结果集字段) ; --INSERT和VALUES之间不需要加 INTO 表名
(二)增量同步练习
1.练习1
示例:假如在昨天公司里新增一个员工和 7788 这个员工的薪资发生了变化,用存储过程实现,将 EMP_SOURCE 表的数据增量同步到 mDrTqEMP_TAR
源表数据:
drop table EMP_TAR; drop table emp_source; create table emp_source as SELECT e.*,SYSDATE create_date,SYSDATE last_update_date from emp e; INSERT INTO emp_source(empno,ename,hiredate,create_date,last_update_date) VALUES(1111,'lisa',TRUNC(SYSDATE)-1,TRUNC(SYSDATE)-1,TRUNC(SYSDATE)-1); UPDATE emp_source SET sal=10000,last_update_date=TRUNC(SYSDATE)-1 WHERE empno=7788; commit; SELECT * FROM emp_source;
create table EMP_TAR as SELECT e.*,SYSDATE etl_dt from emp e; SELECT * FROM EMP_TAR;
目标表数据:
编写存储过程进行增量同步,并进行调用:
create or replace procedure p_emp_source(p_dt date) as begin merge into EMP_TAR a using (select * from emp_source where LAST_UPDATE_DATE = p_dt) b --子查询 查询源表的增量结果集 javascript on (a.empno = b.empno) --用来判断增量结果集里的数据到底是更新,目标表里的数据还是插入到目标表中 when matched then update set a.ename = b.ename, --主键不能update a.job = b.job, a.mgr = b.mgr, a.hiredate = b.hiredate, a.sal = b.sal, a.comm = b.comm, a.deptno = b.deptno, a.etl_dt = sysdate --UPDATE和SET之间不需要加表名 when not matched then insert (a.empno, a.ename, a.job, a.mgr, a.hiredate, a.sal, a.comm, a.deptno, javascript a.etl_dt) values (b.empno, b.ename, b.job, b.mgr, b.hiredate, b.sal, b.comm, b.deptno, sysdate); --INSERT和VALUES之间不需要加 INTO 表名 commit; end; begin p_emp_source(p_dt=>TRUNC(SYSDATE)-1); end;
目标表数据:
SELECT * FROM EMP_TAR;
2.练习2
--书 book表 drop table book; create table book(bno varchar2(20),--图书编号 bname varchar2(50),--图书名称 aid int,--作者 pid int,--出版社 tid varchar2(20),--种类 buy date,--进货日期 price number(7,2),--价格 buynum int); --数量 insert into book values('J0001','计算机基础',2001,1001,'J001',date '2016-1-5',12.5,5); insert into book values('J0002','oracle从入门到精通',2002,1004,'J001',date '2016-8-8',30,10); insert into book values('Y0001','常见病例及用药',2005,1003,'Y001',date '2016-2-4',20,20); insert into book values('W0001','平凡的世界',2006,1003,'W001',date '2016-5-15',35,30); insert into book values('W0002','悲惨世界',2007,1004,'W001',date '2016-4-9',31,22); insert into book values('J0003','SQL入门',2001,1004,'J001',date '2016-2-15',32,20); insert into book values('J0004','SQL基础课程',2002,1001,'J001',date '2016-6-6',28,10); COMMIT; SELECT * FROM book; --书(主表)
DROP TABLE BOOK_SOURCE; CREATE TABLE BOOK_SOURCE AS SELECT T.* ---假如昨天写入的这些数据 ,SYSDATE -1 AS CREATE_DATE ,SYSDATE -1 AS LAST_UPDATE_DATE FROM book T; SELECT * FROM BOOK_SOURCE;
---目标表 DROP TABLE BOOK_TARGET; create table BOOK_TARGET(bno varchar2(20) ,--图书编号 bname varchar2(50),--图书名称 aid int,--作者 pid int,--出版社 tid varchar2(20),--种类 buy date,--进货日期 price number(7,2),--价格 buynum int ,ETL_DATE DATE); SELECT * FROM BOOK_TARGET;
-- 创建存储过程,将源表数据同步到目标表中 create or replace procedure p_BOOK_SOURCE_TARGET(P_DT DATE) is v_rowcount number; begin merge into BOOK_TARGET a using (select * from BOOK_SOURCE where trunc(LAST_UPDATE_DATE) = P_DT) b on (a.bno = b.bno) when matched then update set a.bname = b.bname, a.aid = b.aid, a.pid = b.pid, a.tid = b.tid, a.buy = b.buy, a.price = b.price, a.buynum = b.buynum, a.etl_date = sysdate -- 这里必须是当天的日期 when not matched then insert (a.bno, a.bname, a.aid, a.pid, a.tid, a.buy, a.price, a.buynum, a.etl_date) values (b.bno, b.bname, b.aid, b.pid, b.tid, b.buy, b.price, b.buynum, sysdate); -- 记录MERGE影响的行数 v_rowcount := SQL%ROWCOUNT; DBMS_OUTPUT.PUT_LINE('MERGE影响的行数: ' || v_rowcount); commit; EXCEPTION WHEN OTHERS THEN ROLLBACK; end;
-- 没有匹配到数据,是新增数据 begin p_BOOK_SOURCE_TARGET(P_DT => trunc(SYSDATE) - 1); -- 传入昨天的日期 end; SELECT * FROM BOOK_TARGET;
源表数据成功INSERT到目标表中:
insert into BOOK_SOURCE SELECT T.* ---假如昨天写入的这些数据 ,SYSDATE AS CREATE_DATE ,SYSDATE AS LAST_UPDATE_DATE FROM book T; COMMIT; SELECT * FROM BOOK_SOURCE;
begin p_BOOK_SOURCE_TARGET(P_DT => trunc(SYSDATE)); -- 传入今天的日期 end; -- 这次匹配到了,是更新操作 SELECT * FROM BOOK_TARGET;
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论