JDBC高级特性(二)分布式事和JTA基本原理

在进入主题之前我们首先来了解一下JNDI和连接池~

一、JNDI

1)是一组在Java应用中访问命名和目录服务的API(Java命名与目录接口)

命名服务将名称和对象联系起来,使得我们可以通过名称访问对象。

目录服务是命名服务的扩展,两者之间的关键差别是目录服务中对象可以有属性(例如:用户有email地址),而命名服务中对象没有属性。

2)JNDI API提供了一种统一的方式,可以在本地或网络上查找和访问服务

各种服务在命名服务器上注册一个名称,需要使用服务的应用程序通过JNDI找到对应服务就可以使用。

3)远程服务可以是任何的企业服务

DNS、LDAP(Lightweight Directory Access Protocol 轻型目录访问协议)、 CORBA对象服务、数据源对象,文件系统、Windows XP/2000/NT/Me/9x的注册表、RMI、EJB、JMS……

4)JNDI主要有两部分组成:应用程序编程接口和服务供应商接口。

应用程序编程接口提供了Java应用程序访问各种命名和目录服务的功能(API)

服务供应商接口提供了任意一种服务的供应商使用的功能(SPI:service provider  interface)

JDBC与JNDI区别:

1)在JNDI中注册能操作数据(JDBC)的服务,应用程序找到服务访问数据

JDBC:本地有驱动程序,知道数据在哪里,只有一个应用程序能用

JNDI:可以获得访问数据相关的对象,从而操作数据。本地不需要驱动,也不需要知道数据在哪里,所有需要访问数据库的都可以用

2)JNDI中的数据访问

驱动代码的加载由服务提供方负责

应用程序需要URL找到指定的服务

3)JNDI  API使应用程序可以通过逻辑名称获得指定的数据源

4)JNDI  API提供数据库源服务需要以下内容:

与数据源关联的数据库驱动程序

数据源对象的引用名称

JDBC数据源接口

javax.sql.DataSource接口是在Java程序设计时使用JNDI获得数据库连接的工具

应用程序使用JNDI通过注册名称获得的就是DataSource实现类对象

通过DataSource实现类对象可以获得数据库连接对象(Connection)

所有需要访问数据库的应用程序都可以使用此服务,所以需要同时返回多个连接对象—连接池

由驱动程序供应商实现

实现DataSource接口的对象在基于JNDI  API的命名服务中注册后,应用程序就可以通过JNDI获得相关对象,从而访问数据库服务器。

JNDI的范例程序也很少,在网上广泛流传的是一些与应用服务器结合的实例,如在Tomcat、JBoss、WebLogic中配置了JNDI的数据源,然后在程序中

去发现使用。这些例子大家都会,没意思。

能否在没有应用服务器的程序中使用JNDI技术呢?经过验证,答案是:可以!

package com.cn;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Hashtable; 

public class TestFileSystemJNDI {
	public static void main(String[] args) throws NamingException {
		Hashtable env = new Hashtable();
		String name = "F:\\denghbjar\\fscontext-1_2-beta3.zip";
		// 文件系统服务的提供者
		env.put(Context.INITIAL_CONTEXT_FACTORY,
				"com.sun.jndi.fscontext.RefFSContextFactory");
		Context ctx = new InitialContext(env);
		// 通过上下文查找名称对应的对象
		Object obj = ctx.lookup(name);
		System.out.println("名称:[" + name + "]绑定的对象是:" + obj);
	}
}

运行结果:

名称:[F:\邓海波jar\fscontext-1_2-beta3.zip]绑定的对象是:F:\邓海波jar\fscontext-1_2-beta3.zip

注意,JNDI的概念说的明白,你需要有这个服务,才可以用JNDI API来获取。

因此还需要安装文件系统服务。这个例子我不用安装什么服务,下载个文件服务包就行了。其中的两个jar文件包:fscontext.jar和providerutil.jar

二、连接池

1)是缓存在内存中的多个可重复使用的数据库连接

2)客户端请求连接池获得一个连接对象(Connection),然后就可以使用连接对象访问数据库

不需要客户端注册驱动程序

3)大大提高了数据访问性能

连接对象拿来就直接使用,不需要注册驱动及使用驱动创建连接对象

配置连接池:

连接池应该是在服务器或中间件中使用,客户端通过连接池获得连接对象用来访问数据库

使用DBCP实现连接池

commons-collections-3.2.1.jar

commons-dbcp-1.4.jar

commons-pool-1.6.jar

数据库驱动(mysql-connector-java-5.1.18-bin.jar)

创建xxx.properties文件对连接池属性进行配置

使用DBCP连接池,在xxx.properties文件中对连接池属性进行配置:

