当我创建IntegrationFlow
设置时sendTimeout()
,值SourcePollingChannelAdapterSpec
被忽略,导致轮询线程阻塞。如果我以编程方式创建,则不会SourcePollingChannelAdapter
发生这种情况。例如,以编程方式使用@Configuration
类:
@Configuration
public class JdbcPollingConfig {
@Bean
protected MessageChannel jdbcInboundChannel() {
return new QueueChannel(1);
}
@Bean
public ThreadPoolTaskScheduler jdbcTaskExecutor() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(1);
taskScheduler.initialize();
return taskScheduler;
}
@Bean
protected JdbcPollingChannelAdapter jdbcPollingChannelAdapter(final DataSource dataSource) {
final JdbcPollingChannelAdapter adapter =
new CustomJdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
return adapter;
}
@Bean
protected SourcePollingChannelAdapter sourcePollingChannelAdapter(
final JdbcPollingChannelAdapter adapter,
final MessageChannel jdbcInboundChannel,
final ThreadPoolTaskScheduler jdbcTaskExecutor) {
final SourcePollingChannelAdapter spcAdapter = new SourcePollingChannelAdapter();
spcAdapter.setSource(adapter);
spcAdapter.setOutputChannel(jdbcInboundChannel);
spcAdapter.setSendTimeout(500); // Sets the send-to-channel timeout - throws exception on timeout though.
spcAdapter.setTaskScheduler(jdbcTaskExecutor);
spcAdapter.setTrigger( new PeriodicTrigger(Duration.ofSeconds(2L)));
return spcAdapter;
}
}
请注意,该类CustomJdbcPollingChannelAdapter
是我自己的,它只是进行了扩展,以便我可以doReceive()
使用一些日志调用来覆盖该方法:
@Log4j2
public class CustomJdbcPollingChannelAdapter extends JdbcPollingChannelAdapter {
public CustomJdbcPollingChannelAdapter(DataSource dataSource, String selectQuery) {
super(dataSource, selectQuery);
}
public CustomJdbcPollingChannelAdapter(JdbcOperations jdbcOperations, String selectQuery) {
super(jdbcOperations, selectQuery);
}
@Override
protected Object doReceive() {
log.info("Jdbc Polling.");
return super.doReceive();
}
}
在配置了通道队列的情况下,此配置会按预期抛出异常,因为没有通道的消费者jdbcInboundChannel
。对比一下使用和IntegrationFlow
配置:
@Configuration
public class JdbcDSLConfig {
@Bean
public QueueChannelSpec jdbcInboundChannel() {
return MessageChannels.queue(1);
}
@Bean
public MessageSource<Object> jdbcMessageSource(final DataSource dataSource) {
return new CustomJdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
}
@Bean
public ThreadPoolTaskExecutor jdbcExecutor() {
final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1);
taskExecutor.setMaxPoolSize(1);
taskExecutor.initialize();
return taskExecutor;
}
@Bean
public IntegrationFlow jdbcInboundFlow(final MessageSource<Object> jdbcMessageSource,
final QueueChannelSpec jdbcInboundChannel,
@Qualifier("jdbcExecutor") final ThreadPoolTaskExecutor jdbcExecutor) {
return IntegrationFlow.from(jdbcMessageSource,
c -> c.poller(Pollers
.fixedDelay(2000)
.sendTimeout(1) // this has no effect.
.taskExecutor(jdbcExecutor)))
.channel(jdbcInboundChannel)
.get();
}
}
即使在配置中设置了 sendTimeout,此配置也会在数据库的第二次轮询时阻塞。