一个sql server service broker例子
----------------------------------- USE master GO -------------------------------------------------- -- Create demo database section -------------------------------------------------- IF EXISTS (SELECT name FROM sys.databases WHERE name = ‘SsbDemoDb‘) DROP DATABASE [SsbDemoDb]; CREATE DATABASE [SsbDemoDb] GO USE [SsbDemoDb]; GO -------------------------------------------------- -- Dialog pool section -------------------------------------------------- -------------------------------------------------- -- The dialog pool table. -- Obtain a conversation handle using from service, to service, and contract. -- Also indicates age and usage of dialog for auditing purposes. -------------------------------------------------- IF EXISTS (SELECT name FROM sys.tables WHERE name = ‘DialogPool‘) DROP TABLE [DialogPool] GO CREATE TABLE [DialogPool] ( FromService SYSNAME NOT NULL, ToService SYSNAME NOT NULL, OnContract SYSNAME NOT NULL, Handle UNIQUEIDENTIFIER NOT NULL, OwnerSPID INT NOT NULL, CreationTime DATETIME NOT NULL, SendCount BIGINT NOT NULL, UNIQUE (Handle)); GO -------------------------------------------------- -- Get dialog procedure. -- Reuse a free dialog in the pool or create a new one in case -- no free dialogs exist. -- Input is from service, to service, and contract. -- Output is dialog handle and count of message previously sent on dialog. -------------------------------------------------- IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_get_dialog‘) DROP PROC usp_get_dialog GO CREATE PROCEDURE [usp_get_dialog] ( @fromService SYSNAME, @toService SYSNAME, @onContract SYSNAME, @dialogHandle UNIQUEIDENTIFIER OUTPUT, @sendCount BIGINT OUTPUT) AS BEGIN SET NOCOUNT ON; DECLARE @dialog TABLE ( FromService SYSNAME NOT NULL, ToService SYSNAME NOT NULL, OnContract SYSNAME NOT NULL, Handle UNIQUEIDENTIFIER NOT NULL, OwnerSPID INT NOT NULL, CreationTime DATETIME NOT NULL, SendCount BIGINT NOT NULL ); -- Try to claim an unused dialog in [DialogPool] -- READPAST option avoids blocking on locked dialogs. BEGIN TRANSACTION; DELETE @dialog; UPDATE TOP(1) [DialogPool] WITH(READPAST) SET OwnerSPID = @@SPID OUTPUT INSERTED.* INTO @dialog WHERE FromService = @fromService AND ToService = @toService AND OnContract = @OnContract AND OwnerSPID = -1; IF @@ROWCOUNT > 0 BEGIN SET @dialogHandle = (SELECT Handle FROM @dialog); SET @sendCount = (SELECT SendCount FROM @dialog); END ELSE BEGIN -- No free dialogs: need to create a new one BEGIN DIALOG CONVERSATION @dialogHandle FROM SERVICE @fromService TO SERVICE @toService ON CONTRACT @onContract WITH ENCRYPTION = OFF; INSERT INTO [DialogPool] (FromService, ToService, OnContract, Handle, OwnerSPID, CreationTime, SendCount) VALUES (@fromService, @toService, @onContract, @dialogHandle, @@SPID, GETDATE(), 0); SET @sendCount = 0; END COMMIT END; GO -------------------------------------------------- -- Free dialog procedure. -- Return the dialog to the pool. -- Inputs are dialog handle and updated send count. -------------------------------------------------- IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_free_dialog‘) DROP PROC usp_free_dialog GO CREATE PROCEDURE [usp_free_dialog] ( @dialogHandle UNIQUEIDENTIFIER, @sendCount BIGINT) AS BEGIN SET NOCOUNT ON; DECLARE @rowcount INT; DECLARE @string VARCHAR(50); BEGIN TRANSACTION; -- Release dialog by setting OwnerSPID to -1. UPDATE [DialogPool] SET OwnerSPID = -1, SendCount = @sendCount WHERE Handle = @dialogHandle; SELECT @rowcount = @@ROWCOUNT; IF @rowcount = 0 BEGIN SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50))); RAISERROR(‘usp_free_dialog: dialog %s not found in dialog pool‘, 16, 1, @string) WITH LOG; END ELSE IF @rowcount > 1 BEGIN SET @string = (SELECT CAST( @dialogHandle AS VARCHAR(50))); RAISERROR(‘usp_free_dialog: duplicate dialog %s found in dialog pool‘, 16, 1, @string) WITH LOG; END COMMIT END; GO -------------------------------------------------- -- Delete dialog procedure. -- Delete the dialog from the pool. This does not end the dialog. -- Input is dialog handle. -------------------------------------------------- IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_delete_dialog‘) DROP PROC usp_delete_dialog GO CREATE PROCEDURE [usp_delete_dialog] ( @dialogHandle UNIQUEIDENTIFIER) AS BEGIN SET NOCOUNT ON; BEGIN TRANSACTION; DELETE [DialogPool] WHERE Handle = @dialogHandle; COMMIT END; GO -------------------------------------------------- -- Application setup section. -------------------------------------------------- -------------------------------------------------- -- Send messages from initiator to target. -- Initiator uses dialogs from the dialog pool. -- Initiator also retires dialogs based on application criteria, -- which results in recycling dialogs in the pool. -------------------------------------------------- -- This table stores the messages on the target side IF EXISTS (SELECT name FROM sys.tables WHERE name = ‘MsgTable‘) DROP TABLE MsgTable GO CREATE TABLE MsgTable ( message_type SYSNAME, message_body NVARCHAR(4000)) GO -- Activated store proc for the initiator to receive messages. CREATE PROCEDURE initiator_queue_activated_procedure AS BEGIN DECLARE @handle UNIQUEIDENTIFIER; DECLARE @message_type SYSNAME; BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP(1) @handle = [conversation_handle], @message_type = [message_type_name] FROM [SsbInitiatorQueue]), TIMEOUT 5000; IF @@ROWCOUNT = 1 BEGIN -- Expect target response to EndOfStream message. IF @message_type = ‘http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog‘ BEGIN END CONVERSATION @handle; END END COMMIT END; GO -- Activated store proc for the target to receive messages. CREATE PROCEDURE target_queue_activated_procedure AS BEGIN -- Variable table for received messages. DECLARE @receive_table TABLE( queuing_order BIGINT, conversation_handle UNIQUEIDENTIFIER, message_type_name SYSNAME, message_body VARCHAR(MAX)); -- Cursor for received message table. DECLARE message_cursor CURSOR LOCAL FORWARD_ONLY READ_ONLY FOR SELECT conversation_handle, message_type_name, message_body FROM @receive_table ORDER BY queuing_order; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_type SYSNAME; DECLARE @message_body VARCHAR(4000); -- Error variables. DECLARE @error_number INT; DECLARE @error_message VARCHAR(4000); DECLARE @error_severity INT; DECLARE @error_state INT; DECLARE @error_procedure SYSNAME; DECLARE @error_line INT; DECLARE @error_dialog VARCHAR(50); BEGIN TRY WHILE (1 = 1) BEGIN BEGIN TRANSACTION; -- Receive all available messages into the table. -- Wait 5 seconds for messages. WAITFOR ( RECEIVE [queuing_order], [conversation_handle], [message_type_name], CAST([message_body] AS VARCHAR(4000)) FROM [SsbTargetQueue] INTO @receive_table ), TIMEOUT 5000; IF @@ROWCOUNT = 0 BEGIN COMMIT; BREAK; END ELSE BEGIN OPEN message_cursor; WHILE (1=1) BEGIN FETCH NEXT FROM message_cursor INTO @conversation_handle, @message_type, @message_body; IF (@@FETCH_STATUS != 0) BREAK; -- Process a message. -- If an exception occurs, catch and attempt to recover. BEGIN TRY IF @message_type = ‘SsbMsgType‘ BEGIN -- process the msg. Here we will just insert it into a table INSERT INTO MsgTable values(@message_type, @message_body); END ELSE IF @message_type = ‘EndOfStream‘ BEGIN -- initiator is signaling end of message stream: end the dialog END CONVERSATION @conversation_handle; END ELSE IF @message_type = ‘http://schemas.microsoft.com/SQL/ServiceBroker/Error‘ BEGIN -- If the message_type indicates that the message is an error, -- raise the error and end the conversation. WITH XMLNAMESPACES (‘http://schemas.microsoft.com/SQL/ServiceBroker/Error‘ AS ssb) SELECT @error_number = CAST(@message_body AS XML).value(‘(//ssb:Error/ssb:Code)[1]‘, ‘INT‘), @error_message = CAST(@message_body AS XML).value(‘(//ssb:Error/ssb:Description)[1]‘, ‘VARCHAR(4000)‘); SET @error_dialog = CAST(@conversation_handle AS VARCHAR(50)); RAISERROR(‘Error in dialog %s: %s (%i)‘, 16, 1, @error_dialog, @error_message, @error_number); END CONVERSATION @conversation_handle; END END TRY BEGIN CATCH SET @error_number = ERROR_NUMBER(); SET @error_message = ERROR_MESSAGE(); SET @error_severity = ERROR_SEVERITY(); SET @error_state = ERROR_STATE(); SET @error_procedure = ERROR_PROCEDURE(); SET @error_line = ERROR_LINE(); IF XACT_STATE() = -1 BEGIN -- The transaction is doomed. Only rollback possible. -- This could disable the queue if done 5 times consecutively! ROLLBACK TRANSACTION; -- Record the error. BEGIN TRANSACTION; INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message, @error_severity, @error_state, @error_procedure, @error_line, 1); COMMIT; -- For this level of error, it is best to exit the proc -- and give the queue monitor control. -- Breaking to the outer catch will accomplish this. RAISERROR (‘Message processing error‘, 16, 1); END ELSE IF XACT_STATE() = 1 BEGIN -- Record error and continue processing messages. -- Failing message could also be put aside for later processing here. -- Otherwise it will be discarded. INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message, @error_severity, @error_state, @error_procedure, @error_line, 0); END END CATCH END CLOSE message_cursor; DELETE @receive_table; END COMMIT; END END TRY BEGIN CATCH -- Process the error and exit the proc to give the queue monitor control SET @error_number = ERROR_NUMBER(); SET @error_message = ERROR_MESSAGE(); SET @error_severity = ERROR_SEVERITY(); SET @error_state = ERROR_STATE(); SET @error_procedure = ERROR_PROCEDURE(); SET @error_line = ERROR_LINE(); IF XACT_STATE() = -1 BEGIN -- The transaction is doomed. Only rollback possible. -- This could disable the queue if done 5 times consecutively! ROLLBACK TRANSACTION; -- Record the error. BEGIN TRANSACTION; INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message, @error_severity, @error_state, @error_procedure, @error_line, 1); COMMIT; END ELSE IF XACT_STATE() = 1 BEGIN -- Record error and commit transaction. -- Here you could also save anything else you want before exiting. INSERT INTO target_processing_errors VALUES(NULL, @error_number, @error_message, @error_severity, @error_state, @error_procedure, @error_line, 0); COMMIT; END END CATCH END; GO -- Table to store processing errors. IF EXISTS (SELECT name FROM sys.tables WHERE name = ‘target_processing_errors‘) DROP TABLE target_processing_errors; GO CREATE TABLE target_processing_errors (error_conversation UNIQUEIDENTIFIER, error_number INT, error_message VARCHAR(4000), error_severity INT, error_state INT, error_procedure SYSNAME NULL, error_line INT, doomed_transaction TINYINT) GO -- Create Initiator and Target side SSB entities CREATE MESSAGE TYPE SsbMsgType VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE EndOfStream; CREATE CONTRACT SsbContract ( SsbMsgType SENT BY INITIATOR, EndOfStream SENT BY INITIATOR ); CREATE QUEUE SsbInitiatorQueue WITH ACTIVATION ( STATUS = ON, MAX_QUEUE_READERS = 1, PROCEDURE_NAME = [initiator_queue_activated_procedure], EXECUTE AS OWNER); CREATE QUEUE SsbTargetQueue WITH ACTIVATION ( STATUS = ON, MAX_QUEUE_READERS = 1, PROCEDURE_NAME = [target_queue_activated_procedure], EXECUTE AS OWNER); CREATE SERVICE SsbInitiatorService ON QUEUE SsbInitiatorQueue; CREATE SERVICE SsbTargetService ON QUEUE SsbTargetQueue (SsbContract); GO -- SEND procedure. Uses a dialog from the dialog pool. -- IF EXISTS (SELECT name FROM sys.procedures WHERE name = ‘usp_send‘) DROP PROC usp_send GO CREATE PROCEDURE [usp_send] ( @fromService SYSNAME, @toService SYSNAME, @onContract SYSNAME, @messageType SYSNAME, @messageBody NVARCHAR(MAX)) AS BEGIN SET NOCOUNT ON; DECLARE @dialogHandle UNIQUEIDENTIFIER; DECLARE @sendCount BIGINT; DECLARE @counter INT; DECLARE @error INT; SELECT @counter = 1; BEGIN TRANSACTION; -- Will need a loop to retry in case the dialog is -- in a state that does not allow transmission -- WHILE (1=1) BEGIN -- Claim a dialog from the dialog pool. -- A new one will be created if none are available. -- EXEC usp_get_dialog @fromService, @toService, @onContract, @dialogHandle OUTPUT, @sendCount OUTPUT; -- Attempt to SEND on the dialog -- IF (@messageBody IS NOT NULL) BEGIN -- If the @messageBody is not null it must be sent explicitly SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType (@messageBody); END ELSE BEGIN -- Messages with no body must *not* specify the body, -- cannot send a NULL value argument SEND ON CONVERSATION @dialogHandle MESSAGE TYPE @messageType; END SELECT @error = @@ERROR; IF @error = 0 BEGIN -- Successful send, increment count and exit the loop -- SET @sendCount = @sendCount + 1; BREAK; END SELECT @counter = @counter+1; IF @counter > 10 BEGIN -- We failed 10 times in a row, something must be broken -- RAISERROR(‘Failed to SEND on a conversation for more than 10 times. Error %i.‘, 16, 1, @error) WITH LOG; BREAK; END -- Delete the associated dialog from the table and try again -- EXEC usp_delete_dialog @dialogHandle; SELECT @dialogHandle = NULL; END -- “Criterion” for dialog pool removal is send count > 1000. -- Modify to suit application. -- When deleting also inform the target to end the dialog. IF @sendCount > 1000 BEGIN EXEC usp_delete_dialog @dialogHandle ; SEND ON CONVERSATION @dialogHandle MESSAGE TYPE [EndOfStream]; END ELSE BEGIN -- Free the dialog. EXEC usp_free_dialog @dialogHandle, @sendCount; END COMMIT END; GO -------------------------------------------------------- -- Run application section -------------------------------------------------------- -- Send some messages exec usp_send N‘SsbInitiatorService‘, N‘SsbTargetService‘, N‘SsbContract‘, N‘SsbMsgType‘, N‘<xml>This is a well formed XML Message1.</xml>‘ exec usp_send N‘SsbInitiatorService‘, N‘SsbTargetService‘, N‘SsbContract‘, N‘SsbMsgType‘, N‘<xml>This is a well formed XML Message2.</xml>‘ exec usp_send N‘SsbInitiatorService‘, N‘SsbTargetService‘, N‘SsbContract‘, N‘SsbMsgType‘, N‘<xml>This is a well formed XML Message3.</xml>‘ exec usp_send N‘SsbInitiatorService‘, N‘SsbTargetService‘, N‘SsbContract‘, N‘SsbMsgType‘, N‘<xml>This is a well formed XML Message4.</xml>‘ exec usp_send N‘SsbInitiatorService‘, N‘SsbTargetService‘, N‘SsbContract‘, N‘SsbMsgType‘, N‘<xml>This is a well formed XML Message5.</xml>‘ GO -- Show the dialog pool SELECT * FROM [DialogPool] GO -- Show the dialogs used. SELECT * FROM sys.conversation_endpoints; GO -- Check whether the TARGET side has processed the messages SELECT * FROM MsgTable SELECT * FROM dialogpool SELECT * FROM dbo.target_processing_errors --TRUNCATE TABLE MsgTable GO
时间: 2024-10-27 04:59:42