driverClassName=com.mysql.jdbc.Driver    //配置驱动类

url=jdbc:mysql://localhost:3306/test   //配置连接字符串

username=root   //登陆数据库的用户名

password=root    //登陆数据库的密码

maxActive=50    //最大活动连接数,设为0为没有限制

maxIdle=20    //最大空闲连接数,设为0为没有限制

maxWait=60000      //最大等待毫秒数,设为-1为没有限制

……

package com.cn;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;

import javax.sql.DataSource;

import org.apache.commons.dbcp.BasicDataSourceFactory;

public class Test {
	public static void main(String[] args) {
		//创建属性文件对象
		Properties p=new Properties();
		//连接
		Connection conn=null;
		//预编译
		PreparedStatement pst=null;
		//结果集
		ResultSet rs=null;
		String sql = "select * from score;";
		/*
		 * File f=new File("jdbc.properties");
		 * InputStream is=new FileInputStream(f);
		 * pro.load(is);
		 */
		try {
			// 从输入流中读取属性列表(键和元素对)。
			p.load(new FileInputStream("jdbc.properties"));
			//得到数据源
			DataSource ds=BasicDataSourceFactory.createDataSource(p);
			//得到连接对象
			conn=ds.getConnection();
			//预编译,执行sql得到结果集
			pst=conn.prepareStatement(sql);
			rs=pst.executeQuery();
			System.out.println("id\tChinese\tEnglish\thistory");
			while(rs.next()){
				System.out.print(rs.getInt("id")+"\t");
				System.out.print(rs.getInt("Chinese")+"\t");
				System.out.print(rs.getInt("English")+"\t");
				System.out.println(rs.getInt("history")+"\t");
			}
			conn.close();
		} catch (FileNotFoundException e) {

			e.printStackTrace();
		} catch (IOException e) {

			e.printStackTrace();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}
}

三、分布式事务

1. 分布式事务

事务可分为本地事务和分布式事务(Spring中对应局部事务和全局事务).

本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),

分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了

JDBC和JMS或者某个操作需要访问多个不同的数据库。

一个事务处理定义了一个工作逻辑单元,要么彻底成功要么不产生任何结果。

一个分布式事务处理只是一个在两个或更多网络资源上访问和更新数据的事务处理,

因此它在那些资源之间必然是等价的。

对于下面这个应用场景: 将用户 A 账户中的 500 元人民币转移到用户 B 的账户中,其操作流程如下

1. 将 A 账户中的金额减少 500

2. 将 B 账户中的金额增加 500

这两个操作必须保正 ACID 的事务属性:即要么全部成功,要么全部失败;

假若没有事务保障,用户的账号金额将可能发生问题:

假如第一步操作成功而第二步失败,那么用户 A 账户中的金额将就减少 500 元而

用户 B 的账号却没有任何增加(不翼而飞);同样如果第一步出错 而第二步成功,

那么用户 A 的账户金额不变而用户 B 的账号将增加 500 元(凭空而生)。

上述任何一种错误都会产生严重的数据不一致问题,

事务的缺失对于一个稳定的生产系统是不可接受的。

针对上面的应用场景

(1) 对于本地事务(账户A和账户B存储在同一个数据库中),

我们需要获取一个普通的Connection连接,然后调用Connection的

setAutoCommit(false);conn.commit();conn.rollback();等方法即可实现事务控制.

如代码1所示.

(2)分布式事务处理(账户A和账户B分别在连接为connA和connB的数据库中),这时

实现分布式事务,需要事务管理器(TransactionManager),用户事务(UserTransaction,处理分布式事务),

分布式资源管理器(XADatasource),产生符合特定需求的资源(XAResource,能被TransactionManager管理的).

其中分布式资源用于产生分布式资源(如connA和connB的数据库连接).通过UserTransaction实现分布式事务.

如下代码2所示.

代码1(本地事务)

public void transferAccount()
  {
    Connection conn = null;
    Statement stmt = null;
    try
    {
      conn = getDataSource().getConnection();
      // 将自动提交设置为 false,
      // 若设置为 true 则数据库将会把每一次数据更新认定为一个事务并自动提交
      conn.setAutoCommit(false);  

      stmt = conn.createStatement();
      // 将 A 账户中的金额减少 500
      stmt.execute("update t_account set amount = amount - 500 where account_id = 'A'");
      // 将 B 账户中的金额增加 500
      stmt.execute("update t_account set amount = amount + 500 where account_id = 'B'");  

      // 提交事务
        conn.commit();
      // 事务提交:转账的两步操作同时成功
    } catch(SQLException sqle)
    {
      try
      {
        // 发生异常,回滚在本事务中的操做
               conn.rollback();
        // 事务回滚:转账的两步操作完全撤销
                stmt.close();
                conn.close();
      }
      catch(Exception ignore)
      {   

      }
      sqle.printStackTrace();
    }
  }  

