PLSQL NOTE--------Advanced Queue demo

1. sysdba 用户下赋权限

# sysprivilege.sql

prompt ---- connect to sysdba
set serveroutput on;

prompt ---- create user aq and aq_user
drop user aq cascade;
drop user aq_user cascade;
create user aq identified by aq;
create user aq_user identified by aq_user;

prompt ---- grant sys privilege
grant connect,resource to aq,aq_user;
grant aq_administrator_role,unlimited tablespace to aq;
grant execute on dbms_aq to aq;
grant execute on dbms_aqadm to aq;
grant select any dictionary to aq;
grant execute on dbms_aq to aq_user;
grant execute on dbms_aqadm to aq_user;

prompt ---- grant aq queue admin privilege
begin
	dbms_aqadm.grant_system_privilege(‘ENQUEUE_ANY‘, ‘aq‘, FALSE);
	dbms_aqadm.grant_system_privilege(‘DEQUEUE_ANY‘, ‘aq‘, FALSE);
end;
/
show errors;

2. advanced queue 管理员    aq用户下

# aqinstall.sql

prompt ---- connect to aq
set serveroutput on;

prompt ---- construct message type
create or replace type aq_message force as object(
request clob,
id integer);
/
show errors;
grant execute on aq_message to aq_user; 

prompt ---- drop queue and queue table
begin
	DBMS_AQADM.STOP_QUEUE(
		queue_name => ‘demo_queue‘
	);
	DBMS_AQADM.DROP_QUEUE(
		queue_name => ‘demo_queue‘
	);
	DBMS_AQADM.DROP_QUEUE_TABLE(
		queue_table => ‘demo_queue_table‘
	);
end;
/
show errors;

prompt ---- create queue table and queue
begin
	DBMS_AQADM.CREATE_QUEUE_TABLE (
		queue_table => ‘demo_queue_table‘,
		queue_payload_type => ‘aq_message‘,
		multiple_consumers => TRUE
	);
	DBMS_AQADM.CREATE_QUEUE (
		queue_name => ‘demo_queue‘,
		queue_table => ‘demo_queue_table‘
	);
END;
/
show errors;

prompt ---- grant aq_user queue privilege
begin
	dbms_aqadm.GRANT_QUEUE_PRIVILEGE(‘ALL‘,‘demo_queue‘,‘aq_user‘, true);
end;
/
show errors;

prompt ---- add queue subscriber
begin
	DBMS_AQADM.ADD_SUBSCRIBER (
		queue_name => ‘demo_queue‘,
		subscriber => SYS.AQ$_AGENT(‘demo_sub‘,NULL,NULL)
	);
end;
/
show errors;

prompt ---- subscriber register
begin
	DBMS_AQ.REGISTER (
		SYS.AQ$_REG_INFO_LIST(
		SYS.AQ$_REG_INFO(
		‘DEMO_QUEUE:demo_sub‘,
		DBMS_AQ.NAMESPACE_AQ,
		‘plsql://DEMO_QUEUE_CALLBACK_PROCEDURE‘,
		HEXTORAW(‘FF‘))),1);
END;
/
show errors;

prompt ---- add queue subscriber_1
begin
	DBMS_AQADM.ADD_SUBSCRIBER (
		queue_name => ‘demo_queue‘,
		subscriber => SYS.AQ$_AGENT(‘demo_sub_1‘,NULL,NULL)
	);
end;
/
show errors;

prompt ---- subscriber_1 register
begin
	DBMS_AQ.REGISTER (
		SYS.AQ$_REG_INFO_LIST(
		SYS.AQ$_REG_INFO(
		‘DEMO_QUEUE:demo_sub_1‘,
		DBMS_AQ.NAMESPACE_AQ,
		‘plsql://demo_queue_call_procedure‘,
		HEXTORAW(‘FF‘))),1);
END;
/
show errors;

prompt ---- start queue
begin
	DBMS_AQADM.START_QUEUE (
		queue_name => ‘demo_queue‘
	);
end;
/
show errors;

prompt ---- subscriber count
SELECT count(*) FROM aq$demo_queue_table_s;

3. aq 用户下  服务端处理

# plsqlcallback.sql

set serveroutput on;
CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000),id integer );

create or replace procedure sendmessage(message in clob,id in integer, subcriber in varchar2) is
	r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
	r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
	v_message_handle RAW(16);
	o_payload aq.aq_message;
	rcpt_list dbms_aq.aq$_recipient_list_t;
	l_message clob;
begin
	l_message := message||to_clob(TO_CHAR(SYSTIMESTAMP, ‘DD-MON-YYYY HH24:MI:SS.FF3‘)||‘}‘);
	o_payload := aq.aq_message(l_message,id);

	rcpt_list(0) := sys.aq$_agent(subcriber, null, null);
	r_message_properties.recipient_list := rcpt_list;
	-- r_message_properties.delay := 10;
	DBMS_AQ.ENQUEUE(
		queue_name => ‘aq.demo_queue‘,
		enqueue_options => r_enqueue_options,
		message_properties => r_message_properties,
		payload => o_payload,
		msgid => v_message_handle
	);
