有一个Spring Integration
由多个组成的应用程序integration flows
,并且该应用程序被部署为容器Kubernetes
。容器根据负载自动扩展。
在这些多个集成流中,有一个这样的流会重试来自 DB 的失败记录,并且在任何时间点都只能运行它的一个实例(类似于Quartz
中的集成Spring Batch
)。
这种行为如何实现Spring Integration
?
有一个Spring Integration
由多个组成的应用程序integration flows
,并且该应用程序被部署为容器Kubernetes
。容器根据负载自动扩展。
在这些多个集成流中,有一个这样的流会重试来自 DB 的失败记录,并且在任何时间点都只能运行它的一个实例(类似于Quartz
中的集成Spring Batch
)。
这种行为如何实现Spring Integration
?
@MessagingGateway
允许defaultRequestChannel
使用属性占位符${} 来设置注释:
@MessagingGateway(defaultRequestChannel = "${gateway.request.channel}")
public interface MessageGateway {
@Gateway(requestTimeout = 2000)
void sendListing(List<Path> entries);
}
但我无法对@Gateway注释进行类似操作:
@MessagingGateway
public interface MessageGateway {
@Gateway(requestTimeout = 2000, requestChannel = "${gateway.request.channel}") // invalid
void sendListing(List<Path> entries);
}
类似地,我无法使用属性占位符或 SpEl 动态设置 ServiceActivator inputChannel
。outputChannel
我是否需要ServiceActivator
使用手动配置ServiceActivatingHandler
?
我正在寻找一种带有拆分/聚合的集成流程。我希望捕获处理程序抛出的异常,并逐个迭代拆分的所有元素,即使某些元素失败。当所有元素都抛出异常,并且捕获异常时,流程永远不会结束。请问我遗漏了什么?我如何才能达到聚合并结束此流程?
IntegrationFlow.from(
WebFlux.inboundGateway("/jira/version")
.requestMapping(r -> r.methods(HttpMethod.POST)
.consumes("application/json"))
.requestPayloadType(String.class)
.replyChannel(replyChannel)
.errorChannel(errorChannel)
.mappedRequestHeaders(parameter.getJiraHeaderSignature()))
.handle(versionWebhookHandler)
.split(new VersionIssueSplitter())
.handle(updateVersionHandler, s -> s.advice(advice()))
.aggregate()
.get();
public Advice advice() {
var advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setTrapException(true);
return advice;
}
我想使用它KinesisMessageDrivenChannelAdapter
从 Kinesis 流中读取记录。首次启动消费者应用程序时,我希望它接收流中已存在的所有记录。但在后续启动时,应用程序应继续从来自 DynamoDb 的最新检查点序列号读取。
我的假设是否正确,adapter.setStreamInitialSequence(KinesisShardOffset.trimHorizon())
导致了这种行为?
目前如何KinesisMessageDrivenChannelAdapter
处理多个实例之间的平衡?我希望当我启动另一个实例进行扩展时,碎片分布得比较均匀。
我找到了这张旧票https://github.com/spring-projects/spring-integration-aws/issues/99。 似乎没有重新平衡。
感谢您的意见!
用例是使用 Spring Integration 的 Kafka 延迟重新处理下游失败消息ConcurrentMessageListenerContainer
。假设最大重试次数应为 2,固定延迟为 5 分钟。
Spring Integration 框架中是否有现成的解决方案?
已经过去了DeadLetterPublishingRecoverer
,它只有助于转移到 DLT,除非我们用轮询器监听同一个 DLT,否则不会进行进一步处理。
此问题是 SO 问题的扩展,Spring Integration 将 ReactorContext 恢复为命令式处理
通过 WebFlux 调用后抛出的任何异常WebFluxRequestExecutingMessageHandler
都缺少Trace
信息。
描述该问题的示例可在https://github.com/syedyusufh/simple-handle-reactive.git上找到
这就是正在发生的事情。
WebFluxRequestExecutingMessageHandler
首先抛出的这个异常会通过上下文丢失的地方自动记录例外:
2m2024-06-24T20:27:02.445+04:00 INFO 1436 --- [ scheduling-1] [66799e5608c967d694f450b82077c38c-fadc3f15d16b6b11] c.i.sample.config.TracingConfig : Request Headers: [Content-Type:"application/json", traceparent:"00-66799e5608c967d694f450b82077c38c-fadc3f15d16b6b11-00"]
2024-06-24T20:27:03.997+04:00 INFO 1436 --- [ctor-http-nio-3] [66799e5608c967d694f450b82077c38c-fadc3f15d16b6b11] c.i.sample.config.TracingConfig : Response Headers: [Access-Control-Allow-Origin:"*", Alt-Svc:"h3=":443"; ma=2592000", Content-Type:"application/json", Date:"Mon, 24 Jun 2024 16:27:05 GMT", Server:"Caddy", Vary:"Accept-Encoding", Transfer-Encoding:"chunked"]
2024-06-24T20:27:04.089+04:00 ERROR 1436 --- [oundedElastic-1] [ ] .o.WebFluxRequestExecutingMessageHandler : Failed to send async reply: org.springframework.integration.support.MessageBuilder@5e1691b0
下午好,
我正在使用出站网关来调用可以使用(非 SOAP)XML 并生成(非 SOAP)XML 的服务。我可以编组 JAXB 类来请求 XML,但无法将响应 XML 解组回 JAXB 类,有效负载正文为 null。
流程如下
.subFlowMapping("SomeRequestType", subflow -> subflow
.transform(someRequestTransformer)
.enrichHeaders(header -> header.header("Content-Type","application/xml"))
.handle(someServiceOutboundGateway)
.transform(someResponseTransformer)
)
从someServiceOutboundGateway
@Bean
public HttpMessageConverter m() {
MarshallingHttpMessageConverter c = new MarshallingHttpMessageConverter();
c.setMarshaller(someMarshaller());
c.setUnmarshaller(someUnMarshaller());
return c;
}
@Bean(name="someServiceOutboundGateway")
public MessageHandler someOutboundGateway() {
return Http.outboundGateway(someUrl, lnquiHttp())
.httpMethod(HttpMethod.POST)
.expectedResponseType(Response.class)
.get();
}
@Bean
public RestTemplate lnquiHttp() {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(connections);
connectionManager.setDefaultMaxPerRoute(maxConnectionsPerRoute);
RequestConfig requestConfig = RequestConfig
.custom()
.setConnectionRequestTimeout(timeout) // timeout to get connection from pool
.setSocketTimeout(timeout) // standard connection timeout
.setConnectTimeout(timeout) // standard connection timeout
.build();
HttpClient httpClient = HttpClientBuilder.create()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfig).build();
ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(httpClient);
RestTemplateBuilder restTemplateBuilder = new RestTemplateBuilder();
RestTemplate restTemplate = restTemplateBuilder
.requestFactory(requestFactory)
.basicAuthorization(userName, password)
.messageConverters(m())
.build();
return restTemplate;
}
@Bean
public Marshaller someMarshaller() {
final Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setContextPath(CONTEXTPATH_REQUEST);
marshaller.setSchema(responseSchema);
marshaller.setSupportJaxbElementClass(Boolean.TRUE);
return marshaller;
}
@Bean
public Unmarshaller someUnMarshaller() {
final Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
unmarshaller.setContextPath(CONTEXTPATH_RESPONSE);
unmarshaller.setSchema(responseSchema);
unmarshaller.setSupportJaxbElementClass(Boolean.TRUE);
return unmarshaller;
}
@Bean(name = "someJAXBContext")
public JAXBContext someJAXBContext() throws JAXBException {
return JAXBContext.newInstance(Response.class);
}
从someResponseTransformer
@Component
public class SomeResponseTransformer implements GenericHandler<Object> {
@Override
public Object handle(final Object payload, final Map<String, Object> headers) {
ResponseEntity responseEntity = (ResponseEntity)payload;
Object body = responseEntity.getBody();
Response lnqiResponse = (Response)body;
... = buildHeader(lnqiResponse.getHeader());
当我尝试获取标头 ( lnqiResponse.getHeader()
) 时,我收到 NullPointerException。
知道如何解组对 的响应Response
吗?任何帮助,将不胜感激!谢谢!