代码2(分布式事务)

public void transferAccount()
  {   

         UserTransaction userTx = null;
         Connection connA = null;
         Statement stmtA = null;   

         Connection connB = null;
         Statement stmtB = null;   

         try
         {
               // 获得 Transaction 管理对象
             userTx = (UserTransaction)getContext().lookup("\
                   java:comp/UserTransaction");
             // 从数据库 A 中取得数据库连接
             connA = getDataSourceA().getConnection();   

             // 从数据库 B 中取得数据库连接
             connB = getDataSourceB().getConnection();   

                        // 启动事务
             userTx.begin();  

             // 将 A 账户中的金额减少 500
             stmtA = connA.createStatement();
             stmtA.execute("
            update t_account set amount = amount - 500 where account_id = 'A'");  

             // 将 B 账户中的金额增加 500
             stmtB = connB.createStatement();
             stmtB.execute("\
             update t_account set amount = amount + 500 where account_id = 'B'");  

             // 提交事务
             userTx.commit();
             // 事务提交:转账的两步操作同时成功(数据库 A 和数据库 B 中的数据被同时更新)
         }
         catch(SQLException sqle)
         {
             try
             {
              // 发生异常,回滚在本事务中的操纵
             userTx.rollback();
                 // 事务回滚:转账的两步操作完全撤销
                 //( 数据库 A 和数据库 B 中的数据更新被同时撤销)  

                 stmt.close();
                 conn.close();
                 ...
             }catch(Exception ignore)
             {   

             }
             sqle.printStackTrace();   

         } catch(Exception ne)
         {
             e.printStackTrace();
         }
     }  

2. JDBC的分布式事务支持

JDBC规范中定义了XADatasource和XAConnection,用于支持分布式事务处理.

这两个接口与普通的DataSource和Connection的区别:

普通的DataSource接口的Connection getConnection() 方法返回的就是常用的JDBC数据库连接.

而XADatasource 为分布式事务提供支持的对象。

可以通过 XAResource 对象在分布式事务中利用 XAConnection 对象。

事务管理器(通常为中间层服务器的一部分)通过 XAResource 对象管理 XAConnection 对象。

XADatasource 接口的接口方法

XAConnection getXAConnection()

返回的XAConnection接口有一个方法XAResource getXAResource(),

public interface XAResource

XAResource 接口是基于 X/Open CAE 规范(分布式事务处理:XA 规范)

的工业标准 XA 接口的 Java 映射。

在分布式事务处理 (DTP) 环境中,XA 接口定义资源管理器和事务管理器之间的协定。

JDBC 驱动程序或 JMS 提供者实现此接口,以支持全局事务与数据库或消息服务连接之间的关联。

可由应用程序在外部事务管理器控制事务的环境中使用的任何事务资源均可支持 XAResource 接口。

数据库管理系统就属于此类资源。应用程序可以通过多个数据库连接访问数据。

通过事务管理器将每个数据库连接作为事务资源添加到列表中。

事务管理器为参与全局事务的每个连接获取 XAResource。

事务管理器使用 start 方法建立全局事务与资源之间的关联,

而使用 end 方法取消事务与资源之间的关联。

资源管理器负责将全局事务关联到在 start 与 end 方法调用之间对其数据执行的所有工作。

在事务提交时,事务管理器通知资源管理器根据二阶段提交协议准备、提交或回滚事务。

另外,XAConnection是继承自PooledConnection接口的:

public interface XAConnectionextends PooledConnection

PooledConnection接口的方法:

void addConnectionEventListener(ConnectionEventListener listener)

注册给定的事件侦听器,以便在此 PooledConnection

对象上发生事件时它将得到通知。

void addStatementEventListener(StatementEventListener listener)

向此 PooledConnection 对象注册一个 StatementEventListener。

void close()

关闭此 PooledConnection 对象表示的物理连接。

Connection getConnection()

创建并返回一个 Connection 对象,它是此 PooledConnection 对象表示的物理连接的句柄。

void removeConnectionEventListener(ConnectionEventListener listener)

从在此 PooledConnection 对象上发生事件时得到通知的组件所组成的列表中移除给定的事件侦听器。

void removeStatementEventListener(StatementEventListener listener)

从一个组件列表中移除指定的 StatementEventListener,该列表由驱动程序检测到

PreparedStatement 已关闭或无效时将获得通知的组件组成。

3. JTA

上文中说过,实现分布式事务,需要事务管理器(TransactionManager),用户事务(UserTransaction,处理分布式事务),

分布式资源管理器(XADatasource),产生符合特定需求的资源(XAResource,能被TransactionManager管理的).

JDBC中只定义了分布式事务所需要的资源(XAResource)机器管理器(XADatasource),

还有事务管理器(TransactionManager),用户事务(UserTransaction)等则没有涉及,而这部分正是

JTA主要关注的.

JTA(Java Transaction API)定义了一种标准的API,应用系统由此可以存取各种事务监控。

4. 分布式事务的关键角色

资源管理器,主要用于产生符合接口的资源XAResource.比如上文中说的XADatasource.这个是JDBC中的规定的接口,

各大数据库厂商提供的数据库驱动需要实现这个接口.貌似有的数据库驱动不支持XA规范.还有的数据库驱动提供

普通版本的JDBC驱动和实现了XA的驱动(支持分布式事务的驱动).

事务管理器,在JTA中定义,TransactionManager,由JTA的实现者负责实现,比如JOTM的实现.

开发人员事务接口:UserTransaction,提供给开发人员的接口,也在JTA中定义.

要理解 JTA 的实现原理首先需要了解其架构:

它包括事务管理器(Transaction Manager)和一个或多个支持 XA 协议的

资源管理器 ( Resource Manager ) 两部分,

我们可以将资源管理器看做任意类型的持久化数据存储;

事务管理器则承担着所有事务参与单元的协调与控制。

根据所面向对象的不同,我们可以将 JTA 的事务管理器和资源管理器理解为两个方面:

面向开发人员的使用接口(事务管理器)和面向服务提供商的实现接口(资源管理器)。

其中开发接口的主要部分即为上述示例中引用的 UserTransaction 对象,

开发人员通过此接口在信息系统中实现分布式事务;

而实现接口则用来规范提供商(如数据库连接提供商)所提供的事务服务,

它约定了事务的资源管理功能,使得 JTA 可以在异构事务资源之间执行协同沟通。

以数据库为例,IBM 公司提供了实现分布式事务的数据库驱动程序,Oracle 也提供了实现分布式事务的数据库驱动程序,

在同时使用 DB2 和 Oracle 两种数据库连接时, JTA 即可以根据约定的接口协调者两种事务资源从而实现分布式事务。

正是基于统一规范的不同实现使得 JTA 可以协调与控制不同数据库或者 JMS 厂商的事务资源,其架构如下

开发人员使用开发人员接口,实现应用程序对全局事务的支持;各提供商(数据库,JMS 等)

依据提供商接口的规范提供事务资源管理功能;

事务管理器( TransactionManager )将应用对分布式事务的使用映射到实际的事务

资源并在事务资源间进行协调与控制。

下面,本文将对包括 UserTransaction、Transaction 和 TransactionManager

在内的三个主要接口以及其定义的方法进行介绍。

面向开发人员的接口为 UserTransaction (使用方法如上例所示),

开发人员通常只使用此接口实现 JTA 事务管理,其定义了如下的方法:

begin()- 开始一个分布式事务,

在后台 TransactionManager 会创建一个 Transaction 事务对象

并把此对象通过 ThreadLocale 关联到当前线程上

commit()- 提交事务

在后台 TransactionManager 会从当前线程下取出事务对象并把此对象所代表的事务提交

rollback()- 回滚事务

在后台 TransactionManager 会从当前线程下取出事务对象并把此对象所代表的事务回滚

getStatus()- 返回关联到当前线程的分布式事务的状态

Status 对象里边定义了所有的事务状态,感兴趣的读者可以参考 API 文档

setRollbackOnly()- 标识关联到当前线程的分布式事务将被回滚

面向提供商的实现接口主要涉及到

TransactionManager 和 Transaction 两个对象

Transaction 代表了一个物理意义上的事务,

在开发人员调用 UserTransaction.begin() 方法时

TransactionManager 会创建一个 Transaction 事务对象(标志着事务的开始)

并把此对象通过 ThreadLocale 关联到当前线程。

UserTransaction 接口中的 commit()、rollback(),getStatus()

等方法都将最终委托给 Transaction 类的对应方法执行。

Transaction 接口定义了如下的方法:

commit()- 协调不同的事务资源共同完成事务的提交

rollback()- 协调不同的事务资源共同完成事务的回滚

setRollbackOnly()- 标识关联到当前线程的分布式事务将被回滚

getStatus()- 返回关联到当前线程的分布式事务的状态

enListResource(XAResource xaRes, int flag)- 将事务资源加入到当前的事务中

在上述示例中,在对数据库 A 操作时 其所代表的事务资源将被关联到当前事务中,

同样,在对数据库 B 操作时其所代表的事务资源也将被关联到当前事务中

delistResourc(XAResource xaRes, int flag)- 将事务资源从当前事务中删除

registerSynchronization(Synchronization sync)- 回调接口,

Hibernate 等 ORM 工具都有自己的事务控制机制来保证事务,

但同时它们还需要一种回调机制以便在事务完成时得到通知从而触发一些处理工作,

如清除缓存等。这就涉及到了 Transaction 的回调接口 registerSynchronization。

工具可以通过此接口将回调程序注入到事务中,当事务成功提交后,回调程序将被激活。

TransactionManager 本身并不承担实际的事务处理功能,

它更多的是充当用户接口和实现接口之间的桥梁。

下面列出了 TransactionManager 中定义的方法,

可以看到此接口中的大部分事务方法与 UserTransaction 和 Transaction 相同。

在开发人员调用 UserTransaction.begin() 方法时 TransactionManager

会创建一个 Transaction 事务对象(标志着事务的开始)并把此对象通过 ThreadLocale 关联到当前线程上;

同样 UserTransaction.commit() 会调用 TransactionManager.commit(),

方法将从当前线程下取出事务对象 Transaction 并把此对象所代表的事务提交,

即调用 Transaction.commit()

begin()- 开始事务

commit()- 提交事务

rollback()- 回滚事务

getStatus()- 返回当前事务状态

setRollbackOnly()

getTransaction()- 返回关联到当前线程的事务

setTransactionTimeout(int seconds)- 设置事务超时时间

resume(Transaction tobj)- 继续当前线程关联的事务

suspend()- 挂起当前线程关联的事务

在系统开发过程中会遇到需要将事务资源暂时排除的操作,此时就需要调用 suspend() 方法将当前的事务挂起:

在此方法后面所做的任何操作将不会被包括在事务中,在非事务性操作完成后调用 resume()以继续事务

注: 要进行此操作需要获得 TransactionManager 对象, 其获得方式在不同的 J2EE 应用服务器上是不一样的

下面将通过具体的代码向读者介绍 JTA 实现原理。

下图列出了示例实现中涉及到的 Java 类,

其中 UserTransactionImpl 实现了 UserTransaction 接口,

TransactionManagerImpl 实现了 TransactionManager 接口,

TransactionImpl 实现了 Transaction 接口

清单 3. 开始事务 - UserTransactionImpl implenments UserTransaction

public void begin() throws NotSupportedException, SystemException
    {
       // 将开始事务的操作委托给 TransactionManagerImpl
       TransactionManagerImpl.singleton().begin();
    }   

清单 4. 开始事务 - TransactionManagerImpl implements TransactionManager

// 此处 transactionHolder 用于将 Transaction 所代表的事务对象关联到线程上
    private static ThreadLocal<TransactionImpl> transactionHolder
            = new ThreadLocal<TransactionImpl>();   

    //TransacationMananger 必须维护一个全局对象,因此使用单实例模式实现
    private static TransactionManagerImpl singleton = new TransactionManagerImpl();   

    private TransactionManagerImpl()
    {   

    }   

    public static TransactionManagerImpl singleton()
    {
    return singleton;
    }   

    public void begin() throws NotSupportedException, SystemException
    {
    //XidImpl 实现了 Xid 接口,其作用是唯一标识一个事务
    XidImpl xid = new XidImpl();
    // 创建事务对象,并将对象关联到线程
    TransactionImpl tx = new TransactionImpl(xid);   

    transactionHolder.set(tx);
    }  

现在我们就可以理解 Transaction 接口上没有定义 begin 方法的原因了:Transaction 对象本身就代表了一个事务,

在它被创建的时候就表明事务已经开始,因此也就不需要额外定义 begin() 方法了。

清单 5. 提交事务 - UserTransactionImpl implenments UserTransaction

public void commit() throws RollbackException, HeuristicMixedException,
    HeuristicRollbackException, SecurityException,
    IllegalStateException, SystemException
      {
    // 检查是否是 Roll back only 事务,如果是回滚事务
      if(rollBackOnly)
      {
        rollback();   

        return;
      }
      else
      {
      // 将提交事务的操作委托给 TransactionManagerImpl
      TransactionManagerImpl.singleton().commit();
      }
    } 

清单 6. 提交事务 - TransactionManagerImpl implenments TransactionManager

public void commit() throws RollbackException, HeuristicMixedException,
       HeuristicRollbackException, SecurityException,
       IllegalStateException, SystemException
   {   

      // 取得当前事务所关联的事务并通过其 commit 方法提交
      TransactionImpl tx = transactionHolder.get();
      tx.commit();
   }   

同理, rollback、getStatus、setRollbackOnly 等方法也采用了与 commit() 相同的方式实现。

UserTransaction 对象不会对事务进行任何控制, 所有的事务方法都是通过 TransactionManager

传递到实际的事务资源即 Transaction 对象上。

上述示例演示了 JTA 事务的处理过程,

下面将为您展示事务资源(数据库连接,JMS)是如何以透明的方式加入到 JTA 事务中的。

首先需要明确的一点是,在 JTA 事务 代码中获得的数据库源 ( DataSource ) 必须是支持分布式事务的。

在如下的代码示例中,尽管所有的数据库操作都被包含在了 JTA 事务中,但是因为 MySql 的数据库连接是通过本地方式获得的,

对 MySql 的任何更新将不会被自动包含在全局事务中。

清单 7. JTA 事务处理

public void transferAccount()
    {   

   UserTransaction userTx = null;
   Connection mySqlConnection = null;
   Statement mySqlStat = null;   

   Connection connB = null;
   Statement stmtB = null;   

   try
   {
     // 获得 Transaction 管理对象
   userTx = (UserTransaction)getContext().lookup("java:comp/UserTransaction");
   // 以本地方式获得 mySql 数据库连接
   mySqlConnection = DriverManager.getConnection("localhost:1111");   

   // 从数据库 B 中取得数据库连接, getDataSourceB 返回应用服务器的数据源
   connB = getDataSourceB().getConnection();   

          // 启动事务
   userTx.begin();  

   // 将 A 账户中的金额减少 500
   //mySqlConnection 是从本地获得的数据库连接,不会被包含在全局事务中
   mySqlStat = mySqlConnection.createStatement();
   mySqlStat.execute("
                update t_account set amount = amount - 500 where account_id = 'A'");  

   //connB 是从应用服务器得的数据库连接,会被包含在全局事务中
   stmtB = connB.createStatement();
   stmtB.execute("
                update t_account set amount = amount + 500 where account_id = 'B'");  

   // 事务提交:connB 的操作被提交,mySqlConnection 的操作不会被提交
   userTx.commit();  

   } catch(SQLException sqle){
   // 处理异常代码
   } catch(Exception ne){
   e.printStackTrace();
   }
   }

为什么必须从支持事务的数据源中获得的数据库连接才支持分布式事务呢?

其实支持事务的数据源与普通的数据源是不同的,它实现了额外的 XADataSource 接口。

我们可以简单的将 XADataSource 理解为普通的数据源(继承了 java.sql.PooledConnection),

只是它为支持分布式事务而增加了 getXAResource 方法。

另外,由 XADataSource 返回的数据库连接与普通连接也是不同的,

此连接除了实现 java.sql.Connection 定义的所有功能之外还实现了 XAConnection 接口。

我们可以把 XAConnection 理解为普通的数据库连接,它支持所有 JDBC 规范的数据库操作,

不同之处在于 XAConnection 增加了对分布式事务的支持。

通过下面的类图读者可以对这几个接口的关系有所了解:

图 3. 事务资源类图

应用程序从支持分布式事务的数据源获得的数据库连接是 XAConnection 接口的实现,

而由此数据库连接创建的会话(Statement)也为了支持分布式事务而增加了功能,如下代码所示:

清单 8. JTA 事务资源处理

public void transferAccount()
     {   

    UserTransaction userTx = null;   

    Connection conn = null;
    Statement stmt = null;   

    try
    {
      // 获得 Transaction 管理对象
    userTx = (UserTransaction)getContext().lookup("java:comp/UserTransaction");   

    // 从数据库中取得数据库连接, getDataSourceB 返回支持分布式事务的数据源
    conn = getDataSourceB().getConnection();
           // 会话 stmt 已经为支持分布式事务进行了功能增强
    stmt = conn.createStatement();   

           // 启动事务
    userTx.begin();
           stmt.execute("update t_account ... where account_id = 'A'");
    userTx.commit();  

    } catch(SQLException sqle){
    // 处理异常代码
    } catch(Exception ne){
    e.printStackTrace();
    }
    } 

我们来看一下由 XAConnection 数据库连接创建的会话(Statement)部分的代码实现

(不同的 JTA 提供商会有不同的实现方式,此处代码示例只是向您演示事务资源是如何被自动加入到事务中)。

我们以会话对象的 execute 方法为例,通过在方法开始部分增加对 associateWithTransactionIfNecessary 方法的调用,

即可以保证在 JTA 事务期间,对任何数据库连接的操作都会被透明的加入到事务中。

清单 9. 将事务资源自动关联到事务对象 - XAStatement implements Statement

public void execute(String sql)
     {
         // 对于每次数据库操作都检查此会话所在的数据库连接是否已经被加入到事务中
    associateWithTransactionIfNecessary();   

    try
    {
              // 处理数据库操作的代码
         ....   

    } catch(SQLException sqle){
    // 处理异常代码
    } catch(Exception ne){
    e.printStackTrace();
    }
    }   

     public void associateWithTransactionIfNecessary()
     {   

    // 获得 TransactionManager
    TransactionManager tm = getTransactionManager();   

                    Transaction tx = tm.getTransaction();
            // 检查当前线程是否有分布式事务
           if(tx != null){
    // 在分布式事务内,通过 tx 对象判断当前数据连接是否已经被包含在事务中,
    //如果不是那么将此连接加入到事务中
    Connection conn = this.getConnection();
    //tx.hasCurrentResource, xaConn.getDataSource() 不是标准的 JTA
    // 接口方法,是为了实现分布式事务而增加的自定义方法
    if(!tx.hasCurrentResource(conn)){
        XAConnection xaConn = (XAConnection)conn;
        XADataSource xaSource = xaConn.getDataSource();   

        // 调用 Transaction 的接口方法,将数据库事务资源加入到当前事务中
        tx.enListResource(xaSource.getXAResource(), 1);
            }
            }
    }  

XAResource 与 Xid: XAResource 是 Distributed Transaction Processing: The XA Specification 标准的 Java 实现,

它是对底层事务资源的抽象,定义了分布式事务处理过程中事务管理器和资源管理器之间的协议,

各事务资源提供商(如 JDBC 驱动,JMS)将提供此接口的实现。

使用此接口,开发人员可以通过自己的编程实现分布式事务处理,

但这些通常都是由应用服务器实现的(服务器自带实现更加高效,稳定) 为了说明,

我们将举例说明他的使用方式。

在使用分布式事务之前,为了区分事务使之不发生混淆,必须实现一个 Xid 类用来标识事务,

可以把 Xid 想象成事务的一个标志符,每次在新事务创建是都会为事务分配一个 Xid,

Xid 包含三个元素:formatID、gtrid(全局事务标识符)和 bqual(分支修饰词标识符)。

formatID 通常是零,这意味着你将使用 OSI CCR(Open Systems Interconnection Commitment, Concurrency 和 Recovery 标准)来命名;

如果你要使用另外一种格式,那么 formatID 应该大于零,-1 值意味着 Xid 为无效。

gtrid 和 bqual 分别包含 64 个字节二进制码来分别标识全局事务和分支事务,

唯一的要求是 gtrid 和 bqual 必须是全局唯一的。

XAResource 接口中主要定义了如下方法:

commit()- 提交事务

isSameRM(XAResource xares)- 检查当前的 XAResource 与参数是否同一事务资源

prepare()- 通知资源管理器准备事务的提交工作

rollback()- 通知资源管理器回滚事务

在事务被提交时,Transaction 对象会收集所有被当前事务包含的 XAResource 资源,

然后调用资源的提交方法,如下代码所示:

清单 10. 提交事务 - TransactionImpl implements Transaction

public void commit() throws RollbackException, HeuristicMixedException,
    HeuristicRollbackException, SecurityException,
    IllegalStateException, SystemException
    {   

    // 得到当前事务中的所有事务资源
       List<XAResource> list = getAllEnlistedResouces();   

    // 通知所有的事务资源管理器,准备提交事务
     // 对于生产级别的实现,此处需要进行额外处理以处理某些资源准备过程中出现的异常
    for(XAResource xa : list)
    {
    xa.prepare();
    }   

      // 所有事务性资源,提交事务
      for(XAResource xa : list)
      {
         xa.commit();
       }
    }  

参考:JTA 深度历险 - 原理与实现

时间: 2024-10-25 19:06:11

JDBC高级特性(二)分布式事和JTA基本原理的相关文章

JDBC高级特性(二)事务、并发控制和行集

一.事务 事务是指一个工作单元,它包括了一组加入,删除,改动等数据操作命令,这组命令作为一个总体向系统提交运行,要么都运行成功,要么所有恢复 在JDBC中使用事务 1)con.setAutoCommit(false),取消自己主动提交 2)对数据库运行一个或多个操作(一个或多个SQL语句) 3)con.commit().提交事务(上面的第二部的多个操作就作为一个总体提交运行) 4)假设某个操作失败.通过con.rollback()回滚全部操作(撤销以上的操作,将数据恢复为运行前状态) 事务处理依

