Jms消费者模式

JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。

Apache官网上下载activeMQ(http://activemq.apache.org/download.html),解压运行bin目录下activemq.bat文件启动activeMQ

对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

1.      点对点的消息模式(Point to Point Messaging)

下面的JMS对象在点对点消息模式中是必须的:

a.      队列(Queue) – 一个提供者命名的队列对象,客户端将会使用这个命名的队列对象

b.     队列链接工厂(QueueConnectionFactory) – 客户端使用队列链接工厂创建链接队列

ConnectionQueue来取得与JMS点对点消息提供者的链接。

c.      链接队列(ConnectionQueue) – 一个活动的链接队列存在在客户端与点对点消息提供者之

间,客户用它创建一个或者多个JMS队列会话(QueueSession)

d.     队列会话(QueueSession) – 用来创建队列消息的发送者与接受者(QueueSenderand

QueueReceiver)

e.     消息发送者(QueueSender 或者MessageProducer)– 发送消息到已经声明的队列

f.       消息接受者(QueueReceiver或者MessageConsumer) – 接受已经被发送到指定队列的消息

2.      发布订阅模式(publish – subscribe Mode)

a.      主题Topic(Destination) – 一个提供者命名的主题对象,客户端将会使用这个命名的主题对象

b.     主题链接工厂(TopciConnectionFactory) – 客户端使用主题链接工厂创建链接主题

ConnectionTopic来取得与JMS消息Pub/Sub提供者的链接。

c.      链接主题(ConnectionTopic) – 一个活动的链接主题存在发布者与订阅者之间

d.     会话(TopicSession) – 用来创建主题消息的发布者与订阅者 (TopicPublisher  and

TopicSubscribers)

e.     消息发送者MessageProducer) – 发送消息到已经声明的主题

f.       消息接受者(MessageConsumer) – 接受已经被发送到指定主题的消息

以感知数据为例子

activemq.properties配置文件:

topic=csp.jxmessages
ipaddress=10.100.70.102
#ipaddress=localhost
port=61616
username=user
password=user

消费者模式:

package com.ship;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.stereotype.Controller;
import com.common.utils.PropertyLoader;

@Controller
public class JmsConsumer implements MessageListener {
    private java.sql.Connection con = null;
    private static JmsConsumer instance = null;

    public JmsConsumer() {
        if (instance != null)
            throw new RuntimeException();
        instance = this;
    }

