基于Spring Batch向Elasticsearch批量导入数据示例
1.介绍
当系统有大量数据需要从数据库导入Elasticsearch时,使用SpringBatch可以提高导入的效率。SpringBatch使用ItemReader分页读取数据,ItemWriter批量写数据。由于SpringBatch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入。
2.示例
2.1pom.xml
本文使用springdatajest连接ES(也可以使用springdataelasticsearch连接ES),ES版本为5.5.3
4.0.0 com.hfcsbc.estl es-etl 0.0.1-SNAPSHOT jar es-etl DemoprojectforSpringBoot org.springframework.boot spring-boot-starter-parent 2.0.0.M7 UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-data-jpa org.postgresql postgresql org.springframework.boot spring-boot-starter-batch com.github.vanroy spring-boot-starter-data-jest 3.0.0.RELEASE io.searchbox jest 5.3.2 org.projectlombok lombok org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin spring-snapshots SpringSnapshots https://repo.spring.io/snapshot true spring-milestones SpringMilestones https://repo.spring.io/milestone false spring-snapshots SpringSnapshots https://repo.spring.io/snapshot true spring-milestones SpringMilestones https://repo.spring.io/milestone false
2.2实体类及repository
packagecom.hfcsbc.esetl.domain;
importlombok.Data;
importorg.springframework.data.elasticsearch.annotations.Document;
importorg.springframework.data.elasticsearch.annotations.Field;
importorg.springframework.data.elasticsearch.annotations.FieldType;
importjavax.persistence.Entity;
importjavax.persistence.Id;
importjavax.persistence.OneToOne;
/**
*Createbypengchaoon2018/2/23
*/
@Document(indexName="person",type="person",shards=1,replicas=0,refreshInterval="-1")
@Entity
@Data
publicclassPerson{
@Id
privateLongid;
privateStringname;
@OneToOne
@Field(type=FieldType.Nested)
privateAddressaddress;
}
packagecom.hfcsbc.esetl.domain;
importlombok.Data;
importjavax.persistence.Entity;
importjavax.persistence.Id;
/**
*Createbypengchaoon2018/2/23
*/
@Entity
@Data
publicclassAddress{
@Id
privateLongid;
privateStringname;
}
packagecom.hfcsbc.esetl.repository.jpa; importcom.hfcsbc.esetl.domain.Person; importorg.springframework.data.jpa.repository.JpaRepository; /** *Createbypengchaoon2018/2/23 */ publicinterfacePersonRepositoryextendsJpaRepository{ }
packagecom.hfcsbc.esetl.repository.es; importcom.hfcsbc.esetl.domain.Person; importorg.springframework.data.elasticsearch.repository.ElasticsearchRepository; /** *Createbypengchaoon2018/2/23 */ publicinterfaceEsPersonRepositoryextendsElasticsearchRepository{ }
2.3配置elasticsearchItemWriter
packagecom.hfcsbc.esetl.itemWriter; importcom.hfcsbc.esetl.repository.es.EsPersonRepository; importcom.hfcsbc.esetl.domain.Person; importorg.springframework.batch.core.ExitStatus; importorg.springframework.batch.core.ItemWriteListener; importorg.springframework.batch.core.StepExecution; importorg.springframework.batch.core.StepExecutionListener; importorg.springframework.batch.item.ItemWriter; importjava.util.List; /** *Createbypengchaoon2018/2/23 */ publicclassElasticsearchItemWriterimplementsItemWriter,ItemWriteListener ,StepExecutionListener{ privateEsPersonRepositorypersonRepository; publicElasticsearchItemWriter(EsPersonRepositorypersonRepository){ this.personRepository=personRepository; } @Override publicvoidbeforeWrite(Listitems){ } @Override publicvoidafterWrite(Listitems){ } @Override publicvoidonWriteError(Exceptionexception,Listitems){ } @Override publicvoidbeforeStep(StepExecutionstepExecution){ } @Override publicExitStatusafterStep(StepExecutionstepExecution){ returnnull; } @Override publicvoidwrite(Listitems)throwsException{ //实现类AbstractElasticsearchRepository的saveAll方法调用的是elasticsearchOperations.bulkIndex(queries),为批量索引 personRepository.saveAll(items); } }
2.4配置ElasticsearchItemReader(本示例未使用,仅供参考)
packagecom.hfcsbc.esetl.itemReader; importorg.springframework.batch.item.data.AbstractPaginatedDataItemReader; importorg.springframework.beans.factory.InitializingBean; importorg.springframework.data.elasticsearch.core.ElasticsearchOperations; importorg.springframework.data.elasticsearch.core.query.SearchQuery; importjava.util.Iterator; /** *Createbypengchaoon2018/2/24 */ publicclassElasticsearchItemReaderextendsAbstractPaginatedDataItemReader implementsInitializingBean{ privatefinalElasticsearchOperationselasticsearchOperations; privatefinalSearchQueryquery; privatefinalClasstargetType; publicElasticsearchItemReader(ElasticsearchOperationselasticsearchOperations,SearchQueryquery,ClasstargetType){ this.elasticsearchOperations=elasticsearchOperations; this.query=query; this.targetType=targetType; } @Override protectedIterator doPageRead(){ return(Iterator )elasticsearchOperations.queryForList(query,targetType).iterator(); } @Override publicvoidafterPropertiesSet()throwsException{ } }
2.5配置springbatch需要的配置
packagecom.hfcsbc.esetl.config;
importcom.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;
importcom.hfcsbc.esetl.repository.es.EsPersonRepository;
importcom.hfcsbc.esetl.domain.Person;
importorg.springframework.batch.core.Job;
importorg.springframework.batch.core.Step;
importorg.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
importorg.springframework.batch.core.configuration.annotation.JobBuilderFactory;
importorg.springframework.batch.core.configuration.annotation.StepBuilderFactory;
importorg.springframework.batch.core.launch.support.RunIdIncrementer;
importorg.springframework.batch.core.repository.JobRepository;
importorg.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
importorg.springframework.batch.item.ItemReader;
importorg.springframework.batch.item.ItemWriter;
importorg.springframework.batch.item.database.JpaPagingItemReader;
importorg.springframework.batch.item.database.orm.JpaNativeQueryProvider;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.transaction.PlatformTransactionManager;
importjavax.persistence.EntityManagerFactory;
importjavax.sql.DataSource;
/**
*Createbypengchaoon2018/2/23
*/
@Configuration
@EnableBatchProcessing
publicclassBatchConfig{
@Autowired
privateEsPersonRepositorypersonRepository;
@Bean
publicItemReaderorderItemReader(EntityManagerFactoryentityManagerFactory){
JpaPagingItemReaderreader=newJpaPagingItemReader();
StringsqlQuery="select*fromperson";
try{
JpaNativeQueryProviderqueryProvider=newJpaNativeQueryProvider();
queryProvider.setSqlQuery(sqlQuery);
queryProvider.setEntityClass(Person.class);
queryProvider.afterPropertiesSet();
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(10000);
reader.setQueryProvider(queryProvider);
reader.afterPropertiesSet();
reader.setSaveState(true);
}catch(Exceptione){
e.printStackTrace();
}
returnreader;
}
@Bean
publicElasticsearchItemWriteritemWriter(){
returnnewElasticsearchItemWriter(personRepository);
}
@Bean
publicStepstep(StepBuilderFactorystepBuilderFactory,
ItemReaderitemReader,
ItemWriteritemWriter){
returnstepBuilderFactory
.get("step1")
.chunk(10000)
.reader(itemReader)
.writer(itemWriter)
.build();
}
@Bean
publicJobjob(JobBuilderFactoryjobBuilderFactory,Stepstep){
returnjobBuilderFactory
.get("importJob")
.incrementer(newRunIdIncrementer())
.flow(step)
.end()
.build();
}
/**
*springbatch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource
*@paramdataSource
*@parammanager
*@return
*/
@Bean
publicJobRepositoryjobRepository(DataSourcedataSource,PlatformTransactionManagermanager){
JobRepositoryFactoryBeanjobRepositoryFactoryBean=newJobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(manager);
jobRepositoryFactoryBean.setDatabaseType("postgres");
try{
returnjobRepositoryFactoryBean.getObject();
}catch(Exceptione){
e.printStackTrace();
}
returnnull;
}
}
2.6配置数据库及es的连接地址
spring: redis: host:192.168.1.222 data: jest: uri:http://192.168.1.222:9200 username:elastic password:changeme jpa: database:POSTGRESQL show-sql:true hibernate: ddl-auto:update datasource: platform:postgres url:jdbc:postgresql://192.168.1.222:5433/person username:hfcb password:hfcb driver-class-name:org.postgresql.Driver max-active:2 spring.batch.initialize-schema:always
2.7配置入口类
packagecom.hfcsbc.esetl;
importorg.springframework.boot.SpringApplication;
importorg.springframework.boot.autoconfigure.SpringBootApplication;
importorg.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
importorg.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;
importorg.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
importorg.springframework.data.jpa.repository.config.EnableJpaRepositories;
@SpringBootApplication(exclude={ElasticsearchAutoConfiguration.class,ElasticsearchDataAutoConfiguration.class})
@EnableElasticsearchRepositories(basePackages="com.hfcsbc.esetl.repository")
@EnableJpaRepositories(basePackages="com.hfcsbc.esetl.repository.jpa")
publicclassEsEtlApplication{
publicstaticvoidmain(String[]args){
SpringApplication.run(EsEtlApplication.class,args);
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。