spring AOP 实现事务和主从读写分离

1 切面 是个类

2 切入点

3 连接点

4 通知 是个方法

5 配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">

    <!-- 引入属性文件 -->
    <context:property-placeholder location="classpath:*.properties" />

    <!-- 组件自动扫描 -->
<context:component-scan base-package="com.hengxin.qianee">
  <context:exclude-filter type="regex" expression="com.hengxin.qianee.wechat.controller"/>
    </context:component-scan>

    <bean id="masterDataSource" class="com.alibaba.druid.pool.DruidDataSource"
        init-method="init" destroy-method="close">
        <property name="url" value="${jdbc_url_master}" />
        <property name="username" value="${jdbc_username_master}" />
        <property name="password" value="${jdbc_password_master}" />
        <!-- 初始化连接大小 -->
        <property name="initialSize" value="0" />
        <!-- 连接池最大使用连接数量 -->
        <property name="maxActive" value="20" />
        <!-- 连接池最大空闲 -->
        <!-- <property name="maxIdle" value="20" /> -->
        <!-- 连接池最小空闲 -->
        <property name="minIdle" value="0" />
        <!-- 获取连接最大等待时间 -->
        <property name="maxWait" value="60000" />
        <property name="validationQuery" value="${validationQuery}" />
        <property name="testOnBorrow" value="false" />
        <property name="testOnReturn" value="false" />
        <property name="testWhileIdle" value="true" />
        <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
        <property name="timeBetweenEvictionRunsMillis" value="60000" />
        <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
        <property name="minEvictableIdleTimeMillis" value="25200000" />
        <!-- 打开removeAbandoned功能 -->
        <property name="removeAbandoned" value="true" />
        <!-- 1800秒,也就是30分钟 -->
        <property name="removeAbandonedTimeout" value="1800" />
        <!-- 关闭abanded连接时输出错误日志 -->
        <property name="logAbandoned" value="true" />
        <!-- 监控数据库 -->
        <!-- <property name="filters" value="stat" /> -->
        <property name="filters" value="mergeStat" />
    </bean>

    <bean id="slaveDataSource" class="com.alibaba.druid.pool.DruidDataSource"
        init-method="init" destroy-method="close">
        <property name="url" value="${jdbc_url_slave}" />
        <property name="username" value="${jdbc_username_slave}" />
        <property name="password" value="${jdbc_password_slave}" />
        <!-- 初始化连接大小 -->
        <property name="initialSize" value="0" />
        <!-- 连接池最大使用连接数量 -->
        <property name="maxActive" value="20" />
        <!-- 连接池最大空闲 -->
        <!-- <property name="maxIdle" value="20" /> -->
        <!-- 连接池最小空闲 -->
        <property name="minIdle" value="0" />
        <!-- 获取连接最大等待时间 -->
        <property name="maxWait" value="60000" />
        <property name="validationQuery" value="${validationQuery}" />
        <property name="testOnBorrow" value="false" />
        <property name="testOnReturn" value="false" />
        <property name="testWhileIdle" value="true" />
        <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
        <property name="timeBetweenEvictionRunsMillis" value="60000" />
        <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
        <property name="minEvictableIdleTimeMillis" value="25200000" />
        <!-- 打开removeAbandoned功能 -->
        <property name="removeAbandoned" value="true" />
        <!-- 1800秒,也就是30分钟 -->
        <property name="removeAbandonedTimeout" value="1800" />
        <!-- 关闭abanded连接时输出错误日志 -->
        <property name="logAbandoned" value="true" />
        <!-- 监控数据库 -->
        <!-- <property name="filters" value="stat" /> -->
        <property name="filters" value="mergeStat" />
    </bean>

        <bean id="readWriteDataSource" class="com.hengxin.qianee.util.ReadWriteDataSource">
        <property name="writeDataSource" ref="masterDataSource"/>
        <property name="readDataSourceMap">
           <map>
              <entry key="readDataSource1" value-ref="slaveDataSource"/>
              <entry key="readDataSource2" value-ref="slaveDataSource"/>
              <entry key="readDataSource3" value-ref="slaveDataSource"/>
              <entry key="readDataSource4" value-ref="slaveDataSource"/>
           </map>
        </property>
    </bean>

    <bean id="readWriteDataSourceTransactionProcessor" class="com.hengxin.qianee.util.ReadWriteDataSourceProcessor">
       <property name="forceChoiceReadWhenWrite" value="false"/>
    </bean>
    <!-- myBatis文件 -->
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="readWriteDataSource" />
        <!-- 自动扫描entity目录, 省掉Configuration.xml里的手工配置 -->
        <property name="mapperLocations" value="classpath:com/hengxin/qianee/mapper/xml/*.xml" />
    </bean>

    <!-- myBatis扫描文件 -->
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="basePackage" value="com.hengxin.qianee.mapper" />
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
    </bean>

    <!-- 配置事务管理器 -->
    <bean id="transactionManager"
        class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="readWriteDataSource" />
    </bean>

    <!-- 拦截器方式配置事物 -->
    <tx:advice id="transactionAdvice" transaction-manager="transactionManager">
        <tx:attributes>
            <tx:method name="add*" propagation="REQUIRED" />
            <tx:method name="append*" propagation="REQUIRED" />
            <tx:method name="insert*" propagation="REQUIRED" />
            <tx:method name="save*" propagation="REQUIRED" />
            <tx:method name="update*" propagation="REQUIRED" />
            <tx:method name="modify*" propagation="REQUIRED" />
            <tx:method name="edit*" propagation="REQUIRED" />
            <tx:method name="delete*" propagation="REQUIRED" />
            <tx:method name="remove*" propagation="REQUIRED" />
            <tx:method name="repair" propagation="REQUIRED" />
            <tx:method name="delAndRepair" propagation="REQUIRED" />
            <tx:method name="load*" propagation="REQUIRED" />
            <tx:method name="do*" propagation="REQUIRED" />
            <tx:method name="send*" propagation="REQUIRED" />
            <tx:method name="put*" read-only="true"/>
            <tx:method name="query*" read-only="true"/>
            <tx:method name="use*" read-only="true"/>
            <tx:method name="get*" read-only="true" />
            <tx:method name="count*" read-only="true" />
            <tx:method name="find*" read-only="true" />
            <tx:method name="list*" read-only="true" />
            <tx:method name="select*" read-only="true" />
            <tx:method name="is*" read-only="true" />
            <tx:method name="*" propagation="REQUIRED" />
        </tx:attributes>
    </tx:advice>
    <aop:config>
            <!-- 切点 -->
        <aop:pointcut id="transactionPointcut"
            expression="(execution(* com.hengxin.qianee.service.impl.*.*(..)))
                        or (execution(* com.hengxin.qianee.wechat.service.impl.*.*(..)))" />
        <!-- 建议 -->
        <aop:advisor pointcut-ref="transactionPointcut"
            advice-ref="transactionAdvice" />
            <!-- 切面 -->
            <aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor">
          <!-- 环绕通知 -->
           <aop:around pointcut-ref="transactionPointcut" method="determineReadOrWriteDB"/>
        </aop:aspect>
    </aop:config>
