情况:我使用Service Broker开发数据推送方式。
现在我考虑一个场景:
- 使用具有服务、队列和存储过程的单独数据库(称为代理)来发送数据。
- 在必要的数据库和表上使用触发器,将数据转换为 JSON 并从Broker数据库执行 SP以发送数据。我在 I/U/D 的每个表中有 39 个数据库/264632 个表/单独的触发器。793896 触发。是的,我知道它很大,但我们有这样的数据模型,我无法更改它。
- 现在我不使用激活存储过程,因为一些客户端会消耗来自 SB 目标队列的数据。
在Broker数据库中安装 Service Broker 的脚本:
-- installation
use master
go
if exists ( select * from sys.databases where name = 'Broker' )
begin
alter database [Broker] set restricted_user with rollback immediate;
drop database [Broker];
end
go
create database [Broker]
go
alter database [Broker] set enable_broker with rollback immediate;
alter database [Broker] set read_committed_snapshot on;
alter database [Broker] set allow_snapshot_isolation on;
alter database [Broker] set recovery full;
go
use [Broker]
go
create message type datachanges_messagetype
validation = none;
go
create contract datachanges_contract ( datachanges_messagetype sent by initiator );
go
create queue dbo.datachanges_initiatorqueue
with status = on
, retention = off
, poison_message_handling ( status = on )
on [default];
go
create queue dbo.datachanges_targetqueue
with status = on
, retention = off
, poison_message_handling ( status = on )
on [default];
go
create service datachanges_initiatorservice
on queue datachanges_initiatorqueue
( datachanges_contract );
go
create service datachanges_targetservice
on queue datachanges_targetqueue
( datachanges_contract );
go
-- conversation additional table
create table dbo.[SessionConversationsSPID] (
spid int not null
, handle uniqueidentifier not null
, primary key ( spid )
, unique ( handle )
)
go
-- SP which is used to send data from triggers
create procedure dbo.trackChanges_send
@json nvarchar(max)
as
begin
set nocount on;
if ( @json is null or @json = '' )
begin
raiserror( 'DWH Service Broker: An attempt to send empty message occurred', 16, 1);
return;
end
declare @handle uniqueidentifier = null
, @counter int = 1
, @error int;
begin transaction
while ( 1 = 1 )
begin
select @handle = handle
from dbo.[SessionConversationsSPID]
where spid = @@SPID;
if @handle is null
begin
begin dialog conversation @handle
from service datachanges_initiatorservice
to service 'datachanges_targetservice'
on contract datachanges_contract
with encryption = off;
insert into dbo.[SessionConversationsSPID] ( spid, handle )
values ( @@SPID, @handle );
end;
send on conversation @handle
message type datachanges_messagetype( @json );
set @error = @@error;
if @error = 0
break;
set @counter += 1;
if @counter > 5
begin
declare @mes varchar(max) = 'db - ' + @db + '. schema - ' + @sch;
raiserror( N'DWH Service Broker: Failed to SEND on a conversation for more than 10 times. Source: %s. Error: %i.', 16, 2, @mes, @error );
break;
end
delete from dbo.[SessionConversationsSPID]
where handle = @handle;
set @handle = null;
end
commit;
end
go
-- And dialogs creation to mitigate hot spot problem on sys.sysdesend table.
-- Described here: https://learn.microsoft.com/en-us/previous-versions/sql/sql-server-2008/dd576261
declare @i int, @spid int, @handle uniqueidentifier
select @i = 0, @spid = 50;
while (@i < 150*3000) -- 450000 dialogs
begin
set @i = @i + 1
begin dialog @handle
from service datachanges_initiatorservice
to service 'datachanges_targetservice'
on contract datachanges_contract
with encryption = off;
if ((@i % 150) = 0)
begin
set @spid += 1;
insert into dbo.SessionConversationsSPID ( spid, handle ) values (@spid, @handle)
end
end
用户数据库中的典型触发代码:
create trigger [<SCHEMA>].[<TABLE>_TR_I]
on [<SCHEMA>].[<TABLE>]
with execute as caller
after insert
as
begin
set xact_abort off;
set nocount on;
declare @rc int = ( select count(*) from inserted );
if ( @rc = 0 )
begin
return;
end
begin try
declare @db_name sysname = db_name();
declare @json nvarchar(max);
set @json = (
select getutcdate() as get_date, ''I'' as tr_operation, current_transaction_id() as cur_tran_id, ''<TABLE>'' as table_name, @@servername as server_name, @db_name as db_name, ''<SCHEMA>'' as tenant_schemaname
, *
from inserted
for json auto, include_null_values
);
exec dbo.trackChanges_send
@json = @json;
end try
begin catch
declare @error_message nvarchar(max);
set @error_message = ''['' + isnull( cast( error_number() as nvarchar( max ) ), '''' ) +''] ''
+ isnull( cast( error_severity() as nvarchar( max ) ), '''' )
+'' State: ''+ isnull( cast( error_state() as nvarchar( max ) ), '''' )
+'' Trigger: '' + ''[<SCHEMA>].[<TABLE>_TR_I]''
+'' Line: '' + isnull( cast( error_line() as nvarchar( max ) ), '''' )
+'' Msg: '' + isnull( cast( error_message() as nvarchar( max ) ), '''' );
raiserror( ''DWH Service Broker: An error has been occured while sending data changes. Error: %s'', 0, 0, @error_message ) with log;
return;
end catch
end
go
所以,我的问题是:
- 有时我会在触发器执行期间看到很长的 PAGELATCH_EX / PAGELATCH_SH 等待。问题是在 target_queue 表上等待闩锁。我不明白为什么当我将数据发送到 Service Broker 时,我会在目标队列dbo.datachanges_targetqueue上看到某种热点。我会理解是否存在与发送系统表或传输队列相关的等待。我在resource_description列 看到目标队列:
使用dbcc 页面,我看到该页面属于sys.queue_messages_597577167,它是 dbo.datachanges_targetqueue 的包装器。那一刻的等待会话总数约为 450,因此它可能是一个瓶颈。
在那段时间触发器执行的时间很长(超过 10 秒,而通常少于 1 秒)。它发生在随机时间,所以我在这里看不到任何依赖
- 我的第二个问题也与锁定有关。而且它发生在随机时间。我从目标队列(外部客户端仿真)读取数据的假脚本是
declare @i int = 0;
while ( 1 = 1 )
begin
declare @mb varbinary( max );
receive top ( 1000 ) @mb = message_body from dbo.datachanges_targetqueue
set @i = @@rowcount;
if @i = 0
break;
end
由于触发活动,它也可以被阻止定期执行。我不明白为什么。
可以使用一个队列和约 800000 个触发器吗?:) 我的意思是也许我需要考虑一些门槛。
使用“我的”方法(一个数据库是发送者和目标)或使用“每个数据库都是发送者和一个目标”的优点/缺点是什么