java分布式事务

原文地址:http://blog.csdn.net/moonpure/article/details/52779794 

本系列先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子。

  请通过以下方式下载github源代码:

git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git

  本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了JDBC和JMS或者某个操作需要访问多个不同的数据库。

  Java通过JTA完成分布式事务,JTA本身只是一种规范,不同的应用服务器都包含有自己的实现(比如JbossJTA),同时还存在独立于应用服务器的单独JTA实现,比如本篇中要讲到的Atomikos。对于JTA的原理,这里不细讲,读者可以通过这篇文章了解相关知识。

  在本篇文章中,我们将实现以下一个应用场景:你在网上购物,下了订单之后,订单数据将保存在系统的数据库中,同时为了安排物流,订单信息将以消息(Message)的方式发送到物流部门以便送货。

  以上操作同时设计到数据库操作和JMS消息发送,为了使整个操作成为一个原子操作,我们只能选择分布式事务。我们首先设计一个service层,定义OrderService接口:

package davenkin;

public interface OrderService {

    public void makeOrder(Order order);
}

  为了简单起见,我们设计一个非常简单的领域对象Order:

@XmlRootElement(name = "Order")
@XmlAccessorType(XmlAccessType.FIELD)
public class Order {

    @XmlElement(name = "Id",required = true)
    private long id;

    @XmlElement(name = "ItemName",required = true)
    private String itemName;

    @XmlElement(name = "Price",required = true)
    private double price;

    @XmlElement(name = "BuyerName",required = true)
    private String buyerName;

    @XmlElement(name = "MailAddress",required = true)
    private String mailAddress;

    public Order() {
    }

  为了采用JAXB对Order对象进行Marshal和Unmarshal,我们在Order类中加入了JAXB相关的Annotation。 我们将使用hibernate来完成数据持久化,然后使用spring提供的JmsTemplate将Order转成xml后以TextMessage的形式发送到物流部门的ORDER.QUEUE中。

(一)准备数据库

  为了方便,我们将采用Spring提供的embedded数据库,默认情况下Spring采用HSQL作为后台数据库,虽然在本例中我们将采用HSQL的非XA的DataSource,但是通过Atomikos包装之后依然可以参与分布式事务。

  SQL脚本包含在createDB.sql文件中:

CREATE TABLE USER_ORDER(
ID INT NOT NULL,
ITEM_NAME VARCHAR (100) NOT NULL UNIQUE,
PRICE DOUBLE NOT NULL,
BUYER_NAME CHAR (32) NOT NULL,
MAIL_ADDRESS VARCHAR(500) NOT NULL,
PRIMARY KEY(ID)
);

  在Spring中配置DataSource如下:

    <jdbc:embedded-database id="dataSource">
        <jdbc:script location="classpath:createDB.sql"/>
    </jdbc:embedded-database>

(二)启动ActiveMQ

  我们将采用embedded的ActiveMQ,在测试之前启动ActiveMQ提供的BrokerService,在测试执行完之后关闭BrokerService。

  @BeforeClass
    public static void startEmbeddedActiveMq() throws Exception {
        broker = new BrokerService();
        broker.addConnector("tcp://localhost:61616");
        broker.start();
    }

    @AfterClass
    public static void stopEmbeddedActiveMq() throws Exception {
        broker.stop();
    }

(三)实现OrderService

  创建一个DefaultOrderService,该类实现了OrderService接口,并维护一个JmsTemplate和一个Hibernate的SessionFactory实例变量,分别用于Message的发送和数据库处理。

package davenkin;

import org.hibernate.SessionFactory;
import org.hibernate.classic.Session;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.annotation.Transactional;

public class DefaultOrderService  implements OrderService{
    private JmsTemplate jmsTemplate;
    private SessionFactory sessionFactory;

    @Override
    @Transactional
    public void makeOrder(Order order) {
        Session session = sessionFactory.getCurrentSession();
        session.save(order);
        jmsTemplate.convertAndSend(order);

    }