</beans>                                

6 serviceimpl 层 每个service方法是个切点 dao方法不是

package com.hengxin.qianee.service.impl;

/**
 * 前台首页服务
 * @author user
 *
 */
@Service
public class FrontMainServiceImpl implements FrontMainService {

    @Autowired
    private ContentAdvertisementsDao ContentAdvertisementsDao;//大广告Dao

    /**
     *  Ajax验证用户名是否已存在
     */
    @Override
    public ErrorInfo hasNameExist(String name, ErrorInfo errorInfo) {

        // 判断否空
        if(StringUtils.isBlank(name)){
            errorInfo.code = -1;
            errorInfo.msg = "用户名不能为空";

            return errorInfo;
        }

        // 判断用户名是否可用
        int rows = UserService.isNameExist(name);
        if(rows>0){
            errorInfo.code = -1;
            errorInfo.msg = "该用户名已存在";
        }else{
            errorInfo.code = 1;
        }

        return errorInfo;
    }

    /**
     * 推荐人是否存在
     */
    @Override
    public ErrorInfo isRecommendExist(String recommend, ErrorInfo errorInfo) {

        String recoName = "";

        if(!StringUtils.isNotBlank(recommend)){
            recoName = "";
            errorInfo.code = 5;//当推荐人为空时 返回 "5"
            return errorInfo;
        }else{
            //推荐人不为空时,判断邀请码有无此人
            recoName = Encrypt.decrypt3DES(recommend, Constants.ENCRYPTION_KEY);

            //判断用户名是否可用
            int rows = UserService.isNameExist(recoName);
            if(rows<=0){
                errorInfo.code = -1;
                errorInfo.msg = "该推荐人不存在,请选填";
            }else{
                errorInfo.code = 4;
                errorInfo.msg = "该推荐人存在";
                return errorInfo;
            }

        }

        return errorInfo;
    }

