Reusing dialogs with a dialog pool--一个sql server service broker例子

一个sql server service broker例子

-----------------------------------
USE master
GO
--------------------------------------------------
-- Create demo database section
-------------------------------------------------- 

IF EXISTS (SELECT name FROM sys.databases WHERE name = ‘SsbDemoDb‘)
      DROP DATABASE [SsbDemoDb]; 

CREATE DATABASE [SsbDemoDb]
GO 

USE [SsbDemoDb];
GO  

--------------------------------------------------
-- Dialog pool section
-------------------------------------------------- 

--------------------------------------------------
-- The dialog pool table.
-- Obtain a conversation handle using from service, to service, and contract.
-- Also indicates age and usage of dialog for auditing purposes.
--------------------------------------------------

IF EXISTS (SELECT name FROM sys.tables WHERE name = ‘DialogPool‘)
      DROP TABLE [DialogPool]
GO

CREATE TABLE [DialogPool] (
      FromService SYSNAME NOT NULL,
      ToService SYSNAME NOT NULL,
      OnContract SYSNAME NOT NULL,
      Handle UNIQUEIDENTIFIER NOT NULL,
      OwnerSPID INT NOT NULL,
      CreationTime DATETIME NOT NULL,
      SendCount BIGINT NOT NULL,
      UNIQUE (Handle));
GO

--------------------------------------------------
-- Get dialog procedure.
-- Reuse a free dialog in the pool or create a new one in case
-- no free dialogs exist.
-- Input is from service, to service, and contract.
-- Output is dialog handle and count of message previously sent on dialog.
--------------------------------------------------

IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_get_dialog‘)
      DROP PROC usp_get_dialog
GO

CREATE PROCEDURE [usp_get_dialog] (
      @fromService SYSNAME,
      @toService SYSNAME,
      @onContract SYSNAME,
      @dialogHandle UNIQUEIDENTIFIER OUTPUT,
      @sendCount BIGINT OUTPUT)
AS
BEGIN
      SET NOCOUNT ON;
      DECLARE @dialog TABLE
      (
          FromService SYSNAME NOT NULL,
          ToService SYSNAME NOT NULL,
          OnContract SYSNAME NOT NULL,
          Handle UNIQUEIDENTIFIER NOT NULL,
          OwnerSPID INT NOT NULL,
          CreationTime DATETIME NOT NULL,
          SendCount BIGINT NOT NULL
      ); 

      -- Try to claim an unused dialog in [DialogPool]
      -- READPAST option avoids blocking on locked dialogs.
      BEGIN TRANSACTION;
      DELETE @dialog;
      UPDATE TOP(1) [DialogPool] WITH(READPAST)
             SET OwnerSPID = @@SPID
             OUTPUT INSERTED.* INTO @dialog
             WHERE FromService = @fromService
                   AND ToService = @toService
                   AND OnContract = @OnContract
                   AND OwnerSPID = -1;
      IF @@ROWCOUNT > 0
      BEGIN
           SET @dialogHandle = (SELECT Handle FROM @dialog);
           SET @sendCount = (SELECT SendCount FROM @dialog);
      END
      ELSE
      BEGIN
           -- No free dialogs: need to create a new one
           BEGIN DIALOG CONVERSATION @dialogHandle
                 FROM SERVICE @fromService
                 TO SERVICE @toService
                 ON CONTRACT @onContract
                 WITH ENCRYPTION = OFF;
           INSERT INTO [DialogPool]
                  (FromService, ToService, OnContract, Handle, OwnerSPID,
                      CreationTime, SendCount)
                  VALUES
                  (@fromService, @toService, @onContract, @dialogHandle, @@SPID,
                      GETDATE(), 0);
          SET @sendCount = 0;
      END
      COMMIT
END;
GO

--------------------------------------------------
-- Free dialog procedure.
-- Return the dialog to the pool.
-- Inputs are dialog handle and updated send count.
--------------------------------------------------

IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_free_dialog‘)
      DROP PROC usp_free_dialog
GO

CREATE PROCEDURE [usp_free_dialog] (
      @dialogHandle UNIQUEIDENTIFIER,
      @sendCount BIGINT)
AS
BEGIN
      SET NOCOUNT ON;
      DECLARE @rowcount INT;
      DECLARE @string VARCHAR(50);
      BEGIN TRANSACTION;
      -- Release dialog by setting OwnerSPID to -1.
      UPDATE [DialogPool] SET OwnerSPID = -1, SendCount = @sendCount WHERE Handle = @dialogHandle;
      SELECT @rowcount = @@ROWCOUNT;
      IF @rowcount = 0
      BEGIN
           SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));
           RAISERROR(‘usp_free_dialog: dialog %s not found in dialog pool‘, 16, 1, @string) WITH LOG;
      END
      ELSE IF @rowcount > 1
      BEGIN
           SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50)));
           RAISERROR(‘usp_free_dialog: duplicate dialog %s found in dialog pool‘, 16, 1, @string) WITH LOG;
      END
      COMMIT
END;

GO

--------------------------------------------------
-- Delete dialog procedure.
-- Delete the dialog from the pool. This does not end the dialog.
-- Input is dialog handle.
--------------------------------------------------

IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_delete_dialog‘)
      DROP PROC usp_delete_dialog
GO

CREATE PROCEDURE [usp_delete_dialog] (
      @dialogHandle UNIQUEIDENTIFIER)
AS
BEGIN
      SET NOCOUNT ON;
      BEGIN TRANSACTION;
      DELETE [DialogPool] WHERE Handle = @dialogHandle;
      COMMIT
END;
GO

--------------------------------------------------
-- Application setup section.
-------------------------------------------------- 

--------------------------------------------------
-- Send messages from initiator to target.
-- Initiator uses dialogs from the dialog pool.
-- Initiator also retires dialogs based on application criteria,
-- which results in recycling dialogs in the pool.
--------------------------------------------------
-- This table stores the messages on the target side

IF EXISTS (SELECT name FROM sys.tables WHERE name = ‘MsgTable‘)
      DROP TABLE MsgTable
GO

CREATE TABLE MsgTable ( message_type SYSNAME, message_body NVARCHAR(4000))
GO 

-- Activated store proc for the initiator to receive messages.

CREATE PROCEDURE initiator_queue_activated_procedure
AS
BEGIN
     DECLARE @handle UNIQUEIDENTIFIER;
     DECLARE @message_type SYSNAME;
     BEGIN TRANSACTION;
     WAITFOR (
          RECEIVE TOP(1) @handle = [conversation_handle],
            @message_type = [message_type_name]
          FROM [SsbInitiatorQueue]), TIMEOUT 5000;
     IF @@ROWCOUNT = 1
     BEGIN
          -- Expect target response to EndOfStream message.
          IF @message_type = ‘http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog‘
          BEGIN
               END CONVERSATION @handle;
          END
     END
     COMMIT
END;

GO 

-- Activated store proc for the target to receive messages.

