在我们的遗产中,我们设置了 Service Broker 来执行一项操作:
- 根据计划执行查询,并通过服务代理将其结果发送到远程服务器上的其他服务
- 消息被目标服务器上的激活过程接收和消费。
问题是由于某种原因,消息在目标服务器上被多次使用,尽管它只发送了一次。
我对 Service Broker 还很陌生,正在寻求关于什么可能是确切问题以及可能是什么原因的建议/指导。
开始对话框的过程代码:
CREATE PROCEDURE [ServiceBroker].[SendMessage]
AS
BEGIN
SET NOCOUNT ON;
BEGIN TRY
DECLARE @Message XML
, @ConversationHandle UNIQUEIDENTIFIER
, @Counter INT = 1
, @Error INT
, @TRANCOUNT INT = @@TRANCOUNT
, @SavePoint CHAR(32) = REPLACE(NEWID(), '-', '')
, @FromService sysname = 'SimpleServiceInitiator_' + DB_NAME();
SET @Message = (
SELECT 1 AS Value
FOR XML PATH('Message'), ELEMENTS XSINIL, TYPE
);
BEGIN TRANSACTION;
SAVE TRANSACTION @SavePoint;
WHILE 1 = 1
BEGIN
SET @ConversationHandle = NULL;
BEGIN DIALOG @ConversationHandle
FROM SERVICE @FromService
TO SERVICE 'SimpleServiceTarget'
ON CONTRACT SimpleContract
WITH ENCRYPTION = OFF;
-- Set an two minute timer on the conversation
BEGIN CONVERSATION TIMER (@ConversationHandle) TIMEOUT = 120;
-- Attempt to SEND on the associated conversation
SEND
ON CONVERSATION @ConversationHandle
MESSAGE TYPE SimpleMessage (@Message);
-- Successful send, just exit the loop
SET @Error = @@ERROR;
IF @Error = 0 BREAK;
SET @Counter += 1;
IF @Counter > 3
BEGIN
-- We failed 3 times in a row, something must be broken
RAISERROR (N'Failed to SEND on a conversation for more than 3 times. Error %i.', 16, 1, @Error);
BREAK;
END;
END;
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
IF XACT_STATE() = 1
BEGIN
-- Rollback any active or uncommittable transactions before inserting information in the ErrorLog.
-- We can still save the other activities in the transaction.
ROLLBACK TRANSACTION @SavePoint;
COMMIT TRANSACTION;
END;
ELSE IF XACT_STATE() = -1 AND @TRANCOUNT = 0
BEGIN
-- If the tran is doomed, and the @TRANCOUNT was 0, we have to do a full roll back
ROLLBACK TRANSACTION;
END;
EXECUTE Logs.LogError;
THROW;
END CATCH;
END;
GO
以及目标上的激活SP:
CREATE PROCEDURE [ServiceBroker].[ProcessSimpleMessagesFromQueue]
AS
BEGIN
SET NOCOUNT ON;
BEGIN TRY
DECLARE @ConversationHandle UNIQUEIDENTIFIER
, @MessageType sysname
, @MessageBody XML
, @ServiceName sysname
, @IsRecognised BIT = 0
, @ErrorCode INT
, @ErrorDescription NVARCHAR(4000);
WHILE 1 = 1
BEGIN
SET @ConversationHandle = NULL;
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP (1) @ConversationHandle = conversation_handle
, @MessageBody = message_body
, @MessageType = message_type_name
, @ServiceName = service_name
-- SELECT *
FROM dbo.SimpleQueue
), TIMEOUT 5000;
IF @@ROWCOUNT = 0
BEGIN
ROLLBACK TRANSACTION;
BREAK;
END;
ELSE IF @MessageType = N'EndOfStream'
BEGIN
END CONVERSATION @ConversationHandle;
END;
ELSE IF @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @ConversationHandle;
END;
ELSE IF @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
EXECUTE ServiceBroker.ExtractError @MessageBody = @MessageBody
, @ErrorCode = @ErrorCode OUTPUT
, @ErrorDescription = @ErrorDescription OUTPUT;
END CONVERSATION @ConversationHandle;
INSERT INTO Logs.ConversationError (ServiceName, ConversationHandle, MessageType, MessageBody, ErrorCode, ErrorDescription)
VALUES (@ServiceName, @ConversationHandle, @MessageType, @MessageBody, @ErrorCode, @ErrorDescription);
END;
ELSE IF @MessageType = N'SimpleMessage'
BEGIN
SET @IsRecognised = 1;
/* DO -TSQL MAGIC*/
END CONVERSATION @ConversationHandle;
END;
COMMIT TRANSACTION;
END;
END TRY
BEGIN CATCH
SET @ErrorCode = ERROR_NUMBER();
SET @ErrorDescription = ERROR_MESSAGE();
IF @IsRecognised = 1
BEGIN
-- 6a) Commit otherwise end the conversation to get the message off the queue **
IF @@TRANCOUNT > 0
BEGIN
COMMIT TRANSACTION;
END;
ELSE
BEGIN
SET @ErrorCode = 127;
SET @ErrorDescription = N'Unable to process message';
END CONVERSATION @ConversationHandle
WITH ERROR = @ErrorCode
DESCRIPTION = @ErrorDescription;
END;
END;
ELSE IF XACT_STATE() IN (-1, 1) AND @@TRANCOUNT > 0
BEGIN
ROLLBACK TRANSACTION;
END;
INSERT INTO Logs.ConversationError (ServiceName, ConversationHandle, MessageType, MessageBody, ErrorCode, ErrorDescription)
VALUES (@ServiceName, @ConversationHandle, @MessageType, @MessageBody, @ErrorCode, @ErrorDescription);
END CATCH;
END;
这是应该结束对话的启动器上的激活 SP:
CREATE PROCEDURE [ServiceBroker].[ProcessSimpleMessagesFromQueue_Initiator]
AS
BEGIN
SET NOCOUNT ON;
BEGIN TRY
DECLARE @ConversationHandle UNIQUEIDENTIFIER
, @MessageType sysname
, @MessageBody XML
, @ServiceName sysname
, @ErrorCode INT
, @ErrorDescription NVARCHAR(4000);
WHILE 1 = 1
BEGIN
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP (1) @ConversationHandle = conversation_handle
, @MessageBody = message_body
, @MessageType = message_type_name
, @ServiceName = service_name
FROM dbo.SimpleQueue
), TIMEOUT 5000;
IF @@ROWCOUNT = 0
BEGIN
ROLLBACK TRANSACTION;
BREAK;
END;
ELSE IF @MessageType = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @ConversationHandle;
END;
ELSE IF @MessageType = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
EXECUTE ServiceBroker.ExtractError @MessageBody = @MessageBody
, @ErrorCode = @ErrorCode OUTPUT
, @ErrorDescription = @ErrorDescription OUTPUT;
END CONVERSATION @ConversationHandle;
INSERT INTO Logs.ConversationError (ServiceName, ConversationHandle, MessageType, MessageBody, ErrorCode, ErrorDescription)
VALUES (@ServiceName, @ConversationHandle, @MessageType, @MessageBody, @ErrorCode, @ErrorDescription);
END;
COMMIT TRANSACTION;
END;
END TRY
BEGIN CATCH
IF @@TRANCOUNT > 0 ROLLBACK TRANSACTION;
SET @ErrorCode = ERROR_NUMBER();
SET @ErrorDescription = ERROR_MESSAGE();
END CONVERSATION @ConversationHandle
WITH ERROR = @ErrorCode
DESCRIPTION = @ErrorDescription;
EXECUTE Logs.LogError;
THROW;
END CATCH;
END;
GO
Dan Guzman给了我一些很好的建议,经过进一步研究,我发现我的路线配置不正确。