    /**
     * 注册页面发送短信
     */
    @Override
    public ErrorInfo verifyMobileRe(String mobile,ErrorInfo errorInfo) {

        // 校验非空
        if (StringUtils.isBlank(mobile)) {
            errorInfo.code = -1;
            errorInfo.msg = "请输入手机号码";

            return errorInfo;
        }

        // 校验格式
        if (!RegexUtils.isMobileNum(mobile)) {
            errorInfo.code = -1;
            errorInfo.msg = "请输入正确的手机号码";

            return errorInfo;
        }

        Users user = new Users();
        boolean flag = false;

        if (user == null || StringUtils.isBlank(user.getMobile()) || !user.getMobile().equals(mobile)) {
            flag = UserService.isMobileExistFlag(mobile);
        }

        if(!flag){
            //发短信
            smsService.sendCode(mobile, errorInfo);
        }else{
            errorInfo.code = -1;
            errorInfo.msg = "该手机号码已存在";
        }

        return errorInfo;
    }

    /**
     * 前台注册用户
     */
    @Override
    public ErrorInfo addregisterUser(Users user,String path,String contextPath,ErrorInfo errorInfo,String recoName) {
        errorInfo.clear();

        BackstageSet backstageSet = (BackstageSet) cache.getObject("backstageSet");
        user.setCreditLine(backstageSet.getInitialAmount());
        user.setLastCreditLine(backstageSet.getInitialAmount());

        // 获取注册关键否定词(如:xijinping)
        String keyWord = backstageSet.getKeywords();

        if(StringUtils.isNotBlank(keyWord)){
            String [] keywords = keyWord.split(",");

            for(String word : keywords) {
                if(user.getName().contains(word)) {
                    errorInfo.code = -1;
                    errorInfo.msg = "对不起,注册的用户名包含敏感词汇,请重新输入用户名";

                    return errorInfo;
                }
            }
        }

        if(!recoName.equals("")){
            // 根据用户在前台的推荐码解密成推荐人用户名查Id
            long recommendedId = userDao.queryIdByUserName(recoName);
            if(recommendedId>0){
                user.setRecommendUserId(recommendedId);
                user.setRecommendRewardType(backstageSet.getCpsRewardType());
                user.setRecommendTime(new Date());
            }else{
                user.setRecommendUserId(0L);
                user.setRecommendRewardType(-1);
                user.setRecommendTime(null);
            }
        }else{
            // 没有推荐人,推荐人id为0(非空)
            user.setRecommendUserId(0L);
        }

        String uuid = UUID.randomUUID().toString();

        try {
            Qrcode.create(contextPath + "/loginAndRegister/register?un=" + Encrypt.encrypt3DES(user.getName(), Constants.ENCRYPTION_KEY),
                    BarcodeFormat.QR_CODE,
                    100, 100,
                    new File(path,uuid+".png").getAbsolutePath(), "png");
            // 读取本地文件
            String fileName = uuid.split("\\.")[0]+".png";
            File file = new File(path, fileName);

            // 用户总数
            int userCount = UserService.selectUserCount();
            // 准备上传至服务器
            Map<String, Object> map = fileUploadService.registeredUploadFiles(file, Constants.FileFormat.IMG, userCount, errorInfo );
            // 取完之后删除文件
            file.delete();
            System.out.println(((String) map.get("fileName")).split("%")[1]);
            // 截取时间后面一节
            user.setQrCode(((String) map.get("fileName")).split("%")[1].split("\\.")[0]);
            // 是否禁止登录(false:可以登录)
            user.setIsAllowLogin(false);
        } catch (WriterException e) {
            e.printStackTrace();
            System.err.println("生成二维码图像失败!");
        } catch (IOException e) {
            e.printStackTrace();
            System.err.println("生成二维码图像失败!");
        }

        // 注册成功添加用户 每个service方法是个切点 dao方法不是
        int rows = userDao.insertUser(user);

        if(rows<=0){
            errorInfo.code = -1;
            errorInfo.msg = "此次注册失败!";
            return errorInfo;
        }

        MD5 md5 = new MD5();
        String sign1 = md5.getMD5ofStr(""+user.getId()+0.00+0.00+Constants.ENCRYPTION_KEY);
        String sign2 = md5.getMD5ofStr(""+user.getId()+0.00+0.00+0.00+0.00+Constants.ENCRYPTION_KEY);

        int updateSign = userDao.updateSign(sign1, sign2, user.getId());

        if(updateSign<=0){
            errorInfo.code = -1;
            errorInfo.msg = "此次注册失败!";
            return errorInfo;
        }

        userEventsDao.inserUserEvent(user.getId(), UserEvent.REGISTER, "注册成功", errorInfo);

        if(errorInfo.code < 0){
            return errorInfo;
        }

        // 发送注册站内信
        addSendLetter(user, Constants.M_REGISTER,errorInfo);

        //创建审计项目
        statisticUserAuditItemsDao.createAuditItem(user.getId());

        errorInfo.code = 0;
        errorInfo.msg = "恭喜你,注册成功!";

        return errorInfo;
    }

