现在的位置: 首页 > 综合 > 正文

Advanced Queue Remote

2018年04月17日 ⁄ 综合 ⁄ 共 4054字 ⁄ 字号 评论关闭
Diagram

 

queue_between_database

From the diagram, we can see two following point:

  1. queue1 and queue2 should enable both enqueue and dequeue operation.
  2. when dequeue from queue2@db2, consumer name should be same as subscriber in queue@db1
Implement scirpt

DB1

conn / as sysdba@db1
create role my_aq_adm_role ;
grant connect,resource, AQ_ADMINISTRATOR_ROLE to my_aq_adm_role;

create role my_aq_user_role;
grant connect, resource,aq_user_role to my_aq_user_role;
exec dbms_aqadm.grant_system_privilege(privilege=>'ENQUEUE_ANY',grantee=>'my_aq_user_role',admin_option=>false);
exec dbms_aqadm.grant_system_privilege(privilege=>'DEQUEUE_ANY',grantee=>'my_aq_user_role',admin_option=>false);

Scenario 1:
1. create user aqadm and grant queue admin privilege. create payload
conn / as sysdba@db1
create user aqadm identified by aqadm;
grant my_aq_adm_role to aqadm;

conn aqadm/aqadm@db1
create type my_payload is object(msg varchar2(100));
/
grant execute on my_payload to my_aq_user_role;

2. create user aquser and grant queue user privilege
conn / as sysdba@db1
create user aquser identified by aquser;
grant my_aq_user_role to aquser;

3. use aqadm create queue table and queue(msg_queue, payload=aqadm.my_payload)
conn aqadm/aqadm@db1
exec dbms_aqadm.create_queue_table(queue_table => 'MSG_QUEUE_TAB',queue_payload_type => 'aqadm.my_payload',multiple_consumers => true);

exec dbms_aqadm.create_queue(queue_name => 'MSG_QUEUE',queue_table => 'MSG_QUEUE_TAB');

exec dbms_aqadm.start_queue(queue_name => 'MSG_QUEUE',enqueue => TRUE,dequeue => TRUE);

exec dbms_aqadm.add_subscriber(queue_name => 'MSG_QUEUE',subscriber => SYS.AQ$_AGENT('OMS','aqadm.FF_msg_queue@FF_OMS_Q.WALMART.COM',NULL));

EXEC dbms_aqadm.schedule_propagation(queue_name => 'MSG_QUEUE',destination => 'FF_OMS_Q.WALMART.COM');

4. use aquser to enqueue message to aqadm.FF_msg_queue@FF_OMS_Q.WALMART.COM

conn aquser/aquser@db1
DECLARE
    queue_options       DBMS_AQ.ENQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    message_id          RAW(16);
    my_message          aqadm.my_payload :=aqadm.my_payload('From aquser@oms');
BEGIN
    DBMS_AQ.ENQUEUE(
        queue_name => 'aqadm.msg_queue',
        enqueue_options => queue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id);
    COMMIT;
END;
/

DB2

conn / as sysdba@db2
create user aqadm identified by aqadm;
grant my_aq_adm_role to aqadm;
alter user aqadm quota unlimited on system;

conn aqadm/aqadm@db2
create type my_payload is object(msg varchar2(100));
/
grant execute on my_payload to my_aq_user_role;

2. create user aquser and grant queue user privilege
conn / as sysdba@db2
create user aquser identified by aquser;
grant my_aq_user_role to aquser;

3. use aqadm create queue table and queue(msg_queue, payload=aqadm.my_payload)
conn aqadm/aqadm@db2
exec dbms_aqadm.create_queue_table(queue_table => 'FF_MSG_QUEUE_TAB',queue_payload_type => 'aqadm.my_payload',multiple_consumers => true);

exec dbms_aqadm.create_queue(queue_name => 'FF_MSG_QUEUE',queue_table => 'FF_MSG_QUEUE_TAB');

exec dbms_aqadm.start_queue(queue_name => 'FF_MSG_QUEUE',enqueue => true,dequeue => TRUE);

--exec dbms_aqadm.add_subscriber(queue_name => 'FF_MSG_QUEUE',subscriber => SYS.AQ$_AGENT('FF',null,NULL));
exec dbms_aqadm.add_subscriber(queue_name => 'FF_MSG_QUEUE',subscriber => SYS.AQ$_AGENT('OMS',null,NULL));

conn aquser/aquser@db2
SET SERVEROUTPUT ON;
DECLARE
    queue_options       DBMS_AQ.DEQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    message_id          RAW(2000);
    my_message          aqadm.my_payload;
BEGIN
    queue_options.consumer_name :='OMS';
    queue_options.navigation := dbms_aq.first_message;
    DBMS_AQ.DEQUEUE(
        queue_name => 'aqadm.ff_msg_queue',
        dequeue_options => queue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id );
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('This message is '||my_message.msg);
END;
/

Troubshooting

when there is error during the process of enqueuing and dequeuing, there are some views and table you may need to refer to.

  • user_queue_schedule: informaton about the propagation happened right now. 
  • queue_table(msg_queue_table,ff_msg_queue_tab@ff_oms_q.walmart.com): when enqueue message from msg_queue, dblink dequeue the message from source queue(msg_queue) immediately and equeue to the destination queue. Then at the destination, the real customer can dequeue the message.

抱歉!评论已关闭.