Oracle高级队列介绍

原始链接:http://www.oracle-developer.net/display.php?id=411

oracle高级队列介绍

高级队列Advanced Queuing(AQ)在oracle多个版本都可得到。他是oracle原生消息软件并且在每一个版本都在加强。

这篇文章提供了一个AQ的高级概览。尤其是我们将看到如何启动一个队列并进行入列--出列操作,还有通过通知创建异步出列。

注意AQ支持数据库意外的消息监听(例如JMS消息队列)。本文仅涉及数据库内部消息通信。

要求:

本例要求指定的角色和权限(除了标准CREATE SESSION/TABLE/PROCEDURE/TYPE和表空间配额外)

1、AQ_ADMINISTRATOR_ROLE: 用于创建队列表和队列;

2、EXECUTE ON DBMS_AQ:用于通知案例中启用PLSQL存储过程编译

另外,需要入列/出列消息的标准应用用户要求AQ权限通过DBMS_AQADM[GRANT|REVOKE]_QUEUE_PRIVILIEGE API提供。

以下例子可以运行在任何拥有以上权限的用户下。

1、创建并启动一个队列

AQ处理的消息称为"有效负荷"(payloads) 。消息格式可以是用户自定义对象或XMLType或ANYDATA。当我们创建一个队列,需要告诉oracle

有效负荷的结构,所以我们先创建一个简单对象类型。

CREATE TYPE demo_queue_payload_type AS OBJECT

( message VARCHAR2(4000) );

/

我们有效负荷类型包含一个属性,而现实中可能更复杂。下面创建队列表用于存储队列消息直到永久出列。

BEGIN

DBMS_AQADM.CREATE_QUEUE_TABLE (

queue_table        => ‘demo_queue_table‘,

queue_payload_type => ‘demo_queue_payload_type‘

);

END;

/

接着创建队列并启动:

BEGIN

DBMS_AQADM.CREATE_QUEUE (

queue_name  => ‘demo_queue‘,

queue_table => ‘demo_queue_table‘

);

DBMS_AQADM.START_QUEUE (

queue_name => ‘demo_queue‘

);

END;

/

至此,我们已经创建了队列有效负荷,队列表和队列。来看下有哪些相关对象:

SELECT object_name, object_type

FROM   user_objects

WHERE  object_name != ‘DEMO_QUEUE_PAYLOAD_TYPE‘;

OBJECT_NAME                    OBJECT_TYPE

------------------------------ ---------------

DEMO_QUEUE_TABLE               TABLE

SYS_C009392                    INDEX

SYS_LOB0000060502C00030$$      LOB

AQ$_DEMO_QUEUE_TABLE_T         INDEX

AQ$_DEMO_QUEUE_TABLE_I         INDEX

AQ$_DEMO_QUEUE_TABLE_E         QUEUE

AQ$DEMO_QUEUE_TABLE            VIEW

DEMO_QUEUE                     QUEUE

我们看到一个队列带出了一系列自动生成对象,有些是被后面直接用到的。不过有趣的是,创建了第二个队列。这就是所谓的异常队列(exception

queue)。如果AQ无法从我们的队列接收消息,将记录在该异常队列中。

2、入列消息(enqueuing messages)

我们已经准备好使用DBMS_AQ.ENQUEUE API去入列一个消息。接下来的例子,我们使用ENQUEUE过程入列一个单条消息。

DBMS_AQ有大范围的记录和数组类型来支持其接口并使我们去修改其行为(我们将在下面的例子看到2个此类引用)。

DECLARE

r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;

r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

v_message_handle     RAW(16);

o_payload            demo_queue_payload_type;

BEGIN

o_payload := demo_queue_payload_type(‘Here is a message‘);

DBMS_AQ.ENQUEUE(

queue_name         => ‘demo_queue‘,

enqueue_options    => r_enqueue_options,

message_properties => r_message_properties,

payload            => o_payload,

msgid              => v_message_handle

);

COMMIT;

END;

/

可以看到入列消息很简单。入列操作是一个基本的事务(就像往队列表Insert),因此我们需要提交。

3、浏览消息(browsing messages)