    /**
     * 发送站内信
     */
    @Override
    public ErrorInfo addSendLetter(Users user,long id,ErrorInfo error) {

        // 获取发送内容和标题
        MessageStationTemplates mst = messageStationTemplatesDao.fandMessageStationTemplates(id);

        // 开启状态(默认 true:开启 false:关闭)
        if(mst.getStatus()){
            // 添加消息的任务(定时发送)
            int rows = messageSendingDao.addMessageTask(user.getId(), mst.getTitle(), mst.getContent());
            if(rows<=0){
                error.code = -1;
                error.msg = "添加失败";
            }
        }
        return error;
    }

    /**
     * 忘记密码页面发送短信
     */
    @Override
    public ErrorInfo sendMobileMessage(String mobile, ErrorInfo errorInfo) {

        // 校验非空
        if (StringUtils.isBlank(mobile)) {
            errorInfo.code = -1;
            errorInfo.msg = "请输入手机号码";

            return errorInfo;
        }

        // 校验格式
        if (!RegexUtils.isMobileNum(mobile)) {
            errorInfo.code = -1;
            errorInfo.msg = "请输入正确的手机号码";

            return errorInfo;
        }

        //发短信
        smsService.sendCode(mobile, errorInfo);

        return errorInfo;
    }

    /**
     * 查询用户注册协议
     */
    @Override
    public String queryContent(long id) {
        return ContentNewsDao.queryContent(id);
    }

}

7 切面

package com.hengxin.qianee.util;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.NestedRuntimeException;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;
import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.ReflectionUtils;

