我正在尝试使用 Spring KafkaJsonSerializer
从我的生产者发送 JSON 对象,但在发送与序列化相关的消息时出现异常ProducerRecord
。我期望JsonSerializer
处理消息的序列化,但在尝试序列化时失败ProducerRecord
。这是我的设置:
生产者配置:
import com.app.integration.impl.dto.TransitECS;
import com.app.integration.impl.util.KafkaConstants;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
return Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class
);
}
@Bean
public ProducerFactory<String, TransitECS> transitECSProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, TransitECS> transitECSKafkaTemplate() {
KafkaTemplate<String, TransitECS> kafkaTemplate = new KafkaTemplate<>(transitECSProducerFactory());
kafkaTemplate.setDefaultTopic(KafkaConstants.TRANSIT_ECS_TOPIC);
return kafkaTemplate;
}
}
发件人方式:
@RestController
@RequestMapping("/api/v1/topic")
@RequiredArgsConstructor
public class TopicController {
private final KafkaTemplate<String, TransitECS> transitECSKafkaTemplate;
/**
* This method is used to send the transit ECS to the topic.
*
* @param transitECS The transit ECS to be sent.
*/
@PostMapping("/send/transitECS")
public CompletableFuture<SendResult<String, TransitECS>> sendTransit(@Valid @RequestBody TransitECS transitECS) {
return this.transitECSKafkaTemplate.sendDefault(transitECS);
}
}
异常消息:
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.kafka.clients.producer.ProducerRecord and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.kafka.support.SendResult["producerRecord"])
导致异常的代码:
package com.fasterxml.jackson.databind.ser.impl;
public class UnknownSerializer
extends ToEmptyObjectSerializer // since 2.13
{
...
@Override
public void serialize(Object value, JsonGenerator gen, SerializerProvider ctxt) throws IOException
{
// 27-Nov-2009, tatu: As per [JACKSON-201] may or may not fail...
if (ctxt.isEnabled(SerializationFeature.FAIL_ON_EMPTY_BEANS)) {
failForEmpty(ctxt, value); // <------ IS ENTERING HERE
}
super.serialize(value, gen, ctxt);
}
protected void failForEmpty(SerializerProvider prov, Object value)
throws JsonMappingException {
Class<?> cl = value.getClass();
if (NativeImageUtil.needsReflectionConfiguration(cl)) {
prov.reportBadDefinition(handledType(), String.format(
"No serializer found for class %s and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS). This appears to be a native image, in which case you may need to configure reflection for the class that is to be serialized",
cl.getName()));
} else {
prov.reportBadDefinition(handledType(), String.format(
"No serializer found for class %s and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)",
cl.getName()));
}
}
}
异常表明 Jackson 正在尝试序列化ProducerRecord
包含我的有效负载对象和其他 kafka 内容(标头、密钥等)的内容。关键是,怎么可能org.springframework.kafka.support.serializer.JsonSerializer
不能序列化org.apache.kafka.clients.producer.ProducerRecord
类?
Spring Boot 版本:
3.0.1
任何建议或见解都将不胜感激!
我尝试过的:
- 已禁用,
SerializationFeature.FAIL_ON_EMPTY_BEANS
但没有成功。
@Bean
public ProducerFactory<String, TransitECS> transitECSProducerFactory() {
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
return new DefaultKafkaProducerFactory<>(
producerConfigs(),
new StringSerializer(),
new JsonSerializer<>(mapper)
);
}
[已解决]
我已经决定定义一个 ObjectMapper 类型的 Bean,而不是在 DefaultKafkaProducerFactory Bean 定义期间将其传递给 JsonSerializer 构造函数。