浅谈HBase在SpringBoot项目里的应用(含HBaseUtil工具类)
背景:
项目这两个月开始使用HBase来读写数据,网上现成的HBase工具类要么版本混杂,要么只是Demo级别的简单实现,各方面都不完善;
而且我发现HBase查询有很多种方式,首先大方向上有Get和Scan两种,其次行键、列族、列名(限定符)、列值(value)、时间戳版本等多种组合条件,还有各种过滤器的选择,协处理器的应用,所以必须根据自己项目需求和HBase行列设计来自定义HBase工具类和实现类!
经过我自己的研究整理,在此分享下初步的实现方案吧~
注:HBase版本:1.3.0-CDH5.13.0、SpringBoot版本:1.5.9
需要注意的是我用的是原生api,没有用和spring或者springboot整合的HbaseTemplate等,因为这方面资料较少而且听说并没有那么好用…
一、pom.xml依赖
org.apache.hbase hbase-client 1.3.0 org.slf4j slf4j-log4j12 log4j log4j javax.servlet servlet-api org.apache.hadoop hadoop-common 2.6.0 org.apache.hadoop hadoop-mapreduce-client-core 2.6.0 org.apache.hadoop hadoop-mapreduce-client-common 2.6.0 org.apache.hadoop hadoop-hdfs 2.6.0
二、application.yml项目配置
此处我是自定义HBase配置,后面会有专门的配置类来加载这个配置
hbase:
conf:
confMaps:
'hbase.zookeeper.quorum':'cdh1:2181,cdh2:2181,cdh3:2181'
三、HbaseConfig自定义配置类
HbaseConfig.java:
importorg.springframework.boot.context.properties.ConfigurationProperties;
importorg.springframework.context.annotation.Configuration;
importjava.util.Map;
/**
*Hbase-Conf配置
*
*@Author:yuanj
*@Date:2018/10/1210:49
*/
@Configuration
@ConfigurationProperties(prefix=HbaseConfig.CONF_PREFIX)
publicclassHbaseConfig{
publicstaticfinalStringCONF_PREFIX="hbase.conf";
privateMapconfMaps;
publicMapgetconfMaps(){
returnconfMaps;
}
publicvoidsetconfMaps(MapconfMaps){
this.confMaps=confMaps;
}
}
不了解@ConfigurationProperties这个注解的兄弟可以去百度下,它可以将application.yml中的配置导入到该类的成员变量里!
也就是说springboot项目启动完成后confMaps变量里已经存在一个key为hbase.zookeeper.quorum,value为cdh1:2181,cdh2:2181,cdh3:2181的entry了!
四、HBaseUtils工具类
首先添加SpringContextHolder工具类,下面会用到:
packagecom.moerlong.credit.core;
importorg.springframework.beans.BeansException;
importorg.springframework.context.ApplicationContext;
importorg.springframework.context.ApplicationContextAware;
importorg.springframework.stereotype.Component;
/**
*Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean
*/
@Component
publicclassSpringContextHolderimplementsApplicationContextAware{
privatestaticApplicationContextapplicationContext;
@Override
publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{
SpringContextHolder.applicationContext=applicationContext;
}
publicstaticApplicationContextgetApplicationContext(){
assertApplicationContext();
returnapplicationContext;
}
@SuppressWarnings("unchecked")
publicstaticTgetBean(StringbeanName){
assertApplicationContext();
return(T)applicationContext.getBean(beanName);
}
publicstaticTgetBean(ClassrequiredType){
assertApplicationContext();
returnapplicationContext.getBean(requiredType);
}
privatestaticvoidassertApplicationContext(){
if(SpringContextHolder.applicationContext==null){
thrownewRuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!");
}
}
}
HBaseUtils.java:
importcom.moerlong.credit.config.HbaseConfig;
importcom.moerlong.credit.core.SpringContextHolder;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.hbase.*;
importorg.apache.hadoop.hbase.client.*;
importorg.apache.hadoop.hbase.client.coprocessor.AggregationClient;
importorg.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
importorg.apache.hadoop.hbase.filter.*;
importorg.apache.hadoop.hbase.util.Bytes;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.context.annotation.DependsOn;
importorg.springframework.stereotype.Component;
importorg.springframework.util.StopWatch;
importjava.io.IOException;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.Map;
importjava.util.NavigableMap;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
@DependsOn("springContextHolder")//控制依赖顺序,保证springContextHolder类在之前已经加载
@Component
publicclassHBaseUtils{
privateLoggerlogger=LoggerFactory.getLogger(this.getClass());
//手动获取hbaseConfig配置类对象
privatestaticHbaseConfighbaseConfig=SpringContextHolder.getBean("hbaseConfig");
privatestaticConfigurationconf=HBaseConfiguration.create();
privatestaticExecutorServicepool=Executors.newScheduledThreadPool(20);//设置连接池
privatestaticConnectionconnection=null;
privatestaticHBaseUtilsinstance=null;
privatestaticAdminadmin=null;
privateHBaseUtils(){
if(connection==null){
try{
//将hbase配置类中定义的配置加载到连接池中每个连接里
MapconfMap=hbaseConfig.getconfMaps();
for(Map.EntryconfEntry:confMap.entrySet()){
conf.set(confEntry.getKey(),confEntry.getValue());
}
connection=ConnectionFactory.createConnection(conf,pool);
admin=connection.getAdmin();
}catch(IOExceptione){
logger.error("HbaseUtils实例初始化失败!错误信息为:"+e.getMessage(),e);
}
}
}
//简单单例方法,如果autowired自动注入就不需要此方法
publicstaticsynchronizedHBaseUtilsgetInstance(){
if(instance==null){
instance=newHBaseUtils();
}
returninstance;
}
/**
*创建表
*
*@paramtableName表名
*@paramcolumnFamily列族(数组)
*/
publicvoidcreateTable(StringtableName,String[]columnFamily)throwsIOException{
TableNamename=TableName.valueOf(tableName);
//如果存在则删除
if(admin.tableExists(name)){
admin.disableTable(name);
admin.deleteTable(name);
logger.error("createhtableerror!thistable{}alreadyexists!",name);
}else{
HTableDescriptordesc=newHTableDescriptor(name);
for(Stringcf:columnFamily){
desc.addFamily(newHColumnDescriptor(cf));
}
admin.createTable(desc);
}
}
/**
*插入记录(单行单列族-多列多值)
*
*@paramtableName表名
*@paramrow行名
*@paramcolumnFamilys列族名
*@paramcolumns列名(数组)
*@paramvalues值(数组)(且需要和列一一对应)
*/
publicvoidinsertRecords(StringtableName,Stringrow,StringcolumnFamilys,String[]columns,String[]values)throwsIOException{
TableNamename=TableName.valueOf(tableName);
Tabletable=connection.getTable(name);
Putput=newPut(Bytes.toBytes(row));
for(inti=0;i>>map=rs.getMap();
for(Cellcell:rs.rawCells()){
StringBufferstringBuffer=newStringBuffer().append(Bytes.toString(cell.getRow())).append("\t")
.append(Bytes.toString(cell.getFamily())).append("\t")
.append(Bytes.toString(cell.getQualifier())).append("\t")
.append(Bytes.toString(cell.getValue())).append("\n");
Stringstr=stringBuffer.toString();
record+=str;
}
returnrecord;
}
/**
*查找单行单列族单列记录
*
*@paramtablename表名
*@paramrowKey行名
*@paramcolumnFamily列族名
*@paramcolumn列名
*@return
*/
publicstaticStringselectValue(Stringtablename,StringrowKey,StringcolumnFamily,Stringcolumn)throwsIOException{
TableNamename=TableName.valueOf(tablename);
Tabletable=connection.getTable(name);
Getg=newGet(rowKey.getBytes());
g.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column));
Resultrs=table.get(g);
returnBytes.toString(rs.value());
}
/**
*查询表中所有行(Scan方式)
*
*@paramtablename
*@return
*/
publicStringscanAllRecord(Stringtablename)throwsIOException{
Stringrecord="";
TableNamename=TableName.valueOf(tablename);
Tabletable=connection.getTable(name);
Scanscan=newScan();
ResultScannerscanner=table.getScanner(scan);
try{
for(Resultresult:scanner){
for(Cellcell:result.rawCells()){
StringBufferstringBuffer=newStringBuffer().append(Bytes.toString(cell.getRow())).append("\t")
.append(Bytes.toString(cell.getFamily())).append("\t")
.append(Bytes.toString(cell.getQualifier())).append("\t")
.append(Bytes.toString(cell.getValue())).append("\n");
Stringstr=stringBuffer.toString();
record+=str;
}
}
}finally{
if(scanner!=null){
scanner.close();
}
}
returnrecord;
}
/**
*根据rowkey关键字查询报告记录
*
*@paramtablename
*@paramrowKeyword
*@return
*/
publicListscanReportDataByRowKeyword(Stringtablename,StringrowKeyword)throwsIOException{
ArrayList<>list=newArrayList<>();
Tabletable=connection.getTable(TableName.valueOf(tablename));
Scanscan=newScan();
//添加行键过滤器,根据关键字匹配
RowFilterrowFilter=newRowFilter(CompareFilter.CompareOp.EQUAL,newSubstringComparator(rowKeyword));
scan.setFilter(rowFilter);
ResultScannerscanner=table.getScanner(scan);
try{
for(Resultresult:scanner){
//TODO此处根据业务来自定义实现
list.add(null);
}
}finally{
if(scanner!=null){
scanner.close();
}
}
returnlist;
}
/**
*根据rowkey关键字和时间戳范围查询报告记录
*
*@paramtablename
*@paramrowKeyword
*@return
*/
publicListscanReportDataByRowKeywordTimestamp(Stringtablename,StringrowKeyword,LongminStamp,LongmaxStamp)throwsIOException{
ArrayList<>list=newArrayList<>();
Tabletable=connection.getTable(TableName.valueOf(tablename));
Scanscan=newScan();
//添加scan的时间范围
scan.setTimeRange(minStamp,maxStamp);
RowFilterrowFilter=newRowFilter(CompareFilter.CompareOp.EQUAL,newSubstringComparator(rowKeyword));
scan.setFilter(rowFilter);
ResultScannerscanner=table.getScanner(scan);
try{
for(Resultresult:scanner){
//TODO此处根据业务来自定义实现
list.add(null);
}
}finally{
if(scanner!=null){
scanner.close();
}
}
returnlist;
}
/**
*删除表操作
*
*@paramtablename
*/
publicvoiddeleteTable(Stringtablename)throwsIOException{
TableNamename=TableName.valueOf(tablename);
if(admin.tableExists(name)){
admin.disableTable(name);
admin.deleteTable(name);
}
}
/**
*利用协处理器进行全表count统计
*
*@paramtablename
*/
publicLongcountRowsWithCoprocessor(Stringtablename)throwsThrowable{
TableNamename=TableName.valueOf(tablename);
HTableDescriptordescriptor=admin.getTableDescriptor(name);
StringcoprocessorClass="org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
if(!descriptor.hasCoprocessor(coprocessorClass)){
admin.disableTable(name);
descriptor.addCoprocessor(coprocessorClass);
admin.modifyTable(name,descriptor);
admin.enableTable(name);
}
//计时
StopWatchstopWatch=newStopWatch();
stopWatch.start();
Scanscan=newScan();
AggregationClientaggregationClient=newAggregationClient(conf);
Longcount=aggregationClient.rowCount(name,newLongColumnInterpreter(),scan);
stopWatch.stop();
System.out.println("RowCount:"+count+",全表count统计耗时:"+stopWatch.getTotalTimeMillis());
returncount;
}
}
五、使用
接下来只需要在项目业务类里注入hbaseUtils就可以使用了:
@Autowired
privateHBaseUtilshBaseUtils;
补充知识:springboot整合Hbase
springboot项目需要整合SpringCloud
依赖
org.apache.hbase hbase-shaded-client 1.2.6
yml配置:
自定义配置读取zookeeper配置
hbase:
zookeeper:
quorum:hbase126-node2:2181
config配置:
importnet.cc.commons.exception.CCRuntimeException;
importorg.apache.hadoop.hbase.HBaseConfiguration;
importorg.apache.hadoop.hbase.HConstants;
importorg.apache.hadoop.hbase.client.Connection;
importorg.apache.hadoop.hbase.client.ConnectionFactory;
importorg.springframework.beans.factory.annotation.Value;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.context.annotation.Scope;
importjava.io.IOException;
importjava.util.function.Supplier;
/**
*@Authorwangqiubao
*@Date2019/9/2415:28
*@Description
**/
@Configuration
publicclassUcareHbaseConfiguration{
/**
*读取HBase的zookeeper地址
*/
@Value("${hbase.zookeeper.quorum}")
privateStringquorum;
/**
*配置HBase连接参数
*
*@return
*/
@Bean
publicorg.apache.hadoop.conf.ConfigurationhbaseConfig(){
org.apache.hadoop.conf.Configurationconfig=HBaseConfiguration.create();
config.set(HConstants.ZOOKEEPER_QUORUM,quorum);
returnconfig;
}
//每次调用get方法就会创建一个Connection
@Bean
publicSupplierhbaseConnSupplier(){
return()->{
try{
returnhbaseConnection();
}catch(IOExceptione){
thrownewCCRuntimeException(e);
}
};
}
@Bean
//@Scope标明模式,默认单例模式.prototype多例模式
//若是在其他类中直接@Autowired引入的,多例就无效了,因为那个类在初始化的时候,已经创建了创建了这个bean了,之后调用的时候,不会重新创建,若是想要实现多例,就要每次调用的时候,手动获取bean
@Scope(value="prototype")
publicConnectionhbaseConnection()throwsIOException{
returnConnectionFactory.createConnection(hbaseConfig());
}
}
使用
spring管理
/** *内部已实现线程安全的连接池 */ @Autowired privateConnectionhbaseConnection;
插入/更新数据
publicvoidaaaa()throwsIOException{
try(Tabletable=hbaseConnection.getTable(TableName.valueOf("表名"))){//获取表连接
//配置一条数据
//行键
Putput=newPut(Bytes.toBytes("key主键"));
put.addColumn(Bytes.toBytes("列族"),Bytes.toBytes("列"),Bytes.toBytes("值"));
.....//每个有数据的列都要一个addColumn
//put插入数据
table.put(put);
}
}
查询
根据主键查询内容
try(Tabletable=hbaseConnection.getTable(TableName.valueOf("表名"))){
Resultresult=table.get(newGet(asRowKey(date,acid)));
if(result==null)returnnull;
//列名为starttime,最后一条就是该航班最新的航迹
CelllatestCell=Iterables.getLast(result.listCells());
returnAdsbTrackProto.AdsbTrack.parseFrom(CellUtil.cloneValue(latestCell));
}
以上这篇浅谈HBase在SpringBoot项目里的应用(含HBaseUtil工具类)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。