1 切面 是个类
2 切入点
3 连接点
4 通知 是个方法
5 配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"> <!-- 引入属性文件 --> <context:property-placeholder location="classpath:*.properties" /> <!-- 组件自动扫描 --> <context:component-scan base-package="com.hengxin.qianee"> <context:exclude-filter type="regex" expression="com.hengxin.qianee.wechat.controller"/> </context:component-scan> <bean id="masterDataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close"> <property name="url" value="${jdbc_url_master}" /> <property name="username" value="${jdbc_username_master}" /> <property name="password" value="${jdbc_password_master}" /> <!-- 初始化连接大小 --> <property name="initialSize" value="0" /> <!-- 连接池最大使用连接数量 --> <property name="maxActive" value="20" /> <!-- 连接池最大空闲 --> <!-- <property name="maxIdle" value="20" /> --> <!-- 连接池最小空闲 --> <property name="minIdle" value="0" /> <!-- 获取连接最大等待时间 --> <property name="maxWait" value="60000" /> <property name="validationQuery" value="${validationQuery}" /> <property name="testOnBorrow" value="false" /> <property name="testOnReturn" value="false" /> <property name="testWhileIdle" value="true" /> <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --> <property name="timeBetweenEvictionRunsMillis" value="60000" /> <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --> <property name="minEvictableIdleTimeMillis" value="25200000" /> <!-- 打开removeAbandoned功能 --> <property name="removeAbandoned" value="true" /> <!-- 1800秒,也就是30分钟 --> <property name="removeAbandonedTimeout" value="1800" /> <!-- 关闭abanded连接时输出错误日志 --> <property name="logAbandoned" value="true" /> <!-- 监控数据库 --> <!-- <property name="filters" value="stat" /> --> <property name="filters" value="mergeStat" /> </bean> <bean id="slaveDataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close"> <property name="url" value="${jdbc_url_slave}" /> <property name="username" value="${jdbc_username_slave}" /> <property name="password" value="${jdbc_password_slave}" /> <!-- 初始化连接大小 --> <property name="initialSize" value="0" /> <!-- 连接池最大使用连接数量 --> <property name="maxActive" value="20" /> <!-- 连接池最大空闲 --> <!-- <property name="maxIdle" value="20" /> --> <!-- 连接池最小空闲 --> <property name="minIdle" value="0" /> <!-- 获取连接最大等待时间 --> <property name="maxWait" value="60000" /> <property name="validationQuery" value="${validationQuery}" /> <property name="testOnBorrow" value="false" /> <property name="testOnReturn" value="false" /> <property name="testWhileIdle" value="true" /> <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --> <property name="timeBetweenEvictionRunsMillis" value="60000" /> <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --> <property name="minEvictableIdleTimeMillis" value="25200000" /> <!-- 打开removeAbandoned功能 --> <property name="removeAbandoned" value="true" /> <!-- 1800秒,也就是30分钟 --> <property name="removeAbandonedTimeout" value="1800" /> <!-- 关闭abanded连接时输出错误日志 --> <property name="logAbandoned" value="true" /> <!-- 监控数据库 --> <!-- <property name="filters" value="stat" /> --> <property name="filters" value="mergeStat" /> </bean> <bean id="readWriteDataSource" class="com.hengxin.qianee.util.ReadWriteDataSource"> <property name="writeDataSource" ref="masterDataSource"/> <property name="readDataSourceMap"> <map> <entry key="readDataSource1" value-ref="slaveDataSource"/> <entry key="readDataSource2" value-ref="slaveDataSource"/> <entry key="readDataSource3" value-ref="slaveDataSource"/> <entry key="readDataSource4" value-ref="slaveDataSource"/> </map> </property> </bean> <bean id="readWriteDataSourceTransactionProcessor" class="com.hengxin.qianee.util.ReadWriteDataSourceProcessor"> <property name="forceChoiceReadWhenWrite" value="false"/> </bean> <!-- myBatis文件 --> <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref="readWriteDataSource" /> <!-- 自动扫描entity目录, 省掉Configuration.xml里的手工配置 --> <property name="mapperLocations" value="classpath:com/hengxin/qianee/mapper/xml/*.xml" /> </bean> <!-- myBatis扫描文件 --> <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <property name="basePackage" value="com.hengxin.qianee.mapper" /> <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" /> </bean> <!-- 配置事务管理器 --> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="readWriteDataSource" /> </bean> <!-- 拦截器方式配置事物 --> <tx:advice id="transactionAdvice" transaction-manager="transactionManager"> <tx:attributes> <tx:method name="add*" propagation="REQUIRED" /> <tx:method name="append*" propagation="REQUIRED" /> <tx:method name="insert*" propagation="REQUIRED" /> <tx:method name="save*" propagation="REQUIRED" /> <tx:method name="update*" propagation="REQUIRED" /> <tx:method name="modify*" propagation="REQUIRED" /> <tx:method name="edit*" propagation="REQUIRED" /> <tx:method name="delete*" propagation="REQUIRED" /> <tx:method name="remove*" propagation="REQUIRED" /> <tx:method name="repair" propagation="REQUIRED" /> <tx:method name="delAndRepair" propagation="REQUIRED" /> <tx:method name="load*" propagation="REQUIRED" /> <tx:method name="do*" propagation="REQUIRED" /> <tx:method name="send*" propagation="REQUIRED" /> <tx:method name="put*" read-only="true"/> <tx:method name="query*" read-only="true"/> <tx:method name="use*" read-only="true"/> <tx:method name="get*" read-only="true" /> <tx:method name="count*" read-only="true" /> <tx:method name="find*" read-only="true" /> <tx:method name="list*" read-only="true" /> <tx:method name="select*" read-only="true" /> <tx:method name="is*" read-only="true" /> <tx:method name="*" propagation="REQUIRED" /> </tx:attributes> </tx:advice> <aop:config> <!-- 切点 --> <aop:pointcut id="transactionPointcut" expression="(execution(* com.hengxin.qianee.service.impl.*.*(..))) or (execution(* com.hengxin.qianee.wechat.service.impl.*.*(..)))" /> <!-- 建议 --> <aop:advisor pointcut-ref="transactionPointcut" advice-ref="transactionAdvice" /> <!-- 切面 --> <aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor"> <!-- 环绕通知 --> <aop:around pointcut-ref="transactionPointcut" method="determineReadOrWriteDB"/> </aop:aspect> </aop:config> </beans>
6 serviceimpl 层 每个service方法是个切点 dao方法不是
package com.hengxin.qianee.service.impl; /** * 前台首页服务 * @author user * */ @Service public class FrontMainServiceImpl implements FrontMainService { @Autowired private ContentAdvertisementsDao ContentAdvertisementsDao;//大广告Dao /** * Ajax验证用户名是否已存在 */ @Override public ErrorInfo hasNameExist(String name, ErrorInfo errorInfo) { // 判断否空 if(StringUtils.isBlank(name)){ errorInfo.code = -1; errorInfo.msg = "用户名不能为空"; return errorInfo; } // 判断用户名是否可用 int rows = UserService.isNameExist(name); if(rows>0){ errorInfo.code = -1; errorInfo.msg = "该用户名已存在"; }else{ errorInfo.code = 1; } return errorInfo; } /** * 推荐人是否存在 */ @Override public ErrorInfo isRecommendExist(String recommend, ErrorInfo errorInfo) { String recoName = ""; if(!StringUtils.isNotBlank(recommend)){ recoName = ""; errorInfo.code = 5;//当推荐人为空时 返回 "5" return errorInfo; }else{ //推荐人不为空时,判断邀请码有无此人 recoName = Encrypt.decrypt3DES(recommend, Constants.ENCRYPTION_KEY); //判断用户名是否可用 int rows = UserService.isNameExist(recoName); if(rows<=0){ errorInfo.code = -1; errorInfo.msg = "该推荐人不存在,请选填"; }else{ errorInfo.code = 4; errorInfo.msg = "该推荐人存在"; return errorInfo; } } return errorInfo; } /** * 注册页面发送短信 */ @Override public ErrorInfo verifyMobileRe(String mobile,ErrorInfo errorInfo) { // 校验非空 if (StringUtils.isBlank(mobile)) { errorInfo.code = -1; errorInfo.msg = "请输入手机号码"; return errorInfo; } // 校验格式 if (!RegexUtils.isMobileNum(mobile)) { errorInfo.code = -1; errorInfo.msg = "请输入正确的手机号码"; return errorInfo; } Users user = new Users(); boolean flag = false; if (user == null || StringUtils.isBlank(user.getMobile()) || !user.getMobile().equals(mobile)) { flag = UserService.isMobileExistFlag(mobile); } if(!flag){ //发短信 smsService.sendCode(mobile, errorInfo); }else{ errorInfo.code = -1; errorInfo.msg = "该手机号码已存在"; } return errorInfo; } /** * 前台注册用户 */ @Override public ErrorInfo addregisterUser(Users user,String path,String contextPath,ErrorInfo errorInfo,String recoName) { errorInfo.clear(); BackstageSet backstageSet = (BackstageSet) cache.getObject("backstageSet"); user.setCreditLine(backstageSet.getInitialAmount()); user.setLastCreditLine(backstageSet.getInitialAmount()); // 获取注册关键否定词(如:xijinping) String keyWord = backstageSet.getKeywords(); if(StringUtils.isNotBlank(keyWord)){ String [] keywords = keyWord.split(","); for(String word : keywords) { if(user.getName().contains(word)) { errorInfo.code = -1; errorInfo.msg = "对不起,注册的用户名包含敏感词汇,请重新输入用户名"; return errorInfo; } } } if(!recoName.equals("")){ // 根据用户在前台的推荐码解密成推荐人用户名查Id long recommendedId = userDao.queryIdByUserName(recoName); if(recommendedId>0){ user.setRecommendUserId(recommendedId); user.setRecommendRewardType(backstageSet.getCpsRewardType()); user.setRecommendTime(new Date()); }else{ user.setRecommendUserId(0L); user.setRecommendRewardType(-1); user.setRecommendTime(null); } }else{ // 没有推荐人,推荐人id为0(非空) user.setRecommendUserId(0L); } String uuid = UUID.randomUUID().toString(); try { Qrcode.create(contextPath + "/loginAndRegister/register?un=" + Encrypt.encrypt3DES(user.getName(), Constants.ENCRYPTION_KEY), BarcodeFormat.QR_CODE, 100, 100, new File(path,uuid+".png").getAbsolutePath(), "png"); // 读取本地文件 String fileName = uuid.split("\\.")[0]+".png"; File file = new File(path, fileName); // 用户总数 int userCount = UserService.selectUserCount(); // 准备上传至服务器 Map<String, Object> map = fileUploadService.registeredUploadFiles(file, Constants.FileFormat.IMG, userCount, errorInfo ); // 取完之后删除文件 file.delete(); System.out.println(((String) map.get("fileName")).split("%")[1]); // 截取时间后面一节 user.setQrCode(((String) map.get("fileName")).split("%")[1].split("\\.")[0]); // 是否禁止登录(false:可以登录) user.setIsAllowLogin(false); } catch (WriterException e) { e.printStackTrace(); System.err.println("生成二维码图像失败!"); } catch (IOException e) { e.printStackTrace(); System.err.println("生成二维码图像失败!"); } // 注册成功添加用户 每个service方法是个切点 dao方法不是 int rows = userDao.insertUser(user); if(rows<=0){ errorInfo.code = -1; errorInfo.msg = "此次注册失败!"; return errorInfo; } MD5 md5 = new MD5(); String sign1 = md5.getMD5ofStr(""+user.getId()+0.00+0.00+Constants.ENCRYPTION_KEY); String sign2 = md5.getMD5ofStr(""+user.getId()+0.00+0.00+0.00+0.00+Constants.ENCRYPTION_KEY); int updateSign = userDao.updateSign(sign1, sign2, user.getId()); if(updateSign<=0){ errorInfo.code = -1; errorInfo.msg = "此次注册失败!"; return errorInfo; } userEventsDao.inserUserEvent(user.getId(), UserEvent.REGISTER, "注册成功", errorInfo); if(errorInfo.code < 0){ return errorInfo; } // 发送注册站内信 addSendLetter(user, Constants.M_REGISTER,errorInfo); //创建审计项目 statisticUserAuditItemsDao.createAuditItem(user.getId()); errorInfo.code = 0; errorInfo.msg = "恭喜你,注册成功!"; return errorInfo; } /** * 发送站内信 */ @Override public ErrorInfo addSendLetter(Users user,long id,ErrorInfo error) { // 获取发送内容和标题 MessageStationTemplates mst = messageStationTemplatesDao.fandMessageStationTemplates(id); // 开启状态(默认 true:开启 false:关闭) if(mst.getStatus()){ // 添加消息的任务(定时发送) int rows = messageSendingDao.addMessageTask(user.getId(), mst.getTitle(), mst.getContent()); if(rows<=0){ error.code = -1; error.msg = "添加失败"; } } return error; } /** * 忘记密码页面发送短信 */ @Override public ErrorInfo sendMobileMessage(String mobile, ErrorInfo errorInfo) { // 校验非空 if (StringUtils.isBlank(mobile)) { errorInfo.code = -1; errorInfo.msg = "请输入手机号码"; return errorInfo; } // 校验格式 if (!RegexUtils.isMobileNum(mobile)) { errorInfo.code = -1; errorInfo.msg = "请输入正确的手机号码"; return errorInfo; } //发短信 smsService.sendCode(mobile, errorInfo); return errorInfo; } /** * 查询用户注册协议 */ @Override public String queryContent(long id) { return ContentNewsDao.queryContent(id); } }
7 切面
package com.hengxin.qianee.util; import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.core.NestedRuntimeException; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource; import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute; import org.springframework.transaction.interceptor.TransactionAttribute; import org.springframework.transaction.interceptor.TransactionInterceptor; import org.springframework.util.PatternMatchUtils; import org.springframework.util.ReflectionUtils; /** * * * <pre> * * 此类实现了两个职责(为了减少类的数量将两个功能合并到一起了): * 读/写动态数据库选择处理器 * 通过AOP切面实现读/写选择 * * * ★★读/写动态数据库选择处理器★★ * 1、首先读取<tx:advice>事务属性配置 * * 2、对于所有读方法设置 read-only="true" 表示读取操作(以此来判断是选择读还是写库),其他操作都是走写库 * 如<tx:method name="×××" read-only="true"/> * * 3、 forceChoiceReadOnWrite用于确定在如果目前是写(即开启了事务),下一步如果是读, * 是直接参与到写库进行读,还是强制从读库读<br/> * forceChoiceReadOnWrite:false 表示目前是写,下一步如果是读,强制参与到写事务(即从写库读) * 这样可以避免写的时候从读库读不到数据 * * 通过设置事务传播行为:SUPPORTS实现 * * forceChoiceReadOnWrite:true 表示不管当前事务是写/读,都强制从读库获取数据 * 通过设置事务传播行为:NOT_SUPPORTS实现(连接是尽快释放) * 『此处借助了 NOT_SUPPORTS会挂起之前的事务进行操作 然后再恢复之前事务完成的』 * 4、配置方式 * <bean id="readWriteDataSourceTransactionProcessor" class="cn.javass.common.datasource.ReadWriteDataSourceProcessor"> * <property name="forceChoiceReadWhenWrite" value="false"/> * </bean> * * 5、目前只适用于<tx:advice>情况 TODO 支持@Transactional注解事务 * * * * ★★通过AOP切面实现读/写库选择★★ * * 1、首先将当前方法 与 根据之前【读/写动态数据库选择处理器】 提取的读库方法 进行匹配 * * 2、如果匹配,说明是读取数据: * 2.1、如果forceChoiceReadOnWrite:true,即强制走读库 * 2.2、如果之前是写操作且forceChoiceReadOnWrite:false,将从写库进行读取 * 2.3、否则,到读库进行读取数据 * * 3、如果不匹配,说明默认将使用写库进行操作 * * 4、配置方式 * <aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor"> * <aop:around pointcut-ref="txPointcut" method="determineReadOrWriteDB"/> * </aop:aspect> * 4.1、此处order = Integer.MIN_VALUE 即最高的优先级(请参考http://jinnianshilongnian.iteye.com/blog/1423489) * 4.2、切入点:txPointcut 和 实施事务的切入点一样 * 4.3、determineReadOrWriteDB方法用于决策是走读/写库的,请参考 * @see cn.javass.common.datasource.ReadWriteDataSourceDecision * @see cn.javass.common.datasource.ReadWriteDataSource * * </pre> * @author Zhang Kaitao * */ public class ReadWriteDataSourceProcessor implements BeanPostProcessor { // private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSourceProcessor.class); private boolean forceChoiceReadWhenWrite = false; private Map<String, Boolean> readMethodMap = new HashMap<String, Boolean>(); /** * 当之前操作是写的时候,是否强制从从库读 * 默认(false) 当之前操作是写,默认强制从写库读 * @param forceReadOnWrite */ public void setForceChoiceReadWhenWrite(boolean forceChoiceReadWhenWrite) { this.forceChoiceReadWhenWrite = forceChoiceReadWhenWrite; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if(!(bean instanceof NameMatchTransactionAttributeSource)) { return bean; } try { NameMatchTransactionAttributeSource transactionAttributeSource = (NameMatchTransactionAttributeSource)bean; Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap"); nameMapField.setAccessible(true); Map<String, TransactionAttribute> nameMap = (Map<String, TransactionAttribute>) nameMapField.get(transactionAttributeSource); for(Entry<String, TransactionAttribute> entry : nameMap.entrySet()) { RuleBasedTransactionAttribute attr = (RuleBasedTransactionAttribute)entry.getValue(); //仅对read-only的处理 if(!attr.isReadOnly()) { continue; } String methodName = entry.getKey(); Boolean isForceChoiceRead = Boolean.FALSE; if(forceChoiceReadWhenWrite) { //不管之前操作是写,默认强制从读库读 (设置为NOT_SUPPORTED即可) //NOT_SUPPORTED会挂起之前的事务 attr.setPropagationBehavior(Propagation.NOT_SUPPORTED.value()); isForceChoiceRead = Boolean.TRUE; } else { //否则 设置为SUPPORTS(这样可以参与到写事务) attr.setPropagationBehavior(Propagation.SUPPORTS.value()); } System.out.println("read/write transaction process method:{} force read:{}"+" "+ methodName+" "+ isForceChoiceRead); readMethodMap.put(methodName, isForceChoiceRead); } } catch (Exception e) { throw new ReadWriteDataSourceTransactionException("process read/write transaction error", e); } return bean; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } private class ReadWriteDataSourceTransactionException extends NestedRuntimeException { public ReadWriteDataSourceTransactionException(String message, Throwable cause) { super(message, cause); } } //ProceedingJoinPoint 连接点
public Object determineReadOrWriteDB(ProceedingJoinPoint pjp) throws Throwable { if (isChoiceReadDB(pjp.getSignature().getName())) { ReadWriteDataSourceDecision.markRead(); System.out.println("方法:" + pjp.getSignature().getName() +"进入读库!"); } else { ReadWriteDataSourceDecision.markWrite(); System.out.println("方法:" + pjp.getSignature().getName() +"进入写库!"); } try { return pjp.proceed(); } finally { System.out.println(pjp.getSignature().getName()+" "+"reset方法"); ReadWriteDataSourceDecision.reset(); } } private boolean isChoiceReadDB(String methodName) { String bestNameMatch = null; for (String mappedName : this.readMethodMap.keySet()) { if (isMatch(methodName, mappedName)) { bestNameMatch = mappedName; break; } } Boolean isForceChoiceRead = readMethodMap.get(bestNameMatch); //表示强制选择 读 库 if(isForceChoiceRead == Boolean.TRUE) { System.out.println("表示强制选择 读 库"); return true; } //如果之前选择了写库 现在还选择 写库 if(ReadWriteDataSourceDecision.isChoiceWrite()) { System.out.println("如果之前选择了写库 现在还选择 写库"); return false; } //表示应该选择读库 if(isForceChoiceRead != null) { System.out.println("表示应该选择读库"); return true; } //默认选择 写库 return false; } protected boolean isMatch(String methodName, String mappedName) { return PatternMatchUtils.simpleMatch(mappedName, methodName); } }
package com.hengxin.qianee.util; /** * <pre> * 读/写动态数据库 决策者 * 根据DataSourceType是write/read 来决定是使用读/写数据库 * 通过ThreadLocal绑定实现选择功能 * </pre> * @author Zhang Kaitao * */ public class ReadWriteDataSourceDecision { public enum DataSourceType { write, read; } private static final ThreadLocal<DataSourceType> holder = new ThreadLocal<DataSourceType>(); public static void markWrite() { holder.set(DataSourceType.write); } public static void markRead() { holder.set(DataSourceType.read); } public static void reset() { holder.set(null); } public static boolean isChoiceNone() { return null == holder.get(); } public static boolean isChoiceWrite() { return DataSourceType.write == holder.get(); } public static boolean isChoiceRead() { return DataSourceType.read == holder.get(); } }
package com.hengxin.qianee.util; import java.sql.Connection; import java.sql.SQLException; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import javax.sql.DataSource; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.jdbc.datasource.AbstractDataSource; import org.springframework.util.CollectionUtils; /** * * <pre> * 读/写动态选择数据库实现 * 目前实现功能 * 一写库多读库选择功能,请参考 * @see cn.javass.common.datasource.ReadWriteDataSourceDecision @see cn.javass.common.datasource.ReadWriteDataSourceDecision.DataSourceType * * 默认按顺序轮询使用读库 * 默认选择写库 * * 已实现:一写多读、当写时默认读操作到写库、当写时强制读操作到读库 * TODO 读库负载均衡、读库故障转移 * </pre> * @author Zhang Kaitao * */ public class ReadWriteDataSource extends AbstractDataSource implements InitializingBean { // private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSource.class); private DataSource writeDataSource; private Map<String, DataSource> readDataSourceMap; private String[] readDataSourceNames; private DataSource[] readDataSources; private int readDataSourceCount; private AtomicInteger counter = new AtomicInteger(1); /** * 设置读库(name, DataSource) * @param readDataSourceMap */ public void setReadDataSourceMap(Map<String, DataSource> readDataSourceMap) { this.readDataSourceMap = readDataSourceMap; } public void setWriteDataSource(DataSource writeDataSource) { this.writeDataSource = writeDataSource; } @Override public void afterPropertiesSet() throws Exception { if(writeDataSource == null) { throw new IllegalArgumentException("property ‘writeDataSource‘ is required"); } if(CollectionUtils.isEmpty(readDataSourceMap)) { throw new IllegalArgumentException("property ‘readDataSourceMap‘ is required"); } readDataSourceCount = readDataSourceMap.size(); readDataSources = new DataSource[readDataSourceCount]; readDataSourceNames = new String[readDataSourceCount]; int i = 0; for(Entry<String, DataSource> e : readDataSourceMap.entrySet()) { readDataSources[i] = e.getValue(); readDataSourceNames[i] = e.getKey(); i++; } } private DataSource determineDataSource() { if(ReadWriteDataSourceDecision.isChoiceWrite()) { System.out.println("current determine write datasource"); return writeDataSource; } if(ReadWriteDataSourceDecision.isChoiceNone()) { System.out.println("no choice read/write, default determine write datasource"); return writeDataSource; } return determineReadDataSource(); } private DataSource determineReadDataSource() { //按照顺序选择读库 //TODO 算法改进 int index = counter.incrementAndGet() % readDataSourceCount; if(index < 0) { index = - index; } String dataSourceName = readDataSourceNames[index]; System.out.println("current determine read datasource : {}"+" "+dataSourceName); return readDataSources[index]; } @Override public Connection getConnection() throws SQLException { return determineDataSource().getConnection(); } @Override public Connection getConnection(String username, String password) throws SQLException { return determineDataSource().getConnection(username, password); } }
时间: 2024-11-04 20:05:59