Spring+MyBatis实现数据库读写分离方案
推荐第四种
方案1
通过MyBatis配置文件创建读写分离两个DataSource,每个SqlSessionFactoryBean对象的mapperLocations属性制定两个读写数据源的配置文件。将所有读的操作配置在读文件中,所有写的操作配置在写文件中。
优点:实现简单
缺点:维护麻烦,需要对原有的xml文件进行重新修改,不支持多读,不易扩展
实现方式
<beanid="abstractDataSource"abstract="true"class="com.alibaba.druid.pool.DruidDataSource"init-method="init"
destroy-method="close">
<propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<!--配置获取连接等待超时的时间-->
<propertyname="maxWait"value="60000"/>
<!--配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒-->
<propertyname="timeBetweenEvictionRunsMillis"value="60000"/>
<!--配置一个连接在池中最小生存的时间,单位是毫秒-->
<propertyname="minEvictableIdleTimeMillis"value="300000"/>
<propertyname="validationQuery"value="SELECT'x'"/>
<propertyname="testWhileIdle"value="true"/>
<propertyname="testOnBorrow"value="false"/>
<propertyname="testOnReturn"value="false"/>
<!--打开PSCache,并且指定每个连接上PSCache的大小-->
<propertyname="poolPreparedStatements"value="true"/>
<propertyname="maxPoolPreparedStatementPerConnectionSize"value="20"/>
<propertyname="filters"value="config"/>
<propertyname="connectionProperties"value="config.decrypt=true"/>
</bean>
<beanid="readDataSource"parent="abstractDataSource">
<!--基本属性url、user、password-->
<propertyname="url"value="${read.jdbc.url}"/>
<propertyname="username"value="${read.jdbc.user}"/>
<propertyname="password"value="${read.jdbc.password}"/>
<!--配置初始化大小、最小、最大-->
<propertyname="initialSize"value="${read.jdbc.initPoolSize}"/>
<propertyname="minIdle"value="10"/>
<propertyname="maxActive"value="${read.jdbc.maxPoolSize}"/>
</bean>
<beanid="writeDataSource"parent="abstractDataSource">
<!--基本属性url、user、password-->
<propertyname="url"value="${write.jdbc.url}"/>
<propertyname="username"value="${write.jdbc.user}"/>
<propertyname="password"value="${write.jdbc.password}"/>
<!--配置初始化大小、最小、最大-->
<propertyname="initialSize"value="${write.jdbc.initPoolSize}"/>
<propertyname="minIdle"value="10"/>
<propertyname="maxActive"value="${write.jdbc.maxPoolSize}"/>
</bean>
<beanid="readSqlSessionFactory"class="org.mybatis.spring.SqlSessionFactoryBean">
<!--实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件-->
<propertyname="dataSource"ref="readDataSource"/>
<propertyname="mapperLocations"value="classpath:mapper/read/*.xml"/>
</bean>
<beanid="writeSqlSessionFactory"class="org.mybatis.spring.SqlSessionFactoryBean">
<!--实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件-->
<propertyname="dataSource"ref="writeDataSource"/>
<propertyname="mapperLocations"value="classpath:mapper/write/*.xml"/>
</bean>
方案2
通过SpringAOP在业务层实现读写分离,在DAO层调用前定义切面,利用Spring的AbstractRoutingDataSource解决多数据源的问题,实现动态选择数据源
优点:通过注解的方法在DAO每个方法上配置数据源,原有代码改动量少,易扩展,支持多读
缺点:需要在DAO每个方法上配置注解,人工管理,容易出错
实现方式
//定义枚举类型,读写
publicenumDynamicDataSourceGlobal{
READ,WRITE;
}
importjava.lang.annotation.ElementType;
importjava.lang.annotation.Retention;
importjava.lang.annotation.RetentionPolicy;
importjava.lang.annotation.Target;
/**
*RUNTIME
*定义注解
*编译器将把注释记录在类文件中,在运行时VM将保留注释,因此可以反射性地读取。
*@authorshma1664
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public@interfaceDataSource{
publicDynamicDataSourceGlobalvalue()defaultDynamicDataSourceGlobal.READ;
}
/**
*CreatedbyIDEA
*本地线程设置和获取数据源信息
*User:mashaohua
*Date:2016-07-0713:35
*Desc:
*/
publicclassDynamicDataSourceHolder{
privatestaticfinalThreadLocal<DynamicDataSourceGlobal>holder=newThreadLocal<DynamicDataSourceGlobal>();
publicstaticvoidputDataSource(DynamicDataSourceGlobaldataSource){
holder.set(dataSource);
}
publicstaticDynamicDataSourceGlobalgetDataSource(){
returnholder.get();
}
publicstaticvoidclearDataSource(){
holder.remove();
}
}
importorg.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
importjava.util.HashMap;
importjava.util.List;
importjava.util.Map;
importjava.util.concurrent.ThreadLocalRandom;
importjava.util.concurrent.atomic.AtomicLong;
importjava.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-07-1410:56
*Desc:动态数据源实现读写分离
*/
publicclassDynamicDataSourceextendsAbstractRoutingDataSource{
privateObjectwriteDataSource;//写数据源
privateList<Object>readDataSources;//多个读数据源
privateintreadDataSourceSize;//读数据源个数
privateintreadDataSourcePollPattern=0;//获取读数据源方式,0:随机,1:轮询
privateAtomicLongcounter=newAtomicLong(0);
privatestaticfinalLongMAX_POOL=Long.MAX_VALUE;
privatefinalLocklock=newReentrantLock();
@Override
publicvoidafterPropertiesSet(){
if(this.writeDataSource==null){
thrownewIllegalArgumentException("Property'writeDataSource'isrequired");
}
setDefaultTargetDataSource(writeDataSource);
Map<Object,Object>targetDataSources=newHashMap<>();
targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(),writeDataSource);
if(this.readDataSources==null){
readDataSourceSize=0;
}else{
for(inti=0;i<readDataSources.size();i++){
targetDataSources.put(DynamicDataSourceGlobal.READ.name()+i,readDataSources.get(i));
}
readDataSourceSize=readDataSources.size();
}
setTargetDataSources(targetDataSources);
super.afterPropertiesSet();
}
@Override
protectedObjectdetermineCurrentLookupKey(){
DynamicDataSourceGlobaldynamicDataSourceGlobal=DynamicDataSourceHolder.getDataSource();
if(dynamicDataSourceGlobal==null
||dynamicDataSourceGlobal==DynamicDataSourceGlobal.WRITE
||readDataSourceSize<=0){
returnDynamicDataSourceGlobal.WRITE.name();
}
intindex=1;
if(readDataSourcePollPattern==1){
//轮询方式
longcurrValue=counter.incrementAndGet();
if((currValue+1)>=MAX_POOL){
try{
lock.lock();
if((currValue+1)>=MAX_POOL){
counter.set(0);
}
}finally{
lock.unlock();
}
}
index=(int)(currValue%readDataSourceSize);
}else{
//随机方式
index=ThreadLocalRandom.current().nextInt(0,readDataSourceSize);
}
returndynamicDataSourceGlobal.name()+index;
}
publicvoidsetWriteDataSource(ObjectwriteDataSource){
this.writeDataSource=writeDataSource;
}
publicvoidsetReadDataSources(List<Object>readDataSources){
this.readDataSources=readDataSources;
}
publicvoidsetReadDataSourcePollPattern(intreadDataSourcePollPattern){
this.readDataSourcePollPattern=readDataSourcePollPattern;
}
}
importorg.apache.log4j.Logger;
importorg.aspectj.lang.JoinPoint;
importorg.aspectj.lang.reflect.MethodSignature;
importjava.lang.reflect.Method;
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-07-0713:39
*Desc:定义选择数据源切面
*/
publicclassDynamicDataSourceAspect{
privatestaticfinalLoggerlogger=Logger.getLogger(DynamicDataSourceAspect.class);
publicvoidpointCut(){};
publicvoidbefore(JoinPointpoint)
{
Objecttarget=point.getTarget();
StringmethodName=point.getSignature().getName();
Class<?>[]clazz=target.getClass().getInterfaces();
Class<?>[]parameterTypes=((MethodSignature)point.getSignature()).getMethod().getParameterTypes();
try{
Methodmethod=clazz[0].getMethod(methodName,parameterTypes);
if(method!=null&&method.isAnnotationPresent(DataSource.class)){
DataSourcedata=method.getAnnotation(DataSource.class);
DynamicDataSourceHolder.putDataSource(data.value());
}
}catch(Exceptione){
logger.error(String.format("ChooseDataSourceerror,method:%s,msg:%s",methodName,e.getMessage()));
}
}
publicvoidafter(JoinPointpoint){
DynamicDataSourceHolder.clearDataSource();
}
}
<?xmlversion="1.0"encoding="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/txhttp://www.springframework.org/schema/tx/spring-tx-4.1.xsd
http://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-4.1.xsd">
<beanid="abstractDataSource"abstract="true"class="com.alibaba.druid.pool.DruidDataSource"init-method="init"destroy-method="close">
<propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<!--配置获取连接等待超时的时间-->
<propertyname="maxWait"value="60000"/>
<!--配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒-->
<propertyname="timeBetweenEvictionRunsMillis"value="60000"/>
<!--配置一个连接在池中最小生存的时间,单位是毫秒-->
<propertyname="minEvictableIdleTimeMillis"value="300000"/>
<propertyname="validationQuery"value="SELECT'x'"/>
<propertyname="testWhileIdle"value="true"/>
<propertyname="testOnBorrow"value="false"/>
<propertyname="testOnReturn"value="false"/>
<!--打开PSCache,并且指定每个连接上PSCache的大小-->
<propertyname="poolPreparedStatements"value="true"/>
<propertyname="maxPoolPreparedStatementPerConnectionSize"value="20"/>
<propertyname="filters"value="config"/>
<propertyname="connectionProperties"value="config.decrypt=true"/>
</bean>
<beanid="dataSourceRead1"parent="abstractDataSource">
<propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<!--基本属性url、user、password-->
<propertyname="url"value="${read1.jdbc.url}"/>
<propertyname="username"value="${read1.jdbc.user}"/>
<propertyname="password"value="${read1.jdbc.password}"/>
<!--配置初始化大小、最小、最大-->
<propertyname="initialSize"value="${read1.jdbc.initPoolSize}"/>
<propertyname="minIdle"value="${read1.jdbc.minPoolSize}"/>
<propertyname="maxActive"value="${read1.jdbc.maxPoolSize}"/>
</bean>
<beanid="dataSourceRead2"parent="abstractDataSource">
<propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<!--基本属性url、user、password-->
<propertyname="url"value="${read2.jdbc.url}"/>
<propertyname="username"value="${read2.jdbc.user}"/>
<propertyname="password"value="${read2.jdbc.password}"/>
<!--配置初始化大小、最小、最大-->
<propertyname="initialSize"value="${read2.jdbc.initPoolSize}"/>
<propertyname="minIdle"value="${read2.jdbc.minPoolSize}"/>
<propertyname="maxActive"value="${read2.jdbc.maxPoolSize}"/>
</bean>
<beanid="dataSourceWrite"parent="abstractDataSource">
<propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<!--基本属性url、user、password-->
<propertyname="url"value="${write.jdbc.url}"/>
<propertyname="username"value="${write.jdbc.user}"/>
<propertyname="password"value="${write.jdbc.password}"/>
<!--配置初始化大小、最小、最大-->
<propertyname="initialSize"value="${write.jdbc.initPoolSize}"/>
<propertyname="minIdle"value="${write.jdbc.minPoolSize}"/>
<propertyname="maxActive"value="${write.jdbc.maxPoolSize}"/>
</bean>
<beanid="dataSource"class="com.test.api.dao.datasource.DynamicDataSource">
<propertyname="writeDataSource"ref="dataSourceWrite"/>
<propertyname="readDataSources">
<list>
<refbean="dataSourceRead1"/>
<refbean="dataSourceRead2"/>
</list>
</property>
<!--轮询方式-->
<propertyname="readDataSourcePollPattern"value="1"/>
<propertyname="defaultTargetDataSource"ref="dataSourceWrite"/>
</bean>
<tx:annotation-driventransaction-manager="transactionManager"/>
<beanid="transactionManager"class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<propertyname="dataSource"ref="dataSource"/>
</bean>
<!--针对myBatis的配置项-->
<!--配置sqlSessionFactory-->
<beanid="sqlSessionFactory"class="org.mybatis.spring.SqlSessionFactoryBean">
<!--实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件-->
<propertyname="dataSource"ref="dataSource"/>
<propertyname="mapperLocations"value="classpath:mapper/*.xml"/>
</bean>
<!--配置扫描器-->
<beanclass="org.mybatis.spring.mapper.MapperScannerConfigurer">
<!--扫描包以及它的子包下的所有映射接口类-->
<propertyname="basePackage"value="com.test.api.dao.inte"/>
<propertyname="sqlSessionFactoryBeanName"value="sqlSessionFactory"/>
</bean>
<!--配置数据库注解aop-->
<beanid="dynamicDataSourceAspect"class="com.test.api.dao.datasource.DynamicDataSourceAspect"/>
<aop:config>
<aop:aspectid="c"ref="dynamicDataSourceAspect">
<aop:pointcutid="tx"expression="execution(*com.test.api.dao.inte..*.*(..))"/>
<aop:beforepointcut-ref="tx"method="before"/>
<aop:afterpointcut-ref="tx"method="after"/>
</aop:aspect>
</aop:config>
<!--配置数据库注解aop-->
</beans>
方案3
通过Mybatis的Plugin在业务层实现数据库读写分离,在MyBatis创建Statement对象前通过拦截器选择真正的数据源,在拦截器中根据方法名称不同(select、update、insert、delete)选择数据源。
优点:原有代码不变,支持多读,易扩展
缺点:
实现方式
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-07-1915:40
*Desc:创建Connection代理接口
*/
publicinterfaceConnectionProxyextendsConnection{
/**
*根据传入的读写分离需要的key路由到正确的connection
*@paramkey数据源标识
*@return
*/
ConnectiongetTargetConnection(Stringkey);
}
importjava.lang.reflect.InvocationHandler;
importjava.lang.reflect.InvocationTargetException;
importjava.lang.reflect.Method;
importjava.lang.reflect.Proxy;
importjava.sql.Connection;
importjava.sql.SQLException;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.logging.Logger;
importjavax.sql.DataSource;
importorg.springframework.beans.factory.InitializingBean;
importorg.springframework.jdbc.datasource.AbstractDataSource;
importorg.springframework.jdbc.datasource.lookup.DataSourceLookup;
importorg.springframework.jdbc.datasource.lookup.JndiDataSourceLookup;
importorg.springframework.util.Assert;
publicabstractclassAbstractDynamicDataSourceProxyextendsAbstractDataSourceimplementsInitializingBean{
privateList<Object>readDataSources;
privateList<DataSource>resolvedReadDataSources;
privateObjectwriteDataSource;
privateDataSourceresolvedWriteDataSource;
privateintreadDataSourcePollPattern=0;
privateintreadDsSize;
privatebooleandefaultAutoCommit=true;
privateintdefaultTransactionIsolation=Connection.TRANSACTION_READ_COMMITTED;
publicstaticfinalStringREAD="read";
publicstaticfinalStringWRITE="write";
privateDataSourceLookupdataSourceLookup=newJndiDataSourceLookup();
@Override
publicConnectiongetConnection()throwsSQLException{
return(Connection)Proxy.newProxyInstance(
com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class.getClassLoader(),
newClass[]{com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class},
newRWConnectionInvocationHandler());
}
@Override
publicConnectiongetConnection(Stringusername,Stringpassword)
throwsSQLException{
return(Connection)Proxy.newProxyInstance(com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class.getClassLoader(),
newClass[]{com.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy.class},
newRWConnectionInvocationHandler(username,password));
}
publicintgetReadDsSize(){
returnreadDsSize;
}
publicList<DataSource>getResolvedReadDataSources(){
returnresolvedReadDataSources;
}
publicvoidafterPropertiesSet()throwsException{
if(writeDataSource==null){
thrownewIllegalArgumentException("Property'writeDataSource'isrequired");
}
this.resolvedWriteDataSource=resolveSpecifiedDataSource(writeDataSource);
resolvedReadDataSources=newArrayList<DataSource>(readDataSources.size());
for(Objectitem:readDataSources){
resolvedReadDataSources.add(resolveSpecifiedDataSource(item));
}
readDsSize=readDataSources.size();
}
protectedDataSourcedetermineTargetDataSource(Stringkey){
Assert.notNull(this.resolvedReadDataSources,"DataSourcerouternotinitialized");
if(WRITE.equals(key)){
returnresolvedWriteDataSource;
}else{
returnloadReadDataSource();
}
}
publicLoggergetParentLogger(){
//NOOPJustignore
returnnull;
}
/**
*获取真实的datasource
*@paramdataSource(jndi|realdatasource)
*@return
*@throwsIllegalArgumentException
*/
protectedDataSourceresolveSpecifiedDataSource(ObjectdataSource)throwsIllegalArgumentException{
if(dataSourceinstanceofDataSource){
return(DataSource)dataSource;
}
elseif(dataSourceinstanceofString){
returnthis.dataSourceLookup.getDataSource((String)dataSource);
}
else{
thrownewIllegalArgumentException(
"Illegaldatasourcevalue-only[javax.sql.DataSource]andStringsupported:"+dataSource);
}
}
protectedabstractDataSourceloadReadDataSource();
publicvoidsetReadDsSize(intreadDsSize){
this.readDsSize=readDsSize;
}
publicList<Object>getReadDataSources(){
returnreadDataSources;
}
publicvoidsetReadDataSources(List<Object>readDataSources){
this.readDataSources=readDataSources;
}
publicObjectgetWriteDataSource(){
returnwriteDataSource;
}
publicvoidsetWriteDataSource(ObjectwriteDataSource){
this.writeDataSource=writeDataSource;
}
publicvoidsetResolvedReadDataSources(List<DataSource>resolvedReadDataSources){
this.resolvedReadDataSources=resolvedReadDataSources;
}
publicDataSourcegetResolvedWriteDataSource(){
returnresolvedWriteDataSource;
}
publicvoidsetResolvedWriteDataSource(DataSourceresolvedWriteDataSource){
this.resolvedWriteDataSource=resolvedWriteDataSource;
}
publicintgetReadDataSourcePollPattern(){
returnreadDataSourcePollPattern;
}
publicvoidsetReadDataSourcePollPattern(intreadDataSourcePollPattern){
this.readDataSourcePollPattern=readDataSourcePollPattern;
}
/**
*InvocationhandlerthatdefersfetchinganactualJDBCConnection
*untilfirstcreationofaStatement.
*/
privateclassRWConnectionInvocationHandlerimplementsInvocationHandler{
privateStringusername;
privateStringpassword;
privateBooleanreadOnly=Boolean.FALSE;
privateIntegertransactionIsolation;
privateBooleanautoCommit;
privatebooleanclosed=false;
privateConnectiontarget;
publicRWConnectionInvocationHandler(){
}
publicRWConnectionInvocationHandler(Stringusername,Stringpassword){
this();
this.username=username;
this.password=password;
}
publicObjectinvoke(Objectproxy,Methodmethod,Object[]args)throwsThrowable{
//InvocationonConnectionProxyinterfacecomingin...
if(method.getName().equals("equals")){
//WemustavoidfetchingatargetConnectionfor"equals".
//Onlyconsiderequalwhenproxiesareidentical.
return(proxy==args[0]?Boolean.TRUE:Boolean.FALSE);
}
elseif(method.getName().equals("hashCode")){
//WemustavoidfetchingatargetConnectionfor"hashCode",
//andwemustreturnthesamehashcodeevenwhenthetarget
//Connectionhasbeenfetched:usehashCodeofConnectionproxy.
returnnewInteger(System.identityHashCode(proxy));
}
elseif(method.getName().equals("getTargetConnection")){
//HandlegetTargetConnectionmethod:returnunderlyingconnection.
returngetTargetConnection(method,args);
}
if(!hasTargetConnection()){
//NophysicaltargetConnectionkeptyet->
//resolvetransactiondemarcationmethodswithoutfetching
//aphysicalJDBCConnectionuntilabsolutelynecessary.
if(method.getName().equals("toString")){
return"RWRoutingDataSourceProxy";
}
elseif(method.getName().equals("isReadOnly")){
returnthis.readOnly;
}
elseif(method.getName().equals("setReadOnly")){
this.readOnly=(Boolean)args[0];
returnnull;
}
elseif(method.getName().equals("getTransactionIsolation")){
if(this.transactionIsolation!=null){
returnthis.transactionIsolation;
}
returndefaultTransactionIsolation;
//ElsefetchactualConnectionandcheckthere,
//becausewedidn'thaveadefaultspecified.
}
elseif(method.getName().equals("setTransactionIsolation")){
this.transactionIsolation=(Integer)args[0];
returnnull;
}
elseif(method.getName().equals("getAutoCommit")){
if(this.autoCommit!=null)
returnthis.autoCommit;
returndefaultAutoCommit;
//ElsefetchactualConnectionandcheckthere,
//becausewedidn'thaveadefaultspecified.
}
elseif(method.getName().equals("setAutoCommit")){
this.autoCommit=(Boolean)args[0];
returnnull;
}
elseif(method.getName().equals("commit")){
//Ignore:nostatementscreatedyet.
returnnull;
}
elseif(method.getName().equals("rollback")){
//Ignore:nostatementscreatedyet.
returnnull;
}
elseif(method.getName().equals("getWarnings")){
returnnull;
}
elseif(method.getName().equals("clearWarnings")){
returnnull;
}
elseif(method.getName().equals("isClosed")){
return(this.closed?Boolean.TRUE:Boolean.FALSE);
}
elseif(method.getName().equals("close")){
//Ignore:notargetconnectionyet.
this.closed=true;
returnnull;
}
elseif(this.closed){
//Connectionproxyclosed,withouteverhavingfetcheda
//physicalJDBCConnection:throwcorrespondingSQLException.
thrownewSQLException("Illegaloperation:connectionisclosed");
}
}
//TargetConnectionalreadyfetched,
//ortargetConnectionnecessaryforcurrentoperation->
//invokemethodontargetconnection.
try{
returnmethod.invoke(target,args);
}
catch(InvocationTargetExceptionex){
throwex.getTargetException();
}
}
/**
*ReturnwhethertheproxycurrentlyholdsatargetConnection.
*/
privatebooleanhasTargetConnection(){
return(this.target!=null);
}
/**
*ReturnthetargetConnection,fetchingitandinitializingitifnecessary.
*/
privateConnectiongetTargetConnection(Methodoperation,Object[]args)throwsSQLException{
if(this.target==null){
Stringkey=(String)args[0];
//NotargetConnectionheld->fetchone.
if(logger.isDebugEnabled()){
logger.debug("Connectingtodatabaseforoperation'"+operation.getName()+"'");
}
//FetchphysicalConnectionfromDataSource.
this.target=(this.username!=null)?
determineTargetDataSource(key).getConnection(this.username,this.password):
determineTargetDataSource(key).getConnection();
//Ifwestilllackdefaultconnectionproperties,checkthemnow.
//checkDefaultConnectionProperties(this.target);
//Applykepttransactionsettings,ifany.
if(this.readOnly.booleanValue()){
this.target.setReadOnly(this.readOnly.booleanValue());
}
if(this.transactionIsolation!=null){
this.target.setTransactionIsolation(this.transactionIsolation.intValue());
}
if(this.autoCommit!=null&&this.autoCommit.booleanValue()!=this.target.getAutoCommit()){
this.target.setAutoCommit(this.autoCommit.booleanValue());
}
}
else{
//TargetConnectionalreadyheld->returnit.
if(logger.isDebugEnabled()){
logger.debug("Usingexistingdatabaseconnectionforoperation'"+operation.getName()+"'");
}
}
returnthis.target;
}
}
}
importjavax.sql.DataSource;
importjava.util.concurrent.ThreadLocalRandom;
importjava.util.concurrent.atomic.AtomicLong;
importjava.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-07-1916:04
*Desc:
*/
publicclassDynamicRoutingDataSourceProxyextendsAbstractDynamicDataSourceProxy{
privateAtomicLongcounter=newAtomicLong(0);
privatestaticfinalLongMAX_POOL=Long.MAX_VALUE;
privatefinalLocklock=newReentrantLock();
@Override
protectedDataSourceloadReadDataSource(){
intindex=1;
if(getReadDataSourcePollPattern()==1){
//轮询方式
longcurrValue=counter.incrementAndGet();
if((currValue+1)>=MAX_POOL){
try{
lock.lock();
if((currValue+1)>=MAX_POOL){
counter.set(0);
}
}finally{
lock.unlock();
}
}
index=(int)(currValue%getReadDsSize());
}else{
//随机方式
index=ThreadLocalRandom.current().nextInt(0,getReadDsSize());
}
returngetResolvedReadDataSources().get(index);
}
}
importorg.apache.ibatis.executor.statement.RoutingStatementHandler;
importorg.apache.ibatis.executor.statement.StatementHandler;
importorg.apache.ibatis.mapping.MappedStatement;
importorg.apache.ibatis.mapping.SqlCommandType;
importorg.apache.ibatis.plugin.*;
importjava.sql.Connection;
importjava.util.Properties;
/**
*拦截器
*/
@Intercepts({@Signature(type=StatementHandler.class,method="prepare",args={Connection.class})})
publicclassDynamicPluginimplementsInterceptor{
publicObjectintercept(Invocationinvocation)throwsThrowable{
Connectionconn=(Connection)invocation.getArgs()[0];
//如果是采用了我们代理,则路由数据源
if(conninstanceofcom.autohome.api.dealer.tuan.dao.rwmybatis.ConnectionProxy){
StatementHandlerstatementHandler=(StatementHandler)invocation
.getTarget();
MappedStatementmappedStatement=null;
if(statementHandlerinstanceofRoutingStatementHandler){
StatementHandlerdelegate=(StatementHandler)ReflectionUtils
.getFieldValue(statementHandler,"delegate");
mappedStatement=(MappedStatement)ReflectionUtils.getFieldValue(
delegate,"mappedStatement");
}else{
mappedStatement=(MappedStatement)ReflectionUtils.getFieldValue(
statementHandler,"mappedStatement");
}
Stringkey=AbstractDynamicDataSourceProxy.WRITE;
if(mappedStatement.getSqlCommandType()==SqlCommandType.SELECT){
key=AbstractDynamicDataSourceProxy.READ;
}else{
key=AbstractDynamicDataSourceProxy.WRITE;
}
ConnectionProxyconnectionProxy=(ConnectionProxy)conn;
connectionProxy.getTargetConnection(key);
}
returninvocation.proceed();
}
publicObjectplugin(Objecttarget){
returnPlugin.wrap(target,this);
}
publicvoidsetProperties(Propertiesproperties){
//NOOP
}
}
importorg.apache.ibatis.logging.Log;
importorg.apache.ibatis.logging.LogFactory;
importjava.lang.reflect.*;
publicclassReflectionUtils{
privatestaticfinalLoglogger=LogFactory.getLog(ReflectionUtils.class);
/**
*直接设置对象属性值,无视private/protected修饰符,不经过setter函数.
*/
publicstaticvoidsetFieldValue(finalObjectobject,finalStringfieldName,finalObjectvalue){
Fieldfield=getDeclaredField(object,fieldName);
if(field==null)
thrownewIllegalArgumentException("Couldnotfindfield["+fieldName+"]ontarget["+object+"]");
makeAccessible(field);
try{
field.set(object,value);
}catch(IllegalAccessExceptione){
}
}
/**
*直接读取对象属性值,无视private/protected修饰符,不经过getter函数.
*/
publicstaticObjectgetFieldValue(finalObjectobject,finalStringfieldName){
Fieldfield=getDeclaredField(object,fieldName);
if(field==null)
thrownewIllegalArgumentException("Couldnotfindfield["+fieldName+"]ontarget["+object+"]");
makeAccessible(field);
Objectresult=null;
try{
result=field.get(object);
}catch(IllegalAccessExceptione){
}
returnresult;
}
/**
*直接调用对象方法,无视private/protected修饰符.
*/
publicstaticObjectinvokeMethod(finalObjectobject,finalStringmethodName,finalClass<?>[]parameterTypes,
finalObject[]parameters)throwsInvocationTargetException{
Methodmethod=getDeclaredMethod(object,methodName,parameterTypes);
if(method==null)
thrownewIllegalArgumentException("Couldnotfindmethod["+methodName+"]ontarget["+object+"]");
method.setAccessible(true);
try{
returnmethod.invoke(object,parameters);
}catch(IllegalAccessExceptione){
}
returnnull;
}
/**
*循环向上转型,获取对象的DeclaredField.
*/
protectedstaticFieldgetDeclaredField(finalObjectobject,finalStringfieldName){
for(Class<?>superClass=object.getClass();superClass!=Object.class;superClass=superClass
.getSuperclass()){
try{
returnsuperClass.getDeclaredField(fieldName);
}catch(NoSuchFieldExceptione){
}
}
returnnull;
}
/**
*循环向上转型,获取对象的DeclaredField.
*/
protectedstaticvoidmakeAccessible(finalFieldfield){
if(!Modifier.isPublic(field.getModifiers())||!Modifier.isPublic(field.getDeclaringClass().getModifiers())){
field.setAccessible(true);
}
}
/**
*循环向上转型,获取对象的DeclaredMethod.
*/
protectedstaticMethodgetDeclaredMethod(Objectobject,StringmethodName,Class<?>[]parameterTypes){
for(Class<?>superClass=object.getClass();superClass!=Object.class;superClass=superClass
.getSuperclass()){
try{
returnsuperClass.getDeclaredMethod(methodName,parameterTypes);
}catch(NoSuchMethodExceptione){
}
}
returnnull;
}
/**
*通过反射,获得Class定义中声明的父类的泛型参数的类型.
*eg.
*publicUserDaoextendsHibernateDao<User>
*
*@paramclazzTheclasstointrospect
*@returnthefirstgenericdeclaration,orObject.classifcannotbedetermined
*/
@SuppressWarnings("unchecked")
publicstatic<T>Class<T>getSuperClassGenricType(finalClassclazz){
returngetSuperClassGenricType(clazz,0);
}
/**
*通过反射,获得Class定义中声明的父类的泛型参数的类型.
*eg.
*publicUserDaoextendsHibernateDao<User>
*
*@paramclazzTheclasstointrospect
*@returnthefirstgenericdeclaration,orObject.classifcannotbedetermined
*/
@SuppressWarnings("unchecked")
publicstaticClassgetSuperClassGenricType(finalClassclazz,finalintindex){
TypegenType=clazz.getGenericSuperclass();
if(!(genTypeinstanceofParameterizedType)){
logger.warn(clazz.getSimpleName()+"'ssuperclassnotParameterizedType");
returnObject.class;
}
Type[]params=((ParameterizedType)genType).getActualTypeArguments();
if(index>=params.length||index<0){
logger.warn("Index:"+index+",Sizeof"+clazz.getSimpleName()+"'sParameterizedType:"
+params.length);
returnObject.class;
}
if(!(params[index]instanceofClass)){
logger.warn(clazz.getSimpleName()+"notsettheactualclassonsuperclassgenericparameter");
returnObject.class;
}
return(Class)params[index];
}
/**
*将反射时的checkedexception转换为uncheckedexception.
*/
publicstaticIllegalArgumentExceptionconvertToUncheckedException(Exceptione){
if(einstanceofIllegalAccessException||einstanceofIllegalArgumentException
||einstanceofNoSuchMethodException)
returnnewIllegalArgumentException("RefelctionException.",e);
else
returnnewIllegalArgumentException(e);
}
}
<?xmlversion="1.0"encoding="UTF-8"?> <!DOCTYPEconfigurationPUBLIC"-//mybatis.org//DTDSQLMapConfig3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd"> <configuration> <plugins> <plugininterceptor="com.test.api.dao.mybatis.DynamicPlugin"> </plugin> </plugins> </configuration>
<?xmlversion="1.0"encoding="UTF-8"?>
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
http://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop-4.1.xsd">
<beanid="abstractDataSource"abstract="true"class="com.alibaba.druid.pool.DruidDataSource"init-method="init"destroy-method="close">
<propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<!--配置获取连接等待超时的时间-->
<propertyname="maxWait"value="60000"/>
<!--配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒-->
<propertyname="timeBetweenEvictionRunsMillis"value="60000"/>
<!--配置一个连接在池中最小生存的时间,单位是毫秒-->
<propertyname="minEvictableIdleTimeMillis"value="300000"/>
<propertyname="validationQuery"value="SELECT'x'"/>
<propertyname="testWhileIdle"value="true"/>
<propertyname="testOnBorrow"value="false"/>
<propertyname="testOnReturn"value="false"/>
<!--打开PSCache,并且指定每个连接上PSCache的大小-->
<propertyname="poolPreparedStatements"value="true"/>
<propertyname="maxPoolPreparedStatementPerConnectionSize"value="20"/>
<propertyname="filters"value="config"/>
<propertyname="connectionProperties"value="config.decrypt=true"/>
</bean>
<beanid="dataSourceRead1"parent="abstractDataSource">
<propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<!--基本属性url、user、password-->
<propertyname="url"value="${read1.jdbc.url}"/>
<propertyname="username"value="${read1.jdbc.user}"/>
<propertyname="password"value="${read1.jdbc.password}"/>
<!--配置初始化大小、最小、最大-->
<propertyname="initialSize"value="${read1.jdbc.initPoolSize}"/>
<propertyname="minIdle"value="${read1.jdbc.minPoolSize}"/>
<propertyname="maxActive"value="${read1.jdbc.maxPoolSize}"/>
</bean>
<beanid="dataSourceRead2"parent="abstractDataSource">
<propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<!--基本属性url、user、password-->
<propertyname="url"value="${read2.jdbc.url}"/>
<propertyname="username"value="${read2.jdbc.user}"/>
<propertyname="password"value="${read2.jdbc.password}"/>
<!--配置初始化大小、最小、最大-->
<propertyname="initialSize"value="${read2.jdbc.initPoolSize}"/>
<propertyname="minIdle"value="${read2.jdbc.minPoolSize}"/>
<propertyname="maxActive"value="${read2.jdbc.maxPoolSize}"/>
</bean>
<beanid="dataSourceWrite"parent="abstractDataSource">
<propertyname="driverClassName"value="com.microsoft.sqlserver.jdbc.SQLServerDriver"/>
<!--基本属性url、user、password-->
<propertyname="url"value="${write.jdbc.url}"/>
<propertyname="username"value="${write.jdbc.user}"/>
<propertyname="password"value="${write.jdbc.password}"/>
<!--配置初始化大小、最小、最大-->
<propertyname="initialSize"value="${write.jdbc.initPoolSize}"/>
<propertyname="minIdle"value="${write.jdbc.minPoolSize}"/>
<propertyname="maxActive"value="${write.jdbc.maxPoolSize}"/>
</bean>
<beanid="dataSource"class="com.test.api.dao.datasource.DynamicRoutingDataSourceProxy">
<propertyname="writeDataSource"ref="dataSourceWrite"/>
<propertyname="readDataSources">
<list>
<refbean="dataSourceRead1"/>
<refbean="dataSourceRead2"/>
</list>
</property>
<!--轮询方式-->
<propertyname="readDataSourcePollPattern"value="1"/>
</bean>
<tx:annotation-driventransaction-manager="transactionManager"/>
<beanid="transactionManager"class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<propertyname="dataSource"ref="dataSource"/>
</bean>
<!--针对myBatis的配置项-->
<!--配置sqlSessionFactory-->
<beanid="sqlSessionFactory"class="org.mybatis.spring.SqlSessionFactoryBean">
<!--实例化sqlSessionFactory时需要使用上述配置好的数据源以及SQL映射文件-->
<propertyname="dataSource"ref="dataSource"/>
<propertyname="mapperLocations"value="classpath:mapper/*.xml"/>
<propertyname="configLocation"value="classpath:mybatis-plugin-config.xml"/>
</bean>
<beanid="sqlSessionTemplate"class="org.mybatis.spring.SqlSessionTemplate">
<constructor-argref="sqlSessionFactory"/>
</bean>
<!--通过扫描的模式,扫描目录下所有的mapper,根据对应的mapper.xml为其生成代理类-->
<beanid="mapper"class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<propertyname="basePackage"value="com.test.api.dao.inte"/>
<propertyname="sqlSessionTemplate"ref="sqlSessionTemplate"></property>
</bean>
</beans>
方案4
如果你的后台结构是spring+mybatis,可以通过spring的AbstractRoutingDataSource和mybatisPlugin拦截器实现非常友好的读写分离,原有代码不需要任何改变。推荐第四种方案
importorg.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
importjava.util.HashMap;
importjava.util.Map;
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-07-1410:56
*Desc:动态数据源实现读写分离
*/
publicclassDynamicDataSourceextendsAbstractRoutingDataSource{
privateObjectwriteDataSource;//写数据源
privateObjectreadDataSource;//读数据源
@Override
publicvoidafterPropertiesSet(){
if(this.writeDataSource==null){
thrownewIllegalArgumentException("Property'writeDataSource'isrequired");
}
setDefaultTargetDataSource(writeDataSource);
Map<Object,Object>targetDataSources=newHashMap<>();
targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(),writeDataSource);
if(readDataSource!=null){
targetDataSources.put(DynamicDataSourceGlobal.READ.name(),readDataSource);
}
setTargetDataSources(targetDataSources);
super.afterPropertiesSet();
}
@Override
protectedObjectdetermineCurrentLookupKey(){
DynamicDataSourceGlobaldynamicDataSourceGlobal=DynamicDataSourceHolder.getDataSource();
if(dynamicDataSourceGlobal==null
||dynamicDataSourceGlobal==DynamicDataSourceGlobal.WRITE){
returnDynamicDataSourceGlobal.WRITE.name();
}
returnDynamicDataSourceGlobal.READ.name();
}
publicvoidsetWriteDataSource(ObjectwriteDataSource){
this.writeDataSource=writeDataSource;
}
publicObjectgetWriteDataSource(){
returnwriteDataSource;
}
publicObjectgetReadDataSource(){
returnreadDataSource;
}
publicvoidsetReadDataSource(ObjectreadDataSource){
this.readDataSource=readDataSource;
}
}
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-07-1410:58
*Desc:
*/
publicenumDynamicDataSourceGlobal{
READ,WRITE;
}
publicfinalclassDynamicDataSourceHolder{
privatestaticfinalThreadLocal<DynamicDataSourceGlobal>holder=newThreadLocal<DynamicDataSourceGlobal>();
privateDynamicDataSourceHolder(){
//
}
publicstaticvoidputDataSource(DynamicDataSourceGlobaldataSource){
holder.set(dataSource);
}
publicstaticDynamicDataSourceGlobalgetDataSource(){
returnholder.get();
}
publicstaticvoidclearDataSource(){
holder.remove();
}
}
importorg.springframework.jdbc.datasource.DataSourceTransactionManager;
importorg.springframework.transaction.TransactionDefinition;
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-08-1014:34
*Desc:
*/
publicclassDynamicDataSourceTransactionManagerextendsDataSourceTransactionManager{
/**
*只读事务到读库,读写事务到写库
*@paramtransaction
*@paramdefinition
*/
@Override
protectedvoiddoBegin(Objecttransaction,TransactionDefinitiondefinition){
//设置数据源
booleanreadOnly=definition.isReadOnly();
if(readOnly){
DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.READ);
}else{
DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.WRITE);
}
super.doBegin(transaction,definition);
}
/**
*清理本地线程的数据源
*@paramtransaction
*/
@Override
protectedvoiddoCleanupAfterCompletion(Objecttransaction){
super.doCleanupAfterCompletion(transaction);
DynamicDataSourceHolder.clearDataSource();
}
}
importorg.apache.ibatis.executor.Executor;
importorg.apache.ibatis.executor.keygen.SelectKeyGenerator;
importorg.apache.ibatis.mapping.BoundSql;
importorg.apache.ibatis.mapping.MappedStatement;
importorg.apache.ibatis.mapping.SqlCommandType;
importorg.apache.ibatis.plugin.*;
importorg.apache.ibatis.session.ResultHandler;
importorg.apache.ibatis.session.RowBounds;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.transaction.support.TransactionSynchronizationManager;
importjava.util.Locale;
importjava.util.Map;
importjava.util.Properties;
importjava.util.concurrent.ConcurrentHashMap;
/**
*CreatedbyIDEA
*User:mashaohua
*Date:2016-08-1011:09
*Desc:
*/
@Intercepts({
@Signature(type=Executor.class,method="update",args={
MappedStatement.class,Object.class}),
@Signature(type=Executor.class,method="query",args={
MappedStatement.class,Object.class,RowBounds.class,
ResultHandler.class})})
publicclassDynamicPluginimplementsInterceptor{
protectedstaticfinalLoggerlogger=LoggerFactory.getLogger(DynamicPlugin.class);
privatestaticfinalStringREGEX=".*insert\\u0020.*|.*delete\\u0020.*|.*update\\u0020.*";
privatestaticfinalMap<String,DynamicDataSourceGlobal>cacheMap=newConcurrentHashMap<>();
@Override
publicObjectintercept(Invocationinvocation)throwsThrowable{
booleansynchronizationActive=TransactionSynchronizationManager.isSynchronizationActive();
if(!synchronizationActive){
Object[]objects=invocation.getArgs();
MappedStatementms=(MappedStatement)objects[0];
DynamicDataSourceGlobaldynamicDataSourceGlobal=null;
if((dynamicDataSourceGlobal=cacheMap.get(ms.getId()))==null){
//读方法
if(ms.getSqlCommandType().equals(SqlCommandType.SELECT)){
//!selectKey为自增id查询主键(SELECTLAST_INSERT_ID())方法,使用主库
if(ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)){
dynamicDataSourceGlobal=DynamicDataSourceGlobal.WRITE;
}else{
BoundSqlboundSql=ms.getSqlSource().getBoundSql(objects[1]);
Stringsql=boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]","");
if(sql.matches(REGEX)){
dynamicDataSourceGlobal=DynamicDataSourceGlobal.WRITE;
}else{
dynamicDataSourceGlobal=DynamicDataSourceGlobal.READ;
}
}
}else{
dynamicDataSourceGlobal=DynamicDataSourceGlobal.WRITE;
}
logger.warn("设置方法[{}]use[{}]Strategy,SqlCommandType[{}]..",ms.getId(),dynamicDataSourceGlobal.name(),ms.getSqlCommandType().name());
cacheMap.put(ms.getId(),dynamicDataSourceGlobal);
}
DynamicDataSourceHolder.putDataSource(dynamicDataSourceGlobal);
}
returninvocation.proceed();
}
@Override
publicObjectplugin(Objecttarget){
if(targetinstanceofExecutor){
returnPlugin.wrap(target,this);
}else{
returntarget;
}
}
@Override
publicvoidsetProperties(Propertiesproperties){
//
}
}
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持毛票票!