以下是 Hikari 连接设置
spring.datasource.hikari.connection-timeout=600000
spring.datasource.hikari.idle-timeout=300000
spring.datasource.hikari.max-lifetime=1800000
spring.datasource.hikari.maximum-pool-size=15
spring.datasource.hikari.minimum-idle=5
下面是代码,分配与最大连接数相同的线程池大小,将 1M+ 记录列表按 10k 拆分并插入到数据库中,使用 mybatis 批处理会话
ExecutorService executorService = Executors.newFixedThreadPool(15);
List<List<Employee>> listOfEmployees = new ArrayList<>(IntStream.range(0, totalEmployeeList.size()).boxed().collect(
Collectors.groupingBy(e -> e / 10000, Collectors.mapping(totalEmployeeList::get, Collectors.toList()))).values());
List<Callable<Void>> callables = listOfEmployees.stream().map(sublist ->
(Callable<Void>) () -> {
dao.insertEmployees(sublist);
return null;
}).collect(Collectors.toList());
try {
executorService.invokeAll(callables);
executorService.shutdown();
} catch (InterruptedException e) {
log.error("Exception in executing thread save", e);
}
批处理会话代码配置:
@Bean(value = "batchSqlSession")
@Autowired
public SqlSessionTemplate batchSqlSession(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory, ExecutorType.BATCH);
}
DAO:
public class EmployeeDAO {
private final SqlSessionTemplate sqlSessionTemplate;
private final EmployeeMapper employeeMapper;
public EmployeeDAO (EmployeeMapper employeeMapper,
@Qualifier("batchSqlSession") SqlSessionTemplate sqlSessionTemplate) {
this.sqlSessionTemplate = sqlSessionTemplate;
this.employeeMapper= sqlSessionTemplate.getMapper(EmployeeMapper.class);;
}
@Transactional
public void insertEmployees(List<Employee> employeeList) {
employeeList.stream().parallel().forEach(employee-> {
try{
employeeMapper.insertEmployees(employee);
} catch(Exception ex){
log.error("Exception - {} while inserting data {}",ex,employee.toString());
}
});
sqlSessionTemplate.flushStatements();
sqlSessionTemplate.clearCache();
}
当我运行这个时,我得到了
引起原因:org.apache.ibatis.exceptions.PersistenceException:
更新数据库时出错。原因:org.springframework.jdbc.CannotGetJdbcConnectionException:无法更新
获取JDBC连接
错误可能存在于mapper/EmployeeMapper.xml中
该错误可能涉及 com.fmr.qrit.datasync.mapper.EmployeeMapper.insertEmployees
执行更新时发生错误
原因:org.springframework.jdbc.CannotGetJdbcConnectionException:无法获取 JDBC 连接
org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:84)在 org.mybatis.spring.transaction.SpringManagedTransaction.openConnection(SpringManagedTransaction.java:80)在 org.mybatis.spring.transaction.SpringManagedTransaction.getConnection(SpringManagedTransaction.java:67)在 org.apache.ibatis.executor.BaseExecutor.getConnection(BaseExecutor.java:348)在 org.apache.ibatis.executor.BatchExecutor.doUpdate(BatchExecutor.java:70)在 org.apache.ibatis.executor.BaseExecutor.update(BaseExecutor.java:117)在 org.apache.ibatis.executor.CachingExecutor.update(CachingExecutor.java:76)在org.apache.ibatis.session.defaults.DefaultSqlSession.update(DefaultSqlSession.java:197) ... 另外 22 个原因:java.sql.SQLTransientConnectionException:HikariPool-1 - 连接不可用,请求在 30000 毫秒后超时(总计=10、活动=10、空闲=0、等待=6)位于 com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:686)位于 com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:179)位于 com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:144)位于 com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:127)位于org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource.getConnection(AbstractRoutingDataSource.java:213) 在 org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:160) 在 org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:118) 在 org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:81) ... 还有 29 个
我尝试将连接池大小从 10 增加到 15,然后将连接超时从 300000 增加到 600000,但仍然出现同样的错误
我认为问题出在您的 DAO 方法中的 employeeList.stream() .parallel() .forEach() 语句上。在内部,它会拆分列表并并发执行,最后将结果合并。这可能导致过多的并发数据库连接请求,从而导致此问题。
并行流非常适合 CPU 密集型操作,但不建议用于 I/O 密集型操作,例如数据库交互。
请尝试使用简单的employeeList.stream().forEach操作
您已经在执行器服务中生成了 15 个线程。
然后在事务层中再次执行并行流,这将使用其他空闲线程。
在事务块中,如果当前连接正忙,它将从池中获取一个新连接。
由于连接池只有 15 个连接,而寻求连接的线程数却更多,因此并行流将因上述异常而中断。
修复
将会删除每个并行流并降低
类似于