结合ThreadLocal来看spring事务源码,感受下清泉般的洗涤!

  在我的博客spring事务源码解析中,提到了一个很关键的点:将connection绑定到当前线程来保证这个线程中的数据库操作用的是同一个connection。但是没有细致的讲到如何绑定,以及为什么这么绑定;另外也没有讲到连接池的相关问题:如何从连接池获取,如何归还连接到连接池等等。那么下面就请听我慢慢道来。

ThreadLocal

  讲spring事务之前,我们先来看看ThreadLocal,它在spring事务中是占据着比较重要的地位;不管你对ThreadLocal熟悉与否,且都静下心来听我唐僧般的念叨。

  先强调一点:ThreadLocal不是用来解决共享变量问题的,它与多线程的并发问题没有任何关系。

  基本介绍

    当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本,如下例:

public class ThreadLocalTest
{
    ThreadLocal<Long> longLocal = new ThreadLocal<Long>();
    ThreadLocal<String> stringLocal = new ThreadLocal<String>();

    public void set()
    {
        longLocal.set(1L);
        stringLocal.set(Thread.currentThread().getName());
    }

    public long getLong()
    {
        return longLocal.get();
    }

    public String getString()
    {
        return stringLocal.get();
    }

    public static void main(String[] args) throws InterruptedException
    {
        final ThreadLocalTest test = new ThreadLocalTest();

        test.set();     // 初始化ThreadLocal
        for (int i=0; i<10; i++)
        {
            System.out.println(test.getString() + " : " + test.getLong() + i);
        }

        Thread thread1 = new Thread(){
            public void run() {
                test.set();
                for (int i=0; i<10; i++)
                {
                    System.out.println(test.getString() + " : " + test.getLong() + i);
                }
            };
        };
        thread1.start();

        Thread thread2 = new Thread(){
            public void run() {
                test.set();
                for (int i=0; i<10; i++)
                {
                    System.out.println(test.getString() + " : " + test.getLong() + i);
                }
            };
        };
        thread2.start();
    }
}

    执行结果如下

    可以看到,各个线程的longLocal值与stringLocal值是相互独立的,本线程的累加操作不会影响到其他线程的值,真正达到了线程内部隔离的效果。

  源码解读

    这里我就不进行ThreadLocal的源码解析,建议大家去看我参考的博客,个人认为看那两篇博客就能对ThreadLocal有个很深地认知了。

    做个重复的强调(引用[Java并发包学习七]解密ThreadLocal中的一段话):

Thread与ThreadLocal对象之间的引用关系图
 
看了ThreadLocal源码,不知道大家有没有一个疑惑:为什么像上图那么设计? 如果给你设计,你会怎么设计?相信大部分人会有这样的想法,我也是这样的想法:
  ”每个ThreadLocal类创建一个Map,然后用线程的ID作为Map的key,实例对象作为Map的value,这样就能达到各个线程的值隔离的效果“
JDK最早期的ThreadLocal就是这样设计的。(不确定是否是1.3)之后ThreadLocal的设计换了一种方式,也就是目前的方式,那有什么优势了:
  1、这样设计之后每个Map的Entry数量变小了:之前是Thread的数量,现在是ThreadLocal的数量,能提高性能,据说性能的提升不是一点两点(没有亲测)
  2、当Thread销毁之后对应的ThreadLocalMap也就随之销毁了,能减少内存使用量。

Spring事务中的ThreadLocal

  最常见的ThreadLocal使用场景为 用来解决数据库连接、Session管理等,那么接下来我们就看看spring事务中ThreadLocal的应用

ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext-jdbc.xml");
DaoImpl daoImpl = (DaoImpl) ac.getBean("daoImpl");
System.out.println(daoImpl.insertUser("yes", 25));

  只要某个类的方法、类或者接口上有事务配置,spring就会对该类的实例生成代理。所以daoImpl是DaoImpl实例的代理实例的引用,而不是DaoImpl的实例(目标实例)的引用;当我们调用目标实例的方法时,实际调用的是代理实例对应的方法,若目标方法没有被@Transactional(或aop注解,当然这里不涉及aop)修饰,那么代理方法直接反射调用目标方法,若目标方法被@Transactional修饰,那么代理方法会先执行增强(例如判断当前线程是否存在connection,不存在则新建并绑定到当前线程等等),然后通过反射执行目标方法,最后回到代理方法执行增强(例如,事务回滚或事务提交、connection归还到连接池等等处理)。这里的绑定connection到当前线程就用到了ThreadLocal,我们来看看源码

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {

        if (txObject.getConnectionHolder() == null ||
                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            // 从连接池获取一个connection
            Connection newCon = this.dataSource.getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            // 包装newCon,并赋值到txObject,并标记是新的ConnectionHolder
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);

        // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
        // so we don‘t want to do it unnecessarily (for example if we‘ve explicitly
        // configured the connection pool to set it already).
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            con.setAutoCommit(false);
        }
        txObject.getConnectionHolder().setTransactionActive(true);

        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // 若是新的ConnectionHolder,则将它绑定到当前线程中
        // Bind the session holder to the thread.
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
        }
    }

    catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, this.dataSource);
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}
/**
 * Bind the given resource for the given key to the current thread.
 * @param key the key to bind the value to (usually the resource factory)
 * @param value the value to bind (usually the active resource object)
 * @throws IllegalStateException if there is already a value bound to the thread
 * @see ResourceTransactionManager#getResourceFactory()
 */
