CDC的全称是Change Data Capture,主要用在数据仓库中,对原数据库的数据进行抽取、传输到数据仓库中,用于进行分析和统计。CDC有同步模式和异步模式:
1. CDC同步模式:
通过trigger来实现。
2. CDC异步模式:
2.1 异步HotLog模式
2.2 异步分布式HotLog模式
2.3 异步AutoLog模式。
2.3.1 异步Autolog模式-online redo log
2.3.2 异步Autolog模式-archive log
今天我们来配置的是异步AutoLog模式-online redo log(澳洲电讯的某应用的数据库就是用的该架构),该模式的好处是日志传输到备机(stage site)后,后续的publish,subscribe,capture操作都是在备机(stage site),不是在生产数据库(source site),因此能大大减轻生产数据库的压力。
上图为online redo的传输,另外还有archive log的传输:
与online redo传输不同的是:前者RFS进程传输的是redo,到standby redo log,downstream capture进程处理的是standby redo log; 后者RFS进程传输的是archive log,downstream capture进程处理的是archive log。
注意:Autolog CDC需要与downstream配合使用。
CDC的capture有两种:local capture和downstream capture。
可以通过
SQL> SELECT CAPTURE_NAME,status, CAPTURE_TYPE from dba_capture;
CAPTURE_NAME STATUS CAPTURE_TY
------------------------------ -------- ----------
CDC$C_CS_USM ENABLED LOCAL
STRM_CAPTURE ENABLED DOWNSTREAM
SQL>
来查看。
capture进程负责创建LCRs(logical change record),CDC能“看到”DDL LCRs,但是不做处理,CDC只处理DML的LCRs。
下面我们就开始来配置CDC。
CDC的生产数据库,我们成为Source site,目标机器,用于做数据仓库的机器,我们称作Stage site。
做CDC的要求2边的数据库版本都是10g以上,且stage site的数据库版本必须等于或者大于source site的数据库版本。
1. 配置source site:
1.1 配置初始化参数,以sysdba登录:
SQL> alter system set global_names=TRUE scope=BOTH;
System altered.
SQL> alter system set streams_pool_size=200M scope=BOTH;
System altered.
SQL> alter system set undo_retention=3600 scope=BOTH;
System altered.
SQL> alter system set log_archive_dest_1 ="location=/oracle/app/oracle/arch/cdc01 mandatory reopen=2";
System altered.
SQL>
SQL>
SQL> alter system set log_archive_dest_2 = "service=cdc02 arch optional noregister reopen=2 template=/oracle/app/oracle/arch/stdbylogs/cdc_stby_%R_%T_%S.arc";
System altered.
SQL>
1.2 重启数据库,检查
SQL> shutdown immediate;
Database closed.
Database dismounted.
ORACLE instance shut down.
SQL>
SQL> startup mount;
ORACLE instance started.
Total System Global Area 448790528 bytes
Fixed Size 1261332 bytes
Variable Size 339738860 bytes
Database Buffers 100663296 bytes
Redo Buffers 7127040 bytes
Database mounted.
SQL>
SQL> alter database archivelog;
Database altered.
SQL> alter database force logging;
Database altered.
SQL> alter database add supplemental log data;
Database altered.
SQL> SQL>
SQL> alter database open;
Database altered.
SQL> archive log list;
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /oracle/app/oracle/arch/cdc01
Oldest online log sequence 1
Next log sequence to archive 3
Current log sequence 3
SQL>
SQL> alter system switch logfile;
System altered.
SQL> archive log list;
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /oracle/app/oracle/arch/cdc01
Oldest online log sequence 2
Next log sequence to archive 3
Current log sequence 4
SQL>
SQL>
SQL>
SQL>
SQL> col log_min format a7
SQL> col log_pk format a6
SQL> col log_pk format a6
SQL> col log_ui format a6
SQL> col log_fk format a6
SQL> col log_all format a7
SQL> col force_log format a9
SQL>
SQL> SELECT supplemental_log_data_min LOG_MIN, supplemental_log_data_pk LOG_PK, supplemental_log_data_ui LOG_UI, supplemental_log_data_fk LOG_FK,
2 supplemental_log_data_all LOG_ALL, force_logging FORCE_LOG
3 FROM v$database;
LOG_MIN LOG_PK LOG_UI LOG_FK LOG_ALL FORCE_LOG
------- ------ ------ ------ ------- ---------
YES NO NO NO NO YES
SQL>
SQL>
SQL>
SQL> SELECT tablespace_name, force_logging
2 FROM dba_tablespaces;
TABLESPACE_NAME FORCE_
------------------------------------------------------------ ------
SYSTEM NO
UNDOTBS1 NO
SYSAUX NO
TEMP NO
USERS NO
SQL> SELECT table_name
2 FROM dba_tables
3 WHERE owner = ‘SYS‘
4 AND table_name LIKE ‘CDC%$‘;
TABLE_NAME
------------------------------------------------------------
CDC_SYSTEM$
CDC_SUBSCRIBERS$
CDC_SUBSCRIBED_TABLES$
CDC_SUBSCRIBED_COLUMNS$
CDC_PROPAGATIONS$
CDC_PROPAGATED_SETS$
CDC_CHANGE_TABLES$
CDC_CHANGE_SOURCES$
CDC_CHANGE_SETS$
CDC_CHANGE_COLUMNS$
10 rows selected.
SQL> desc cdc_system$
Name Null? Type
----------------------------------------- -------- ----------------------------
MAJOR_VERSION NOT NULL NUMBER
MINOR_VERSION NOT NULL NUMBER
SQL>
SQL> SELECT * FROM cdc_system$;
MAJOR_VERSION MINOR_VERSION
------------- -------------
1 0
SQL> SELECT * FROM global_name;
GLOBAL_NAME
--------------------------------------------------------------------------------
CDC01.REGRESS.RDBMS.DEV.US.ORACLE.COM
SQL>
SQL>
SQL> desc v$log
Name Null? Type
----------------------------------------- -------- ----------------------------
GROUP# NUMBER
THREAD# NUMBER
SEQUENCE# NUMBER
BYTES NUMBER
MEMBERS NUMBER
ARCHIVED VARCHAR2(3)
STATUS VARCHAR2(16)
FIRST_CHANGE# NUMBER
FIRST_TIME DATE
SQL>
SQL> SELECT group#, bytes
2 FROM v$log;
GROUP# BYTES
---------- ----------
1 52428800
2 52428800
3 52428800
SQL>
2. 配置stage site.
2.1 配置初始化参数,用sysdba登录:
SQL> alter system set global_names=TRUE scope=BOTH;
System altered.
SQL> alter system set streams_pool_size=200M scope=BOTH;
System altered.
SQL> alter system set undo_retention=3600 scope=BOTH;
System altered.
SQL> alter system set log_archive_dest_1 ="location=/oracle/app/oracle/arch/cdc01 mandatory reopen=2";
System altered.
SQL>
SQL> alter system set log_archive_dest_1="location=/oracle/app/oracle/arch/cdc02 mandatory reopen=2 valid_for=(online_logfile,primary_role)";
System altered.
SQL>
SQL> alter system set log_archive_dest_2="location=/oracle/app/oracle/arch/cdc02_dest2 mandatory valid_for=(standby_logfile,primary_role)";
System altered.
SQL>
SQL> alter system set log_archive_dest_state_1 = enable;
System altered.
SQL> alter system set log_archive_dest_state_2 = enable;
System altered.
SQL> alter system set log_archive_format="cdc02_%R_%T_%S.arc" scope=spfile;
System altered.
SQL>
2.2 重启数据库,检查。
SQL> shutdown immediate;
Database closed.
Database dismounted.
ORACLE instance shut down.
SQL>
SQL> startup mount;
ORACLE instance started.
Total System Global Area 448790528 bytes
Fixed Size 1261332 bytes
Variable Size 318767340 bytes
Database Buffers 121634816 bytes
Redo Buffers 7127040 bytes
Database mounted.
SQL>
SQL> alter database archivelog;
Database altered.
SQL> alter database open;
Database altered.
SQL> archive log list;
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /oracle/app/oracle/arch/cdc02_dest2
Oldest online log sequence 1
Next log sequence to archive 2
Current log sequence 2
SQL> alter system switch logfile;
System altered.
SQL> archive log list;
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /oracle/app/oracle/arch/cdc02_dest2
Oldest online log sequence 1
Next log sequence to archive 2
Current log sequence 3
SQL>
SQL> SELECT table_name
2 FROM dba_tables
3 WHERE owner = ‘SYS‘
4 AND table_name LIKE ‘CDC%$‘;
TABLE_NAME
------------------------------------------------------------
CDC_SYSTEM$
CDC_SUBSCRIBERS$
CDC_SUBSCRIBED_TABLES$
CDC_SUBSCRIBED_COLUMNS$
CDC_PROPAGATIONS$
CDC_PROPAGATED_SETS$
CDC_CHANGE_TABLES$
CDC_CHANGE_SOURCES$
CDC_CHANGE_SETS$
CDC_CHANGE_COLUMNS$
10 rows selected.
SQL> desc cdc_system$
Name Null? Type
----------------------------------------------------- -------- ------------------------------------
MAJOR_VERSION NOT NULL NUMBER
MINOR_VERSION NOT NULL NUMBER
SQL> SELECT * FROM cdc_system$;
MAJOR_VERSION MINOR_VERSION
------------- -------------
1 0
SQL>
3. 创建stream adminstrator(在stage site执行:),从上面文章开始处的第一个图,我们看出,当log(无论是online还是archive)到stage site之后,由downstream capture进程进行挖掘。因此我们要在stage site先创建stream administrator。
downstream capture用来产生stage site的change source(注意,除了异步HotLog模式,其他模式的change source都在stage site),stage change set,和stage change table。
注:在本例中,change source只存在stage site; change set在source site和stage site都有;change table在stage site,对应在source site叫source table。
SQL> CREATE TABLESPACE cdc_tbsp
2 datafile ‘/oracle/app/oracle/oradata/cdc02/cdctbsp01.dbf‘ SIZE 50M
3 AUTOEXTEND OFF
4 BLOCKSIZE 8192
5 EXTENT MANAGEMENT LOCAL UNIFORM SIZE 64K;
Tablespace created.
SQL>
SQL>
SQL>
SQL>
SQL>
SQL> CREATE USER cdcadmin
2 IDENTIFIED by cdcadmin
3 DEFAULT TABLESPACE cdc_tbsp
4 TEMPORARY TABLESPACE temp
5 QUOTA UNLIMITED ON cdc_tbsp;
User created.
SQL>
SQL>
SQL> GRANT CREATE SESSION TO cdcadmin;
Grant succeeded.
SQL> GRANT CREATE SEQUENCE TO cdcadmin;
Grant succeeded.
SQL> GRANT CREATE TABLE TO cdcadmin;
Grant succeeded.
SQL>
SQL>
SQL> GRANT SELECT_CATALOG_ROLE TO cdcadmin;
Grant succeeded.
SQL> GRANT EXECUTE_CATALOG_ROLE TO cdcadmin;
Grant succeeded.
SQL> GRANT execute ON dbms_cdc_publish TO cdcadmin;
Grant succeeded.
SQL> exec dbms_streams_auth.grant_admin_privilege(‘CDCADMIN‘);
PL/SQL procedure successfully completed.
SQL>
SQL> grant dba to cdcadmin;
Grant succeeded.
SQL
4. 准备用来做cdc复制的表(,即source table,操作是在source site)。
我们的source database的instance name叫cdc01,里面有一个用户叫app_user,我们选取了该用户下的cdc_demo3表,作为用来测试的source table,同时,为了演示方便,我们只取一个表作为source table,因此source change set中,就只有一个表。所以,该表也是source change set。(上面说了,change set在source site和stage site都存在,为了区别,我把在source site的change set叫source change set,把在stage site的change set 叫stage change set。)
SQL> conn app_user/app_user
Connected.
SQL> SELECT table_name, reason
2 FROM all_streams_unsupported
3 WHERE owner = ‘APP_USER‘
4 ORDER BY 1;
no rows selected
SQL>
SQL> select OBJECT_NAME,OBJECT_TYPE,OBJECT_ID from t1
2 /
OBJECT_NAME OBJECT_TYPE OBJECT_ID
-------------------- -------------------------------------- ----------
T1 TABLE 51414
SQL> CREATE TABLE cdc_demo3 AS
2 SELECT * FROM T1;
Table created.
SQL>
SQL> ALTER TABLE cdc_demo3
2 ADD CONSTRAINT pk_cdc_demo3
3 PRIMARY KEY (OBJECT_ID)
4 USING INDEX;
Table altered.
SQL>
SQL> ALTER TABLE app_user.cdc_demo3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Table altered.
SQL>
SQL> SELECT * FROM user_log_groups;
OWNER LOG_GROUP_NAME TABLE_NAME LOG_GROUP_TYPE ALWAYS GENERATED
---------- -------------------- --------------- ------------------------------ ---------------------- ----------------------------
APP_USER SYS_C005144 CDC_DEMO3 ALL COLUMN LOGGING ALWAYS GENERATED NAME
SQL>
5. 创建standby redo。
5.1 在stage site建立路径,注意和在source site的dest2一致。
[[email protected] ~]$ mkdir -p /oracle/app/oracle/arch/stdbylogs
[[email protected] ~]$
5.2 在stage site建立standby log:
SQL> ALTER DATABASE ADD STANDBY LOGFILE GROUP 4
2 (‘/oracle/app/oracle/arch/stdbylogs/slog04.log‘) SIZE 50M;
Database altered.
SQL> ALTER DATABASE ADD STANDBY LOGFILE GROUP 5
2 (‘/oracle/app/oracle/arch/stdbylogs/slog05.log‘) SIZE 50M;
Database altered.
SQL> ALTER DATABASE ADD STANDBY LOGFILE GROUP 6
2 (‘/oracle/app/oracle/arch/stdbylogs/slog06.log‘) SIZE 50M;
Database altered.
SQL> ALTER DATABASE ADD STANDBY LOGFILE GROUP 7
2 (‘/oracle/app/oracle/arch/stdbylogs/slog07.log‘) SIZE 50M;
Database altered.
SQL> --注意group数要比source site的log的group数多1.原来有3组,所以要建4组standby redo log.
SQL>
SQL> SELECT group#, bytes, status
2 FROM v$standby_log;
GROUP# BYTES STATUS
---------- ---------- --------------------
4 52428800 UNASSIGNED
5 52428800 UNASSIGNED
6 52428800 UNASSIGNED
7 52428800 UNASSIGNED
SQL>
6. 实例化source site的数据字典。
取source site的scn,是为了创建logmnr数据字典的时候,需要知道scn。
SQL> DECLARE
2 f_scn NUMBER;
3 BEGIN
4 dbms_capture_adm.build ( f_scn );
5 DBMS_OUTPUT.PUT_LINE(‘The first_scn value is: ‘|| f_scn);
6 END;
7 /
The first_scn value is: 524561
PL/SQL procedure successfully completed.
SQL>
我们记下该scn号:524561
7. 准备soruce table的实例化。
注意在source site的每一个source table都必须准备实例化,如果不准备实例化,在stage site的capture会出现问题,无法capture source table的变化。
SQL> conn / as sysdba
Connected.
SQL> desc dba_capture_prepared_tables
Name Null? Type
----------------------------------------- -------- ----------------------------
TABLE_OWNER NOT NULL VARCHAR2(30)
TABLE_NAME NOT NULL VARCHAR2(30)
SCN NOT NULL NUMBER
TIMESTAMP DATE
SUPPLEMENTAL_LOG_DATA_PK VARCHAR2(8)
SUPPLEMENTAL_LOG_DATA_UI VARCHAR2(8)
SUPPLEMENTAL_LOG_DATA_FK VARCHAR2(8)
SUPPLEMENTAL_LOG_DATA_ALL VARCHAR2(8)
SQL>
SQL>
SQL> SELECT table_name, scn, supplemental_log_data_pk PK, supplemental_log_data_ui UI,
2 supplemental_log_data_fk FK, supplemental_log_data_all "ALL"
3 FROM dba_capture_prepared_tables;
no rows selected
SQL>
SQL> exec dbms_capture_adm.prepare_table_instantiation(‘app_user.t1‘);
PL/SQL procedure successfully completed.
SQL>
SQL>
SQL>
SQL> SELECT table_name, scn, supplemental_log_data_pk PK, supplemental_log_data_ui UI,
2 supplemental_log_data_fk FK, supplemental_log_data_all "ALL"
3* FROM dba_capture_prepared_tables
SQL> /
TABLE_NAME SCN PK UI FK ALL
---------- ---------- ---------------- ---------------- ---------------- ----------------
T1 525556 IMPLICIT IMPLICIT IMPLICIT NO
SQL>
8. 确定各个source database和创建change source。
在我们的例子中,source database就只有一个cdc01,change source,再说一次,是出现在stage中的。我们把change source取名叫做CS_DEMO3。
在stage site:
SQL> exec dbms_cdc_publish.create_autolog_change_source(‘CS_DEMO3‘, ‘AutoLog Demo‘, ‘CDC01‘, 524561, ‘Y‘);
PL/SQL procedure successfully completed.
SQL> --注意,如果是异步Autolog模式-archive log,最后的一个参数可以用‘N‘。
9. 创建stage change set。 我们这里把stage change set取名叫做OMEGA_CSET。
注意当CDC创建了change set的时候,stream的capture和apply进程也被同时创建了,但是仅仅是创建,还未启动。
在stage site:
SQL> exec dbms_cdc_publish.create_change_set(‘OMEGA_CSET‘, ‘change set info‘, ‘CS_DEMO3‘, ‘Y‘);
PL/SQL procedure successfully completed.
SQL>
10. 确定用于查询的用户,即subscriber,我们假定该用户叫为app_dev不存在,我们新建一个。
注:该用户在change table创建完成之后,需要授权该用户能读取change table。
SQL> create user app_dev identified by app_dev default tablespace USERS;
User created.
SQL> grant connect,resource to app_dev;
Grant succeeded.
SQL>
11. 在stage site上,创建change table。
change table是publisher做的最后一个事情,从目标到最后的过程为:source table->source change set->change source->stage change set->change table.当然,这期间可以过滤某些字段,或者某些表。
我们开始创建change table,在stage site:
SQL> conn cdcadmin/cdcadmin
Connected.
SQL> SELECT object_name, object_type
2 FROM user_objects
3 ORDER BY 2,1;
no rows selected
SQL>
SQL> exec dbms_cdc_publish.create_change_table(‘CDCADMIN‘, ‘T1_CTAB‘, ‘OMEGA_CSET‘, ‘APP_USER‘, ‘CDC_DEMO3‘, ‘OBJECT_NAME VARCHAR2(128), SUBOBJECT_NAME VARCHAR2(30), OBJECT_ID NUMBER, DATA_OBJECT_ID NUMBER, OBJECT_TYPE VARCHAR2(19), CREATED DATE, LAST_DDL_TIME DATE, TIMESTAMP VARCHAR2(19), STATUS VARCHAR2(7), TEMPORARY VARCHAR2(1), GENERATED VARCHAR2(1), SECONDARY VARCHAR2(1)‘, ‘BOTH‘, ‘Y‘, ‘N‘, ‘N‘, ‘N‘, ‘N‘, ‘N‘, ‘Y‘, ‘TABLESPACE CDC_TBSP‘);
PL/SQL procedure successfully completed.
SQL> SELECT object_name, object_type
2 FROM user_objects
3 ORDER BY 2,1;
OBJECT_NAME OBJECT_TYPE
-------------------- --------------------------------------
T1_CTAB TABLE
T1_CTAB TABLE PARTITION
SQL>
SQL> SELECT table_name, composite, partition_name, high_value
2* FROM user_tab_partitions
SQL> /
TABLE_NAME COMPOS PARTITION_NAME HIGH_VALUE
-------------------- ------ ------------------------------------------------------------ ---------------
T1_CTAB NO P1 281474976710656
SQL>
SQL> conn cdcadmin/cdcadmin
Connected.
SQL> GRANT select ON t1_ctab TO app_dev;
Grant succeeded.
SQL>
SQL> conn / as sysdba
Connected.
SQL>
SQL> desc cdc_change_tables$
Name Null? Type
----------------------------------------------------- -------- ------------------------------------
OBJ# NOT NULL NUMBER
CHANGE_SET_NAME NOT NULL VARCHAR2(30)
SOURCE_SCHEMA_NAME NOT NULL VARCHAR2(30)
SOURCE_TABLE_NAME NOT NULL VARCHAR2(30)
CHANGE_TABLE_SCHEMA NOT NULL VARCHAR2(30)
CHANGE_TABLE_NAME NOT NULL VARCHAR2(30)
CREATED NOT NULL DATE
CREATED_SCN NUMBER
MVL_FLAG NUMBER
CAPTURED_VALUES NOT NULL VARCHAR2(1)
MVL_TEMP_LOG VARCHAR2(30)
MVL_V7TRIGGER VARCHAR2(30)
LAST_ALTERED DATE
LOWEST_SCN NOT NULL NUMBER
MVL_OLDEST_RID NUMBER
MVL_OLDEST_PK NUMBER
MVL_OLDEST_SEQ NUMBER
MVL_OLDEST_OID NUMBER
MVL_OLDEST_NEW NUMBER
MVL_OLDEST_RID_TIME DATE
MVL_OLDEST_PK_TIME DATE
MVL_OLDEST_SEQ_TIME DATE
MVL_OLDEST_OID_TIME DATE
MVL_OLDEST_NEW_TIME DATE
MVL_BACKCOMPAT_VIEW VARCHAR2(30)
MVL_PHYSMVL VARCHAR2(30)
HIGHEST_SCN NUMBER
HIGHEST_TIMESTAMP DATE
CHANGE_TABLE_TYPE NOT NULL NUMBER
MAJOR_VERSION NOT NULL NUMBER
MINOR_VERSION NOT NULL NUMBER
SOURCE_TABLE_OBJ# NUMBER
SOURCE_TABLE_VER NUMBER
SQL>
SQL> l
1 SELECT change_set_name, source_schema_name, source_table_name
2* FROM cdc_change_tables$
SQL> /
CHANGE_SET_NAME SOURCE_SCHEMA_NAME SOURCE_TABLE_NAME
-------------------- -------------------- ------------------------------------------------------------
OMEGA_CSET APP_USER CDC_DEMO3
SQL>
12. enable change set:
注意当change set is enabled,Streams capture进程和apply进程将启动。
SQL> conn / as sysdba
Connected.
SQL> SELECT set_name, change_source_name, capture_enabled
2 FROM cdc_change_sets$;
SET_NAME CHANGE_SOURCE_NAME CA
------------------------------------------------------------ ------------------------------------------------------------ --
SYNC_SET SYNC_SOURCE Y
OMEGA_CSET CS_DEMO3 N
SQL>
SQL> conn cdcadmin/cdcadmin
Connected.
SQL>
SQL> exec dbms_cdc_publish.alter_change_set(‘OMEGA_CSET‘, enable_capture => ‘Y‘);
PL/SQL procedure successfully completed.
注:如果在后续的维护中,如果enable change set时遭遇报错:
ERROR at line 1:
ORA-31514: change set
<
change_set_name
>
disabled due to capture error
ORA-06512: at "SYS.DBMS_CDC_PUBLISH", line 589
ORA-06512: at "SYS.DBMS_CDC_PUBLISH", line 780
ORA-06512: at line 1
可以使用recover_after_error => ‘Y‘参数。变成
exec dbms_cdc_publish.alter_change_set(change_set_name =>‘OMEGA_CSET‘, enable_capture => ‘Y‘, recover_after_error => ‘Y‘);
OK,我们继续:
SQL>
SQL> conn / as sysdba
Connected.
SQL> SELECT set_name, change_source_name, capture_enabled
2 FROM cdc_change_sets$;
SET_NAME CHANGE_SOURCE_NAME CA
------------------------------------------------------------ ------------------------------------------------------------ --
SYNC_SET SYNC_SOURCE Y
OMEGA_CSET CS_DEMO3 Y
SQL>
13. 在source site做switch logfile.
SQL> conn / as sysdba
Connected.
SQL> alter system switch logfile;
System altered.
SQL>
好了,上述publisher的配置就完成了。但是我们还要继续配置subscriber。
14. 检查source table对subscriber是否有access权限:
只需在stage site检查all_source_name即可,不需去source site:
SQL> SELECT * FROM ALL_SOURCE_TABLES;
SOURCE_SCHEMA_NAME SOURCE_TABLE_NAME
-------------------- --------------------
APP_USER CDC_DEMO3
SQL>
15. 找到subscriber有权限访问的change set的名字和change set的每一列:
在stage site:
SQL> SELECT UNIQUE CHANGE_SET_NAME, COLUMN_NAME, PUB_ID
2 FROM ALL_PUBLISHED_COLUMNS
3 WHERE SOURCE_SCHEMA_NAME =‘APP_USER‘ AND SOURCE_TABLE_NAME = ‘CDC_DEMO3‘;
CHANGE_SET_NAME COLUMN_NAME PUB_ID
-------------------- -------------------- ----------
OMEGA_CSET CREATED 51473
OMEGA_CSET DATA_OBJECT_ID 51473
OMEGA_CSET GENERATED 51473
OMEGA_CSET LAST_DDL_TIME 51473
OMEGA_CSET OBJECT_ID 51473
OMEGA_CSET OBJECT_NAME 51473
OMEGA_CSET OBJECT_TYPE 51473
OMEGA_CSET SECONDARY 51473
OMEGA_CSET STATUS 51473
OMEGA_CSET SUBOBJECT_NAME 51473
OMEGA_CSET TEMPORARY 51473
OMEGA_CSET TIMESTAMP 51473
12 rows selected.
SQL>
16. 创建subscription。
在stage site:
SQL> BEGIN
2 DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION(
3 change_set_name => ‘OMEGA_CSET‘,
4 description => ‘Change data for cdc_demo3‘,
5 subscription_name => ‘cdc_demo3_sub‘);
6 END;
7 /
PL/SQL procedure successfully completed.
SQL>
17. 订阅source table和source table中的列,你可以订阅所有的列,可以选择几列来订阅:
SQL> BEGIN
2 DBMS_CDC_SUBSCRIBE.SUBSCRIBE(
3 subscription_name => ‘cdc_demo3_sub‘,
4 source_schema => ‘APP_USER‘,
5 source_table => ‘CDC_DEMO3‘,
6 column_list => ‘OBJECT_NAME, OBJECT_ID, STATUS‘,
7 subscriber_view => ‘CDC_DEMO3_SUB_VIEW‘);
8 END;
9 /
PL/SQL procedure successfully completed.
SQL>
注意,如果在第15步,查到的各个列的pub_id是不一样的,那么在订阅的时候,就要针对不同的列指定不同的pub id,详情可见online document上的Database Data Warehousing Guide
18. 激活订阅:
SQL> BEGIN
2 DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION(
3 subscription_name => ‘cdc_demo3_sub‘);
4 END;
5 /
PL/SQL procedure successfully completed.
SQL>
19. 获得下一个可用的change data。
SQL> BEGIN
2 DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(
3 subscription_name => ‘cdc_demo3_sub‘);
4 END;
5 /
PL/SQL procedure successfully completed.
SQL>
20. 完成,此时你就可以在CDC_DEMO3_SUB_VIEW中查数据了。
参考文档:
Metalink:Doc ID 972876.1
Oracle Streams and Change Data Capture(CDC) 10gR2
Online Document-Database Data Warehousing Guide
Oracle Change Data Capture Asynchronous Autolog Demo