现在的位置: 首页 > 综合 > 正文

Advanced Queue—Local

2018年04月17日 ⁄ 综合 ⁄ 共 5943字 ⁄ 字号 评论关闭

Preparition:
conn / as sysdba
create role my_aq_adm_role ;
grant connect,resource, aq_administrator to my_aq_adm_role;

create role my_aq_user_role;
grant connect, resource,aq_user_role to my_aq_user_role;
exec dbms_aqadm.grant_system_privilege(privilege=>'ENQUEUE_ANY',grantee=>'my_aq_user_role',admin_option=>false);
exec dbms_aqadm.grant_system_privilege(privilege=>'DEQUEUE_ANY',grantee=>'my_aq_user_role',admin_option=>false);

Scenario 1: Sending message Point2Point

1. create user aqadm and grant queue admin privilege. create payload
conn / as sysdba
create user aqadm identified by aqadm;
grant my_aq_adm_role to aqadm;

conn aqadm/aqadm
create type my_payload is object(msg varchar2(100));
/
grant execute on my_payload to my_aq_user_role;

2. create user aquser1,aquser2 and grant queue user privilege
conn / as sysdba
create user aquser1 identified by aquser1;
grant my_aq_user_role to aquser1;

create user aquser2 identified by aquser2;
grant my_aq_user_role to aquser2;

3. use aqadm create queue table and queue(msg_queue, payload=aqadm.my_payload)
conn aqadm/aqadm
exec dbms_aqadm.create_queue_table(queue_table => 'MSG_QUEUE_TAB',queue_payload_type => 'aqadm.my_payload',multiple_consumers => FALSE);

exec dbms_aqadm.create_queue(queue_name => 'MSG_QUEUE',queue_table => 'MSG_QUEUE_TAB');

exec dbms_aqadm.start_queue(queue_name => 'MSG_QUEUE',enqueue => TRUE,dequeue => TRUE);

4. use aquser1 to enqueue message and aquser2 to dequeue message.(point2point)

conn aquser1/aquser1
DECLARE
    queue_options       DBMS_AQ.ENQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    message_id          RAW(16);
    my_message          aqadm.my_payload :=aqadm.my_payload('From aquser1');
BEGIN
    DBMS_AQ.ENQUEUE(
        queue_name => 'aqadm.msg_queue',
        enqueue_options => queue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id);
    COMMIT;
END;
/

conn aquser2/aquser2
SET SERVEROUTPUT ON;
DECLARE
    queue_options       DBMS_AQ.DEQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    message_id          RAW(2000);
    my_message          aqadm.my_payload;
BEGIN
    DBMS_AQ.DEQUEUE(
        queue_name => 'aqadm.msg_queue',
        dequeue_options => queue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id );
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('This message is '||my_message.msg);
END;
/

 

Scenario 2: Sending message via Publisher/Subscripber

1. create user aqadm and grant queue admin privilege.

2. create user aquser1,aquser2 and grant queue user privilege

3. use aqadm create queue table and queue

conn aqadm/aqadm
exec dbms_aqadm.create_queue_table(queue_table => 'MSG_QUEUE_TAB_PS',queue_payload_type => 'aqadm.my_payload',multiple_consumers => TRUE);

exec dbms_aqadm.create_queue(queue_name => 'MSG_QUEUE_PS',queue_table => 'MSG_QUEUE_TAB_PS');

exec dbms_aqadm.start_queue(queue_name => 'MSG_QUEUE_PS',enqueue => TRUE,dequeue => TRUE);

exec dbms_aqadm.add_subscriber(queue_name => 'MSG_QUEUE_PS',subscriber => SYS.AQ$_AGENT('AQ_USER1',NULL,NULL));

EXEC dbms_aqadm.schedule_propagation(queue_name => 'MSG_QUEUE_PS',destination => NULL);

4. use aquser1 to enqueue message and aquser2 to dequeue message.(1-->m)
conn aquser1/aquser1
DECLARE
    queue_options       DBMS_AQ.ENQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    message_id          RAW(16);
    my_message          aqadm.my_payload :=aqadm.my_payload('From aquser1');
BEGIN
    DBMS_AQ.ENQUEUE(
        queue_name => 'aqadm.msg_queue_PS',
        enqueue_options => queue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id);
    COMMIT;
END;
/

conn aquser2/aquser2(be careful: if the consume is not set, you can't dequeue the message)
DECLARE
    queue_options       DBMS_AQ.DEQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    message_id          RAW(2000);
    my_message          aqadm.my_payload;
BEGIN
    queue_options.consumer_name :='AQ_USER1';
    queue_options.navigation := dbms_aq.first_message;
    DBMS_AQ.DEQUEUE(
        queue_name => 'aqadm.msg_queue_PS',
        dequeue_options => queue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id );
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('This message is '||my_message.msg);
END;
/

 

scenario 3:Sending message via P/S while Subscriber is a queue.

1. create user aqadm and grant queue admin privilege.

2. create user aquser1,aquser2 and grant queue user privilege

3. use aqadm create queue table and queue, REUSE QUEUE "MSG_QUEUE"

conn aqadm/aqadm
exec dbms_aqadm.create_queue_table(queue_table => 'MSG_QUEUE_TAB_PQ',queue_payload_type => 'aqadm.my_payload',multiple_consumers => TRUE);

exec dbms_aqadm.create_queue(queue_name => 'MSG_QUEUE_PQ',queue_table => 'MSG_QUEUE_TAB_PQ');

exec dbms_aqadm.start_queue(queue_name => 'MSG_QUEUE_PQ',enqueue => TRUE,dequeue => TRUE);

exec dbms_aqadm.add_subscriber(queue_name => 'MSG_QUEUE_PQ',subscriber => SYS.AQ$_AGENT('AQ_USER1','aqadm.msg_queue_ps',NULL));

EXEC dbms_aqadm.schedule_propagation(queue_name => 'MSG_QUEUE_PQ',destination => NULL);

4. use aquser1 to enqueue message to queue1 and aquser2 to dequeue message from queue2.(1-->m)
Note this: enqueue in "msg_queue_pq", but dequeue in "msg_queue_ps"

conn aquser1/aquser1
DECLARE
    queue_options       DBMS_AQ.ENQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    message_id          RAW(16);
    my_message          aqadm.my_payload :=aqadm.my_payload('From aquser1_to_queue2');
BEGIN
    DBMS_AQ.ENQUEUE(
        queue_name => 'aqadm.msg_queue_pq',
        enqueue_options => queue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id);
    COMMIT;
END;
/

conn aquser2/aquser2()
DECLARE
    queue_options       DBMS_AQ.DEQUEUE_OPTIONS_T;
    message_properties  DBMS_AQ.MESSAGE_PROPERTIES_T;
    message_id          RAW(2000);
    my_message          aqadm.my_payload;
BEGIN
    queue_options.consumer_name :='AQ_USER1';
    queue_options.navigation := dbms_aq.first_message;
    DBMS_AQ.DEQUEUE(
        queue_name => 'aqadm.msg_queue_ps',
        dequeue_options => queue_options,
        message_properties => message_properties,
        payload => my_message,
        msgid => message_id );
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('This message is '||my_message.msg);
END;
/

抱歉!评论已关闭.