/**
 *
 *
 * <pre>
 *
 * 此类实现了两个职责(为了减少类的数量将两个功能合并到一起了):
 *   读/写动态数据库选择处理器
 *   通过AOP切面实现读/写选择
 *
 *
 * ★★读/写动态数据库选择处理器★★
 * 1、首先读取<tx:advice>事务属性配置
 *
 * 2、对于所有读方法设置 read-only="true" 表示读取操作(以此来判断是选择读还是写库),其他操作都是走写库
 *    如<tx:method name="×××" read-only="true"/>
 *
 * 3、 forceChoiceReadOnWrite用于确定在如果目前是写(即开启了事务),下一步如果是读,
 *    是直接参与到写库进行读,还是强制从读库读<br/>
 *      forceChoiceReadOnWrite:false 表示目前是写,下一步如果是读,强制参与到写事务(即从写库读)
 *                                  这样可以避免写的时候从读库读不到数据
 *
 *                                  通过设置事务传播行为:SUPPORTS实现
 *
 *      forceChoiceReadOnWrite:true 表示不管当前事务是写/读,都强制从读库获取数据
 *                                  通过设置事务传播行为:NOT_SUPPORTS实现(连接是尽快释放)
 *                                  『此处借助了 NOT_SUPPORTS会挂起之前的事务进行操作 然后再恢复之前事务完成的』
 * 4、配置方式
 *  <bean id="readWriteDataSourceTransactionProcessor" class="cn.javass.common.datasource.ReadWriteDataSourceProcessor">
 *      <property name="forceChoiceReadWhenWrite" value="false"/>
 *  </bean>
 *
 * 5、目前只适用于<tx:advice>情况 TODO 支持@Transactional注解事务
 *
 *
 *
 * ★★通过AOP切面实现读/写库选择★★
 *
 * 1、首先将当前方法 与 根据之前【读/写动态数据库选择处理器】  提取的读库方法 进行匹配
 *
 * 2、如果匹配,说明是读取数据:
 *  2.1、如果forceChoiceReadOnWrite:true,即强制走读库
 *  2.2、如果之前是写操作且forceChoiceReadOnWrite:false,将从写库进行读取
 *  2.3、否则,到读库进行读取数据
 *
 * 3、如果不匹配,说明默认将使用写库进行操作
 *
 * 4、配置方式
 *      <aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor">
 *          <aop:around pointcut-ref="txPointcut" method="determineReadOrWriteDB"/>
 *      </aop:aspect>
 *  4.1、此处order = Integer.MIN_VALUE 即最高的优先级(请参考http://jinnianshilongnian.iteye.com/blog/1423489)
 *  4.2、切入点:txPointcut 和 实施事务的切入点一样
 *  4.3、determineReadOrWriteDB方法用于决策是走读/写库的,请参考
 *       @see cn.javass.common.datasource.ReadWriteDataSourceDecision
 *       @see cn.javass.common.datasource.ReadWriteDataSource
 *
 * </pre>
 * @author Zhang Kaitao
 *
 */
public class ReadWriteDataSourceProcessor implements BeanPostProcessor {
//    private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSourceProcessor.class);

    private boolean forceChoiceReadWhenWrite = false;

    private Map<String, Boolean> readMethodMap = new HashMap<String, Boolean>();

    /**
     * 当之前操作是写的时候,是否强制从从库读
     * 默认(false) 当之前操作是写,默认强制从写库读
     * @param forceReadOnWrite
     */

