Spring+MyBatis实现数据库读写分离方案
推荐第四种
方案1
通过MyBatis配置文件创建读写分离两个DataSource,每个SqlSessionFactoryBean对象的mapperLocations属性制定两个读写数据源的配置文件。将所有读的操作配置在读文件中,所有写的操作配置在写文件中。
优点:实现简单
缺点:维护麻烦,需要对原有的xml文件进行重新修改,不支持多读,不易扩展
实现方式
<beanid="abstractDataSource"abstract="true"class="com.alibaba.druid.pool.DruidDataSource"init-method="init" destroy-method="close"> <propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/> <!--配置获取连接等待超时的时间--> <propertyname="maxWait"value="60000"/> <!--配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒--> <propertyname="timeBetweenEvictionRunsMillis"value="60000"/> <!--配置一个连接在池中最小生存的时间,单位是毫秒--> <propertyname="minEvictableIdleTimeMillis"value="300000"/> <propertyname="validationQuery"value="SELECT'x'"/> <propertyname="testWhileIdle"value="true"/> <propertyname="testOnBorrow"value="false"/> <propertyname="testOnReturn"value="false"/> <!--打开PSCache,并且指定每个连接上PSCache的大小--> <propertyname="poolPreparedStatements"value="true"/> <propertyname="maxPoolPreparedStatementPerConnectionSize"value="20"/> <propertyname="filters"value="config"/> <propertyname="connectionProperties"value="config.decrypt=true"/> </bean> <beanid="readDataSource"parent="abstractDataSource"> <!--基本属性url、user、password--> <propertyname="url"value="${read.jdbc.url}"/> <propertyname="username"value="${read.jdbc.user}"/> <propertyname="password"value="${read.jdbc.password}"/> <!--配置初始化大小、最小、最大--> <propertyname="initialSize"value="${read.jdbc.initPoolSize}"/> <propertyname="minIdle"value="10"/> <propertyname="maxActive"value="${read.jdbc.maxPoolSize}"/> </bean> <beanid="writeDataSource"parent="abstractDataSource"> <!--基本属性url、user、password--> <propertyname="url"value="${write.jdbc.url}"/> <propertyname="username"value="${write.jdbc.user}"/> <propertyname="password"value="${write.jdbc.password}"/> <!--配置初始化大小、最小、最大--> <propertyname="initialSize"value="${write.jdbc.initPoolSize}"/> <propertyname="minIdle"value="10"/> <propertyname="maxActive"value="${write.jdbc.maxPoolSize}"/> </bean> <beanid="readSqlSessionFactory"class="org.mybatis.spring.SqlSessionFactoryBean"> <!--实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件--> <propertyname="dataSource"ref="readDataSource"/> <propertyname="mapperLocations"value="classpath:mapper/read/*.xml"/> </bean> <beanid="writeSqlSessionFactory"class="org.mybatis.spring.SqlSessionFactoryBean"> <!--实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件--> <propertyname="dataSource"ref="writeDataSource"/> <propertyname="mapperLocations"value="classpath:mapper/write/*.xml"/> </bean>
方案2
通过SpringAOP在业务层实现读写分离,在DAO层调用前定义切面,利用Spring的AbstractRoutingDataSource解决多数据源的问题,实现动态选择数据源
优点:通过注解的方法在DAO每个方法上配置数据源,原有代码改动量少,易扩展,支持多读
缺点:需要在DAO每个方法上配置注解,人工管理,容易出错
实现方式
//定义枚举类型,读写 publicenumDynamicDataSourceGlobal{ READ,WRITE; }
importjava.lang.annotation.ElementType; importjava.lang.annotation.Retention; importjava.lang.annotation.RetentionPolicy; importjava.lang.annotation.Target; /** *RUNTIME *定义注解 *编译器将把注释记录在类文件中,在运行时VM将保留注释,因此可以反射性地读取。 *@authorshma1664 * */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public@interfaceDataSource{ publicDynamicDataSourceGlobalvalue()defaultDynamicDataSourceGlobal.READ; }
/** *CreatedbyIDEA *本地线程设置和获取数据源信息 *User:mashaohua *Date:2016-07-0713:35 *Desc: */ publicclassDynamicDataSourceHolder{ privatestaticfinalThreadLocal<DynamicDataSourceGlobal>holder=newThreadLocal<DynamicDataSourceGlobal>(); publicstaticvoidputDataSource(DynamicDataSourceGlobaldataSource){ holder.set(dataSource); } publicstaticDynamicDataSourceGlobalgetDataSource(){ returnholder.get(); } publicstaticvoidclearDataSource(){ holder.remove(); } }
importorg.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; importjava.util.HashMap; importjava.util.List; importjava.util.Map; importjava.util.concurrent.ThreadLocalRandom; importjava.util.concurrent.atomic.AtomicLong; importjava.util.concurrent.locks.Lock; importjava.util.concurrent.locks.ReentrantLock; /** *CreatedbyIDEA *User:mashaohua *Date:2016-07-1410:56 *Desc:动态数据源实现读写分离 */ publicclassDynamicDataSourceextendsAbstractRoutingDataSource{ privateObjectwriteDataSource;//写数据源 privateList<Object>readDataSources;//多个读数据源 privateintreadDataSourceSize;//读数据源个数 privateintreadDataSourcePollPattern=0;//获取读数据源方式,0:随机,1:轮询 privateAtomicLongcounter=newAtomicLong(0); privatestaticfinalLongMAX_POOL=Long.MAX_VALUE; privatefinalLocklock=newReentrantLock(); @Override publicvoidafterPropertiesSet(){ if(this.writeDataSource==null){ thrownewIllegalArgumentException("Property'writeDataSource'isrequired"); } setDefaultTargetDataSource(writeDataSource); Map<Object,Object>targetDataSources=newHashMap<>(); targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(),writeDataSource); if(this.readDataSources==null){ readDataSourceSize=0; }else{ for(inti=0;i<readDataSources.size();i++){ targetDataSources.put(DynamicDataSourceGlobal.READ.name()+i,readDataSources.get(i)); } readDataSourceSize=readDataSources.size(); } setTargetDataSources(targetDataSources); super.afterPropertiesSet(); } @Override protectedObjectdetermineCurrentLookupKey(){ DynamicDataSourceGlobaldynamicDataSourceGlobal=DynamicDataSourceHolder.getDataSource(); if(dynamicDataSourceGlobal==null ||dynamicDataSourceGlobal==DynamicDataSourceGlobal.WRITE ||readDataSourceSize<=0){ returnDynamicDataSourceGlobal.WRITE.name(); } intindex=1; if(readDataSourcePollPattern==1){ //轮询方式 longcurrValue=counter.incrementAndGet(); if((currValue+1)>=MAX_POOL){ try{ lock.lock(); if((currValue+1)>=MAX_POOL){ counter.set(0); } }finally{ lock.unlock(); } } index=(int)(currValue%readDataSourceSize); }else{ //随机方式 index=ThreadLocalRandom.current().nextInt(0,readDataSourceSize); } returndynamicDataSourceGlobal.name()+index; } publicvoidsetWriteDataSource(ObjectwriteDataSource){ this.writeDataSource=writeDataSource; } publicvoidsetReadDataSources(List<Object>readDataSources){ this.readDataSources=readDataSources; } publicvoidsetReadDataSourcePollPattern(intreadDataSourcePollPattern){ this.readDataSourcePollPattern=readDataSourcePollPattern; } }
importorg.apache.log4j.Logger; importorg.aspectj.lang.JoinPoint; importorg.aspectj.lang.reflect.MethodSignature; importjava.lang.reflect.Method; /** *CreatedbyIDEA *User:mashaohua *Date:2016-07-0713:39 *Desc:定义选择数据源切面 */ publicclassDynamicDataSourceAspect{ privatestaticfinalLoggerlogger=Logger.getLogger(DynamicDataSourceAspect.class); publicvoidpointCut(){}; publicvoidbefore(JoinPointpoint) { Objecttarget=point.getTarget(); StringmethodName=point.getSignature().getName(); Class<?>[]clazz=target.getClass().getInterfaces(); Class<?>[]parameterTypes=((MethodSignature)point.getSignature()).getMethod().getParameterTypes(); try{ Methodmethod=clazz[0].getMethod(methodName,parameterTypes); if(method!=null&&method.isAnnotationPresent(DataSource.class)){ DataSourcedata=method.getAnnotation(DataSource.class); DynamicDataSourceHolder.putDataSource(data.value()); } }catch(Exceptione){ logger.error(String.format("ChooseDataSourceerror,method:%s,msg:%s",methodName,e.getMessage())); } } publicvoidafter(JoinPointpoint){ DynamicDataSourceHolder.clearDataSource(); } }
<?xmlversion="1.0"encoding="UTF-8"?> <beansxmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-4.1.xsd"> <beanid="abstractDataSource"abstract="true"class="com.alibaba.druid.pool.DruidDataSource"init-method="init"destroy-method="close"> <propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/> <!--配置获取连接等待超时的时间--> <propertyname="maxWait"value="60000"/> <!--配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒--> <propertyname="timeBetweenEvictionRunsMillis"value="60000"/> <!--配置一个连接在池中最小生存的时间,单位是毫秒--> <propertyname="minEvictableIdleTimeMillis"value="300000"/> <propertyname="validationQuery"value="SELECT'x'"/> <propertyname="testWhileIdle"value="true"/> <propertyname="testOnBorrow"value="false"/> <propertyname="testOnReturn"value="false"/> <!--打开PSCache,并且指定每个连接上PSCache的大小--> <propertyname="poolPreparedStatements"value="true"/> <propertyname="maxPoolPreparedStatementPerConnectionSize"value="20"/> <propertyname="filters"value="config"/> <propertyname="connectionProperties"value="config.decrypt=true"/> </bean> <beanid="dataSourceRead1"parent="abstractDataSource"> <propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/> <!--基本属性url、user、password--> <propertyname="url"value="${read1.jdbc.url}"/> <propertyname="username"value="${read1.jdbc.user}"/> <propertyname="password"value="${read1.jdbc.password}"/> <!--配置初始化大小、最小、最大--> <propertyname="initialSize"value="${read1.jdbc.initPoolSize}"/> <propertyname="minIdle"value="${read1.jdbc.minPoolSize}"/> <propertyname="maxActive"value="${read1.jdbc.maxPoolSize}"/> </bean> <beanid="dataSourceRead2"parent="abstractDataSource"> <propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/> <!--基本属性url、user、password--> <propertyname="url"value="${read2.jdbc.url}"/> <propertyname="username"value="${read2.jdbc.user}"/> <propertyname="password"value="${read2.jdbc.password}"/> <!--配置初始化大小、最小、最大--> <propertyname="initialSize"value="${read2.jdbc.initPoolSize}"/> <propertyname="minIdle"value="${read2.jdbc.minPoolSize}"/> <propertyname="maxActive"value="${read2.jdbc.maxPoolSize}"/> </bean> <beanid="dataSourceWrite"parent="abstractDataSource"> <propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/> <!--基本属性url、user、password--> <propertyname="url"value="${write.jdbc.url}"/> <propertyname="username"value="${write.jdbc.user}"/> <propertyname="password"value="${write.jdbc.password}"/> <!--配置初始化大小、最小、最大--> <propertyname="initialSize"value="${write.jdbc.initPoolSize}"/> <propertyname="minIdle"value="${write.jdbc.minPoolSize}"/> <propertyname="maxActive"value="${write.jdbc.maxPoolSize}"/> </bean> <beanid="dataSource"class="com.test.api.dao.datasource.DynamicDataSource"> <propertyname="writeDataSource"ref="dataSourceWrite"/> <propertyname="readDataSources"> <list> <refbean="dataSourceRead1"/> <refbean="dataSourceRead2"/> </list> </property> <!--轮询方式--> <propertyname="readDataSourcePollPattern"value="1"/> <propertyname="defaultTargetDataSource"ref="dataSourceWrite"/> </bean> <tx:annotation-driventransaction-manager="transactionManager"/> <beanid="transactionManager"class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <propertyname="dataSource"ref="dataSource"/> </bean> <!--针对myBatis的配置项--> <!--配置sqlSessionFactory--> <beanid="sqlSessionFactory"class="org.mybatis.spring.SqlSessionFactoryBean"> <!--实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件--> <propertyname="dataSource"ref="dataSource"/> <propertyname="mapperLocations"value="classpath:mapper/*.xml"/> </bean> <!--配置扫描器--> <beanclass="org.mybatis.spring.mapper.MapperScannerConfigurer"> <!--扫描包以及它的子包下的所有映射接口类--> <propertyname="basePackage"value="com.test.api.dao.inte"/> <propertyname="sqlSessionFactoryBeanName"value="sqlSessionFactory"/> </bean> <!--配置数据库注解aop--> <beanid="dynamicDataSourceAspect"class="com.test.api.dao.datasource.DynamicDataSourceAspect"/> <aop:config> <aop:aspectid="c"ref="dynamicDataSourceAspect"> <aop:pointcutid="tx"expression="execution(*com.test.api.dao.inte..*.*(..))"/> <aop:beforepointcut-ref="tx"method="before"/> <aop:afterpointcut-ref="tx"method="after"/> </aop:aspect> </aop:config> <!--配置数据库注解aop--> </beans>
方案3
通过Mybatis的Plugin在业务层实现数据库读写分离,在MyBatis创建Statement对象前通过拦截器选择真正的数据源,在拦截器中根据方法名称不同(select、update、insert、delete)选择数据源。
优点:原有代码不变,支持多读,易扩展
缺点:
实现方式
/** *CreatedbyIDEA *User:mashaohua *Date:2016-07-1915:40 *Desc:创建Connection代理接口 */ publicinterfaceConnectionProxyextendsConnection{ /** *根据传入的读写分离需要的key路由到正确的connection *@paramkey数据源标识 *@return */ ConnectiongetTargetConnection(Stringkey); }
importjava.lang.reflect.InvocationHandler; importjava.lang.reflect.InvocationTargetException; importjava.lang.reflect.Method; importjava.lang.reflect.Proxy; importjava.sql.Connection; importjava.sql.SQLException; importjava.util.ArrayList; importjava.util.List; importjava.util.logging.Logger; importjavax.sql.DataSource; importorg.springframework.beans.factory.InitializingBean; importorg.springframework.jdbc.datasource.AbstractDataSource; importorg.springframework.jdbc.datasource.lookup.DataSourceLookup; importorg.springframework.jdbc.datasource.lookup.JndiDataSourceLookup; importorg.springframework.util.Assert; publicabstractclassAbstractDynamicDataSourceProxyextendsAbstractDataSourceimplementsInitializingBean{ privateList<Object>readDataSources; privateList<DataSource>resolvedReadDataSources; privateObjectwriteDataSource; privateDataSourceresolvedWriteDataSource; privateintreadDataSourcePollPattern=0; privateintreadDsSize; privatebooleandefaultAutoCommit=true; privateintdefaultTransactionIsolation=Connection.TRANSACTION_READ_COMMITTED; publicstaticfinalStringREAD="read"; publicstaticfinalStringWRITE="write"; privateDataSourceLookupdataSourceLookup=newJndiDataSourceLookup(); @Override publicConnectiongetConnection()throwsSQLException{ return(Connection)Proxy.newProxyInstance( com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class.getClassLoader(), newClass[]{com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class}, newRWConnectionInvocationHandler()); } @Override publicConnectiongetConnection(Stringusername,Stringpassword) throwsSQLException{ return(Connection)Proxy.newProxyInstance(com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class.getClassLoader(), newClass[]{com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class}, newRWConnectionInvocationHandler(username,password)); } publicintgetReadDsSize(){ returnreadDsSize; } publicList<DataSource>getResolvedReadDataSources(){ returnresolvedReadDataSources; } publicvoidafterPropertiesSet()throwsException{ if(writeDataSource==null){ thrownewIllegalArgumentException("Property'writeDataSource'isrequired"); } this.resolvedWriteDataSource=resolveSpecifiedDataSource(writeDataSource); resolvedReadDataSources=newArrayList<DataSource>(readDataSources.size()); for(Objectitem:readDataSources){ resolvedReadDataSources.add(resolveSpecifiedDataSource(item)); } readDsSize=readDataSources.size(); } protectedDataSourcedetermineTargetDataSource(Stringkey){ Assert.notNull(this.resolvedReadDataSources,"DataSourcerouternotinitialized"); if(WRITE.equals(key)){ returnresolvedWriteDataSource; }else{ returnloadReadDataSource(); } } publicLoggergetParentLogger(){ //NOOPJustignore returnnull; } /** *获取真实的datasource *@paramdataSource(jndi|realdatasource) *@return *@throwsIllegalArgumentException */ protectedDataSourceresolveSpecifiedDataSource(ObjectdataSource)throwsIllegalArgumentException{ if(dataSourceinstanceofDataSource){ return(DataSource)dataSource; } elseif(dataSourceinstanceofString){ returnthis.dataSourceLookup.getDataSource((String)dataSource); } else{ thrownewIllegalArgumentException( "Illegaldatasourcevalue-only[javax.sql.DataSource]andStringsupported:"+dataSource); } } protectedabstractDataSourceloadReadDataSource(); publicvoidsetReadDsSize(intreadDsSize){ this.readDsSize=readDsSize; } publicList<Object>getReadDataSources(){ returnreadDataSources; } publicvoidsetReadDataSources(List<Object>readDataSources){ this.readDataSources=readDataSources; } publicObjectgetWriteDataSource(){ returnwriteDataSource; } publicvoidsetWriteDataSource(ObjectwriteDataSource){ this.writeDataSource=writeDataSource; } publicvoidsetResolvedReadDataSources(List<DataSource>resolvedReadDataSources){ this.resolvedReadDataSources=resolvedReadDataSources; } publicDataSourcegetResolvedWriteDataSource(){ returnresolvedWriteDataSource; } publicvoidsetResolvedWriteDataSource(DataSourceresolvedWriteDataSource){ this.resolvedWriteDataSource=resolvedWriteDataSource; } publicintgetReadDataSourcePollPattern(){ returnreadDataSourcePollPattern; } publicvoidsetReadDataSourcePollPattern(intreadDataSourcePollPattern){ this.readDataSourcePollPattern=readDataSourcePollPattern; } /** *InvocationhandlerthatdefersfetchinganactualJDBCConnection *untilfirstcreationofaStatement. */ privateclassRWConnectionInvocationHandlerimplementsInvocationHandler{ privateStringusername; privateStringpassword; privateBooleanreadOnly=Boolean.FALSE; privateIntegertransactionIsolation; privateBooleanautoCommit; privatebooleanclosed=false; privateConnectiontarget; publicRWConnectionInvocationHandler(){ } publicRWConnectionInvocationHandler(Stringusername,Stringpassword){ this(); this.username=username; this.password=password; } publicObjectinvoke(Objectproxy,Methodmethod,Object[]args)throwsThrowable{ //InvocationonConnectionProxyinterfacecomingin... if(method.getName().equals("equals")){ //WemustavoidfetchingatargetConnectionfor"equals". //Onlyconsiderequalwhenproxiesareidentical. return(proxy==args[0]?Boolean.TRUE:Boolean.FALSE); } elseif(method.getName().equals("hashCode")){ //WemustavoidfetchingatargetConnectionfor"hashCode", //andwemustreturnthesamehashcodeevenwhenthetarget //Connectionhasbeenfetched:usehashCodeofConnectionproxy. returnnewInteger(System.identityHashCode(proxy)); } elseif(method.getName().equals("getTargetConnection")){ //HandlegetTargetConnectionmethod:returnunderlyingconnection. returngetTargetConnection(method,args); } if(!hasTargetConnection()){ //NophysicaltargetConnectionkeptyet-> //resolvetransactiondemarcationmethodswithoutfetching //aphysicalJDBCConnectionuntilabsolutelynecessary. if(method.getName().equals("toString")){ return"RWRoutingDataSourceProxy"; } elseif(method.getName().equals("isReadOnly")){ returnthis.readOnly; } elseif(method.getName().equals("setReadOnly")){ this.readOnly=(Boolean)args[0]; returnnull; } elseif(method.getName().equals("getTransactionIsolation")){ if(this.transactionIsolation!=null){ returnthis.transactionIsolation; } returndefaultTransactionIsolation; //ElsefetchactualConnectionandcheckthere, //becausewedidn'thaveadefaultspecified. } elseif(method.getName().equals("setTransactionIsolation")){ this.transactionIsolation=(Integer)args[0]; returnnull; } elseif(method.getName().equals("getAutoCommit")){ if(this.autoCommit!=null) returnthis.autoCommit; returndefaultAutoCommit; //ElsefetchactualConnectionandcheckthere, //becausewedidn'thaveadefaultspecified. } elseif(method.getName().equals("setAutoCommit")){ this.autoCommit=(Boolean)args[0]; returnnull; } elseif(method.getName().equals("commit")){ //Ignore:nostatementscreatedyet. returnnull; } elseif(method.getName().equals("rollback")){ //Ignore:nostatementscreatedyet. returnnull; } elseif(method.getName().equals("getWarnings")){ returnnull; } elseif(method.getName().equals("clearWarnings")){ returnnull; } elseif(method.getName().equals("isClosed")){ return(this.closed?Boolean.TRUE:Boolean.FALSE); } elseif(method.getName().equals("close")){ //Ignore:notargetconnectionyet. this.closed=true; returnnull; } elseif(this.closed){ //Connectionproxyclosed,withouteverhavingfetcheda //physicalJDBCConnection:throwcorrespondingSQLException. thrownewSQLException("Illegaloperation:connectionisclosed"); } } //TargetConnectionalreadyfetched, //ortargetConnectionnecessaryforcurrentoperation-> //invokemethodontargetconnection. try{ returnmethod.invoke(target,args); } catch(InvocationTargetExceptionex){ throwex.getTargetException(); } } /** *ReturnwhethertheproxycurrentlyholdsatargetConnection. */ privatebooleanhasTargetConnection(){ return(this.target!=null); } /** *ReturnthetargetConnection,fetchingitandinitializingitifnecessary. */ privateConnectiongetTargetConnection(Methodoperation,Object[]args)throwsSQLException{ if(this.target==null){ Stringkey=(String)args[0]; //NotargetConnectionheld->fetchone. if(logger.isDebugEnabled()){ logger.debug("Connectingtodatabaseforoperation'"+operation.getName()+"'"); } //FetchphysicalConnectionfromDataSource. this.target=(this.username!=null)? determineTargetDataSource(key).getConnection(this.username,this.password): determineTargetDataSource(key).getConnection(); //Ifwestilllackdefaultconnectionproperties,checkthemnow. //checkDefaultConnectionProperties(this.target); //Applykepttransactionsettings,ifany. if(this.readOnly.booleanValue()){ this.target.setReadOnly(this.readOnly.booleanValue()); } if(this.transactionIsolation!=null){ this.target.setTransactionIsolation(this.transactionIsolation.intValue()); } if(this.autoCommit!=null&&this.autoCommit.booleanValue()!=this.target.getAutoCommit()){ this.target.setAutoCommit(this.autoCommit.booleanValue()); } } else{ //TargetConnectionalreadyheld->returnit. if(logger.isDebugEnabled()){ logger.debug("Usingexistingdatabaseconnectionforoperation'"+operation.getName()+"'"); } } returnthis.target; } } }
importjavax.sql.DataSource; importjava.util.concurrent.ThreadLocalRandom; importjava.util.concurrent.atomic.AtomicLong; importjava.util.concurrent.locks.Lock; importjava.util.concurrent.locks.ReentrantLock; /** *CreatedbyIDEA *User:mashaohua *Date:2016-07-1916:04 *Desc: */ publicclassDynamicRoutingDataSourceProxyextendsAbstractDynamicDataSourceProxy{ privateAtomicLongcounter=newAtomicLong(0); privatestaticfinalLongMAX_POOL=Long.MAX_VALUE; privatefinalLocklock=newReentrantLock(); @Override protectedDataSourceloadReadDataSource(){ intindex=1; if(getReadDataSourcePollPattern()==1){ //轮询方式 longcurrValue=counter.incrementAndGet(); if((currValue+1)>=MAX_POOL){ try{ lock.lock(); if((currValue+1)>=MAX_POOL){ counter.set(0); } }finally{ lock.unlock(); } } index=(int)(currValue%getReadDsSize()); }else{ //随机方式 index=ThreadLocalRandom.current().nextInt(0,getReadDsSize()); } returngetResolvedReadDataSources().get(index); } }
importorg.apache.ibatis.executor.statement.RoutingStatementHandler; importorg.apache.ibatis.executor.statement.StatementHandler; importorg.apache.ibatis.mapping.MappedStatement; importorg.apache.ibatis.mapping.SqlCommandType; importorg.apache.ibatis.plugin.*; importjava.sql.Connection; importjava.util.Properties; /** *拦截器 */ @Intercepts({@Signature(type=StatementHandler.class,method="prepare",args={Connection.class})}) publicclassDynamicPluginimplementsInterceptor{ publicObjectintercept(Invocationinvocation)throwsThrowable{ Connectionconn=(Connection)invocation.getArgs()[0]; //如果是采用了我们代理,则路由数据源 if(conninstanceofcom.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy){ StatementHandlerstatementHandler=(StatementHandler)invocation .getTarget(); MappedStatementmappedStatement=null; if(statementHandlerinstanceofRoutingStatementHandler){ StatementHandlerdelegate=(StatementHandler)ReflectionUtils .getFieldValue(statementHandler,"delegate"); mappedStatement=(MappedStatement)ReflectionUtils.getFieldValue( delegate,"mappedStatement"); }else{ mappedStatement=(MappedStatement)ReflectionUtils.getFieldValue( statementHandler,"mappedStatement"); } Stringkey=AbstractDynamicDataSourceProxy.WRITE; if(mappedStatement.getSqlCommandType()==SqlCommandType.SELECT){ key=AbstractDynamicDataSourceProxy.READ; }else{ key=AbstractDynamicDataSourceProxy.WRITE; } ConnectionProxyconnectionProxy=(ConnectionProxy)conn; connectionProxy.getTargetConnection(key); } returninvocation.proceed(); } publicObjectplugin(Objecttarget){ returnPlugin.wrap(target,this); } publicvoidsetProperties(Propertiesproperties){ //NOOP } }
importorg.apache.ibatis.logging.Log; importorg.apache.ibatis.logging.LogFactory; importjava.lang.reflect.*; publicclassReflectionUtils{ privatestaticfinalLoglogger=LogFactory.getLog(ReflectionUtils.class); /** *直接设置对象属性值,无视private/protected修饰符,不经过setter函数. */ publicstaticvoidsetFieldValue(finalObjectobject,finalStringfieldName,finalObjectvalue){ Fieldfield=getDeclaredField(object,fieldName); if(field==null) thrownewIllegalArgumentException("Couldnotfindfield["+fieldName+"]ontarget["+object+"]"); makeAccessible(field); try{ field.set(object,value); }catch(IllegalAccessExceptione){ } } /** *直接读取对象属性值,无视private/protected修饰符,不经过getter函数. */ publicstaticObjectgetFieldValue(finalObjectobject,finalStringfieldName){ Fieldfield=getDeclaredField(object,fieldName); if(field==null) thrownewIllegalArgumentException("Couldnotfindfield["+fieldName+"]ontarget["+object+"]"); makeAccessible(field); Objectresult=null; try{ result=field.get(object); }catch(IllegalAccessExceptione){ } returnresult; } /** *直接调用对象方法,无视private/protected修饰符. */ publicstaticObjectinvokeMethod(finalObjectobject,finalStringmethodName,finalClass<?>[]parameterTypes, finalObject[]parameters)throwsInvocationTargetException{ Methodmethod=getDeclaredMethod(object,methodName,parameterTypes); if(method==null) thrownewIllegalArgumentException("Couldnotfindmethod["+methodName+"]ontarget["+object+"]"); method.setAccessible(true); try{ returnmethod.invoke(object,parameters); }catch(IllegalAccessExceptione){ } returnnull; } /** *循环向上转型,获取对象的DeclaredField. */ protectedstaticFieldgetDeclaredField(finalObjectobject,finalStringfieldName){ for(Class<?>superClass=object.getClass();superClass!=Object.class;superClass=superClass .getSuperclass()){ try{ returnsuperClass.getDeclaredField(fieldName); }catch(NoSuchFieldExceptione){ } } returnnull; } /** *循环向上转型,获取对象的DeclaredField. */ protectedstaticvoidmakeAccessible(finalFieldfield){ if(!Modifier.isPublic(field.getModifiers())||!Modifier.isPublic(field.getDeclaringClass().getModifiers())){ field.setAccessible(true); } } /** *循环向上转型,获取对象的DeclaredMethod. */ protectedstaticMethodgetDeclaredMethod(Objectobject,StringmethodName,Class<?>[]parameterTypes){ for(Class<?>superClass=object.getClass();superClass!=Object.class;superClass=superClass .getSuperclass()){ try{ returnsuperClass.getDeclaredMethod(methodName,parameterTypes); }catch(NoSuchMethodExceptione){ } } returnnull; } /** *通过反射,获得Class定义中声明的父类的泛型参数的类型. *eg. *publicUserDaoextendsHibernateDao<User> * *@paramclazzTheclasstointrospect *@returnthefirstgenericdeclaration,orObject.classifcannotbedetermined */ @SuppressWarnings("unchecked") publicstatic<T>Class<T>getSuperClassGenricType(finalClassclazz){ returngetSuperClassGenricType(clazz,0); } /** *通过反射,获得Class定义中声明的父类的泛型参数的类型. *eg. *publicUserDaoextendsHibernateDao<User> * *@paramclazzTheclasstointrospect *@returnthefirstgenericdeclaration,orObject.classifcannotbedetermined */ @SuppressWarnings("unchecked") publicstaticClassgetSuperClassGenricType(finalClassclazz,finalintindex){ TypegenType=clazz.getGenericSuperclass(); if(!(genTypeinstanceofParameterizedType)){ logger.warn(clazz.getSimpleName()+"'ssuperclassnotParameterizedType"); returnObject.class; } Type[]params=((ParameterizedType)genType).getActualTypeArguments(); if(index>=params.length||index<0){ logger.warn("Index:"+index+",Sizeof"+clazz.getSimpleName()+"'sParameterizedType:" +params.length); returnObject.class; } if(!(params[index]instanceofClass)){ logger.warn(clazz.getSimpleName()+"notsettheactualclassonsuperclassgenericparameter"); returnObject.class; } return(Class)params[index]; } /** *将反射时的checkedexception转换为uncheckedexception. */ publicstaticIllegalArgumentExceptionconvertToUncheckedException(Exceptione){ if(einstanceofIllegalAccessException||einstanceofIllegalArgumentException ||einstanceofNoSuchMethodException) returnnewIllegalArgumentException("RefelctionException.",e); else returnnewIllegalArgumentException(e); } }
<?xmlversion="1.0"encoding="UTF-8"?> <!DOCTYPEconfigurationPUBLIC"-//mybatis.org//DTDSQLMapConfig3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd"> <configuration> <plugins> <plugininterceptor="com.test.api.dao.mybatis.DynamicPlugin"> </plugin> </plugins> </configuration>
<?xmlversion="1.0"encoding="UTF-8"?> <beansxmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-4.1.xsd"> <beanid="abstractDataSource"abstract="true"class="com.alibaba.druid.pool.DruidDataSource"init-method="init"destroy-method="close"> <propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/> <!--配置获取连接等待超时的时间--> <propertyname="maxWait"value="60000"/> <!--配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒--> <propertyname="timeBetweenEvictionRunsMillis"value="60000"/> <!--配置一个连接在池中最小生存的时间,单位是毫秒--> <propertyname="minEvictableIdleTimeMillis"value="300000"/> <propertyname="validationQuery"value="SELECT'x'"/> <propertyname="testWhileIdle"value="true"/> <propertyname="testOnBorrow"value="false"/> <propertyname="testOnReturn"value="false"/> <!--打开PSCache,并且指定每个连接上PSCache的大小--> <propertyname="poolPreparedStatements"value="true"/> <propertyname="maxPoolPreparedStatementPerConnectionSize"value="20"/> <propertyname="filters"value="config"/> <propertyname="connectionProperties"value="config.decrypt=true"/> </bean> <beanid="dataSourceRead1"parent="abstractDataSource"> <propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/> <!--基本属性url、user、password--> <propertyname="url"value="${read1.jdbc.url}"/> <propertyname="username"value="${read1.jdbc.user}"/> <propertyname="password"value="${read1.jdbc.password}"/> <!--配置初始化大小、最小、最大--> <propertyname="initialSize"value="${read1.jdbc.initPoolSize}"/> <propertyname="minIdle"value="${read1.jdbc.minPoolSize}"/> <propertyname="maxActive"value="${read1.jdbc.maxPoolSize}"/> </bean> <beanid="dataSourceRead2"parent="abstractDataSource"> <propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/> <!--基本属性url、user、password--> <propertyname="url"value="${read2.jdbc.url}"/> <propertyname="username"value="${read2.jdbc.user}"/> <propertyname="password"value="${read2.jdbc.password}"/> <!--配置初始化大小、最小、最大--> <propertyname="initialSize"value="${read2.jdbc.initPoolSize}"/> <propertyname="minIdle"value="${read2.jdbc.minPoolSize}"/> <propertyname="maxActive"value="${read2.jdbc.maxPoolSize}"/> </bean> <beanid="dataSourceWrite"parent="abstractDataSource"> <propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/> <!--基本属性url、user、password--> <propertyname="url"value="${write.jdbc.url}"/> <propertyname="username"value="${write.jdbc.user}"/> <propertyname="password"value="${write.jdbc.password}"/> <!--配置初始化大小、最小、最大--> <propertyname="initialSize"value="${write.jdbc.initPoolSize}"/> <propertyname="minIdle"value="${write.jdbc.minPoolSize}"/> <propertyname="maxActive"value="${write.jdbc.maxPoolSize}"/> </bean> <beanid="dataSource"class="com.test.api.dao.datasource.DynamicRoutingDataSourceProxy"> <propertyname="writeDataSource"ref="dataSourceWrite"/> <propertyname="readDataSources"> <list> <refbean="dataSourceRead1"/> <refbean="dataSourceRead2"/> </list> </property> <!--轮询方式--> <propertyname="readDataSourcePollPattern"value="1"/> </bean> <tx:annotation-driventransaction-manager="transactionManager"/> <beanid="transactionManager"class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <propertyname="dataSource"ref="dataSource"/> </bean> <!--针对myBatis的配置项--> <!--配置sqlSessionFactory--> <beanid="sqlSessionFactory"class="org.mybatis.spring.SqlSessionFactoryBean"> <!--实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件--> <propertyname="dataSource"ref="dataSource"/> <propertyname="mapperLocations"value="classpath:mapper/*.xml"/> <propertyname="configLocation"value="classpath:mybatis-plugin-config.xml"/> </bean> <beanid="sqlSessionTemplate"class="org.mybatis.spring.SqlSessionTemplate"> <constructor-argref="sqlSessionFactory"/> </bean> <!--通过扫描的模式,扫描目录下所有的mapper,根据对应的mapper.xml为其生成代理类--> <beanid="mapper"class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <propertyname="basePackage"value="com.test.api.dao.inte"/> <propertyname="sqlSessionTemplate"ref="sqlSessionTemplate"></property> </bean> </beans>
方案4
如果你的后台结构是spring+mybatis,可以通过spring的AbstractRoutingDataSource和mybatisPlugin拦截器实现非常友好的读写分离,原有代码不需要任何改变。推荐第四种方案
importorg.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; importjava.util.HashMap; importjava.util.Map; /** *CreatedbyIDEA *User:mashaohua *Date:2016-07-1410:56 *Desc:动态数据源实现读写分离 */ publicclassDynamicDataSourceextendsAbstractRoutingDataSource{ privateObjectwriteDataSource;//写数据源 privateObjectreadDataSource;//读数据源 @Override publicvoidafterPropertiesSet(){ if(this.writeDataSource==null){ thrownewIllegalArgumentException("Property'writeDataSource'isrequired"); } setDefaultTargetDataSource(writeDataSource); Map<Object,Object>targetDataSources=newHashMap<>(); targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(),writeDataSource); if(readDataSource!=null){ targetDataSources.put(DynamicDataSourceGlobal.READ.name(),readDataSource); } setTargetDataSources(targetDataSources); super.afterPropertiesSet(); } @Override protectedObjectdetermineCurrentLookupKey(){ DynamicDataSourceGlobaldynamicDataSourceGlobal=DynamicDataSourceHolder.getDataSource(); if(dynamicDataSourceGlobal==null ||dynamicDataSourceGlobal==DynamicDataSourceGlobal.WRITE){ returnDynamicDataSourceGlobal.WRITE.name(); } returnDynamicDataSourceGlobal.READ.name(); } publicvoidsetWriteDataSource(ObjectwriteDataSource){ this.writeDataSource=writeDataSource; } publicObjectgetWriteDataSource(){ returnwriteDataSource; } publicObjectgetReadDataSource(){ returnreadDataSource; } publicvoidsetReadDataSource(ObjectreadDataSource){ this.readDataSource=readDataSource; } }
/** *CreatedbyIDEA *User:mashaohua *Date:2016-07-1410:58 *Desc: */ publicenumDynamicDataSourceGlobal{ READ,WRITE; }
publicfinalclassDynamicDataSourceHolder{ privatestaticfinalThreadLocal<DynamicDataSourceGlobal>holder=newThreadLocal<DynamicDataSourceGlobal>(); privateDynamicDataSourceHolder(){ // } publicstaticvoidputDataSource(DynamicDataSourceGlobaldataSource){ holder.set(dataSource); } publicstaticDynamicDataSourceGlobalgetDataSource(){ returnholder.get(); } publicstaticvoidclearDataSource(){ holder.remove(); } }
importorg.springframework.jdbc.datasource.DataSourceTransactionManager; importorg.springframework.transaction.TransactionDefinition; /** *CreatedbyIDEA *User:mashaohua *Date:2016-08-1014:34 *Desc: */ publicclassDynamicDataSourceTransactionManagerextendsDataSourceTransactionManager{ /** *只读事务到读库,读写事务到写库 *@paramtransaction *@paramdefinition */ @Override protectedvoiddoBegin(Objecttransaction,TransactionDefinitiondefinition){ //设置数据源 booleanreadOnly=definition.isReadOnly(); if(readOnly){ DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.READ); }else{ DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.WRITE); } super.doBegin(transaction,definition); } /** *清理本地线程的数据源 *@paramtransaction */ @Override protectedvoiddoCleanupAfterCompletion(Objecttransaction){ super.doCleanupAfterCompletion(transaction); DynamicDataSourceHolder.clearDataSource(); } }
importorg.apache.ibatis.executor.Executor; importorg.apache.ibatis.executor.keygen.SelectKeyGenerator; importorg.apache.ibatis.mapping.BoundSql; importorg.apache.ibatis.mapping.MappedStatement; importorg.apache.ibatis.mapping.SqlCommandType; importorg.apache.ibatis.plugin.*; importorg.apache.ibatis.session.ResultHandler; importorg.apache.ibatis.session.RowBounds; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.transaction.support.TransactionSynchronizationManager; importjava.util.Locale; importjava.util.Map; importjava.util.Properties; importjava.util.concurrent.ConcurrentHashMap; /** *CreatedbyIDEA *User:mashaohua *Date:2016-08-1011:09 *Desc: */ @Intercepts({ @Signature(type=Executor.class,method="update",args={ MappedStatement.class,Object.class}), @Signature(type=Executor.class,method="query",args={ MappedStatement.class,Object.class,RowBounds.class, ResultHandler.class})}) publicclassDynamicPluginimplementsInterceptor{ protectedstaticfinalLoggerlogger=LoggerFactory.getLogger(DynamicPlugin.class); privatestaticfinalStringREGEX=".*insert\\u0020.*|.*delete\\u0020.*|.*update\\u0020.*"; privatestaticfinalMap<String,DynamicDataSourceGlobal>cacheMap=newConcurrentHashMap<>(); @Override publicObjectintercept(Invocationinvocation)throwsThrowable{ booleansynchronizationActive=TransactionSynchronizationManager.isSynchronizationActive(); if(!synchronizationActive){ Object[]objects=invocation.getArgs(); MappedStatementms=(MappedStatement)objects[0]; DynamicDataSourceGlobaldynamicDataSourceGlobal=null; if((dynamicDataSourceGlobal=cacheMap.get(ms.getId()))==null){ //读方法 if(ms.getSqlCommandType().equals(SqlCommandType.SELECT)){ //!selectKey为自增id查询主键(SELECTLAST_INSERT_ID())方法,使用主库 if(ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)){ dynamicDataSourceGlobal=DynamicDataSourceGlobal.WRITE; }else{ BoundSqlboundSql=ms.getSqlSource().getBoundSql(objects[1]); Stringsql=boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]",""); if(sql.matches(REGEX)){ dynamicDataSourceGlobal=DynamicDataSourceGlobal.WRITE; }else{ dynamicDataSourceGlobal=DynamicDataSourceGlobal.READ; } } }else{ dynamicDataSourceGlobal=DynamicDataSourceGlobal.WRITE; } logger.warn("设置方法[{}]use[{}]Strategy,SqlCommandType[{}]..",ms.getId(),dynamicDataSourceGlobal.name(),ms.getSqlCommandType().name()); cacheMap.put(ms.getId(),dynamicDataSourceGlobal); } DynamicDataSourceHolder.putDataSource(dynamicDataSourceGlobal); } returninvocation.proceed(); } @Override publicObjectplugin(Objecttarget){ if(targetinstanceofExecutor){ returnPlugin.wrap(target,this); }else{ returntarget; } } @Override publicvoidsetProperties(Propertiesproperties){ // } }
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持毛票票!