CREATE PROCEDURE target_queue_activated_procedure
AS
BEGIN
    -- Variable table for received messages.
    DECLARE @receive_table TABLE(
            queuing_order BIGINT,
            conversation_handle UNIQUEIDENTIFIER,
            message_type_name SYSNAME,
            message_body VARCHAR(MAX));  

    -- Cursor for received message table.
    DECLARE message_cursor CURSOR LOCAL FORWARD_ONLY READ_ONLY
            FOR SELECT
            conversation_handle,
            message_type_name,
            message_body
            FROM @receive_table ORDER BY queuing_order; 

     DECLARE @conversation_handle UNIQUEIDENTIFIER;
     DECLARE @message_type SYSNAME;
     DECLARE @message_body VARCHAR(4000); 

     -- Error variables.
     DECLARE @error_number INT;
     DECLARE @error_message VARCHAR(4000);
     DECLARE @error_severity INT;
     DECLARE @error_state INT;
     DECLARE @error_procedure SYSNAME;
     DECLARE @error_line INT;
     DECLARE @error_dialog VARCHAR(50); 

     BEGIN TRY
       WHILE (1 = 1)
       BEGIN
         BEGIN TRANSACTION;
         -- Receive all available messages into the table.
         -- Wait 5 seconds for messages.
         WAITFOR (
            RECEIVE
               [queuing_order],
               [conversation_handle],
               [message_type_name],
               CAST([message_body] AS VARCHAR(4000))
            FROM [SsbTargetQueue]
            INTO @receive_table
         ), TIMEOUT 5000;  

         IF @@ROWCOUNT = 0
         BEGIN
              COMMIT;
              BREAK;
         END
         ELSE
         BEGIN
              OPEN message_cursor;
              WHILE (1=1)
              BEGIN
                  FETCH NEXT FROM message_cursor
                            INTO @conversation_handle,
                                 @message_type,
                                 @message_body;
                  IF (@@FETCH_STATUS != 0) BREAK;
                  -- Process a message.
                  -- If an exception occurs, catch and attempt to recover.
                  BEGIN TRY
                      IF @message_type = ‘SsbMsgType‘
                      BEGIN
                          -- process the msg. Here we will just insert it into a table
                          INSERT INTO MsgTable values(@message_type, @message_body);
                      END
                      ELSE IF @message_type = ‘EndOfStream‘
                      BEGIN
                          -- initiator is signaling end of message stream: end the dialog
                          END CONVERSATION @conversation_handle;
                      END
                      ELSE IF @message_type = ‘http://schemas.microsoft.com/SQL/ServiceBroker/Error‘
                      BEGIN
                           -- If the message_type indicates that the message is an error,
                           -- raise the error and end the conversation.
                           WITH XMLNAMESPACES (‘http://schemas.microsoft.com/SQL/ServiceBroker/Error‘ AS ssb)
                           SELECT
                           @error_number = CAST(@message_body AS XML).value(‘(//ssb:Error/ssb:Code)[1]‘, ‘INT‘),
                           @error_message = CAST(@message_body AS XML).value(‘(//ssb:Error/ssb:Description)[1]‘, ‘VARCHAR(4000)‘);
                           SET @error_dialog = CAST(@conversation_handle AS VARCHAR(50));
                           RAISERROR(‘Error in dialog %s: %s (%i)‘, 16, 1, @error_dialog, @error_message, @error_number);
                           END CONVERSATION @conversation_handle;
                      END
                  END TRY
                  BEGIN CATCH
                     SET @error_number = ERROR_NUMBER();
                     SET @error_message = ERROR_MESSAGE();
                     SET @error_severity = ERROR_SEVERITY();
                     SET @error_state = ERROR_STATE();
                     SET @error_procedure = ERROR_PROCEDURE();
                     SET @error_line = ERROR_LINE();            

                     IF XACT_STATE() = -1
                     BEGIN
                          -- The transaction is doomed. Only rollback possible.
                          -- This could disable the queue if done 5 times consecutively!
                          ROLLBACK TRANSACTION;            

                          -- Record the error.
                          BEGIN TRANSACTION;
                          INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
                                 @error_severity, @error_state, @error_procedure, @error_line, 1);
                          COMMIT; 

                          -- For this level of error, it is best to exit the proc
                          -- and give the queue monitor control.
                          -- Breaking to the outer catch will accomplish this.
                          RAISERROR (‘Message processing error‘, 16, 1);
                     END
                     ELSE IF XACT_STATE() = 1
                     BEGIN
                          -- Record error and continue processing messages.
                          -- Failing message could also be put aside for later processing here.
                          -- Otherwise it will be discarded.
                          INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
                                 @error_severity, @error_state, @error_procedure, @error_line, 0);
                     END
                  END CATCH
              END
              CLOSE message_cursor;
              DELETE @receive_table;
         END
         COMMIT;
       END
     END TRY
     BEGIN CATCH
         -- Process the error and exit the proc to give the queue monitor control
         SET @error_number = ERROR_NUMBER();
         SET @error_message = ERROR_MESSAGE();
         SET @error_severity = ERROR_SEVERITY();
         SET @error_state = ERROR_STATE();
         SET @error_procedure = ERROR_PROCEDURE();
         SET @error_line = ERROR_LINE();

         IF XACT_STATE() = -1
         BEGIN
              -- The transaction is doomed. Only rollback possible.
              -- This could disable the queue if done 5 times consecutively!
              ROLLBACK TRANSACTION;
              -- Record the error.
              BEGIN TRANSACTION;
              INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
                     @error_severity, @error_state, @error_procedure, @error_line, 1);
              COMMIT;
         END
         ELSE IF XACT_STATE() = 1
         BEGIN
              -- Record error and commit transaction.
              -- Here you could also save anything else you want before exiting.
              INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message,
                     @error_severity, @error_state, @error_procedure, @error_line, 0);
              COMMIT;
         END
     END CATCH
END;
GO

-- Table to store processing errors.

IF EXISTS (SELECT name FROM sys.tables WHERE name = ‘target_processing_errors‘)
      DROP TABLE target_processing_errors;
GO 

CREATE TABLE target_processing_errors (error_conversation UNIQUEIDENTIFIER, error_number INT,
       error_message VARCHAR(4000), error_severity INT, error_state INT, error_procedure SYSNAME NULL,
       error_line INT, doomed_transaction TINYINT)
GO 

-- Create Initiator and Target side SSB entities
CREATE MESSAGE TYPE SsbMsgType VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE EndOfStream;
CREATE CONTRACT SsbContract
       (
        SsbMsgType SENT BY INITIATOR,
        EndOfStream SENT BY INITIATOR
       );

CREATE QUEUE SsbInitiatorQueue
      WITH ACTIVATION (
            STATUS = ON,
            MAX_QUEUE_READERS = 1,
            PROCEDURE_NAME = [initiator_queue_activated_procedure],
            EXECUTE AS OWNER);
CREATE QUEUE SsbTargetQueue
      WITH ACTIVATION (
            STATUS = ON,
            MAX_QUEUE_READERS = 1,
            PROCEDURE_NAME = [target_queue_activated_procedure],
            EXECUTE AS OWNER); 

CREATE SERVICE SsbInitiatorService ON QUEUE SsbInitiatorQueue;

CREATE SERVICE SsbTargetService ON QUEUE SsbTargetQueue (SsbContract);

GO

-- SEND procedure. Uses a dialog from the dialog pool.
--

IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_send‘)
      DROP PROC usp_send