在我们出列消息之前,我们将"浏览"队列内容。首先我们可以查询AQ$DEMO_QUEUE_TABLE视图看到多少消息已经入列。正如我们早些看到的,

该视图是在前面DBMS_AQADM.CREATE_QUEUE创建队列自动生成的。

SELECT COUNT(*)

FROM   aq$demo_queue_table;

COUNT(*)

----------

1

和我们预期一样,队列中只有一条消息。我们有2种方法可以浏览消息内容:

1)直接查询视图:

SELECT user_data

FROM   aq$demo_queue_table;

USER_DATA(MESSAGE)

------------------------------------------------------------

DEMO_QUEUE_PAYLOAD_TYPE(‘Here is a message‘)

2)我们可以使用DBMS_AQ.DEQUEUE API浏览。根据名字可以看出,该过程用于出列消息。为了达到只浏览不删除的目的,我们可以使用

DBMS_AQ.BROWSE修改出列属性(默认是DBMS_AQ.REMOVE)。

DECLARE

r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;

r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

v_message_handle     RAW(16);

o_payload            demo_queue_payload_type;

BEGIN

r_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;

DBMS_AQ.DEQUEUE(

queue_name         => ‘demo_queue‘,

dequeue_options    => r_dequeue_options,

message_properties => r_message_properties,

payload            => o_payload,

msgid              => v_message_handle

);

DBMS_OUTPUT.PUT_LINE(

‘*** Browsed message is [‘ || o_payload.message || ‘] ***‘

);

END;

/

*** Browsed message is [Here is a message] ***

再次查询视图可以确定消息确实没被移除:

SELECT user_data

FROM   aq$demo_queue_table;

USER_DATA(MESSAGE)

------------------------------------------------------------

DEMO_QUEUE_PAYLOAD_TYPE(‘Here is a message‘)

4、出列消息(dequeuing messages)

现在我们将实际出列消息。该操作不要求在同一会话进行(记住入列是AQ基于表的提交事务)。像入列,出列也是一个事务(从队列表

移除消息)。

DECLARE

r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;

r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

v_message_handle     RAW(16);

o_payload            demo_queue_payload_type;

BEGIN

DBMS_AQ.DEQUEUE(

queue_name         => ‘demo_queue‘,

dequeue_options    => r_dequeue_options,

message_properties => r_message_properties,

payload            => o_payload,

msgid              => v_message_handle

);

DBMS_OUTPUT.PUT_LINE(

‘*** Dequeued message is [‘ || o_payload.message || ‘] ***‘

);

COMMIT;

END;

/

*** Dequeued message is [Here is a message] ***

PL/SQL procedure successfully completed.

再次查询视图发现消息确已出列:

SELECT COUNT(*)

FROM   aq$demo_queue_table;

COUNT(*)

----------

0

5、通知(notification)

文章的剩余部分,我们将看一下通过通知自动出列。通过这种方式无论消息何时入列,Oracle都将通知一个代理执行一个注册的PLSQL

