Quando eu crio uma IntegrationFlow
configuração, o sendTimeout()
valor on SourcePollingChannelAdapterSpec
está sendo ignorado, fazendo com que o thread de polling bloqueie. Isso não acontece se eu criar um SourcePollingChannelAdapter
programaticamente. Por exemplo, programaticamente com uma @Configuration
classe:
@Configuration
public class JdbcPollingConfig {
@Bean
protected MessageChannel jdbcInboundChannel() {
return new QueueChannel(1);
}
@Bean
public ThreadPoolTaskScheduler jdbcTaskExecutor() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(1);
taskScheduler.initialize();
return taskScheduler;
}
@Bean
protected JdbcPollingChannelAdapter jdbcPollingChannelAdapter(final DataSource dataSource) {
final JdbcPollingChannelAdapter adapter =
new CustomJdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
return adapter;
}
@Bean
protected SourcePollingChannelAdapter sourcePollingChannelAdapter(
final JdbcPollingChannelAdapter adapter,
final MessageChannel jdbcInboundChannel,
final ThreadPoolTaskScheduler jdbcTaskExecutor) {
final SourcePollingChannelAdapter spcAdapter = new SourcePollingChannelAdapter();
spcAdapter.setSource(adapter);
spcAdapter.setOutputChannel(jdbcInboundChannel);
spcAdapter.setSendTimeout(500); // Sets the send-to-channel timeout - throws exception on timeout though.
spcAdapter.setTaskScheduler(jdbcTaskExecutor);
spcAdapter.setTrigger( new PeriodicTrigger(Duration.ofSeconds(2L)));
return spcAdapter;
}
}
Observe que a classe CustomJdbcPollingChannelAdapter
é minha e se estende apenas para que eu possa substituir o doReceive()
método com algumas chamadas de registro:
@Log4j2
public class CustomJdbcPollingChannelAdapter extends JdbcPollingChannelAdapter {
public CustomJdbcPollingChannelAdapter(DataSource dataSource, String selectQuery) {
super(dataSource, selectQuery);
}
public CustomJdbcPollingChannelAdapter(JdbcOperations jdbcOperations, String selectQuery) {
super(jdbcOperations, selectQuery);
}
@Override
protected Object doReceive() {
log.info("Jdbc Polling.");
return super.doReceive();
}
}
Com a fila de canais conforme configurada, essa configuração lança uma exceção conforme o previsto porque não há consumidor do canal jdbcInboundChannel
. Compare isso usando e IntegrationFlow
configuração:
@Configuration
public class JdbcDSLConfig {
@Bean
public QueueChannelSpec jdbcInboundChannel() {
return MessageChannels.queue(1);
}
@Bean
public MessageSource<Object> jdbcMessageSource(final DataSource dataSource) {
return new CustomJdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
}
@Bean
public ThreadPoolTaskExecutor jdbcExecutor() {
final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1);
taskExecutor.setMaxPoolSize(1);
taskExecutor.initialize();
return taskExecutor;
}
@Bean
public IntegrationFlow jdbcInboundFlow(final MessageSource<Object> jdbcMessageSource,
final QueueChannelSpec jdbcInboundChannel,
@Qualifier("jdbcExecutor") final ThreadPoolTaskExecutor jdbcExecutor) {
return IntegrationFlow.from(jdbcMessageSource,
c -> c.poller(Pollers
.fixedDelay(2000)
.sendTimeout(1) // this has no effect.
.taskExecutor(jdbcExecutor)))
.channel(jdbcInboundChannel)
.get();
}
}
Esta configuração bloqueia na segunda pesquisa do banco de dados, mesmo com um sendTimeout definido na configuração.