GO
CREATE PROCEDURE [usp_send] (
      @fromService SYSNAME,
      @toService SYSNAME,
      @onContract SYSNAME,
      @messageType SYSNAME,
      @messageBody NVARCHAR(MAX))
AS
BEGIN
      SET NOCOUNT ON;
      DECLARE @dialogHandle UNIQUEIDENTIFIER;
      DECLARE @sendCount BIGINT;
      DECLARE @counter INT;
      DECLARE @error INT;
      SELECT @counter = 1;
      BEGIN TRANSACTION;
      -- Will need a loop to retry in case the dialog is
      -- in a state that does not allow transmission
      --
      WHILE (1=1)
      BEGIN
            -- Claim a dialog from the dialog pool.
            -- A new one will be created if none are available.
            --
            EXEC usp_get_dialog @fromService, @toService, @onContract, @dialogHandle OUTPUT, @sendCount OUTPUT; 

            -- Attempt to SEND on the dialog
            --
            IF (@messageBody IS NOT NULL)
            BEGIN
                  -- If the @messageBody is not null it must be sent explicitly
                  SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType (@messageBody);
            END
            ELSE
            BEGIN
                  -- Messages with no body must *not* specify the body,
                  -- cannot send a NULL value argument
                  SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType;
            END               

            SELECT @error = @@ERROR;
            IF @error = 0
            BEGIN
                  -- Successful send, increment count and exit the loop
                  --
                  SET @sendCount = @sendCount + 1;
                  BREAK;
            END           

            SELECT @counter = @counter+1;
            IF @counter > 10
            BEGIN
                  -- We failed 10 times in a  row, something must be broken
                  --
                  RAISERROR(‘Failed to SEND on a conversation for more than 10 times. Error %i.‘, 16, 1, @error) WITH LOG;
            BREAK;
            END 

            -- Delete the associated dialog from the table and try again
            --
            EXEC usp_delete_dialog @dialogHandle;
            SELECT @dialogHandle = NULL;
      END

      -- “Criterion” for dialog pool removal is send count > 1000.
      -- Modify to suit application.
      -- When deleting also inform the target to end the dialog.
      IF @sendCount > 1000
      BEGIN
         EXEC usp_delete_dialog @dialogHandle ;
         SEND ON CONVERSATION @dialogHandle MESSAGE TYPE [EndOfStream];
      END
      ELSE
      BEGIN
         -- Free the dialog.
         EXEC usp_free_dialog @dialogHandle, @sendCount;
      END
      COMMIT