"回调"(callback)过程(可选择地,代理还可以通知一个邮箱地址或HTTP://地址)。

为了说明,我们将创建和注册一个PLSQL过程以通过通知方式管理我们的出列。这个回调过程将出列消息并写到一个数据库表,以模拟

标准数据库操作。

作为开始,我们清理之前创建的对象。

BEGIN

DBMS_AQADM.STOP_QUEUE(

queue_name => ‘demo_queue‘

);

DBMS_AQADM.DROP_QUEUE(

queue_name => ‘demo_queue‘

);

DBMS_AQADM.DROP_QUEUE_TABLE(

queue_table => ‘demo_queue_table‘

);

END;

/

现在我们重新创建队列表以允许多个消费者(consumers)。一个消费者是一个出列消息代理(agent)启用多个消费者是自动通知实现的

前提条件。

BEGIN

DBMS_AQADM.CREATE_QUEUE_TABLE (

queue_table        => ‘demo_queue_table‘,

queue_payload_type => ‘demo_queue_payload_type‘,

multiple_consumers => TRUE

);

END;

/

接着重新创建并启动我们的队列。

BEGIN

DBMS_AQADM.CREATE_QUEUE (

queue_name  => ‘demo_queue‘,

queue_table => ‘demo_queue_table‘

);

DBMS_AQADM.START_QUEUE (

queue_name => ‘demo_queue‘

);

END;

/

为了证明通知的异步特点,我们将把出列消息存在一个应用表中。

CREATE TABLE demo_queue_message_table

( message VARCHAR2(4000) );

现在我们有一个应用表,我们可以创建回调PL/SQL。这个过程将出列触发了通知的入列消息。程序参数必须命名并类型化。入列消息将

包含入列时间戳,这样插入到应用表中我们将看到消息入列和通知出列的异步延迟。

CREATE PROCEDURE demo_queue_callback_procedure(

context  RAW,

reginfo  SYS.AQ$_REG_INFO,

descr    SYS.AQ$_DESCRIPTOR,

payload  RAW,

payloadl NUMBER

) AS

r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;

r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

v_message_handle     RAW(16);

o_payload            demo_queue_payload_type;

BEGIN

r_dequeue_options.msgid := descr.msg_id;

r_dequeue_options.consumer_name := descr.consumer_name;

DBMS_AQ.DEQUEUE(

queue_name         => descr.queue_name,

dequeue_options    => r_dequeue_options,

message_properties => r_message_properties,

payload            => o_payload,

msgid              => v_message_handle

);

INSERT INTO demo_queue_message_table ( message )

VALUES ( ‘Message [‘ || o_payload.message || ‘] ‘ ||

‘dequeued at [‘ || TO_CHAR( SYSTIMESTAMP,

‘DD-MON-YYYY HH24:MI:SS.FF3‘ ) || ‘]‘ );

COMMIT;

END;

/

我们还未完成通知步骤。我们需要向队列增加一个订阅者(subsriber)并注册订阅者接到通知时的动作(例如将执行我们的回调过程)。

BEGIN

DBMS_AQADM.ADD_SUBSCRIBER (

queue_name => ‘demo_queue‘,

subscriber => SYS.AQ$_AGENT(

‘demo_queue_subscriber‘,

NULL,

NULL )

);

DBMS_AQ.REGISTER (

SYS.AQ$_REG_INFO_LIST(

SYS.AQ$_REG_INFO(

‘DEMO_QUEUE:DEMO_QUEUE_SUBSCRIBER‘,

DBMS_AQ.NAMESPACE_AQ,

‘plsql://DEMO_QUEUE_CALLBACK_PROCEDURE‘,

HEXTORAW(‘FF‘)

)

),

1

);

END;

/

现在我们可以通过入列消息来测试。这个消息将仅包含一个时间戳以便我们对比入列和自动出列发生的时间差。

DECLARE

r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;

r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

v_message_handle     RAW(16);

o_payload            demo_queue_payload_type;

BEGIN

o_payload := demo_queue_payload_type(

TO_CHAR(SYSTIMESTAMP, ‘DD-MON-YYYY HH24:MI:SS.FF3‘ )

);

DBMS_AQ.ENQUEUE(

queue_name         => ‘demo_queue‘,

enqueue_options    => r_enqueue_options,

message_properties => r_message_properties,

payload            => o_payload,

msgid              => v_message_handle

);

COMMIT;

END;

/

为了查看我们的小时是否自动出列,我们将检查应用表(DEMO_QUEUE_MESSAGE_TABLE)。

SELECT message

FROM   demo_queue_message_table;

MESSAGE

---------------------------------------------------------------------------

Message [21-JUL-2005 21:54:51.156] dequeued at [21-JUL-2005 21:54:56.015]

我们可以看到通过通知异步出列大约在入列操作后5秒后发生。

6、进一步阅读

我们已经在本文接触了AQ的能力。AQ是一个覆盖面巨大的应用,原超出本文的这点介绍。更多信息请参考:

http://docs.oracle.com/cd/B10501_01/appdev.920/a96587/toc.htm

ASKTOM:  https://asktom.oracle.com/pls/asktom/f?p=100:11:0::::P11_QUESTION_ID:8760267539329

全部源代码下载:源码下载

-------------------------

Dylan  Presents.

时间: 2024-08-07 17:02:26

Oracle高级队列介绍的相关文章

oracle 高级队列

转载:http://www.idevelopment.info/data/Oracle/DBA_tips/Advanced_Queuing/AQ_2.shtml Overview This article provides a brief overview on configuring and using Oracle's Advanced Queuing features using PL/SQL. This will demonstrate the basic functionality o

Oracle数据库用户介绍

Oracle数据库创建的时候,创建了一系列默认的用户,有时候可能我们不小心忘记创建了某个用户,比如SCOTT用户,我们就需要使用Oracle提供的脚本来创建,介绍如下: 1.SYS/change_on_InstaLL or Internal 系统用户,数据字典所有者,超级权限所有者(SYSDBA) 创建脚本:?/rdbms/admin/SQL.bsq and various cat*.SQL 建议创建后立即修改密码,此用户不能被删除 2.SYSTEM/manager 数据库默认管理用户,拥有DB

oracle的环境配置-oracle的相关介绍

Oracle的相关介绍: 1.Oracle数据库软件--安装OS上的2.软件的获取路径,商务销售策略.3.数据库版本:9i--10%                      10g--65%                      11g--25%--新建的系统常见               12c--最新版本 8i--Internet   9i   10g--gird compute--10gR2-oracle 10g relaease 10.2.0.x   11g           

oracle 高级分组

oracle 高级分组 博客分类: 数据库基础 oraclesql Java代码   10.高级分组 本章目标: 对于增强的group by需要掌握: 1.使用rollup(也就是roll up累计的意思)操作产生subtotal(小计)的值. 2.使用cube操作产生cross-tabulation(列联交叉表)的值. 3.使用grouping函数标识通过rollup和cube建立的行的值. 4.使用grouping sets产生一个single result set(结果集). 5.使用gr

消息队列属性及常见消息队列介绍

什么是消息队列?消息队列是在消息的传输过程中保存消息的容器,用于接收消息并以文件的方式存储,一个队列的消息可以同时被多个消息消费者消费.分布式消息服务DMS则是分布式的队列系统,消息队列中的消息分布存储,且每条消息存储多个副本,以实现高可用性,如下图所示. 一般来说,消息队列具有如下属性: 消息顺序普通队列支持"分区有序"和"全局队列"两种模式,ActiveMQ队列和Kafka队列均为分区有序. 分区有序的队列通过分布式处理,支持更高的并发,但由于队列的分布式特性,

消息队列及常见消息队列介绍

原文链接:https://cloud.tencent.com/community/article/129032 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaM

消息队列介绍、RabbitMQ&Redis的重点介绍与简单应用

消息队列介绍.RabbitMQ.Redis 一.什么是消息队列 这个概念我们百度Google能查到一大堆文章,所以我就通俗的讲下消息队列的基本思路. 还记得原来写过Queue的文章,不管是线程queue还是进程queue他都是一种消息队列.他都是基于生产者消费者模型来处理消息. Python中的进程queue,是用于父进程与子进程,或者同属于一个父进程下的多个子进程之间进行信息交互.注意这种queue只能在同一个python程序下才能用,如果两个python程序,或者Python和别的什么程序,

Oracle的游标介绍

Oracle的游标介绍 Oracle中的PL/SQL的游标是指把数据库中查询出来的数据以临时表的形式存放在内存中,游标可以对存储在内存中的数据进行操作,返回一条或者一组数据,或者一条数据也不返回.PL/SQL中的记录和表类型虽然也能用来存储数据,但对一组存储在内存中的数据进行操作,还是不太方便,游标恰好是这方面的工具. PL/SQL包含隐含游标和显示游标,其中隐含游标用于处理SELECT INTO和DML语句,而显示游标则专门用于处理SELECT语句返回的多行数据,游标的基本操作有: - 声明游

Oracle purge 用法介绍

1.用途: 清除oracle 回收站(recyclebin)中的表和索引并释放与其相关的空间,还可清空回收站,或者清除表空间中记录的已删除的部分表空间. 注意:purge后不能回滚和恢复. 2.语法: 3.示例说明: 1)首先查一下回收站: SELECT * FROM RECYCLEBIN; 2)创建并删除同一表三次: --版本1 CREATE TABLE recycle_tmp(version NUMBER(10)); INSERT INTO recycle_tmp VALUES(1); CO