    @Required
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    @Required
    public void setSessionFactory(SessionFactory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }
}

(四)创建Order的Mapping配置文件

<?xml version="1.0"?>
<!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN"
        "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd">

<hibernate-mapping>
    <class name="davenkin.Order" table="USER_ORDER">
        <id name="id" type="long">
            <column name="ID" />
            <generator class="increment" />
        </id>
        <property name="itemName" type="string">
            <column name="ITEM_NAME" />
        </property>
        <property name="price" type="double">
            <column name="PRICE"/>
        </property>
        <property name="buyerName" type="string">
            <column name="BUYER_NAME"/>
        </property>
        <property name="mailAddress" type="string">
            <column name="MAIL_ADDRESS"/>
        </property>
    </class>
</hibernate-mapping>

(五)配置Atomikos事务

  在Spring的IoC容器中,我们需要配置由Atomikos提供的UserTransaction和TransactionManager,然后再配置Spring的JtaTransactionManager:

  <bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce">
        <constructor-arg>
            <props>
                <prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop>
            </props>
        </constructor-arg>
    </bean>

    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransactionService">
        <property name="forceShutdown" value="false" />
    </bean>

    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService">
        <property name="transactionTimeout" value="300" />
    </bean>

    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService">
        <property name="transactionManager" ref="atomikosTransactionManager" />
        <property name="userTransaction" ref="atomikosUserTransaction" />
    </bean>

    <tx:annotation-driven transaction-manager="jtaTransactionManager" />

(六)配置JMS

  对于JMS,为了能使ActiveMQ加入到分布式事务中,我们需要配置ActiveMQXAConnectionFactory,而不是ActiveMQConnectionFactory,然后再配置JmsTemplate,此外还需要配置MessageConvertor在Order对象和XML之间互转。

    <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="amqConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init">
        <property name="uniqueResourceName" value="XAactiveMQ" />
        <property name="xaConnectionFactory" ref="jmsXaConnectionFactory" />
        <property name="poolSize" value="5"/>
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="amqConnectionFactory"/>
        <property name="receiveTimeout" value="2000" />
        <property name="defaultDestination" ref="orderQueue"/>
        <property name="sessionTransacted" value="true" />
        <property name="messageConverter" ref="oxmMessageConverter"/>
    </bean>

    <bean id="orderQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="ORDER.QUEUE"/>
    </bean>

    <bean id="oxmMessageConverter"
          class="org.springframework.jms.support.converter.MarshallingMessageConverter">
        <property name="marshaller" ref="marshaller"/>
        <property name="unmarshaller" ref="marshaller"/>
    </bean>

    <oxm:jaxb2-marshaller id="marshaller">
        <oxm:class-to-be-bound name="davenkin.Order"/>
    </oxm:jaxb2-marshaller>

(七)测试

  在测试中,我们首先通过(二)中的方法启动ActiveMQ,再调用DefaultOrderService,最后对数据库和QUEUE进行验证:

   @Test
    public void makeOrder(){
        orderService.makeOrder(createOrder());
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        assertEquals(1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));
        String dbItemName = jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String.class);
        String messageItemName = ((Order) jmsTemplate.receiveAndConvert()).getItemName();
        assertEquals(dbItemName, messageItemName);
    }

    @Test(expected = IllegalArgumentException.class)
    public void failToMakeOrder()
    {
        orderService.makeOrder(null);
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        assertEquals(0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));
        assertNull(jmsTemplate.receiveAndConvert());
    }

时间: 2024-08-09 23:48:19

java分布式事务的相关文章

[java][db]JAVA分布式事务原理及应用

JTA(Java Transaction API)允许应用程序执行分布式事务处理--在两个或多个网络计算机资源上访问并且更新数据.JDBC驱动程序的JTA支持极大地增强了数据访问能力. 本文的目的是要提供一个关于的Java事务处理API(JTA)的高级的概述,以及与分布式事务相关的内容.一个事务处理定义了一个工作逻辑单元,要么彻底成功要么不产生任何结果. 一个分布式事务处理只是一个在两个或更多网络资源上访问和更新数据的事务处理,因此它在那些资源之间必然是等价的.在本文中,我们主要关心的是如何处理

JAVA分布式事务小结

分布式事务处理(  Distributed Transaction Processing  ,  DTP  )涉及多个分布在不同地方的数据库,但对数据库的操作必须全部被提交或者回滚.只要任一数据库操作时失败,所有参与事务的数据库都需要回滚. 举个例子,A服务部署在tomcat上,执行insert a并调用B和C接口,   B服务部署在weblogic上,执行insert b, C服务部署在Nginx,执行insert c. 现在要执行A,并且在执行到insert c时更新失败, 分布式事务的要求

