我在项目中使用Spring Boot (v2.7.1) + Batch,采用 XML 方法。在此示例中,我读取 FlatFile,FlatFileItemReader
然后利用对ClassifierCompositeItemWriter
项目进行分类,并在达到特定阈值时借助MultiResourceItemWriter
创建文件的多个版本。
以下是错误信息-
错误:
org.springframework.dao.OptimisticLockingFailureException: Attempt to update step execution id=1 with wrong version (1), where current version is 3
at org.springframework.batch.core.repository.dao.MapStepExecutionDao.updateStepExecution(MapStepExecutionDao.java:110) ~[spring-batch-core-4.3.6.jar:4.3.6]
at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:204) ~[spring-batch-core-4.3.6.jar:4.3.6]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.21.jar:5.3.21]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.21.jar:5.3.21]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.21.jar:5.3.21]
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.21.jar:5.3.21]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.21.jar:5.3.21]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.21.jar:5.3.21]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.21.jar:5.3.21]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.21.jar:5.3.21]
at com.sun.proxy.$Proxy44.update(Unknown Source) ~[na:na]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:457) ~[spring-batch-core-4.3.6.jar:4.3.6]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.3.6.jar:4.3.6]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.21.jar:5.3.21]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.3.6.jar:4.3.6]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.3.6.jar:4.3.6]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.3.6.jar:4.3.6]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
代码.spring-batch-context.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
<!-- JobRepository and JobLauncher are configuration/setup classes -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="4"/>
<property name="maxPoolSize" value="4"/>
</bean>
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
<bean id="suffixCreator" class="com.example.EmployeeResourceSuffixCreator"/>
<batch:job id="employeeJob">
<batch:step id="step1">
<batch:tasklet transaction-manager="transactionManager" task-executor="taskExecutor">
<batch:chunk reader="flatFileItemReader" writer="classifierCompositeWriter" commit-interval="20" >
<batch:streams>
<batch:stream ref="javaSyncSW"/>
<batch:stream ref="pythonSyncSW"/>
<batch:stream ref="cloudSyncSW"/>
</batch:streams>
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="classpath:employee.csv"/>
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="empId,firstName,lastName,role"/>
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.example.EmployeeFieldSetMapper"/>
</property>
</bean>
</property>
</bean>
<bean id="classifierCompositeWriter" class="org.springframework.batch.item.support.ClassifierCompositeItemWriter">
<property name="classifier" ref="employeeClassifier"/>
</bean>
<bean id="employeeClassifier" class="com.example.EmployeeClassifier">
<constructor-arg index="0" ref="javaSyncSW"/>
<constructor-arg index="1" ref="pythonSyncSW"/>
<constructor-arg index="2" ref="cloudSyncSW"/>
</bean>
<bean id="javaWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file:javaDeveloper-employee.csv"/>
<property name="shouldDeleteIfExists" value="true"/>
<property name="shouldDeleteIfEmpty" value="true"/>
<property name="appendAllowed" value="true"/>
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value=","/>
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="empId,firstName,lastName,role"/>
</bean>
</property>
</bean>
</property>
</bean>
<bean id="javaMultiResource" class="org.springframework.batch.item.file.MultiResourceItemWriter">
<property name="name" value="javaMulti"/>
<property name="resource" value="file:javaDeveloper-employee.csv"/>
<property name="itemCountLimitPerResource" value="5"/>
<property name="resourceSuffixCreator" ref="suffixCreator"/>
<property name="delegate" ref="javaWriter"/>
</bean>
<bean id="javaSyncSW" class="org.springframework.batch.item.support.SynchronizedItemStreamWriter">
<property name="delegate" ref="javaMultiResource" />
</bean>
<bean id="pythonWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file:pythonDeveloper-employee.csv"/>
<property name="shouldDeleteIfExists" value="true"/>
<property name="shouldDeleteIfEmpty" value="true"/>
<property name="appendAllowed" value="true"/>
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value=","/>
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="empId,firstName,lastName,role"/>
</bean>
</property>
</bean>
</property>
</bean>
<bean id="pythonMultiResource" class="org.springframework.batch.item.file.MultiResourceItemWriter">
<property name="name" value="javaMulti"/>
<property name="resource" value="file:pythonDeveloper-employee.csv"/>
<property name="itemCountLimitPerResource" value="5"/>
<property name="resourceSuffixCreator" ref="suffixCreator"/>
<property name="delegate" ref="pythonWriter"/>
</bean>
<bean id="pythonSyncSW" class="org.springframework.batch.item.support.SynchronizedItemStreamWriter">
<property name="delegate" ref="pythonMultiResource" />
</bean>
<bean id="cloudWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file:cloudDeveloper-employee.csv"/>
<property name="shouldDeleteIfExists" value="true"/>
<property name="shouldDeleteIfEmpty" value="true"/>
<property name="appendAllowed" value="true"/>
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value=","/>
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="empId,firstName,lastName,role"/>
</bean>
</property>
</bean>
</property>
</bean>
<bean id="cloudMultiResource" class="org.springframework.batch.item.file.MultiResourceItemWriter">
<property name="name" value="javaMulti"/>
<property name="resource" value="file:cloudDeveloper-employee.csv"/>
<property name="itemCountLimitPerResource" value="5"/>
<property name="resourceSuffixCreator" ref="suffixCreator"/>
<property name="delegate" ref="cloudWriter"/>
</bean>
<bean id="cloudSyncSW" class="org.springframework.batch.item.support.SynchronizedItemStreamWriter">
<property name="delegate" ref="cloudMultiResource" />
</bean>
</beans>
员工.java
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Employee {
private String empId;
private String firstName;
private String lastName;
private String role;
@Override
public String toString() {
return empId + "," + firstName + "," + lastName + "," + role;
}
}
员工分类器.java
@Setter
public class EmployeeClassifier implements Classifier<Employee, ItemWriter<? super Employee>> {
private static final long serialVersionUID = 1L;
private ItemWriter<Employee> javaDeveloperFileItemWriter;
private ItemWriter<Employee> pythonDeveloperFileItemWriter;
private ItemWriter<Employee> cloudDeveloperFileItemWriter;
public EmployeeClassifier(ItemWriter<Employee> javaDeveloperFileItemWriter,
ItemWriter<Employee> pythonDeveloperFileItemWriter,
ItemWriter<Employee> cloudDeveloperFileItemWriter) {
this.javaDeveloperFileItemWriter = javaDeveloperFileItemWriter;
this.pythonDeveloperFileItemWriter = pythonDeveloperFileItemWriter;
this.cloudDeveloperFileItemWriter = cloudDeveloperFileItemWriter;
}
@Override
public ItemWriter<? super Employee> classify(Employee employee) {
if(employee.getRole().equals("Java Developer")){
return javaDeveloperFileItemWriter;
}
else if(employee.getRole().equals("Python Developer")){
return pythonDeveloperFileItemWriter;
}
return cloudDeveloperFileItemWriter;
}
}
EmployeeFieldSetMapper.java
public class EmployeeFieldSetMapper implements FieldSetMapper<Employee> {
@Override
public Employee mapFieldSet(FieldSet fieldSet) throws BindException {
return Employee.builder()
.empId(fieldSet.readRawString("empId"))
.firstName(fieldSet.readRawString("firstName"))
.lastName(fieldSet.readRawString("lastName"))
.role(fieldSet.readRawString("role"))
.build();
}
}
主应用程序
@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchMultiresourceClassifierXmlApplication implements CommandLineRunner{
public static void main(String[] args) {
SpringApplication.run(SpringBatchMultiresourceClassifierXmlApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch-context.xml");
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("employeeJob");
try {
JobExecution execution = jobLauncher.run(job, new JobParameters());
System.out.println("Job Exit Status : "+ execution.getStatus());
} catch (JobExecutionException e) {
System.out.println("Job ExamResult failed");
e.printStackTrace();
}
}
}
它
FlatFileItemReader
不是线程安全的(请参阅其 javadoc),并且您正在多线程步骤中使用它。这就是线程重叠并导致乐观锁定失败的原因。您需要同步对读取器和写入器的访问才能使其正常工作。一种方法是将读取器/写入器包装在
SynchronizedItemStreamReader
/中SynchronizedItemStreamWriter
。谢谢@Mahmoud Ben Hassine
这是现在的工作代码