AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / dba / 问题 / 344924
Accepted
Martin Smith
Martin Smith
Asked: 2025-01-26 03:28:24 +0800 CST2025-01-26 03:28:24 +0800 CST 2025-01-26 03:28:24 +0800 CST

JSON 数组是否可以以流式方式作为存储过程参数发送?

  • 772

使用数据库设置

CREATE TYPE dbo.TableType AS TABLE (
prop1 int, 
prop2 datetime2, 
prop3 varchar(1000)
);


GO

CREATE OR ALTER PROC dbo.TestTableTypePerf
@Data dbo.TableType READONLY
AS
SELECT COUNT(*)
FROM @Data;

以下代码以流式传输方式传递 TVP 值。可枚举值直到调用后才被评估ExecuteScalarAsync,并且不需要将全部 5,000,000 个元素具体化为客户端中的集合。

越来越流行的是放弃 TVP 而选择 JSON 字符串,对于 JSON 能做类似的事情吗?

using System.Data;
using Microsoft.Data.SqlClient;
using Microsoft.Data.SqlClient.Server;

const string connectionString =
    @"...";

await TvpTest();

return;

static async Task TvpTest()
{
    await using var conn = new SqlConnection(connectionString);
    await conn.OpenAsync();
    await using var cmd = new SqlCommand("dbo.TestTableTypePerf", conn);
    cmd.CommandType = CommandType.StoredProcedure;

    cmd.Parameters.Add(new SqlParameter
    {
        ParameterName = "@Data",
        SqlDbType = SqlDbType.Structured,
        TypeName = "dbo.TableType",
        Value = GetEnumerableOfRandomSqlDataRecords(5_000_000)
    });

    Console.WriteLine($"calling ExecuteScalarAsync at {DateTime.Now:O}");
    var result = await cmd.ExecuteScalarAsync();

    Console.WriteLine($"writing result at {DateTime.Now:O}");
    Console.WriteLine(result);
}

static IEnumerable<SqlDataRecord> GetEnumerableOfRandomSqlDataRecords(uint length)
{
    SqlMetaData[] metaData =
    [
        new SqlMetaData("prop1", SqlDbType.Int),
        new SqlMetaData("prop2", SqlDbType.DateTime2),
        new SqlMetaData("prop3", SqlDbType.VarChar, 1000)
    ];

    foreach (var dto in GetEnumerableOfRandomDto(length))
    {
        var record = new SqlDataRecord(metaData);
        record.SetInt32(0, dto.Prop1);
        record.SetDateTime(1, dto.Prop2);
        record.SetString(2, dto.Prop3);

        yield return record;
    }
}


static IEnumerable<Dto> GetEnumerableOfRandomDto(uint length)
{
    var rnd = new Random();

    for (var i = 0; i < length; i++)
    {
        yield return new Dto(rnd.Next(1, int.MaxValue), 
                             DateTime.Now.AddMinutes(rnd.Next(1, 10000)),
                             Guid.NewGuid().ToString()
                             );

        if ((i + 1) % 100_000 == 0)
            Console.WriteLine($"Generated enumerable {i + 1} at {DateTime.Now:O}");
    }
}


internal record Dto(int Prop1, DateTime Prop2, string Prop3);
sql-server
  • 2 2 个回答
  • 219 Views