JDBC高级特性(一)结果集,批量更新

一.ResultSet的高级特性 1 可滚动ResultSet 1)向前和向后滚动 滚动特性 在JDBC初期版本中, ResultSet仅能向前滚动 在JDBC后续版本中, ResultSet默认能向前滚动或前后滚动 迟缓滚动:记录集可前后滚动,不受数据库数据更新影响 灵敏滚动:记录集可前后滚动,受数据库数据更新影响 由结果集类型设定 con.createStatement() con.createStatement(结果集类型, 结果集并发类型) con.createStatement(结果集

Java JDBC高级特性

1.JDBC批处理 实际开发中需要向数据库发送多条SQL语句,这时,如果逐条执行SQL语句,效率会很低,因此可以使用JDBC提供的批处理机制.Statement和PreparedStatemen都实现了批处理.测试表结构如下: Statement批处理程序示例 1 package server; 2 3 import java.sql.Connection; 4 import java.sql.DriverManager; 5 import java.sql.ResultSet; 6 impor

JVM高级特性-二、JVM在堆中对象的分配、布局、访问过程

前面介绍了jvm运行时数据区域后,下面讲解下对内存中数据的其他细节,看他们是如何创建.布局及访问的 一.对象的创建 对象的创建分配方式主要有两种:指针碰撞和空闲列表 指针碰撞: 假设堆内存中是绝对规整的,那么,在为新对象分配内存空间时,只需要将指针向空闲空间方向移动新对象所需大小的一段出来即可 空闲列表: 如果内存不是规整的,这时就需要维护一个列表,记录哪些内存是空闲的,在分配空间时,从列表中找出一块足够大的空间划分为对象实例并更新列表记录

PHP高级特性二之文件处理

PHP中的文件处理也是一个相当重要的模块,这一篇的主要内容就是PHP中文件系统的简介. 文件系统用途 1. 项目处理都离不开文件处理 2. 可以用文件长时间保存数据 3. 建立缓存,在服务器中进行文件操作 文件系统函数用法详述 1.基本的判断函数 is_dir — 判断给定文件名是否是一个目录 is_file — 判断给定文件名是否为一个文件 is_executable — 判断给定文件名是否可执行 is_link — 判断给定文件名是否为一个符号连接 is_readable — 判断给定文件名

Python学习笔记(二)——高级特性

知识点 切片 切片 取一个list或tuple的部分元素. 原理:调用__getitem__,__setitem__,__delitem__和slice函数. 根据官方的帮助文档(https://docs.python.org/2/library/operator.html)可知,_getitem_返回元素的下标,_setitem_设置元素的值,_remove_删除元素的值. 而slice函数实现最重要的切片功能.            x=a[1:5] --> x._getitem_(slic

(十二)boost库之多线程高级特性

(十二)boost库之多线程高级特性 很多时候,线程不仅仅是执行一些耗时操作,可能我们还需要得到线程的返回值,一般的处理方法就是定义一个全局状态变量,不断轮训状态,就如我目前维护的一个项目,全局变量定义了N中状态,看的让人抓狂.该项目的大体逻辑是这样的,启动K个线程,当线程执行到某一个点时,进行轮训,判断是否所有线程都执行到该点,单独开启了一个线程用于轮训所有线程是否结束,待所有线程结束后会获取数据,生成一个文件,另外还有一个线程就在轮训文件是否生成,然后读取文件进行下一步操作.各种的轮训,显得

JVM高级特性与实践(二):对象存活判定算法(引用) 与 回收

关于垃圾回收器GC(Garbage Collection),多数人意味它是Java语言的伴生产物.事实上,GC的历史远比Java悠远,于1960年诞生在MIT的Lisp是第一门真正使用内存动态分配和垃圾收集技术的语言.当Lisp尚在胚胎时期,开发人员就在思考GC需要完成的3件事情: 哪些内存需要回收? 什么时候回收? 如何回收? 目前GC早已解决了以上问题,内存的动态分配与内存回收机制已经相当成熟,一切似乎“自动化”起来.而开发人员仍旧需要了解GC和内存分配等底层知识,因为在排查各种内存溢出.内

Java事务(七) - 分布式事务 - spring + JTA + jotm

一. 前言: 在写这篇博客之前,我们需要弄清楚两个概念:本地事务和分布式事务. 本地事务:只处理单一数据源,比如单个数据库. 分布式事务:处理多种异构的数据源, 比如某个业务操作中同时包含JDBC和JMS或者某个操作需要访问多个不同的数据库. Java通过JTA完成分布式事务, JTA本身只是一种规范, 本篇博客将使用JOTM作为实现, 后续还会使用Atomikos实现. 二. 业务背景: 假定我们有这样一个需求:当我们新建一个用户的时候需要往一个DB中插入一条用户记录,还需要往另一个DB中记录