Oracle Streams: Adding a Table to Streams Replication

Feel old yet?
Don’t worry, we all know that there are still companies using Streams. So, here goes a quick manual for a regular activity: Add a new table to replication using SYS.DBMS_STREAMS_ADM.

For this, assume the following:

STREAMS SETUP OWNER => streams_adm
CAPTURE PROCESS => str_capture
APPLY_PROCESS => str_apply
SOURCE DATABASE TABLE OWNER => TBLOWNER
TARGET DATABASE TABLE OWNER => TBLOWNER_DEST

The high level steps to add table to streams are as follows:

1. Stop streams CAPTURE
2. Create the table on the target database
3. Configure the CAPTURE process
4. Add a rule to the APPLY process
5. Instantiate the new object
6. Create constraints and indexes
7. Restart streams CAPTURE and APPLY

Let’s detail them:

1. Stop streams CAPTURE

exec dbms_capture_adm.stop_capture('str_capture');
exec dbms_apply_adm.stop_apply('streams_apply');

2. Create the table on the target database

– Extract DDL for the table.
– Check for primary key existence (or a unique key that can be used) and also generate the DDL.
* If a unique key does not exist, determine whether it is possible to proceed. Below an example of the table creation statement:

CREATE TABLE TBLOWNER_DEST.MYTABLE
( MYID RAW(16) NOT NULL ENABLE,
DESCRIPTION VARCHAR2(200) NOT NULL ENABLE,
INTERNALIDFK RAW(16),
OTHERINTERNALIDFK RAW(16),
ISACTIVE VARCHAR2(1),
ETL_LAST_UPDATED DATE
) TABLESPACE TBLOWNER_DESTTBS;

* Note that an extra column ETL_LAST_UPDATED is added to keep a timestamp of the data modification.

3. Configure the CAPTURE process

DECLARE
v_dml_rule_name varchar2(60);
v_ddl_rule_name varchar2(60);
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'TBLOWNER.MYTABLE',
streams_type => 'capture',
streams_name => 'str_capture',
queue_name => 'streams_adm.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => '&source_db',
dml_rule_name => v_dml_rule_name,
ddl_rule_name => v_ddl_rule_name,
inclusion_rule => true);
END;
/

4. Add a rule to the APPLY process

As transactional information is mined from archived redo, and the CAPTURE rule does its work, once the APPLY process gets a gold of the work at hand, it does a couple of things:
– Remaps the OWNER (TBLOWNER) of the table on the source database to the proper OWNER on the target (TBLOWNER_DEST)
– The call to DBMS_STREAMS_ADM.RENAME_SCHEMA provides for the schema name change on the target database.

DECLARE
v_dml_rule_name varchar2(60);
v_ddl_rule_name varchar2(60);
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'TBLOWNER.MYTABLE',
streams_type => 'apply',
streams_name => 'streams_apply',
queue_name => 'streams_adm.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => '&source_db',
dml_rule_name => v_dml_rule_name,
ddl_rule_name => v_ddl_rule_name,
inclusion_rule => true);

DBMS_STREAMS_ADM.RENAME_SCHEMA(
rule_name => v_dml_rule_name,
from_schema_name => 'TBLOWNER',
to_schema_name => 'TBLOWNER_DEST',
step_number => 0,
operation => 'ADD');
END;
/

– Executes a portion of code encapsulated in 2 DML_HANDLERS to populate ETL_LAST_UPDATED with SYSDATE on insert and update.

BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'TBLOWNER_DEST.MYTABLE',
object_type => 'TABLE',
operation_name => 'INSERT',
error_handler => false,
user_procedure => 'streams_adm.addtimestamp_dml_handler',
apply_database_link => NULL);
END;
/

BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'TBLOWNER_DEST.MYTABLE',
object_type => 'TABLE',
operation_name => 'UPDATE',
error_handler => false,
user_procedure => 'streams_adm.addtimestamp_dml_handler',
apply_database_link => NULL);
END;
/

5. Instantiate the new object

set serverout on size 1000000
DECLARE
v_scn number;
BEGIN
select DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER() into v_scn from dual@&source_db;

execute immediate 'insert into TBLOWNER_DEST.MYTABLE ( MYID, DESCRIPTION, INTERNALIDFK, OTHERINTERNALIDFK, ISACTIVE, ETL_LAST_UPDATED )' ||
' select MYID, DESCRIPTION, INTERNALIDFK, OTHERINTERNALIDFK, ISACTIVE, sysdate
from TBLOWNER.MYTABLE@&source_db as of scn ' || to_char(v_scn);

DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'TBLOWNER.MYTABLE',
source_database_name => '&source_db',
instantiation_scn => v_scn);
dbms_output.put_line(v_scn);
END;
/

6. Create constraints and indexes

alter table TBLOWNER_DEST.MYTABLE add constraint PK_MYTABLE primary key (MYID) using index;
create index IX_MYTABLE_1 on TBLOWNER_DEST.MYTABLE ( INTERNALIDFK );
create index IX_MYTABLE_2 on TBLOWNER_DEST.MYTABLE ( OTHERINTERNALIDFK );

7. Restart streams CAPTURE and APPLY

exec dbms_capture_adm.start_capture('str_capture');
exec dbms_apply_adm.start_apply('streams_apply');

8. Monitor Replication
Monitor DBA_APPLY_ERROR as well as V$STREAMS for a while after apply and capture are restarted to zero in on any apply errors.

Hope it helps!
Cheers!

One thought on “Oracle Streams: Adding a Table to Streams Replication

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.