    public void setForceChoiceReadWhenWrite(boolean forceChoiceReadWhenWrite) {

        this.forceChoiceReadWhenWrite = forceChoiceReadWhenWrite;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        if(!(bean instanceof NameMatchTransactionAttributeSource)) {
            return bean;
        }

        try {
            NameMatchTransactionAttributeSource transactionAttributeSource = (NameMatchTransactionAttributeSource)bean;
            Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap");
            nameMapField.setAccessible(true);
            Map<String, TransactionAttribute> nameMap = (Map<String, TransactionAttribute>) nameMapField.get(transactionAttributeSource);

            for(Entry<String, TransactionAttribute> entry : nameMap.entrySet()) {
                RuleBasedTransactionAttribute attr = (RuleBasedTransactionAttribute)entry.getValue();

                //仅对read-only的处理
                if(!attr.isReadOnly()) {
                    continue;
                }

                String methodName = entry.getKey();
                Boolean isForceChoiceRead = Boolean.FALSE;
                if(forceChoiceReadWhenWrite) {
                    //不管之前操作是写,默认强制从读库读 (设置为NOT_SUPPORTED即可)
                    //NOT_SUPPORTED会挂起之前的事务
                    attr.setPropagationBehavior(Propagation.NOT_SUPPORTED.value());
                    isForceChoiceRead = Boolean.TRUE;
                } else {
                    //否则 设置为SUPPORTS(这样可以参与到写事务)
                    attr.setPropagationBehavior(Propagation.SUPPORTS.value());
                }
                System.out.println("read/write transaction process  method:{} force read:{}"+" "+ methodName+" "+ isForceChoiceRead);
                readMethodMap.put(methodName, isForceChoiceRead);
            }

        } catch (Exception e) {
            throw new ReadWriteDataSourceTransactionException("process read/write transaction error", e);
        }

        return bean;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    private class ReadWriteDataSourceTransactionException extends NestedRuntimeException {
        public ReadWriteDataSourceTransactionException(String message, Throwable cause) {
            super(message, cause);
        }
    }
      //ProceedingJoinPoint 连接点
    public Object determineReadOrWriteDB(ProceedingJoinPoint pjp) throws Throwable {
        if (isChoiceReadDB(pjp.getSignature().getName())) {
            ReadWriteDataSourceDecision.markRead();
            System.out.println("方法:" + pjp.getSignature().getName() +"进入读库!");
        } else {
            ReadWriteDataSourceDecision.markWrite();
            System.out.println("方法:" + pjp.getSignature().getName() +"进入写库!");
        }

        try {
            return pjp.proceed();

        } finally {
            System.out.println(pjp.getSignature().getName()+" "+"reset方法");
            ReadWriteDataSourceDecision.reset();
        }

    }

    private boolean isChoiceReadDB(String methodName) {

        String bestNameMatch = null;
        for (String mappedName : this.readMethodMap.keySet()) {
            if (isMatch(methodName, mappedName)) {
                bestNameMatch = mappedName;
                break;
            }
        }

        Boolean isForceChoiceRead = readMethodMap.get(bestNameMatch);
        //表示强制选择 读 库
        if(isForceChoiceRead == Boolean.TRUE) {
            System.out.println("表示强制选择 读 库");
            return true;
        }

        //如果之前选择了写库 现在还选择 写库
        if(ReadWriteDataSourceDecision.isChoiceWrite()) {
            System.out.println("如果之前选择了写库 现在还选择 写库");
            return false;
        }

        //表示应该选择读库
        if(isForceChoiceRead != null) {
            System.out.println("表示应该选择读库");
            return true;
        }
        //默认选择 写库
        return false;
    }

    protected boolean isMatch(String methodName, String mappedName) {
        return PatternMatchUtils.simpleMatch(mappedName, methodName);
    }

}
package com.hengxin.qianee.util;

/**
 * <pre>
 * 读/写动态数据库 决策者
 * 根据DataSourceType是write/read 来决定是使用读/写数据库
 * 通过ThreadLocal绑定实现选择功能
 * </pre>
 * @author Zhang Kaitao
 *
 */
public class ReadWriteDataSourceDecision {

    public enum DataSourceType {
        write, read;
    }

    private static final ThreadLocal<DataSourceType> holder = new ThreadLocal<DataSourceType>();

    public static void markWrite() {
        holder.set(DataSourceType.write);
    }

    public static void markRead() {
        holder.set(DataSourceType.read);
    }

    public static void reset() {
        holder.set(null);
    }

    public static boolean isChoiceNone() {
        return null == holder.get();
    }

    public static boolean isChoiceWrite() {
        return DataSourceType.write == holder.get();
    }