END;
GO

--------------------------------------------------------
-- Run application section
--------------------------------------------------------

-- Send some messages

exec usp_send N‘SsbInitiatorService‘, N‘SsbTargetService‘, N‘SsbContract‘, N‘SsbMsgType‘, N‘<xml>This is a well formed XML Message1.</xml>‘
exec usp_send N‘SsbInitiatorService‘, N‘SsbTargetService‘, N‘SsbContract‘, N‘SsbMsgType‘, N‘<xml>This is a well formed XML Message2.</xml>‘
exec usp_send N‘SsbInitiatorService‘, N‘SsbTargetService‘, N‘SsbContract‘, N‘SsbMsgType‘, N‘<xml>This is a well formed XML Message3.</xml>‘
exec usp_send N‘SsbInitiatorService‘, N‘SsbTargetService‘, N‘SsbContract‘, N‘SsbMsgType‘, N‘<xml>This is a well formed XML Message4.</xml>‘
exec usp_send N‘SsbInitiatorService‘, N‘SsbTargetService‘, N‘SsbContract‘, N‘SsbMsgType‘, N‘<xml>This is a well formed XML Message5.</xml>‘
GO

-- Show the dialog pool
SELECT * FROM [DialogPool]
GO 

-- Show the dialogs used.
SELECT * FROM sys.conversation_endpoints;
GO

-- Check whether the TARGET side has processed the messages
SELECT * FROM MsgTable
SELECT * FROM dialogpool
SELECT * FROM dbo.target_processing_errors

--TRUNCATE TABLE MsgTable
GO
时间: 2024-10-27 04:59:42

Reusing dialogs with a dialog pool--一个sql server service broker例子的相关文章

The SQL Server Service Broker for the current database is not enabled

把一个数据恢复至另一个服务器上,出现了一个异常: The SQL Server Service Broker for the current database is not enabled, and as a result query notifications are not supported.  Please enable the Service Broker for this database if you wish to use notifications. 截图如下: 解决方法: 参

未启用当前数据库的 SQL Server Service Broker,请为此数据库启用 Service Broker

未启用当前数据库的 SQL Server Service Broker,因此查询通知不受支持.如果希望使用通知,请为此数据库启用 Service Broker, 我执行了下面语句解决了问题 ALTER DATABASE [数据库名称] SET NEW_BROKER WITH ROLLBACK IMMEDIATE;ALTER DATABASE [数据库名称] SET ENABLE_BROKER; 未启用当前数据库的 SQL Server Service Broker,请为此数据库启用 Servic

在Windows Server 2008 R2 Server中,连接其他服务器的数据库遇到“未启用当前数据库的 SQL Server Service Broker,因此查询通知不受支持。如果希望使用通知,请为此数据库启用 Service Broker ”

