我正在测试一个程序,以验证压缩是否有助于减少主题消息的大小。我的示例主题配置为“max.message.bytes=1024000”,大约为 1MB,在生产者端配置中,我将相同的值设置为“max.request.size”,然后我尝试发送一个大小为 1573015 的字符串,大约为 1.5MB,这引发了低于预期的错误。
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1573015 bytes when
serialized which is larger than 1048576, which is the value of the max.request.size configuration.
接下来,由于我希望在生产者级别负责压缩,我将生产者的压缩配置设置为“zstd”(我也尝试过 gzip),但生产者抛出了相同的错误。我期望压缩配置在发送消息之前将生产者的消息大小减小到 <1MB。
当我在主题级别或生产者级别测试“compression.type”或在主题和生产者上设置compression.type属性时,我也观察到了相同的行为(我想避免在代理级别设置此属性,因为我希望它只对特定主题或该主题的生产者生效)。
我想了解 compression.type 是否真的减少了从生产者发送到 Kafka 代理的消息大小,代理会解压并验证未压缩消息的大小并引发此错误? 或者是因为生产者可能存在配置错误,导致压缩首先没有发生?
如果有人可以阐明与 compression.type 有关的属性 max.request.size 的内部工作原理,我将不胜感激。
使用独立程序,我确实验证了我用于此测试的消息样本可以使用 gzip 和 zstd 压缩到 <1MB。我用于此测试的 kafka 版本是 Confluent Kafka Platform 8.0,它在 Ubuntu WSL 本地的单节点集群上运行。
消息永远不会发送给代理,因为您的生产者将首先进行大小验证。
对于代理,有完全不同的配置 - message.max.bytes “设置代理可以接受的消息的最大大小。默认值为 1 MB。”。在这两种情况下,如果超出限制,
RecordTooLargeException
都会抛出异常,但您的配置包含max.request.size,它指示生产者的配置。文档明确指出生产者负责压缩,但如果主题 compression.type 与生产者 compression.type 不同,则代理可以重新压缩。
https://www.confluence.io/blog/apache-kafka-message-compression/#configuring-compression-type