ActiveMQ(三) 转

package pfs.y2017.m11.mq.activemq.demo03;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

    // ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    // ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    // ActiveMQ 的链接地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory factory;

    Connection connection;

    Session session;

    MessageProducer producer;

    Destination[] destinations;

    public Consumer() throws JMSException {
        factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    } 

    public static void main(String[] args) throws JMSException {
        Consumer consumer = new Consumer();
        String[] arg= {"aa","bb"};

        for (String stock : arg) {
        Destination destination = consumer.getSession().createTopic("STOCKS." + stock);
        MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
        messageConsumer.setMessageListener(new Listener());
        }
    }  

    public Session getSession() {
        return session;
    }  

}
package pfs.y2017.m11.mq.activemq.demo03;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;

public class Publisher {

    // ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    // ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    // ActiveMQ 的链接地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory factory;

    Connection connection;

    Session session;

    MessageProducer producer;

    Destination[] destinations;

    public Publisher() throws JMSException {
        factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
        connection = factory.createConnection();
        try {
            connection.start();
        } catch (JMSException jmse) {
            connection.close();
            throw jmse;
        }
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(null);
    }

    protected void setTopics(String[] stocks) throws JMSException {
        destinations = new Destination[stocks.length];
        for (int i = 0; i < stocks.length; i++) {
            destinations[i] = session.createTopic("STOCKS." + stocks[i]);
        }
    }

    protected void sendMessage(String[] stocks) throws JMSException {
        for (int i = 0; i < stocks.length; i++) {
            Message message = createStockMessage(stocks[i], session);
            System.out.println("Sending: " + ((ActiveMQMapMessage) message).getContentMap() + " on destination: "
                    + destinations[i]);
            producer.send(destinations[i], message);
        }
    }

    protected Message createStockMessage(String stock, Session session) throws JMSException {
        MapMessage message = session.createMapMessage();
        message.setString("stock", stock);
        message.setDouble("price", 1.00);
        message.setDouble("offer", 0.01);
        message.setBoolean("up", true);

        return message;
    }

    public static void main(String[] args) throws JMSException {
//        if (args.length < 1)
//            throw new IllegalArgumentException();

        // Create publisher
        Publisher publisher = new Publisher();

        String[] arg= {"aa","bb"};
        // Set topics
        publisher.setTopics(arg);

        for (int i = 0; i < 10; i++) {
            publisher.sendMessage(arg);
            System.out.println("Publisher ‘" + i + " price messages");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // Close all resources
        publisher.close();
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }
}
package pfs.y2017.m11.mq.activemq.demo03;

import java.text.DecimalFormat;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

public class Listener implements MessageListener {

    public void onMessage(Message message) {
        try {
            MapMessage map = (MapMessage) message;
            String stock = map.getString("stock");
            double price = map.getDouble("price");
            double offer = map.getDouble("offer");
            boolean up = map.getBoolean("up");
            DecimalFormat df = new DecimalFormat("#,###,###,##0.00");
            System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up ? "up" : "down"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
时间: 2024-11-12 13:39:24

ActiveMQ(三) 转的相关文章

ActiveMQ消息队列的搭建

今天来写下消息队列 一.首先介绍下什么是activeMQ? ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 主要特点: 1. 多种语言和协议编写客户端.语言: Java, C, C++, C#, Ruby, Perl, Python, PHP.应用协议: OpenWire,Sto

ActiveMQ 简介与安装

一. 概述与介绍 ActiveMQ 是Apache出品,最流行的.功能强大的即时通讯和集成模式的开源服务器.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现.提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能. 二. 特性 1. 多种语言和协议编写客户端.语言: Java.C.C++.C#.Ruby.Perl.Python.PHP.应用协议:OpenWire.Stomp REST.WS N

JMS学习(三)ActiveMQ Message Persistence(转)

1,JMS规范支持两种类型的消息传递:persistent and non-persistent.ActiveMQ在支持这两种类型的传递方式时,还支持消息的恢复.中间状态的消息(message are cached in memory) 2,ActiveMQ可将消息存储在三种类型介质中:file-based(存储在文件中).in-memory(存储在内存中).relational databases(存储在关系数据库中) 3,Persistence Message有何用处? Persistent

2015年12月10日 spring初级知识讲解(三)Spring消息之activeMQ消息队列

基础 JMS消息 一.下载ActiveMQ并安装 地址:http://activemq.apache.org/ 最新版本:5.13.0 下载完后解压缩到本地硬盘中,解压目录中activemq-core-5.13.0.jar,这就是ActiveMQ提供给我们的API. 在bin目录中,找到用于启动ActiveMQ的脚本,运行脚本后ActiveMQ就准备好了,可以使用它进行消息代理. 访问http://127.0.0.1:8161/admin/能看到如下则表示安装成功了. 二.在Spring中搭建消

ActiveMQ持久化消息的三种方式

详见:http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt362 本文只介绍三种方式,分别是持久化为文件,MYSql,Oracle.下面逐一介绍. A:持久化为文件 这个你装ActiveMQ时默认就是这种,只要你设置消息为持久化就可以了.涉及到的配置和代码有 <persistenceAdapter>            <kahaDB directory="${activemq.base}/data/kahadb

JMS学习(三)ActiveMQ Message Persistence

1,JMS规范支持两种类型的消息传递:persistent and non-persistent.ActiveMQ在支持这两种类型的传递方式时,还支持消息的恢复.中间状态的消息(message are cached in memory) 2,ActiveMQ可将消息存储在三种类型介质中:file-based(存储在文件中).in-memory(存储在内存中).relational databases(存储在关系数据库中) 3,Persistence Message有何用处? Persistent

activemq 实战三 了解连接器的URI-Understanding connector URIs

Before discussing the details of connectors and their role in the overall ActiveMQ architecture, it’s important to understand connector URIs. Uniform resource identifiers (URIs), as a concept, aren’t new, and you’ve probably used them over and over a

ActiveMQ入门系列三:发布/订阅模式

在上一篇<ActiveMQ入门系列二:入门代码实例(点对点模式)>中提到了ActiveMQ中的两种模式:点对点模式(PTP)和发布/订阅模式(Pub & Sub),详细介绍了点对点模式并用代码实例进行说明,今天就介绍下发布/订阅模式. 一.理论基础 发布/订阅模式的工作示意图: 消息生产者将消息(发布)到topic中,可以同时有多个消息消费者(订阅)消费该消息. 和点对点方式不同,发布到topic的消息会被所有订阅者消费. 当生产者发布消息,不管是否有消费者,都不会保存消息. 一定要先

ActiveMQ(三)——理解和掌握JMS

一.JMS基本概念 JMS是什么JMS Java Message Service,Java消息服务,是JavaEE中的一个技术. JMS规范JMS定义了Java中访问消息中间件的接囗,并没有给予实现,实现JMS接囗的消息中间件称为JMS Provider,例如ActiveMQ?JMS provider:实现JMS接囗和规范的消息中间件?JMS message:JMS的消息,JMS消息由以下三部分组成:1:消息头:每个消息头字段都有相应的getter和setter方法2消息属性:如果需要除消息头字