项目代码和数据库部署在不同的Windows Server 2008 R2 Server中,错误日志显示如下: "未启用当前数据库的 SQL Server Service Broker,因此查询通知不受支持.如果希望使用通知,请为此数据库启用 Service Broker." SQL Server Service Broker介绍: SQL Server Service Broker 为消息和队列应用程序提供 SQL Server 数据库引擎本机支持.这使开发人员可以轻松地创建使用数据库

未启用当前数据库的 SQL Server Service Broker,因此查询通知不受支持。如果希望使用通知,请为此数据库启用 Service Broker

昨晚遇到的这个问题,也知道Notifications service依赖底层的Service broker的.本以为只需要执行以下脚本对数据库启用Service broker即可. alter database DBNAME set enable_broker 但是,执行后,脚本一直处于执行状态,不以为然,正好在忙其它事情就没有查看运行结果,结果到今早一看,居然运行还没有结束.虽然是在一个生产数据库上执行的,数据库也只有30G的样子,但也不至于执行一个晚上也未结束,只好终止执行,使用 SELEC

sql server service broker中调用存储过程执行跨库操作,不管怎么设置都一直提示 服务器主体 &quot;sa&quot; 无法在当前安全上下文下访问数据库 &quot;dbname&quot;。

用sql server自带的消息队列service borker,调用存储过程中,执行了一个跨库的操作,先是用了一个用户,权限什么都给够了,但是一直提示 服务器主体 "user" 无法在当前安全上下文下访问数据库 "dbname". 想着是架构方面的问题,换sa还是不行.查到微软的一篇文章 提示需要开数据库的 ALTER DATABASE current_db SET TRUSTWORTHY ON 我把跨的那个库设置了还是不行.最后自己写测试代码,代码如下: cre

错误:未启用当前数据库的SQL Server Service Broker,因此查询通知不受支持。如果希望使用通知,请为此数据库启用 Service Broker。

解决方法: 打开SQL Server,新建查询: ALTER DATABASE 数据库名 SET NEW_BROKER WITH ROLLBACK IMMEDIATE;ALTER DATABASE 数据库名 SET ENABLE_BROKER; 转自:http://www.cnblogs.com/Impulse/articles/5358379.html

sql server Service Broker 相关查询

-- 查看传输队列中的消息 --如果尝试从队列中移除时,列将表明哪里出现了问题 select * from sys.transmission_queue -- 查看Service Broker 激活的存储过程 select * from sys.dm_broker_activated_tasks -- 查看数据库中的每个会话端点.会话端点代表Service Broker 会话的每一端. -- 会话端点视图state列显示会话的状态 select * from sys.conversation_e

SQL Server Service Broker 示例(转)

1.定义数据类型.协议和服务(发送服务和接收服务) USE master; GO ALTER DATABASE 目标数据库 SET ENABLE_BROKER; GO -- 如果上面的操作执行后,长时间无反应,有死机的嫌疑,尝试下面的语句. ALTER DATABASE 目标数据库 SET NEW_BROKER WITH ROLLBACK IMMEDIATE; GO ALTER DATABASE 目标数据库 SET ENABLE_BROKER; GO -- 创建 SayHelloMessage

微软MVP攻略 (如何成为MVP?一个SQL Server MVP的经验之谈)

一.本文所涉及的内容(Contents) 本文所涉及的内容(Contents) 初衷 什么是微软MVP? 成为微软MVP的条件? 如何成为微软MVP? (一) 申请时间划分 (二) 前期准备 (三) 下载/填写申请表格 (四) 申请MVP (五) 各种资料的填写 微软MVP奖项及权益包括什么? 成为微软MVP之后 个人建议 互动资讯 二.初衷 搞微软技术的,大家或多或少都有听说过微软的“最有价值专家”(MVP),网上也有不少资料对这个称谓做了介绍,但是都是一些大体的描述,并没有更加细节方面的,比