    public static boolean isChoiceRead() {
        return DataSourceType.read == holder.get();
    }

}
package com.hengxin.qianee.util;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;

import javax.sql.DataSource;

//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.datasource.AbstractDataSource;
import org.springframework.util.CollectionUtils;

/**
 *
 * <pre>
 * 读/写动态选择数据库实现
 * 目前实现功能
 *   一写库多读库选择功能,请参考
 *      @see cn.javass.common.datasource.ReadWriteDataSourceDecision
        @see cn.javass.common.datasource.ReadWriteDataSourceDecision.DataSourceType
 *
 *   默认按顺序轮询使用读库
 *   默认选择写库
 *
 *   已实现:一写多读、当写时默认读操作到写库、当写时强制读操作到读库
 *   TODO 读库负载均衡、读库故障转移
 * </pre>
 * @author Zhang Kaitao
 *
 */
public class ReadWriteDataSource extends AbstractDataSource implements InitializingBean {
//    private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSource.class);

    private DataSource writeDataSource;
    private Map<String, DataSource> readDataSourceMap;

    private String[] readDataSourceNames;
    private DataSource[] readDataSources;
    private int readDataSourceCount;

    private AtomicInteger counter = new AtomicInteger(1);

    /**
     * 设置读库(name, DataSource)
     * @param readDataSourceMap
     */
    public void setReadDataSourceMap(Map<String, DataSource> readDataSourceMap) {
        this.readDataSourceMap = readDataSourceMap;
    }
    public void setWriteDataSource(DataSource writeDataSource) {
        this.writeDataSource = writeDataSource;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        if(writeDataSource == null) {
            throw new IllegalArgumentException("property ‘writeDataSource‘ is required");
        }
        if(CollectionUtils.isEmpty(readDataSourceMap)) {
            throw new IllegalArgumentException("property ‘readDataSourceMap‘ is required");
        }
        readDataSourceCount = readDataSourceMap.size();

        readDataSources = new DataSource[readDataSourceCount];
        readDataSourceNames = new String[readDataSourceCount];

        int i = 0;
        for(Entry<String, DataSource> e : readDataSourceMap.entrySet()) {
            readDataSources[i] = e.getValue();
            readDataSourceNames[i] = e.getKey();
            i++;
        }

    }

    private DataSource determineDataSource() {
        if(ReadWriteDataSourceDecision.isChoiceWrite()) {
            System.out.println("current determine write datasource");
            return writeDataSource;
        }

        if(ReadWriteDataSourceDecision.isChoiceNone()) {
            System.out.println("no choice read/write, default determine write datasource");
            return writeDataSource;
        }
        return determineReadDataSource();
    }

    private DataSource determineReadDataSource() {
        //按照顺序选择读库
        //TODO 算法改进
        int index = counter.incrementAndGet() % readDataSourceCount;
        if(index < 0) {
            index = - index;
        }

        String dataSourceName = readDataSourceNames[index];

        System.out.println("current determine read datasource : {}"+"  "+dataSourceName);

        return readDataSources[index];
    }

