我们使用Spring将管道分隔的文件ETL转换为DB。文件中的每个记录都有许多字段,并由ClaimNumber标识:
ClaimNumber|AdjustmentVersion|.....
0038017282|3|....
0071517729|3|....
0081517745|3|....在批处理步骤中,使用了一个常规的读写流程:
<step id="stagingDataDump" next="gatherStats">
<tasklet>
<!-- <chunk reader="genericBatchItemReader" writer="genericBatchItemWriter" -->
<chunk reader="genericBatchItemReader" writer="compositeWriter" processor="validationProcessor"
commit-interval="1000" skip-limit="100000" >
<skippable-exception-classes>
<batch:include class="org.springframework.batch.item.file.FlatFileParseException" />
<batch:include class="org.beanio.BeanIOException" />
</skippable-exception-classes>
</chunk>
<listeners>
<listener ref="genericItemSkipListener"/>
</listeners>
</tasklet>
</step>读取器使用BeanIOFlatFileItemReader
<bean id="genericBatchItemReader" class="org.beanio.spring.BeanIOFlatFileItemReader" scope="step"
p:streamMapping="classpath:beanio-mapping.xml"
p:streamName="#{jobParameters[feedProcessorLauncherImpl.BEANIO_STREAM_MAPPING]}"
p:resource="file://#{jobParameters[feedProcessorLauncherImpl.RESOURCE_FILE_NAME_UNENCRYPTED]}"
p:errorHandler-ref="beanIoRecordErrorHandler"/>处理器阶段封装项验证:
<util:map id="handlerRegistryContents">
<entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).MEDI}" value-ref="medicalClaimsValidator"/>
<entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).LAB}" value-ref="labClaimsValidator"/>
<entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).RXPD}" value-ref="pharmaClaimsValidator"/>
</util:map>
<bean id="validationProcessor" class="org.fuwt.iws.claims.validation.springbatch.ValidationProcessor" scope="step">
<property name="handlerRegistry" ref="handlerRegistryContents"/>
</bean> 该写入是复合的:
<bean id="genericBatchItemWriter" class="org.fuwt.iws.claims.springbatch.GenericBatchItemWriter" scope="step"
p:metadataId="#{jobParameters[feedProcessorLauncherImpl.METADATA_ID]}"/>
<bean id="softValidationsItemWriter" class="org.fuwt.iws.claims.springbatch.SoftValidationsItemWriter" scope="step"
p:metadataId="#{jobParameters[feedProcessorLauncherImpl.METADATA_ID]}"/>
<bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter" scope="step">
<property name="delegates">
<list>
<!-- Order here is significant as ID's, which are generated by the first writer - genericBatchItemWriter - need to be passed around -->
<ref bean="genericBatchItemWriter"/>
<ref bean="softValidationsItemWriter"/>
</list>
</property>
</bean> 在上面的处理/验证步骤中,ValidationProcessor确定记录的类型,根据记录实例化适当的复合验证器(在本例中为MedicalClaimsValidator),在其中配置此类型的所有单独验证(组合模式),在下面的日志中配置HCPCSCodeLength。
虽然每个项目(声明)都是通过这个基础结构进行验证的,但是发现的错误被累积到item的errors字段中-- Map<String, Collection<String>>字段中,特定验证的失败由验证名称键决定,并在地图的值中描述。
这使我想到我们所看到的错误行为的描述:
经过仔细的日志文件检查上述测试CSV文件的负载与3个索赔记录,我们发现以下(奇怪!?)每条记录通过验证的次数与文件中序号相同的行为。如下所示:第一条记录(索赔)被验证一次,导致错误地图中的一条记录;第二条记录(索赔)被验证两次,而它的错误映射现在包含重复两次的消息;第三条记录被验证了三次,导致以相同的消息重复三次进入错误地图。
文件中的每个记录在其无效性方面几乎是相同的,因此,预期的结果是每个记录都应该有一个相同的错误集合。
实际结果是,每个后续记录的错误值都会不断增加:
第一纪录:
INFO 2016-06-23 10:16:24,214 [main] org.fuwt.iws.claims.validation.springbatch.medical.MedicalClaimsValidator: Service date to: Thu Dec 10 00:00:00 EST 2015
INFO 2016-06-23 10:16:24,216 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0038017282
INFO 2016-06-23 10:16:24,223 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]} 第二记录
INFO 2016-06-23 10:16:24,227 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0071517729
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0071517729
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}第三记录
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO 2016-06-23 10:16:24,229 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}使用的版本:
spring-batch-core: 2.2.0.RELEASE
beanio: 2.1.0问题:
如果有什么东西使Spring批处理发出对处理器的重复调用,那么Spring批处理的正常行为是什么,以及如何阻止它并实现我前面描述的期望行为呢?
更新:
此验证组件显示错误行为:
@Component("medicalClaimsValidator")
public class MedicalClaimsValidator implements ClaimValidation {
private final static Logger logger = LoggerFactory.getLogger(MedicalClaimsValidator.class);
@Autowired private AbstractMedicalClaimValidation HCPCSCodeLength;
List<ClaimValidation> medicalClaimValidations = new ArrayList<>();
@Override
public boolean supports(Class<?> clazz) {
return QualcareMedicalClaimWeeklyNDT.class.equals(clazz);
}
@Override
public Map<String, Collection<String>> validate(Object item, MessageSource messageSource) {
logger.info("\nSoft-validating the bean...");
QualcareMedicalClaimWeeklyNDT medicalClaim = (QualcareMedicalClaimWeeklyNDT)item;
logger.info("Claim #: {}", medicalClaim.getClaimNumber());
logger.info("Service date from: {}", medicalClaim.getServiceDateFrom());
logger.info("Service date to: {}", medicalClaim.getServiceDateTo());
//TODO: A candidate for externalization into a config file once we have all the known rules
//medicalClaimValidations.add(new ServiceDateFromGreaterThanTo());
//medicalClaimValidations.add(new ProcedureCodeLength());
medicalClaimValidations.add(HCPCSCodeLength/*new HCPCSCodeLength()*/);
//medicalClaimValidations.add(new TypeOfBillPresenseAndLengthForInstitutionalClaims());
//medicalClaimValidations.add(new DischargeStatusPresenseAndLengthForInpatientClaims());
//medicalClaimValidations.add(new DiagnosisCodeFormat());
for(ClaimValidation validation:medicalClaimValidations) {
logger.info("validation type: {}",validation.getClass());
validation.validate(medicalClaim, messageSource);
}
return medicalClaim.getErrors();
}
}以下解决方法隐藏了错误行为:
@Component("medicalClaimsValidator")
public class MedicalClaimsValidator implements ClaimValidation {
private final static Logger logger = LoggerFactory.getLogger(MedicalClaimsValidator.class);
@Autowired @Qualifier("HCPCSCodeLength")private AbstractMedicalClaimValidation HCPCSCodeLength;
@Autowired @Qualifier("serviceDateFromGreaterThanTo")private AbstractMedicalClaimValidation serviceDateFromGreaterThanTo;
@Autowired @Qualifier("procedureCodeLength")private AbstractMedicalClaimValidation procedureCodeLength;
@Autowired @Qualifier("typeOfBillPresenseAndLengthForInstitutionalClaims")private AbstractMedicalClaimValidation typeOfBillPresenseAndLengthForInstitutionalClaims;
@Autowired @Qualifier("dischargeStatusPresenseAndLengthForInpatientClaims")private AbstractMedicalClaimValidation dischargeStatusPresenseAndLengthForInpatientClaims;
@Autowired @Qualifier("diagnosisCodeFormat")private AbstractMedicalClaimValidation diagnosisCodeFormat;
List<ValidationProcessTuple> medicalClaimValidations = new ArrayList<>();
@Override
public boolean supports(Class<?> clazz) {
return QualcareMedicalClaimWeeklyNDT.class.equals(clazz);
}
@Override
public Map<String, Collection<String>> validate(Object item, MessageSource messageSource) {
logger.info("\nSoft-validating the bean...");
QualcareMedicalClaimWeeklyNDT medicalClaim = (QualcareMedicalClaimWeeklyNDT)item;
logger.info("Claim #: {}", medicalClaim.getClaimNumber());
logger.info("Service date from: {}", medicalClaim.getServiceDateFrom());
logger.info("Service date to: {}", medicalClaim.getServiceDateTo());
//TODO: A candidate for externalization into a config file once we have all the known rules
medicalClaimValidations.add(new ValidationProcessTuple(serviceDateFromGreaterThanTo, false));
medicalClaimValidations.add(new ValidationProcessTuple(procedureCodeLength, false));
medicalClaimValidations.add(new ValidationProcessTuple(HCPCSCodeLength, false));
medicalClaimValidations.add(new ValidationProcessTuple(typeOfBillPresenseAndLengthForInstitutionalClaims, false));
medicalClaimValidations.add(new ValidationProcessTuple(dischargeStatusPresenseAndLengthForInpatientClaims, false));
medicalClaimValidations.add(new ValidationProcessTuple(diagnosisCodeFormat, false));
for (ValidationProcessTuple tuple : medicalClaimValidations) {
if (!tuple.processed) {//to counteract the erroneous behavior whereby validation calls get repeated as many times as there are records
tuple.validation.validate(item, messageSource);
tuple.processed = true;
}
}
return medicalClaim.getErrors();
}
}我仍然不知道为什么这种行为发生在第一位-任何解释肯定是受欢迎的。
发布于 2016-06-24 16:49:53
在MedicalClaimsValidator类中,为什么要在validate()方法中积累这个列表中的读项?我不知道是什么原因。每次处理新行时,它都会一直在此列表中添加已处理的元组。可以在init方法或构造函数中定义方法外部的处理规则。
//TODO: A candidate for externalization into a config file once we have all the known rules
medicalClaimValidations.add(new ValidationProcessTuple(serviceDateFromGreaterThanTo, false));
medicalClaimValidations.add(new ValidationProcessTuple(procedureCodeLength, false));
medicalClaimValidations.add(new ValidationProcessTuple(HCPCSCodeLength, false));
medicalClaimValidations.add(new ValidationProcessTuple(typeOfBillPresenseAndLengthForInstitutionalClaims, false));
medicalClaimValidations.add(new ValidationProcessTuple(dischargeStatusPresenseAndLengthForInpatientClaims, false));
medicalClaimValidations.add(new ValidationProcessTuple(diagnosisCodeFormat, false));https://stackoverflow.com/questions/37996187
复制相似问题