COMMIT;
exception when others then
	dbms_output.put_line(sqlerrm);
	dbms_output.put_line(dbms_utility.format_error_backtrace);
end;
/
show errors;

create or replace function api_func(request in clob) return clob is
	l_response clob;
begin
	l_response := to_clob(‘send to client{‘);
	return l_response;
end;
/
show errors;

CREATE or replace PROCEDURE demo_queue_callback_procedure(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq.aq_message;
i_payload aq.aq_message;
i_message clob;
response clob;
l_id integer;
BEGIN
 	r_dequeue_options.msgid := descr.msg_id;
	r_dequeue_options.consumer_name := descr.consumer_name;

	DBMS_AQ.DEQUEUE(
		queue_name => descr.queue_name,
		dequeue_options => r_dequeue_options,
		message_properties => r_message_properties,
		payload => o_payload,
		msgid => v_message_handle
	);
	i_message := to_clob(‘Message [‘ || o_payload.request|| ‘] ‘ ||‘dequeued at [‘ || TO_CHAR( SYSTIMESTAMP,‘DD-MON-YYYY HH24:MI:SS.FF3‘ ) || ‘]‘);
	INSERT INTO demo_queue_message_table ( message,id) VALUES (to_char(i_message),o_payload.id);
	response := api_func(i_message);
	l_id := o_payload.id;
	sendmessage(response,l_id,‘demo_sub_1‘);
COMMIT;
exception when others then
	dbms_output.put_line(sqlerrm);
	dbms_output.put_line(dbms_utility.format_error_backtrace);
END;
/
show errors;

4. aq 用户下客户端处理

# plsqlcall.sql

CREATE TABLE demo_queue_message_receive
( message VARCHAR2(4000),id integer);

CREATE PROCEDURE demo_queue_call_procedure(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq.aq_message;
i_message clob;
BEGIN
 	r_dequeue_options.msgid := descr.msg_id;
	r_dequeue_options.consumer_name := descr.consumer_name;

	DBMS_AQ.DEQUEUE(
		queue_name => descr.queue_name,
		dequeue_options => r_dequeue_options,
		message_properties => r_message_properties,
		payload => o_payload,
		msgid => v_message_handle
	);
	i_message := to_clob(‘Message [‘ || o_payload.request|| ‘] ‘ ||‘dequeued at [‘ || TO_CHAR( SYSTIMESTAMP,‘DD-MON-YYYY HH24:MI:SS.FF3‘ ) || ‘]‘);
	INSERT INTO demo_queue_message_receive ( message,id) VALUES (to_char(i_message),o_payload.id);

COMMIT;
exception when others then
	dbms_output.put_line(sqlerrm);
	dbms_output.put_line(dbms_utility.format_error_backtrace);
END;
/
show errors;

5. message 入队(客户端)

# subcriberenqueue.sql

set serveroutput on;
declare
	r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
	r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
	v_message_handle RAW(16);
	o_payload aq.aq_message;
	message clob;
	rcpt_list dbms_aq.aq$_recipient_list_t;
BEGIN
	message := to_clob(‘send to server{‘||TO_CHAR(SYSTIMESTAMP, ‘DD-MON-YYYY HH24:MI:SS.FF3‘||‘}‘));
	o_payload := aq.aq_message(message,1);

	rcpt_list(0) := sys.aq$_agent(‘demo_sub‘, null, null);
	r_message_properties.recipient_list := rcpt_list;
	--r_message_properties.delay := 10;
	DBMS_AQ.ENQUEUE(
		queue_name => ‘aq.demo_queue‘,
		enqueue_options => r_enqueue_options,
		message_properties => r_message_properties,
		payload => o_payload,
		msgid => v_message_handle
	);
COMMIT;
exception when others then
	dbms_output.put_line(sqlerrm);
	dbms_output.put_line(dbms_utility.format_error_backtrace);
end;
/
show errors;

6. 结果检查

# checkresult.sql

set serveroutput on;
prompt ---- message detail
select ENQ_TIME, CONSUMER_NAME, USER_DATA from AQ$demo_queue_table;

prompt ---- server receive messages
SELECT message FROM demo_queue_message_table where id =1;

prompt ---- client receive messages
SELECT message FROM demo_queue_message_receive where id =1;

7. 安装脚本 && instruction

#!/bin/bash

sysname=sys
syspwd=111111

username=aq
userpwd=aq
port=1522

username1=aq_user
userpwd1=aq_user

sqlplus $sysname/[email protected]:$port as sysdba <<-EOF
SET SERVEROUTPUT ON SIZE 3000
@sysprivilege.sql
EOF

sqlplus $username/[email protected]:$port/XE <<-EOF
SET SERVEROUTPUT ON SIZE 3000
@aqinstall.sql
EOF

sqlplus $username/[email protected]:$port/XE <<-EOF
SET SERVEROUTPUT ON SIZE 3000
@plsqlcallback.sql
EOF

sqlplus $username/[email protected]:$port/XE <<-EOF
SET SERVEROUTPUT ON SIZE 3000
@plsqlcall.sql
EOF

++++++++++++++++++++++ instruction ++++++++++++++++++++

1. execute installaq.sh to install aq environment;
2. connect to user ‘aq_user‘ using password ‘aq_user‘;
3. execute subcriberenqueue.sql to send message;
4. connect to user ‘aq‘ using password ‘aq‘;
5. execute checkresult.sql to print the messages receive by server and receive by client.

the result is as below:

MESSAGE
--------------------------------------------------------------------------------
Message [send to server{15-APR-2016 18:08:20.548}] dequeued at [15-APR-2016 18:0
8:20.580]

---- client receive messages

MESSAGE
--------------------------------------------------------------------------------
Message [send to client{15-APR-2016 18:08:20.581}] dequeued at [15-APR-2016 18:0
8:20.589]

时间: 2024-07-30 20:13:52

PLSQL NOTE--------Advanced Queue demo的相关文章

PLSQL NOTE --------like 与转义字符

SQL> create table test 2 (id integer, 3 name varchar2(90)); Table created. SQL> insert into test values(100,'aassdd'); 1 row created. SQL> insert into test values(120,null); 1 row created. SQL> insert into test values(110,'aa_see'); 1 row crea

Linux Message Queue Demo

Client: #include <sys/types.h>#include <sys/stat.h>#include <errno.h>#include <fcntl.h>#include <mqueue.h>#include <unistd.h>#include <stdio.h> #define MSG_SERVER "/msgqueue"#define BUF_LEN 20000 int mai

PLSQL note

sql%count 最近一次sql执行的件数SUBSTR(string , int i) // i番目から最後までの文字列を切り取るSUBSTR(string , int i, int j) // i番目からj文二の文字列を切り取るNVL(para, string) // paraはnullであれば.string を戻るFLOOR( i) // iより小さく.一番大きい整数を戻るCEIL(i) // iより大きく.一番小さい整数を戻るROUND(i) // 四捨五入TO_CHAR() //sel

C#基础---Queue(队列)的应用

   Queue队列,特性先进先出. 在一些项目中我们会遇到对一些数据的Check,如果数据不符合条件将会把不通过的信息返回到界面.但是对于有的数据可能会Check很多条件,如果一个数据一旦很多条件不通过,那么全部错误返回到界面,可能会让用户束手无策.我们有时候往往在一个流程中.只将Check流程中第一个不符合条件的错误提示给用户,让用户修改.首先我们就想到了队列,通过队列将所有的Check方法注册,然后依次出列.执行. Demo背景: XX公司招人,对员工的居住地点,姓氏,年龄都有要求. 一.

Java中使用LinkedList实现Queue

LinkedList提供了方法支持队列的行为,并且它实现了Queue接口,因此LinkedList可以用作Queue的一种实现. package cn.usst.queue.demo; import java.util.LinkedList; import java.util.Queue; import java.util.Random; /* * Queue的基本使用 */ public class QueueDemo { public static void main(String[] ar

oracle12c新特性索引压缩 COMPRESS ADVANCED LOW

从oracle 12.1.0.2版本起,创建索引时可以通过COMPRESS ADVANCED LOW对index进行压缩 语法 create index index_name on table_name(col_name) COMPRESS ADVANCED LOW; alter index index_name REBUILD COMPRESS ADVANCED LOW; 压缩空间对比 create table ddeng as select * from dba_objects: creat

oracle-Expdp/impdp命令

建立逻辑路径 create or replace directory dumpdir as 'c:\'; grant read,write on directory dumpdir to scott; 倒入/出 expdp newdr/[email protected] directory=test dumpfile=test_score1.dmp logfile=tes t.log parallel=2 schemas=newdr 错误 ORA-00054 resource busy and

Database Initialization Parameters for Oracle E-Business Suite Release 12

In This Document Section 1: Common Database Initialization Parameters For All Releases Section 2: Release-Specific Database Initialization Parameters For Oracle 10g Release 2 Section 3: Release-Specific Database Initialization Parameters For Oracle 1

AVFoundation 框架初探究(二)

接着第一篇总结 系列第一篇地址:AVFoundation 框架初探究(一) 在第一篇的文章中,我们总结了主要有下面几个点的知识: 1.对AVFoundation框架整体的一个认识 2.AVSpeechSynthesizer这个文字转音频类 3.AVAudioPlayer音频播放类 4.AVAudioRecorder音频录制类 5.AVAudioSession音频会话处理类 上面第一篇说的内容,大致都是关于上面总结的,接着说说我们这第二篇总结什么?其实刚开始的时候,我是想按照<AVFoundati