public static void bindResource(Object key, Object value) throws IllegalStateException {        //key:通常指资源工厂,也就是connection工厂,value:通常指活动的资源,也就是活动的ConnectionHolder

    // 必要时unwrap给定的连接池; 否则按原样返回给定的连接池。
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Assert.notNull(value, "Value must not be null");
    Map<Object, Object> map = resources.get();
    // set ThreadLocal Map if none found   如果ThreadLocal Map不存在则新建,并将其设置到resources中
    // private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources");  // 这就到了ThreadLocal流程了
    if (map == null) {
        map = new HashMap<Object, Object>();
        resources.set(map);
    }
    Object oldValue = map.put(actualKey, value);
    // Transparently suppress a ResourceHolder that was marked as void...
    if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
        oldValue = null;
    }
    if (oldValue != null) {
        throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
                actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    }
    if (logger.isTraceEnabled()) {
        logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
                Thread.currentThread().getName() + "]");
    }
}

总结

  1、ThreadLocal能解决的问题,那肯定不是共享变量(多线程并发)问题,只是看起来有些像并发;像火车票、电影票这样的真正的共享变量的问题用ThreadLocal是解决不了的,同一时间,同一趟车的同一个座位,你敢用ThreadLocal来解决吗?

  2、每个Thread维护一个ThreadLocalMap映射表,这个映射表的key是ThreadLocal实例本身,value是真正需要存储的Object

  3、druid连接池用的是数组来存放的connectionHolder,不是我认为的list,connectionHolder从线程中解绑后,归还到数组连接池中;connectionHolder是connection的封装

疑问:

  private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

  1、 为什么是ThreadLocal<Map<Object, Object>>,而不是ThreadLocal<ConnectionHolder>

  2、 ThreadLocal<Map<Object, Object>> 中的Map的key是为什么是DataSource

  望知道的朋友赐教下,评论留言或者私信都可以,谢谢!

时间: 2024-10-22 14:06:35

结合ThreadLocal来看spring事务源码,感受下清泉般的洗涤!的相关文章

spring事务源码研读1

转载摘录自:Spring事务源码分析(一)Spring事务入门 有时为了保证一些操作要么都成功,要么都失败,这就需要事务来保证. 传统的jdbc事务如下: @Test public void testAdd(){ Connection con=null; try { con=DriverManager.getConnection(url , username , password ) con.setAutoCommit(false); //操作一 PreparedStatement ps = c

spring事务源码分析结合mybatis源码(一)

最近想提升,苦逼程序猿,想了想还是拿最熟悉,之前也一直想看但没看的spring源码来看吧,正好最近在弄事务这部分的东西,就看了下,同时写下随笔记录下,以备后查. spring tx源码分析 这里只分析简单事务也就是DataSourceTransactionManager 首先肯定找入口了,看过spring源码的同学一定都会找spring tx的入口就是在TxAdviceBeanDefinitionParser这里将解析tx的配置,生成TransactionInterceptor对象,这个也就是一

Spring事务源码

启动事务 @EnableTransactionManagement 注解来启用事务能力. 参数解释 proxyTargetClass:默认为false,表示使用 JDK 的代理模式,true表示用 CGLib 的代理模式,仅在 mode 是 PROXY 时才有效. mode:默认为PROXY,表示使用 AOP 代理来实现事务,ASPECTJ表示用 ASPECTJ 来实现事务,ASPECTJ 相比 PROXY 减少了一些使用限制,比如支持在同一个类内部方法调用. order:事务通知执行的顺序,默

spring事务源码分析结合mybatis源码(三)

下面将结合mybatis源码来分析下,这种持久化框架是如何对connection使用,来达到spring事务的控制. 想要在把mybatis跟spring整合都需要这样一个jar包:mybatis-spring-x.x.x.jar,这里面定义了一些主要的整合信息. 在spring配置文件中需要配置如下两个bean: <!-- mybatis配置 --> <bean id="sqlSessionFactory" class="org.mybatis.sprin

Spring 事务源码分析——Hibernate篇

在Spring与Hibernate整合的时候,可以利用Spring的事务管理机制,为我们管理事务的开启.提交.回滚等操作.这样的方式极大的减少了我们的代码量,让我们只专注于业务代码的编写.在使用Hibernate的时候,每一个操作都要经历事务开启与提交这样的操作,他们在业务代码的周围,这样来看是不是就想到了AOP编程,把这部分代码抽取出来.没错,Spring正是这样做的,Spring的事务管理就是基于AOP的. 1 Spring的事务隔离与传播 Srping的事务定义了五个隔离等级(isolat

Spring事务源码解析(二)获取增强

在上一篇文章@EnableTransactionManagement注解解析中,我们搭建了源码阅读的环境,以及解析了开启Spring事务功能的注解@EnableTransactionManagement的实现逻辑 在进行接下来的源码解析之前我想大家应该知道,当我们使用传统的jdbc应用事务的时候是不是做了如下操作: 开启事务 save.update.delete等操作 出现异常进行回滚 正常情况提交事务 而在Spring中我们好像只需要关心第三步,也就是我们的业务,而其他的操作都不需要关心.那么

Spring事务源码分析总结

Spring事务是我们日常工作中经常使用的一项技术,Spring提供了编程.注解.aop切面三种方式供我们使用Spring事务,其中编程式事务因为对代码入侵较大所以不被推荐使用,注解和aop切面的方式可以基于需求自行选择,我们以注解的方式为例来分析Spring事务的原理和源码实现. //配置事务管理器<tx:annotation-driven transaction-manager="transactionManager"/> <bean id="trans

Spring事务源码分析

首先看例子,这例子摘抄自开涛的跟我学spring3. @Test public void testPlatformTransactionManager() { DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); def.setPropagationBehavior(T

spring事务源码分析结合mybatis源码(二)

让我们继续上篇,分析下如果有第二个调用进入的过程. 代码部分主要是下面这个: if (isExistingTransaction(transaction)) { return handleExistingTransaction(definition, transaction, debugEnabled); } protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txOb