情况:我使用Service Broker开发数据推送方式。
现在我考虑一个场景:
- 使用具有服务、队列和存储过程的单独数据库(称为代理)来发送数据。
- 在必要的数据库和表上使用触发器,将数据转换为 JSON 并从Broker数据库执行 SP以发送数据。我在 I/U/D 的每个表中有 39 个数据库/264632 个表/单独的触发器。793896 触发。是的,我知道它很大,但我们有这样的数据模型,我无法更改它。
- 现在我不使用激活存储过程,因为一些客户端会消耗来自 SB 目标队列的数据。
在Broker数据库中安装 Service Broker 的脚本:
-- installation
use master
go
if exists ( select * from sys.databases where name = 'Broker' )
begin
alter database [Broker] set restricted_user with rollback immediate;
drop database [Broker];
end
go
create database [Broker]
go
alter database [Broker] set enable_broker with rollback immediate;
alter database [Broker] set read_committed_snapshot on;
alter database [Broker] set allow_snapshot_isolation on;
alter database [Broker] set recovery full;
go
use [Broker]
go
create message type datachanges_messagetype
validation = none;
go
create contract datachanges_contract ( datachanges_messagetype sent by initiator );
go
create queue dbo.datachanges_initiatorqueue
with status = on
, retention = off
, poison_message_handling ( status = on )
on [default];
go
create queue dbo.datachanges_targetqueue
with status = on
, retention = off
, poison_message_handling ( status = on )
on [default];
go
create service datachanges_initiatorservice
on queue datachanges_initiatorqueue
( datachanges_contract );
go
create service datachanges_targetservice
on queue datachanges_targetqueue
( datachanges_contract );
go
-- conversation additional table
create table dbo.[SessionConversationsSPID] (
spid int not null
, handle uniqueidentifier not null
, primary key ( spid )
, unique ( handle )
)
go
-- SP which is used to send data from triggers
create procedure dbo.trackChanges_send
@json nvarchar(max)
as
begin
set nocount on;
if ( @json is null or @json = '' )
begin
raiserror( 'DWH Service Broker: An attempt to send empty message occurred', 16, 1);
return;
end
declare @handle uniqueidentifier = null
, @counter int = 1
, @error int;
begin transaction
while ( 1 = 1 )
begin
select @handle = handle
from dbo.[SessionConversationsSPID]
where spid = @@SPID;
if @handle is null
begin
begin dialog conversation @handle
from service datachanges_initiatorservice
to service 'datachanges_targetservice'
on contract datachanges_contract
with encryption = off;
insert into dbo.[SessionConversationsSPID] ( spid, handle )
values ( @@SPID, @handle );
end;
send on conversation @handle
message type datachanges_messagetype( @json );
set @error = @@error;
if @error = 0
break;
set @counter += 1;
if @counter > 5
begin
declare @mes varchar(max) = 'db - ' + @db + '. schema - ' + @sch;
raiserror( N'DWH Service Broker: Failed to SEND on a conversation for more than 10 times. Source: %s. Error: %i.', 16, 2, @mes, @error );
break;
end
delete from dbo.[SessionConversationsSPID]
where handle = @handle;
set @handle = null;
end
commit;
end
go
-- And dialogs creation to mitigate hot spot problem on sys.sysdesend table.
-- Described here: https://learn.microsoft.com/en-us/previous-versions/sql/sql-server-2008/dd576261
declare @i int, @spid int, @handle uniqueidentifier
select @i = 0, @spid = 50;
while (@i < 150*3000) -- 450000 dialogs
begin
set @i = @i + 1
begin dialog @handle
from service datachanges_initiatorservice
to service 'datachanges_targetservice'
on contract datachanges_contract
with encryption = off;
if ((@i % 150) = 0)
begin
set @spid += 1;
insert into dbo.SessionConversationsSPID ( spid, handle ) values (@spid, @handle)
end
end
用户数据库中的典型触发代码:
create trigger [<SCHEMA>].[<TABLE>_TR_I]
on [<SCHEMA>].[<TABLE>]
with execute as caller
after insert
as
begin
set xact_abort off;
set nocount on;
declare @rc int = ( select count(*) from inserted );
if ( @rc = 0 )
begin
return;
end
begin try
declare @db_name sysname = db_name();
declare @json nvarchar(max);
set @json = (
select getutcdate() as get_date, ''I'' as tr_operation, current_transaction_id() as cur_tran_id, ''<TABLE>'' as table_name, @@servername as server_name, @db_name as db_name, ''<SCHEMA>'' as tenant_schemaname
, *
from inserted
for json auto, include_null_values
);
exec dbo.trackChanges_send
@json = @json;
end try
begin catch
declare @error_message nvarchar(max);
set @error_message = ''['' + isnull( cast( error_number() as nvarchar( max ) ), '''' ) +''] ''
+ isnull( cast( error_severity() as nvarchar( max ) ), '''' )
+'' State: ''+ isnull( cast( error_state() as nvarchar( max ) ), '''' )
+'' Trigger: '' + ''[<SCHEMA>].[<TABLE>_TR_I]''
+'' Line: '' + isnull( cast( error_line() as nvarchar( max ) ), '''' )
+'' Msg: '' + isnull( cast( error_message() as nvarchar( max ) ), '''' );
raiserror( ''DWH Service Broker: An error has been occured while sending data changes. Error: %s'', 0, 0, @error_message ) with log;
return;
end catch
end
go
所以,我的问题是:
- 有时我会在触发器执行期间看到很长的 PAGELATCH_EX / PAGELATCH_SH 等待。问题是在 target_queue 表上等待闩锁。我不明白为什么当我将数据发送到 Service Broker 时,我会在目标队列dbo.datachanges_targetqueue上看到某种热点。我会理解是否存在与发送系统表或传输队列相关的等待。我在resource_description列 看到目标队列:
使用dbcc 页面,我看到该页面属于sys.queue_messages_597577167,它是 dbo.datachanges_targetqueue 的包装器。那一刻的等待会话总数约为 450,因此它可能是一个瓶颈。
在那段时间触发器执行的时间很长(超过 10 秒,而通常少于 1 秒)。它发生在随机时间,所以我在这里看不到任何依赖
- 我的第二个问题也与锁定有关。而且它发生在随机时间。我从目标队列(外部客户端仿真)读取数据的假脚本是
declare @i int = 0;
while ( 1 = 1 )
begin
declare @mb varbinary( max );
receive top ( 1000 ) @mb = message_body from dbo.datachanges_targetqueue
set @i = @@rowcount;
if @i = 0
break;
end
由于触发活动,它也可以被阻止定期执行。我不明白为什么。
可以使用一个队列和约 800000 个触发器吗?:) 我的意思是也许我需要考虑一些门槛。
使用“我的”方法(一个数据库是发送者和目标)或使用“每个数据库都是发送者和一个目标”的优点/缺点是什么
不,不是。您必须确保您的触发器始终处于短期运行状态,否则您的吞吐量将受到影响。
将 800000 个触发器写入单个队列并不是一个好主意。队列由常规表支持,并且在某些规模上,页面热点将成为您的瓶颈。和:
发送
如果您的目标服务位于远程 SQL Server 实例上,则消息将被写入并提交到每个数据库的传输队列。但是对于同一实例上的目标队列,消息直接进入目标队列。
我认为底线是直接写入目标队列不是正确的解决方案。想象一下在事务吞吐量峰值时有一个空的目标队列。该队列的后备表根本没有足够的页面来分散页面锁存以容纳这种情况下所需的大量并发写入者。
如果你所有的表都在同一个数据库中,那么传输队列可能会成为瓶颈。但是传输队列的结构与普通队列不同。传输队列有一个聚集索引:
输出
因此,您不会在传输队列上出现热页争用,并且您将拥有与对话对话 (dlgid) 一样多的插入点。
一个普通队列有两个索引,一个聚集索引在
(状态,conversation_group_id,priority,conversation_handle,queuing_order)
和一个非聚集索引
(状态、优先级、queuing_order、conversation_group_id、conversation_handle、service_id)
您可以通过此查询看到
因此,最好将目标服务移动到远程 SQL 实例。这将卸载和写入和读取目标队列,并且可能会减少瓶颈。您的触发器只需要将消息放在传输队列上,这就是您最初认为的情况。
您可以使用扩展事件会话来查看路由和传输队列的使用情况,例如:
Also in your current design and in the remote service option, you can see from the index structures how how reusing the right number dialog conversations can optimize the solution. Too few and you have locking and page contention issues. Too many and you have overhead of creating and managing them, and you can't do message batching. It looks like you've already read Reusing Conversations, and are using a conversation-per-session pattern, which Remus recommends for this pattern. It would be interesting to see which index the page latch contention is on, and whether it's a leaf or non-leaf page. But in any case queue tables with concurrent SEND and RECEIVE don't usually have enough pages to spread out page latch contention.
So the design alternative is to have the triggers drop changes on N intermediate queues, and then have activation procs on those forward the messages to the single destination queue. You may still have waits on the destination queue, but they won't be during your triggers. Also in your intermediate-to-final queue activation procedure you can batch up sends and manage conversations and have many fewer dialog conversations (N), so the receiver can actually fetch 1000 messages per call. A single call to RECEIVE can only fetch messages from a single conversation. So if you have thousands of conversations interleaved, you'll always only fetch single rows.
Or simply have N destination queues and have your readers read from all of them.
There's no fundamental reason why you can't get this working, but it's not going to be simple. The scale is immense, and Service Broker is complex. You should also consider 3rd party solutions here. Qlik (Attunity) has a log-based CDC solution for SQL Server that can harvest all the changes from the SQL Server transaction logs without triggers or queues. And there are several other solutions based on SQL Server Change Data Capture. Change Data Capture will stage all the changes for you, and you just have to consume them from your external program. Change Tracking是最轻量级的解决方案,但不捕获行的中间版本。因此,您知道哪些行发生了更改以及更改是插入、更新还是删除,但您只有当前版本的行要查询。但是这些选项中的每一个都将是昂贵的、棘手的,并且需要大量的测试。