springbatch的封装与使用实例详解
SpringBatch官网介绍:
Alightweight,comprehensivebatchframeworkdesignedtoenablethedevelopmentofrobustbatchapplicationsvitalforthedailyoperationsofenterprisesystems.(一款轻量的、全面的批处理框架,用于开发强大的日常运营的企业级批处理应用程序。)
springbatch
主要实现批量数据的处理,我对batch进行的封装,提出了jobBase类型,具体job需要实现它即可。SpringBatch不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。
几个组件
•job
•step
•read
•write
•listener
•process
•validator
JobBase定义了几个公用的方法
/** *springBatch的job基础类. */ publicabstractclassJobBase{ /** *批次. */ protectedintchunkCount=5000; /** *监听器. */ privateJobExecutionListenerjobExecutionListener; /** *处理器. */ privateValidatingItemProcessor validatingItemProcessor; /** *job名称. */ privateStringjobName; /** *检验器. */ privateValidator validator; @Autowired privateJobBuilderFactoryjob; @Autowired privateStepBuilderFactorystep; /** *初始化. * *@paramjobNamejob名称 *@paramjobExecutionListener监听器 *@paramvalidatingItemProcessor处理器 *@paramvalidator检验 */ publicJobBase(StringjobName, JobExecutionListenerjobExecutionListener, ValidatingItemProcessor validatingItemProcessor, Validator validator){ this.jobName=jobName; this.jobExecutionListener=jobExecutionListener; this.validatingItemProcessor=validatingItemProcessor; this.validator=validator; } /** *job初始化与启动. */ publicJobgetJob()throwsException{ returnjob.get(jobName).incrementer(newRunIdIncrementer()) .start(syncStep()) .listener(jobExecutionListener) .build(); } /** *执行步骤. * *@return */ publicStepsyncStep()throwsException{ returnstep.get("step1") . chunk(chunkCount) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } /** *单条处理数据. * *@return */ publicItemProcessor processor(){ validatingItemProcessor.setValidator(processorValidator()); returnvalidatingItemProcessor; } /** *校验数据. * *@return */ @Bean publicValidator processorValidator(){ returnvalidator; } /** *批量读数据. * *@return *@throwsException */ publicabstractItemReader reader()throwsException; /** *批量写数据. * *@return */ @Bean publicabstractItemWriter writer(); }
主要规定了公用方法的执行策略,而具体的job名称,读,写还是需要具体JOB去实现的。
具体Job实现
@Configuration @EnableBatchProcessing publicclassSyncPersonJobextendsJobBase{ @Autowired privateDataSourcedataSource; @Autowired @Qualifier("primaryJdbcTemplate") privateJdbcTemplatejdbcTemplate; /** *初始化,规则了job名称和监视器. */ publicSyncPersonJob(){ super("personJob",newPersonJobListener(),newPersonItemProcessor(),newBeanValidator<>()); } @Override publicItemReader reader()throwsException{ StringBuffersb=newStringBuffer(); sb.append("select*fromperson"); Stringsql=sb.toString(); JdbcCursorItemReader jdbcCursorItemReader= newJdbcCursorItemReader<>(); jdbcCursorItemReader.setSql(sql); jdbcCursorItemReader.setRowMapper(newBeanPropertyRowMapper<>(Person.class)); jdbcCursorItemReader.setDataSource(dataSource); returnjdbcCursorItemReader; } @Override @Bean("personJobWriter") publicItemWriter writer(){ JdbcBatchItemWriter writer=newJdbcBatchItemWriter (); writer.setItemSqlParameterSourceProvider(newBeanPropertyItemSqlParameterSourceProvider ()); Stringsql="insertintoperson_export"+"(id,name,age,nation,address)" +"values(:id,:name,:age,:nation,:address)"; writer.setSql(sql); writer.setDataSource(dataSource); returnwriter; } }
写操作需要定义自己的bean的声明
注意,需要为每个job的write启个名称,否则在多job时,write将会被打乱
/** *批量写数据. * *@return */ @Override @Bean("personVerson2JobWriter") publicItemWriterwriter(){ }
添加一个api,手动触发
@Autowired SyncPersonJobsyncPersonJob; @Autowired JobLauncherjobLauncher; voidexec(Jobjob)throwsException{ JobParametersjobParameters=newJobParametersBuilder() .addLong("time",System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(job,jobParameters); } @RequestMapping("/run1") publicStringrun1()throwsException{ exec(syncPersonJob.getJob()); return"personJobsuccess"; }
总结
以上所述是小编给大家介绍的springbatch的封装与使用实例详解,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!