    public JmsConsumer getInstance() {
        return instance;
    }

    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage txtMsg = (TextMessage) message;
            try {
                String content = txtMsg.getText();
                System.out.println("数据是:\n" + content);
                String[] arrcontent = content.split("\n");
                // con.setAutoCommit(false);
                // Statement statement = con.createStatement();
                for (int i = 0; i < arrcontent.length; i++) {
                    String[] shipinfo = arrcontent[i].split(",");
                    // System.out.println(shipinfo[0]); //测试船名
                    // 存储过程
                    // String sql = "call ganzhi()";
                    // 添加批处理
                    // statement.addBatch(sql);
                }
                // 批处理 因为存在读写延迟
                // statement.executeBatch();
                // con.commit();
                // statement.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @PostConstruct
    public void init() {
        try {
            // 数据库连接
            buildSqlCon();
            // jms处理
            buildJmsCon();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void buildSqlCon() {
        // TODO Auto-generated method stub
        Properties jdbcprop = PropertyLoader.getPropertiesFromClassPath(
                "jdbc.properties", "UTF-8");
        String driverClassName = jdbcprop.getProperty("jdbc.driverClassName");
        String jdbcurl = jdbcprop.getProperty("jdbc.url");
        String uname = jdbcprop.getProperty("jdbc.username");
        String pwd = jdbcprop.getProperty("jdbc.password");
        try {
            Class.forName(driverClassName);
            con = DriverManager.getConnection(jdbcurl + "&user=" + uname
                    + "&password=" + pwd);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public void buildJmsCon() {
        // 消费者的主要流程
        Connection connection = null;
        Properties properties = PropertyLoader.getPropertiesFromClassPath(
                "activemq.properties", "UTF-8");
        String topic = properties.getProperty("topic");
        String username = properties.getProperty("username");
        String password = properties.getProperty("password");
        String ipaddress = properties.getProperty("ipaddress");
        String port = properties.getProperty("port");
        String brokerURL = "failover://tcp://" + ipaddress + ":" + port;
        try {
            // 1.初始化connection工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    username, password, brokerURL);

            // 2.创建Connection
            connection = connectionFactory.createConnection();

            // 3.打开连接
            connection.start();

            System.out.println("连接成功..................");

            // 4.创建session
            Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);

            // 5.创建消息目标
            Destination destination = session.createTopic(topic);

            // 6.创建消费者
            MessageConsumer consumer = session.createConsumer(destination);

            // 7.配置监听
            consumer.setMessageListener(getInstance());

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void destroy() {
        if (con != null)
            try {
                con.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
    }

}

测试:

数据是:
浙余杭货01803,120.16185,30.098183,0.0,140,140,2016-06-02 09:40:13:000,,WM2HZ803244,1,0
浙富阳货00759,119.844083,29.932533,null,0,0,2016-06-02 09:40:13:000,,WM2HZ804658,1,0
浙长兴货1952,120.320117,30.630767,0.0,0,0,2016-06-02 09:40:13:000,,WM2HZ820795,1,0
浙富阳货00759,119.844083,29.932533,0.0,0,0,2016-06-02 09:40:13:000,,WM2HZ804658,1,0
浙越城货0677,118.859033,32.171567,14.0,50,50,2016-06-02 09:40:13:000,,WM2HZ814170,1,0
浙钱江货35077,120.122567,30.400967,0.0,340,340,2016-06-02 09:40:13:000,,WM2HZ802275,1,0
浙越城货0677,118.859033,32.171567,14.0,50,50,2016-06-02 09:40:13:000,,WM2HZ814170,1,0
浙余杭货01667,120.12385,30.402033,0.0,20,20,2016-06-02 09:40:13:000,,WM2HZ801640,1,0
浙越城货0670,120.736333,32.040183,0.0,210,210,2016-06-02 09:40:14:000,,WM2HZ817006,1,0
浙安吉货2389,120.149133,30.636817,0.0,220,220,2016-06-02 09:40:14:000,,WM2HZ811204,1,0
浙钱江货00628,120.053833,30.047217,8.0,210,210,2016-06-02 09:40:14:000,,WM2HZ800153,1,0
浙绍运6-25,120.473033,30.6393,6.0,230,230,2016-06-02 09:40:14:000,,WM2HZ801842,1,0
浙上虞货0553,119.705217,29.82325,null,330,330,2016-06-02 09:40:14:000,,WM2HZ801018,1,0
浙余杭货01782,120.07065,30.399117,0.0,20,20,2016-06-02 09:40:14:000,,WM2HZ801764,1,0
浙桐庐货00483,120.217017,30.269617,0.0,20,20,2016-06-02 09:40:14:000,,WM2HZ803645,1,0
汇海集026,120.1237,30.401283,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ811220,1,0
海源186,121.394967,31.485683,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ815369,1,0
任强1号,120.085533,30.0672,0.0,30,30,2016-06-02 09:40:14:000,,WM2HZ818227,1,0
金顺668,120.166217,30.125183,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ819091,1,0
浙富阳货00636,119.705533,29.823683,null,200,200,2016-06-02 09:40:14:000,,WM2HZ804624,1,0
浙临安游026,119.769083,30.230167,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ817016,1,0
浙萧山货23922,120.29145,30.501783,7.0,80,80,2016-06-02 09:40:14:000,,WM2HZ818998,1,0
浙萧山货23922,120.29145,30.501783,null,80,80,2016-06-02 09:40:14:000,,WM2HZ818998,1,0
浙嘉善货03216,120.141717,30.355,0.0,280,280,2016-06-02 09:40:14:000,,WM2HZ800918,1,0
浙萧山货03166,119.835467,29.907917,0.0,310,310,2016-06-02 09:40:14:000,,WM2HZ801393,1,0
浙萧山货23751,120.156167,30.100917,0.0,100,100,2016-06-02 09:40:14:000,,WM2HZ802551,1,0
合肥武运628,120.16455,30.125717,null,0,0,2016-06-02 09:40:14:000,,WM2HZ802141,1,0

时间: 2024-10-07 19:29:41

Jms消费者模式的相关文章

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

并发编程基础之生产者消费者模式

一:概念 生产者消费者模式是java并发编程中很经典的并发情况,首先有一个大的容器,生产者put元素到 容器中,消费者take元素出来,如果元素的数量超过容器的容量时,生产者不能再往容器中put元素 ,处于阻塞状态,如果元素的数量等于0,则消费者不能在从容器中take数据,处于阻塞状态. 二:示例 /** * */ package com.hlcui.main; import java.util.LinkedList; import java.util.concurrent.ExecutorSe

生产者消费者模式

什么是生产者消费者模式   在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者.在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式.结构图如下: 生产者消费者模式的优点 1.解耦 假设生产者和消费者分别是两个类.如果让生产者直接调用消费者的某个方法,那

同步函数 生产者和消费者模式 加强版(多人生产和多人消费)

曾经搞了半天, 生产者和消费者模式  加强版(多人生产 多人消费 ).. 以前的代码格式就不再吐槽了(以后努力改进) //输出结果是一个无限循环 import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 多个生产者&多个消费者模式 * 多个生产者不断生产,多个消费者不停的消费

使用BlockingQueue的生产者消费者模式

BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.使用场景. 首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出:在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享.强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程. BlockingQueue的

生产者消费者模式(转)

本文转载自博文系列架构设计:生产者/消费者模式.文中对原文格式进行了稍加整理. 概述 今天打算来介绍一下“生产者/消费者模式”,这玩意儿在很多开发领域都能派上用场.由于该模式很重要,打算分几个帖子来介绍.今天这个帖子先来扫盲一把.如果你对这个模式已经比较了解,请跳过本扫盲帖,直接看下一个帖子(关于该模式的具体应用) . 看到这里,可能有同学心中犯嘀咕了:在四人帮(GOF)的23种模式里面似乎没听说过这种嘛!其实GOF那经典的23种模式主要是基于OO的(从书名<Design Patterns: E

生产者消费者模式(吃包子例子)

生产者-消费者问题是一个经典的进程同步问 题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制.在同一个进程地址空间内执行的两个线程生产者线程生产物品,然后将物品放置在一个空 缓冲区中供消费者线程消费.消费者线程从缓冲区中获得物品,然后释放缓冲区.当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费 者线程释放出一个空缓冲区.当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来. 生产者消费者模式是并发.多线程编程中经典的设计

Java 并发编程(四)阻塞队列和生产者-消费者模式

阻塞队列 阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法.如果队列已经满了,那么put方法将阻塞直到有空间可以用:如果队列为空,那么take方法将一直阻塞直到有元素可用.队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞.一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式. 意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代

day11(多线程,唤醒机制,生产消费者模式,多线程的生命周期)

A:进程: 进程指正在运行的程序.确切的来说,当一个程序进入内存运行,即变成一个进程,进程是处于运行过程中的程序,并且具有一定独立功能. B:线程: 线程是进程中的一个执行单元,负责当前进程中程序的执行,一个进程中至少有一个线程.一个进程中是可以有多个线程的,这个应用程序也可以称之为多线程程序. C:简而言之: 一个程序运行后至少有一个进程,一个进程中可以包含多个线程 线程实现 实现的两种方式 继承Thread public class MyThread extends Thread{ @Ove