    @Override
    public Connection getConnection() throws SQLException {
        return determineDataSource().getConnection();
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return determineDataSource().getConnection(username, password);
    }

}
时间: 2024-11-04 20:05:59

spring AOP 实现事务和主从读写分离的相关文章

基于 EntityFramework 的数据库主从读写分离架构 - 目录

基于 EntityFramework 的数据库主从读写分离架构 回到目录,完整代码请查看(https://github.com/cjw0511/NDF.Infrastructure)中的目录: src\ NDF.Data.EntityFramework\MasterSlaves 基于 EntityFramework 的数据库主从读写分离架构 - 需求/功能概述 基于 EntityFramework 的数据库主从读写分离架构(1)- 原理概述和基本功能实现 基于 EntityFramework 的

基于 EntityFramework 的数据库主从读写分离服务插件

基于 EntityFramework 的数据库主从读写分离服务插件 1.       版本信息和源码 1.1 版本信息 v1.0 beta(2015-04-02),基于 EF 6.1 开发,支持 EF 6.1 之后的所有 EF6 版本. 1.2 开放源码地址 https://github.com/cjw0511/NDF.Infrastructure 关于该 EF 数据库主从读写分离服务核心源码位于文件夹: src\ NDF.Data.EntityFramework\MasterSlaves 文件

CRL快速开发框架4.4版发布,支持主从读写分离

经过一些调整和优化,4.3已经运行在生产环境,对于不久将会遇到的查询性能,读写分离需求列上日程 读写分离需求 对于一个数据库作了主从发布/订阅,主库为DB1,从库为DB2 所有写入通过DB1,所有查询通过DB2,当然也可以通过DB1 CRL内部实现 在CRL内部调用,请求读和请求写的方法会标记为Read或Write,然后再通过标记实现不同的数据库连接访问对象 如以下代码 1 /// <summary> 2 /// 返回动态对象的查询 3 /// </summary> 4 /// &

mycat 使用 (主从 + 读写分离)

mycat 使用 mycat 源码地址 mycat 概述 是一个开源的分布式数据库系统,一个实现了 MySQL 协议的的 Server.前端用户可以把它看作是一个数据库代理,用 MySQL客户端工具和命令行访问,而其后端可以用 MySQL 原生(Native)协议与多个 MySQL服务器通信,也可以用 JDBC 协议与大多数主流数据库服务器通信.其核心功能是 分表分库,即将一个大表水平分割为 N 个小表,存储在后端 MySQL 服务器里或者其他数据库里. 数据切分 指通过某种特定的条件,将存放在

Amoeba实现mysql主从读写分离

架设amoeba,实现mysql主从读写分 安装amoeba前需要先安装jdk,因为amoeba是JAVA编写的,所以需要JDK环境的支持,至于版本需要在JAVA1.5以后,mysql数据库需要在4.1以后的版本. 以下是我的实验环境. System:    CentOS 6.5 Master mysql:192.168.88.133 Slave mysql:192.168.88.135 Amoeba server:   192.168.88.131 安装mysql及配置mysql主从这里省略,

基于 EntityFramework 的数据库主从读写分离架构(2)- 改进配置和添加事务支持

回到目录,完整代码请查看(https://github.com/cjw0511/NDF.Infrastructure)中的目录: src\ NDF.Data.EntityFramework\MasterSlaves 上一回中(http://www.cnblogs.com/cjw0511/p/4398267.html),我们简单讲述了基于 EF 来实现数据库读写分离的原理.当然,这只是一个 demo 级别的简单实现,实际上,在我们工作环境中,碰到的情况远比这复杂多了,例如数据库连接的配置是通过 c

Spring和MyBatis实现数据的读写分离

1.Spring实现数据库的读写分离 现在大型的电子商务系统,在数据库层面大都采用读写分离技术,就是一个Master数据库,多个Slave数据库.Master库负责数据更新和实时数据查询,Slave库当然负责非实时数据查询.因为在实际的应用中,数据库都是读多写少(读取数据的频率高,更新数据的频率相对较少),而读取数据通常耗时比较长,占用数据库服务器的CPU较多,从而影响用户体验.我们通常的做法就是把查询从主库中抽取出来,采用多个从库,使用负载均衡,减轻每个从库的查询压力. 采用读写分离技术的目标

spring hibernate配置切换数据源,实现读写分离

spring的配置如下 <!-- 主数据源--> <bean id="masterDataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <prope

Amoeba搞定mysql主从读写分离

一个简单完整的 Mysql 主从复制,读写分离的示意图. 1. 首先搭建 Mysql 主从架构,实现 将 mater 数据自动复制到 slave MySQL 复制的工作方式很简单,一台服务器作为主机,一台或多台服务器作为从机.主机会把数据库的变化记录到日志.一旦这些变化被记录到日志,就会立刻(或者以设定的时间间隔)被送到从机. 使用MySQL 复制提供扩展大型网站的能力,这些大型网站的数据库主要是读操作(SELECTs).从机用於复制主机的銷秏是很少的(通常每个从机1%的开销),在大型网站中每个