JAVA分布式事务原理及应用(转)

JTA(Java Transaction API)允许应用程序执行分布式事务处理--在两个或多个网络计算机资源上访问并且更新数据.JDBC驱动程序的JTA支持极大地增强了数据访问能力. 本文的目的是要提供一个关于的Java事务处理API(JTA)的高级的概述,以及与分布式事务相关的内容.一个事务处理定义了一个工作逻辑单元,要么彻底成功要么不产生任何结果. 一个分布式事务处理只是一个在两个或更多网络资源上访问和更新数据的事务处理,因此它在那些资源之间必然是等价的.在本文中,我们主要关心的是如何处理

Java 分布式事务

0 引言 本文主要介绍java中分布式事务以及对应的解决方案. 1 分布式事务产生的原因 1.1 数据库分库分表 当数据库单表一年产生的数据超过1000W,那么就要考虑分库分表,具体分库分表的原理在此不做解释,以后有空详细说,简单的说就是原来的一个数据库变成了多个数据库.这时候,如果一个操作既访问01库,又访问02库,而且要保证数据的一致性,那么就要用到分布式事务. 1.2 SOA优化 所谓的SOA化,就是业务的服务化.比如原来单机支撑了整个电商网站,现在对整个网站进行拆解,分离出了订单中心.用

java分布式事务,及解决方案

1.什么是分布式事务 分布式事务就是指事务的参与者.支持事务的服务器.资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上.以上是百度百科的解释,简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败.本质上来说,分布式事务就是为了保证不同数据库的数据一致性. 2.分布式事务的产生的原因 2.1.数据库分库分表 当数据库单表一年产生的数据超过1000W,那么就要考虑分库分表,具体分库分表的

Java事务处理全解析(八)——分布式事务入门例子(Spring+JTA+Atomikos+Hibernate+JMS)

在本系列先前的文章中,我们主要讲解了JDBC对本地事务的处理,本篇文章将讲到一个分布式事务的例子. 请通过以下方式下载github源代码: git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git 本地事务和分布式事务的区别在于:本地事务只用于处理单一数据源事务(比如单个数据库),分布式事务可以处理多种异构的数据源,比如某个业务操作中同时包含了JDBC和JMS或者某个操作需要访问多个不同的数据库. Java

Java事务(七) - 分布式事务 - spring + JTA + jotm

一. 前言: 在写这篇博客之前,我们需要弄清楚两个概念:本地事务和分布式事务. 本地事务:只处理单一数据源,比如单个数据库. 分布式事务:处理多种异构的数据源, 比如某个业务操作中同时包含JDBC和JMS或者某个操作需要访问多个不同的数据库. Java通过JTA完成分布式事务, JTA本身只是一种规范, 本篇博客将使用JOTM作为实现, 后续还会使用Atomikos实现. 二. 业务背景: 假定我们有这样一个需求:当我们新建一个用户的时候需要往一个DB中插入一条用户记录,还需要往另一个DB中记录

java事务(三)——自己实现分布式事务

在上一篇<java事务(二)——本地事务>中已经提到了事务的类型,并对本地事务做了说明.而分布式事务是跨越多个数据源来对数据来进行访问和更新,在JAVA中是使用JTA(Java Transaction API)来实现分布式的事务管理的.但是在本篇中并不会说明如何使用JTA,而是在不依赖其他框架以及jar包的情况下自己来实现分布式事务,作为对分布式事务的一个理解. 假设现在有两个数据库,可以是在一台机器上也可以是在不同机器上,现在要向其中一个数据库更新用户账户信息,另外一个数据库新增用户的消费信

java微服务架构的分布式事务解决方案

java微服务架构的分布式事务解决方案 课程目录如下: 1.课程介绍20分钟2.解决方案的效果演示(结合支付系统真实应用场景)45分钟3.常用的分布式事务解决方案介绍47分钟4.消息发送一致性(可靠消息的前提保障)20分钟5.消息发送一致性的异常流程处理16分钟6.常规MQ队列消息的处理流程和特点12分钟7.消息重复发送问题及业务接口的幂等性设计18分钟8.可靠消息最终一致性方案1(本地消息服务)的设计19分钟9.可靠消息最终一致性方案2(独立消息服务)的设计24分钟10.可靠消息服务的设计与实