2 个回答

  • Voted
  1. Best Answer
    Charlieface
    2025-03-30T09:41:44+08:002025-03-30T09:41:44+08:00

    我已经对您的 JSON 测试进行了更高效、更简单的版本,并使用 BenchmarkDotNet(.NET 基准测试中的黄金标准)将其与 TVP 测试进行了比较。

    改进:

    • 我们对 SqlClient 使用同步 API,因为异步 API 目前存在严重的性能缺陷。
    • 为了确保我们确实读取了所有三列,我SELECT DISTINCT在所有列上添加了派生子查询。
    [Params(1, 100, 1000, 50000)]
    public uint Length { get; set;}
    
    [Benchmark]
    public int JsonTest()
    {
        const string query = """
                                SELECT COUNT(*)
                                FROM  (
                                    SELECT DISTINCT Prop1, Prop2, Prop3
                                    FROM OPENJSON(@JsonData)
                                    WITH (
                                        Prop1 int,
                                        Prop2 datetime2(7),
                                        Prop3 nvarchar(1000)
                                    ) j
                                ) t;
                             """;
        using var conn = new SqlConnection(connectionString);
        conn.Open();
        using var cmd = new SqlCommand(query, conn);
        cmd.CommandTimeout = 0;
    
        var data = GetEnumerableOfRandomDto(Length);
        using var reader = new StreamReader(GetJsonStream(data));
        cmd.Parameters.Add("@JsonData", SqlDbType.NVarChar, -1).Value = reader;
    
        return (int)cmd.ExecuteScalar();
    }
    
    • DTO 是使用prop2和的相同值创建的prop3,以避免Random和字符串插值的开销。
    static IEnumerable<Dto> GetEnumerableOfRandomDto(uint length)
    {
        var rnd = new Random();
        var s = Guid.NewGuid().ToString();
        var dt = DateTime.Now.AddMinutes(rnd.Next(1, 10000));
    
        for (var i = 0; i < length; i++)
        {
            yield return new Dto(i,
                dt,
                s
            );
        }
    }
    
    • TextReader使用新的API得到的结果System.IO.Pipelines如下:
      • 创建一个Pipe。
      • 在单独的线程上,尽可能快地将 DTO 推送到管道写入器。
      • 将管道读取器作为 并将Stream其传递给StreamReader。
      • 这避免了必须使用整个假直通Stream并发症,因为它Pipe为我们完成了这一切。
    public static Stream GetJsonStream<T>(T obj, JsonSerializerOptions? options = null)
    {
        var pipe = new Pipe();
        _ = Task.Run(() =>
        {
            try
            {
                var utf8writer = new Utf8JsonWriter(pipe.Writer);
                JsonSerializer.Serialize(utf8writer, obj);
                pipe.Writer.Complete();
            }
            catch(Exception ex)
            {
                pipe.Writer.Complete(ex);
            }
        });
        var stream = pipe.Reader.AsStream();
        return stream;
    }
    
    • 对于TvpTest,您不应重新创建。与地球上SqlDataRecord的所有其他用法相反,建议重复使用。注意,比例是明确指定的。IEnumerableSqlDataRecordDateTime2
    static IEnumerable<SqlDataRecord> GetRecordsOfRandomDto(uint length)
    {
        var list = GetEnumerableOfRandomDto(length);
        var record = new SqlDataRecord(
            new("prop1", SqlDbType.Int),
            new("prop2", SqlDbType.DateTime2, 0, 7),
            new("prop3", SqlDbType.NVarChar, 1000));
    
        foreach (var dto in list)
        {
            record.SetInt32(0, dto.Prop1);
            record.SetDateTime(1, dto.Prop2);
            record.SetString(2, dto.Prop3);
            yield return record;
        }
    }
    

    BenchmarkDotNet 结果如下。您可以看到,对于非常小的数据集,使用 JSON 的速度要快得多,但 TVP 在上限时速度会大幅落后。因此,最好只在数据集可能达到 1000 行时使用 TVP,因为启动成本明显较大。

    方法 长度 意思是 错误 标准差 中位数 Gen0 第一代 已分配
    Json测试 1 235.5 微秒 9.78 微秒 27.41 微秒 229.0 微秒 2.9297 - 10.07 千字节
    测试 1 1,014.8 微秒 36.34 微秒 102.49 微秒 991.1 微秒 3.9063 - 12.26 千字节
    Json测试 100 929.4 微秒 21.54 微秒 60.03 微秒 921.2 微秒 3.9063 - 16.19 千字节
    测试 100 1,283.3 微秒 58.77 微秒 168.62 微秒 1,230.0 微秒 5.8594 - 21.54 千字节
    Json测试 1000 5,925.9 μs 117.72 微秒 243.12 微秒 5,898.3 微秒 15.6250 - 54.16 千字节
    测试 1000 2,449.4 微秒 43.35 微秒 96.07 微秒 2,424.2 微秒 31.2500 - 106.3 千字节
    Json测试 50000 398,435.2 微秒 7,684.96 微秒 11,021.55 微秒 397,393.5 微秒 1000.0000 - 6877.39 千字节
    测试 50000 47,100.7 微秒 831.21 微秒 736.85 微秒 47,121.9 微秒 909.0909 545.4545 5045.5 千字节

    我还对两个版本中的单个Prop1 int列进行了测试,并对每个列进行了查询SELECT MAX(Prop1) FROM。结果很有趣:TVP 版本现在即使对于 50000 行也使用更少的内存,但 JSON 使用的内存相同,并且高行数时的速度差异也更大。

    方法 长度 意思是 错误 标准差 中位数 Gen0 第一代 已分配
    Json测试 1 249.2 微秒 13.82微秒 38.06 微秒 246.8 微秒 2.9297 - 10.07 千字节
    测试 1 1,003.7 微秒 39.79 微秒 110.26 微秒 984.1 微秒 0.9766 - 5.33 千字节
    Json测试 100 743.7 微秒 19.87 微秒 54.05 微秒 738.7 微秒 4.8828 - 16.21 千字节
    测试 100 966.3 微秒 28.30 微秒 76.98 微秒 937.2 微秒 0.9766 - 5.33 千字节
    Json测试 1000 4,167.8 微秒 83.25 微秒 197.85 微秒 4,179.7 微秒 15.6250 - 54.16 千字节
    测试 1000 1,413.6 微秒 21.73 微秒 22.32微秒 1,415.0 微秒 - - 5.33 千字节
    Json测试 50000 282,662.2 微秒 5,414.21 微秒 13,780.90 μs 279,611.1 微秒 1000.0000 500.0000 6874.92 千字节
    测试 50000 19,903.0微秒 393.06 微秒 482.71 微秒 19,872.4 微秒 - - 5.47 千字节

    进一步说明:

    • 使用StreamReader确实存在从 UTF-8 转换为 UTF-16 的缺点。我确实看到在某些情况下通过使用varbinary并将 UTF-8Stream直接写入服务器,然后在CAST服务器端将其转换为varchar并转换为,会略有改善nvarchar,但并不一致。不幸的是,SqlClient 不接受StreamUTF-8 字节的值varchar。
    • 更不幸的是,没有办法用 来System.Text.Json编写 UTF-16,而OPENJSON只接受nvarchar,因此转换时几乎肯定会损失一些效率。
    • 我也尝试过使用 XML 和.nodes各种.values方法。在所有情况下,它们都不如其他两种方法有效。
      • XmlReader需要string传入值,这意味着转换后的int和DateTime值会生成大量字符串。对于仅涉及字符串的用例,这仍然是一种选择。
      • XmlSerializer将仅使用一个XmlReader输出。
      • XDocument可以生成一个XmlReader,但是您仍然需要生成所有这些字符串。
      • 我尝试使用 手动生成 UTF-8 和 UTF-16 格式的 XML 并使用 读取Pipe,StreamReader以及Stream直接将其作为 传递并CAST在服务器上执行。

    This was tested using SqlClient v6.0.1, and LocalDB 2019 RTM-CU29-GDR. I did not test using TDS v8.0, but some improvements have been made there to streaming large unknown-length parameters.

    • 5
  2. Martin Smith
    2025-01-26T03:28:24+08:002025-01-26T03:28:24+08:00

    主题示例:流式传输到 SQL Server确实展示了如何从流中设置参数。

    虽然System.Text.Json可以序列化为流,但对于我来说,是否/如何可以让它序列化为可枚举,并将可枚举的评估推迟到用作等效存储过程的参数值为止,并不清楚。

    CREATE OR ALTER PROC dbo.TestJsonPerf
    @JsonData NVARCHAR(MAX)
    AS
    
    SELECT COUNT(*)
    FROM OPENJSON(@JsonData)
    WITH (Prop1 int, Prop2 datetime2, Prop3 varchar(1000))
    

    不过,我确实在这个 StackOverflow 答案中找到了可能的解决方案。下面的解决方案(现在非常松散)基于此,并将对可枚举的评估推迟到调用之后ExecuteScalarAsync。客户端进程中的内存始终很低。

    ⚠️ 但这种方法的性能仍然比使用 TVP 差。在我的本地机器上使用问题中的 TVP 代码,运行 500 万行数据通常需要 8 到 9 秒,而 JSON 流版本则需要大约 45 秒。其中大约 25 秒似乎花在发送参数值上,因此参数传递和查询执行时间都差得多!

    参数传递代码可能仍然可以进行一些性能改进,但这不会有助于查询性能。

    TVP 查询在我的计算机上以批处理模式以 DOP 4 运行,查询存储中显示的每个持续时间平均为 80 毫秒,而 JSON 查询则获得串行计划,平均持续时间 > 20 秒。但实际上,不太可能有人会将 500 万个 JSON 元素发送到 SQL Server 只是为了计算它们,在更现实的用例中,差异可能不那么明显。

    using System.Data;
    using System.Text;
    using System.Text.Json;
    using Microsoft.Data.SqlClient;
    
    const string connectionString =
        @"...";
    
    await JsonTest();
    
    return;
    
    static async Task JsonTest()
    {
        await using var conn = new SqlConnection(connectionString);
        await conn.OpenAsync();
        await using var cmd = new SqlCommand("dbo.TestJsonPerf", conn);
        cmd.CommandType = CommandType.StoredProcedure;
        cmd.CommandTimeout = 300;
    
    
        var data = GetEnumerableOfRandomDto(5_000_000);
        var stream = new JsonArrayStream<Dto>(data);
    
        cmd.Parameters.Add("@JsonData", SqlDbType.NVarChar, -1).Value =
            new StreamReader(stream, Encoding.UTF8, bufferSize: 1_048_576);
    
        Console.WriteLine($"calling ExecuteScalarAsync at {DateTime.Now:O}");
        var result = await cmd.ExecuteScalarAsync();
    
        Console.WriteLine($"writing result at {DateTime.Now:O}. Result was {result}");
    }
    
    static IEnumerable<Dto> GetEnumerableOfRandomDto(uint length)
    {
        var rnd = new Random();
    
        for (var i = 0; i < length; i++)
        {
            yield return new Dto(i,
                DateTime.UtcNow.AddMinutes(rnd.Next(1, 10000)),
                Guid.NewGuid().ToString()
            );
    
            if ((i + 1) % 100_000 == 0)
                Console.WriteLine($"Generated enumerable {i + 1} at {DateTime.Now:O}");
        }
    }
    
    
    internal record Dto(int Prop1, DateTime Prop2, string Prop3);
    
    public class JsonArrayStream<T> : Stream, IDisposable
    {
        private readonly IEnumerator<T> _input;
        private bool _arrayClosed;
    
        private byte[] _bytesToAppendNextRead = [(byte) '['];
        private bool _firstElement = true;
        private bool _disposed;
    
        public JsonArrayStream(IEnumerable<T> input)
        {
            _input = input.GetEnumerator();
        }
    
        public override bool CanRead => true;
        public override bool CanSeek => false;
        public override bool CanWrite => false;
        public override long Length => 0;
        public override long Position { get; set; }
    
        void IDisposable.Dispose()
        {
            if (_disposed)
                return;
            _disposed = true;
            _input.Dispose();
        }
    
        public override int Read(byte[] buffer, int offset, int count)
        {
            var bytesWrittenThisRead = 0;
    
            if (!TryAppendToBuffer(_bytesToAppendNextRead, buffer, ref offset, count, ref bytesWrittenThisRead))
                return bytesWrittenThisRead;
    
      
            while (_input.MoveNext())
            {
                if (!_firstElement)
                {
                    //regardless of result of this call will proceed to the next step to ensure bytes for _input.Current are saved before MoveNext called
                    TryAppendToBuffer([(byte) ','], buffer, ref offset, count, ref bytesWrittenThisRead);
                }
    
                _firstElement = false;
    
                if (!TryAppendToBuffer(JsonSerializer.SerializeToUtf8Bytes(_input.Current), buffer, ref offset, count, ref bytesWrittenThisRead))
                    return bytesWrittenThisRead;
            }
    
            if (!_arrayClosed)
            {
                TryAppendToBuffer([(byte)']'], buffer, ref offset, count, ref bytesWrittenThisRead);
                _arrayClosed = true;
            }
    
            return bytesWrittenThisRead;
        }
    
        //Appends to stream buffer if possible. If no capacity saves any excess in _bytesToAppendNextRead
        private bool TryAppendToBuffer(byte[] bytesToAppend, byte[] buffer, ref int offset, int count,
            ref int bytesWrittenThisRead)
        {
            var bytesToAppendLength = bytesToAppend.Length;
    
            if (bytesToAppendLength > 0)
            {
                var lengthToWrite = Math.Min(bytesToAppendLength, count - bytesWrittenThisRead);
                Buffer.BlockCopy(bytesToAppend, 0, buffer, offset, lengthToWrite);
                offset += lengthToWrite;
                bytesWrittenThisRead += lengthToWrite;
    
                if (bytesToAppendLength > lengthToWrite)
                {
                    _bytesToAppendNextRead = _bytesToAppendNextRead.Length == 0 ? 
                        bytesToAppend[lengthToWrite..] : 
                        _bytesToAppendNextRead.Concat(bytesToAppend[lengthToWrite..]).ToArray();
    
                    return false;
                }
    
                _bytesToAppendNextRead = [];
            }
    
            return true;
        }
    
        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }
    
        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }
    
        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }
    
        public override void Flush()
        {
            throw new NotSupportedException();
        }
    }
    
    • 4

