ETL之增量抽取方式,同步两张表的数据小技巧

有些场景下,需要隔离不同的DB,彼此DB之间不能互相访问,但实际的业务场景又需要从A
DB访问B DB的情形,这时怎么办?我认为有如下常规的三种方案:

1、触发器方式
触发器方式是普遍采取的一种增量抽取机制。该方式是根据抽取要求,在要被抽取的源表上建立插入、修改、删除3个触发器,每当源表中的数据发生变化,就被相应的触发器将变化的数据写入一个增量日志表,ETL的增量抽取则是从增量日志表中而不是直接在源表中抽取数据,同时增量日志表中抽取过的数据要及时被标记或删除。为了简单起见,增量日志表一般不存储增量数据的所有字段信息,而只是存储源表名称、更新的关键字值和更新操作类型(KNSEN、UPDATE或DELETE),ETL增量抽取进程首先根据源表名称和更新的关键字值,从源表中提取对应的完整记录,再根据更新操作类型,对目标表进行相应的处理。

1.双方提供RESET
API,需要访问不同DB数据时,可以通过API来获取指定数据;

例如,对于源表为ORACLE类型的数据库,采用触发器方式进行增量数据捕获的过程如下:

这种方案优点是隔离性、定制性强,统一出入口,只能通过指定的API访问指定的数据;缺点与优点是对立的,也就是定制性太强,导致每次业务发生变更,需要访问不同数据的时候,需要双方更改API的入参或返参,降低了开发效率;而且无法使用表JOIN,这样在某些情况下也会导致查询数据效率变低。目前主流的方案都是建议使用API方案

这样,对表T的所有DML操作就记录在增量日志表DML_LOG中,注意增量日志表中并没有完全记录增量数据本身,只是记录了增量数据的来源。进行增量ETL时,只需要根据增量日志表中的记录情况,反查源表得到真正的增量数据。
SQL代码
(1)创建增量日志表DML_LOG:
CREATE TABLE DML_LOG(
ID NUMBER PRIMARY KEY, //自增主键
TABLE NAME VARCHAR2(200). //源表名称
RECORD ID NUMBER, //源表增量记录的主键值
DML TYPE CH根(1)。∥增量类型,I表示新增:U表示更新;D表示删除
EXECUTE DATE DATE //发生时间
);

2.利用DB的同步技术(如:SQL
SERVER的订阅复制、MYSQL的主从复制脚本等)来实现不同DB的数据同步共享

(2)为DML_LOG创建一个序列SEQ_DML_LOG上,以便触发器写增量日志表时生成ID值。
(3)针对要监听的每一张表,创建一个触发器,例如对表TEST创建触发器如下:
CREATE OR REPLACE TRIGGER T BEFORE INSERT OR UPDATE
OR DELETE ON T FOR EACH ROW
DECLARE 1 DML TYPE VARCHAR2(1);
BEGIN
IF INSERTING THEN L_DML TYPE:= I’;
ELSIF UPDATING THEN I_DML_TYPE:=。TY;
ELSIF DELETING THEN L_DML_TYPE:= D’;
ENDIF;

这种方案优点是可以在同一个DB访问到另一个DB中所需表的数据,可以直接JOIN,把原来的跨DB访问变成了同一个DB的事情;缺点是依赖DB的同步技术,而且两台DB服务器的网络必需互通,没有完全的隔离,且往往同步过来的表不允许直接修改,或需修改仍然需要跨DB修改或使用方案1的API来进行修改。

IF DELETING THEN
INSERT INTO DML_LOG(ID,TABLE_NAME,RECORD—
ID,EXECUTE_DATE,DMLJYPE)
VALUES(SEQ_DML_LOG.NEXTVAL,’TEST ,:OLD.ID,SYSDATE,
L_DML_TYPE);
ELSE
INSERT INTO DML_LOG(ID,TABLE_NAME,RECORD_
ID,EXECUTE_DATE,DMLJYPE)
VALUES(SEQ_DML_LOG.NEXTVAL,。TEST ,:NEW.ID,SYSDATE,L
TIROL_TYPE);
ENDIF;
END;

3.通过程序代码实现两个DB的数据同步(增、删、改、查),如:可以定时轮询源DB的A表,然后获取变更的记录(一般是:增、删、改的记录),再通过程序代码把源DB的A表的变更记录批量更新(若是新增、则是插入,若是修改,则是更新,若是删除,则是删除)到目的DB的A表中。

2、时间戳方式

这种方案的优点是:可以根据实际情况灵活定制同步的表数据,不局限于某一张表或某一个DB,可以保证不同DB间同步表的数据一致性,让本来跨DB操作表变成了同一个DB的事情,而且可以增、删、改、查,功能不受限;缺点是灵活性太强,程序代码实现可靠的跨DB的实时同步逻辑的实现复杂度较高,对于开发人员的要求较高,如果写的同步逻辑无法保证实时、可靠、高可用,那对于业务来讲是灾难性的。

时间戳方式是指增量抽取时,抽取进程通过比较系统时间与抽取源表的时间戳字段的值来决定抽取哪些数据。这种方式需要在源表上增加一个时间戳字段,系统中更新修改表数据的时候,同时修改时间戳字段的值。有的数据库(例如SQL
SERVER)的时间戳支持自动更新,即表的其它字段的数据发生改变时,时间戳字段的值会被自动更新为记录改变的时刻。在这种情下,进行ETL实施时就只需要在源表加上时间戳字段就可以了。对于不支持时间戳自动更新的数据库,这就要求业务系统在更新业务数据时,通过编程的方式手工更新时间戳字段。使用时间戳方式可以正常捕获源表的插入和更新操作,但对于删除操作则无能为力,需要结合其它机制才能完成。

上述三种方案,第1、2方案基本都是定制化的常规方案,我(梦在旅途,)今天要分享的是第3种方案:跨DB增量(增、改)同步两张表的数据,注意是增量同步,其中删除这个我没有说明,原因是如果DB表中记录是物理删除(即:真实的DELETE),那就无法简单的通过程序代码获取到删除的记录,除非在DB中加入DELETE触发器记录删除记录的主键到临时表或开启更改追踪(CHANGE_TRACKING)或DB日志分析,故本文讲的是不给表、DB增加额外负担的情况实时增量同步,至于删的同步这个我认为最好是逻辑标记删除(过期最后清理【真实删除】),而不要物理删除。

更新时间戳:

关于程序代码实现跨DB同步表数据方案,之前已有总结过,详见:https://www.cnblogs.com/zuowj/p/6264711.html 
—》4.利用BCP(sqlbulkcopy)来实现两个不同数据库之间进行数据差异传输(即:数据同步)

3、全表删除插入方式

 之前的文章同步主要是基于TranFlag标记字段
或触发器来实现同步,这种方式必需对表数据的增、删、改逻辑都有要求与规范,也就是增、改必需更改TranFlag=0,删必需记录表删除临进表中,这样才能实现同步逻辑,而今天是在这个同步基础上(BCP),不给表、DB增加额外负担的情况实时增量同步,对数据源的插入、改动没有要求。

全表删除插入方式是指每次抽取前先删除目标表数据,抽取时全新加载数据。该方式实际上将增量抽取等同于全量抽取。对于数据量不大,全量抽取的时间代价小于执行增量抽取的算法和条件代价时,可以采用该方式。

代码如下:(以下同步适用于SQL SERVER 不同DB的表增量同步)

4、全表比对方式

            try
            {
                SqlConnection obConnSrc = new SqlConnection(connLMSStr);
                SqlConnection obConnDest = new SqlConnection(mconnCCSStr);

                string lastTamp = ClsDatabase.gGetFieldValue(obConnSrc, "update TS_SyncUptime set UPTime=GETDATE() OUTPUT (deleted.LastUPstamp) as oldtamp FROM TS_CCSUptime WHERE TableName=N'tableNameA'", "oldtamp");


                string selectSql = @"SELECT id,aaa,bbb,ccc,ddd,eee,fff  
                                  FROM tableNameA WHERE 其它同步过滤查询条件 AND CONVERT(bigint,sys_tamp)>{0}";

                selectSql = string.Format(selectSql, lastTamp);

                master.TransferBulkCopy(selectSql, obConnSrc,
                                "tableNameA", obConnDest,
                                 (stable) =>
                                 {
                                     var colMaps = new Dictionary<string, string>();
                                     foreach (DataColumn col in stable.Columns)
                                     {
                                         colMaps.Add(col.ColumnName, col.ColumnName);
                                     }
                                     return colMaps;
                                 },
                                 (tempTableName, stable, destConn, srcConn) =>
                                 {
                                     StringBuilder saveSqlBuilder = new StringBuilder("begin tran" + Environment.NewLine);

                                     string IUSql = master.BuildInsertOrUpdateToDestTableSql("tableNameA", tempTableName, new[] { "id" }, stable.ExtendedProperties[master.MapDestColNames_String], 2);
                                     saveSqlBuilder.Append(IUSql);

                                     saveSqlBuilder.AppendLine("commit");

                                     ClsDatabase.gExecCommand(destConn, saveSqlBuilder.ToString());


                                     ClsDatabase.gExecCommand(srcConn, "update TS_SyncUptime set UPTime=GETDATE(),LastUPstamp=CONVERT(bigint,sys_tamp) FROM TS_SyncUptime WHERE TableName=N'tableNameA'");

                                     return false;
                                 });


            }
            catch (Exception ex)
            {
                writeLog(ex);//记错误日志
            }

全表比对即在增量抽取时,ETL进程逐条比较源表和目标表的记录,将新增和修改的记录读取出来。优化之后的全部比对方式是采用MD5校验码,需要事先为要抽取的表建立一个结构类似的MD5临时表,该临时表记录源表的主键值以及根据源表所有字段的数据计算出来的(BI)

 上述同步代码逻辑很简单,可以参照之前的文章,这里主要是说明几个重要点:

MD5校验码,每次进行数据抽取时,对源表和MD5临时表进行MD5校验码的比对,如有不同,进行UPDATE操作:如目标表没有存在该主键值,表示该记录还没有,则进行INSERT操作。

发表评论

电子邮件地址不会被公开。 必填项已用*标注