相关问题

  • SQL Server - 使用聚集索引时如何存储数据页

  • 我需要为每种类型的查询使用单独的索引,还是一个多列索引可以工作?

  • 什么时候应该使用唯一约束而不是唯一索引?

  • 死锁的主要原因是什么,可以预防吗?

  • 如何确定是否需要或需要索引

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    连接到 PostgreSQL 服务器:致命:主机没有 pg_hba.conf 条目

    • 12 个回答
  • Marko Smith

    如何让sqlplus的输出出现在一行中?

    • 3 个回答
  • Marko Smith

    选择具有最大日期或最晚日期的日期

    • 3 个回答
  • Marko Smith

    如何列出 PostgreSQL 中的所有模式?

    • 4 个回答
  • Marko Smith

    列出指定表的所有列

    • 5 个回答
  • Marko Smith

    如何在不修改我自己的 tnsnames.ora 的情况下使用 sqlplus 连接到位于另一台主机上的 Oracle 数据库

    • 4 个回答
  • Marko Smith

    你如何mysqldump特定的表?

    • 4 个回答
  • Marko Smith

    使用 psql 列出数据库权限

    • 10 个回答
  • Marko Smith

    如何从 PostgreSQL 中的选择查询中将值插入表中?

    • 4 个回答
  • Marko Smith

    如何使用 psql 列出所有数据库和表?

    • 7 个回答
  • Martin Hope
    Jin 连接到 PostgreSQL 服务器:致命:主机没有 pg_hba.conf 条目 2014-12-02 02:54:58 +0800 CST
  • Martin Hope
    Stéphane 如何列出 PostgreSQL 中的所有模式? 2013-04-16 11:19:16 +0800 CST
  • Martin Hope
    Mike Walsh 为什么事务日志不断增长或空间不足? 2012-12-05 18:11:22 +0800 CST
  • Martin Hope
    Stephane Rolland 列出指定表的所有列 2012-08-14 04:44:44 +0800 CST
  • Martin Hope
    haxney MySQL 能否合理地对数十亿行执行查询? 2012-07-03 11:36:13 +0800 CST
  • Martin Hope
    qazwsx 如何监控大型 .sql 文件的导入进度? 2012-05-03 08:54:41 +0800 CST
  • Martin Hope
    markdorison 你如何mysqldump特定的表? 2011-12-17 12:39:37 +0800 CST
  • Martin Hope
    Jonas 如何使用 psql 对 SQL 查询进行计时? 2011-06-04 02:22:54 +0800 CST
  • Martin Hope
    Jonas 如何从 PostgreSQL 中的选择查询中将值插入表中? 2011-05-28 00:33:05 +0800 CST
  • Martin Hope
    Jonas 如何使用 psql 列出所有数据库和表? 2011-02-18 00:45:49 +0800 CST

热门标签

sql-server mysql postgresql sql-server-2014 sql-server-2016 oracle sql-server-2008 database-design query-performance sql-server-2017

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve