ActiveMQ queue 代码示例

生产者:

package com.111.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSProducer {
    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory;
        //连接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session = null;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //消息队列名称
        String queueName = "helloWord"; 

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //创建一个连接自定义队列名称的消息队列
            destination = session.createQueue(queueName);
            //创建消息生产者
            messageProducer = session.createProducer(destination);
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    session.close();
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    /**
     * 发送消息
     * @param session
     * @param messageProducer  消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
        for (int i = 0; i < SENDNUM; i++) {
            //创建一条文本消息
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
            System.out.println("发送消息:Activemq 发送消息" + i);

            //通过消息生产者发出消息
            messageProducer.send(message);
        }

    }
}

消费者:

package com.111.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumer {
    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory;
        //连接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session;
        //消息的目的地
        Destination destination;
        //消息的消费者
        MessageConsumer messageConsumer;
        //消息队列名称
        String queueName = "helloWord";
        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个连接自定义队列名称的消息队列
            destination = session.createQueue(queueName);
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                if(textMessage != null){
                    System.out.println("收到的消息:" + textMessage.getText());
                }else {
                    break;
                }
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

多线程生产者:

package com.111.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSProducerMultithreading implements Runnable{
    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //发送的消息数量
    private static final int SENDNUM = 3;

    /**
     * 发送消息
     * @param session
     * @param messageProducer  消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
        for (int i = 0; i < SENDNUM; i++) {
            //获取当前线程id
            String threadId = Thread.currentThread().getId()+"";
            //创建一条文本消息
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i+"生产者线程编号="+threadId);
            //控制台打印
            System.out.println("ActiveMQ 发送消息" +i+"生产者线程编号="+threadId);
            //通过消息生产者发出消息
            messageProducer.send(message);
        }

    }
    @Override
    public void run() {
        //连接工厂
        ConnectionFactory connectionFactory;
        //连接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session = null;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //消息队列名称
        String queueName = "Multithreading";
        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //创建一个名称为HelloWorld的消息队列
            destination = session.createQueue(queueName);
            //创建消息生产者
            messageProducer = session.createProducer(destination);
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    session.close();
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

多线程消费者:

package com.111.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumerMultithreading implements Runnable{
    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    @Override
    public void run() {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消费者

        //消息队列名称
        String queueName = "Multithreading";

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个连接HelloWorld的消息队列
            destination = session.createQueue(queueName);
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);
            String threadId = Thread.currentThread().getId()+"";
            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                if(textMessage != null){
                    System.out.println("收到的消息:" + textMessage.getText()+" 消费者线程编号="+threadId);
                }else {
                    break;
                }
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

多线程生产者测试类:

package com.111.activemq;

public class JMSProducerMultithreadingTest {
    public static void main(String[] args) {

        JMSProducerMultithreading jpm = new JMSProducerMultithreading();
        //启动10个生产者线程
        for(int i = 0 ; i < 10 ; i++){
            Thread t = new Thread(jpm);
            t.start();
        }

    }
}

多线程消费者测试类:

package com.111.activemq;

public class JMSConsumerMultithreadingTest {
    public static void main(String[] args) {

        JMSConsumerMultithreading jcm = new JMSConsumerMultithreading();
        //启动3个消费者者线程
        for(int i = 0 ; i < 3 ; i++){
            Thread t = new Thread(jcm);
            t.start();
        }

    }
}
时间: 2024-12-20 10:03:03

ActiveMQ queue 代码示例的相关文章

计算DXFReader中多边形的面积代码示例

在DXFReader中, 一般的多边形的面积计算绝对值 其中K表是顶点的数目,它们的坐标,用于在求和和, 所以用下面的代码就可以计算出一个封闭的多段线的区域: view source print? 01 Dim Vertex As Object 02 Dim Entity As Object 03 Dim k As Long 04 Dim i As Long 05 Dim Area As Single 06 07 With DXFReader1 08 09  For Each Entity In

代码示例:一些简单技巧优化JavaScript编译器工作详解,让你写出高性能运行的更快JavaScript代码

告诉你一些简单的技巧来优化JavaScript编译器工作,从而让你的JavaScript代码运行的更快.尤其是在你游戏中发现帧率下降或是当垃圾回收器有大量的工作要完成的时候. 单一同态: 当你定义了一个两个参数的函数,编译器会接受你的定义,如果函数参数的类型.个数或者返回值的类型改变编译器的工作会变得艰难.通常情况下,单一同态的数据结构和个数相同的参数会让你的程序会更好的工作. function example(a, b) { // 期望a,b都为数值类型 console.log(++a * +

jquery操作单选钮代码示例

jquery操作单选钮代码示例:radio单选按钮是最重要的表单元素之一,下面介绍一下常用的几个jquery对radio单选按钮操作.一.取消选中: $(".theclass").each(function(){ if($(this).attr('checked')) { $(this).attr('checked',false); } }); 以上代码可以将class属性值为theclass的被选中单选按钮取消选中.二.获取被选中的单选按钮的值: var val=$('.thecla

Python实现各种排序算法的代码示例总结

Python实现各种排序算法的代码示例总结 作者:Donald Knuth 字体:[增加 减小] 类型:转载 时间:2015-12-11我要评论 这篇文章主要介绍了Python实现各种排序算法的代码示例总结,其实Python是非常好的算法入门学习时的配套高级语言,需要的朋友可以参考下 在Python实践中,我们往往遇到排序问题,比如在对搜索结果打分的排序(没有排序就没有Google等搜索引擎的存在),当然,这样的例子数不胜数.<数据结构>也会花大量篇幅讲解排序.之前一段时间,由于需要,我复习了

领域驱动开发推荐代码示例 — Microsoft NLayerApp

简介: Microsoft NLayerApp是由微软西班牙团队出品的基于.NET 4.0的“面向领域N层分布式架构”代码示例,在codeplex上的地址是:http://microsoftnlayerapp.codeplex.com/. 架构图: 点击查看大图 代码下载:http://microsoftnlayerapp.codeplex.com/releases/view/56660 所用到的软件: - Microsoft Visual Studio 2010  - Microsoft Ex

Aspectj快速上手代码示例之Before,After,Around

本文不打算解释AOP的相关专业名词和概念,仅通过几个代码示例来展示Aspectj(对AOP实现的)的基本使用,并且使用的Aspectj是目前最新版本. 1.搭建环境 本文使用Maven来构建工程,通过aspectj-maven-plugin插件来编译*.aj文件至.class. Maven的具体配置: <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>aspectj-maven-plugin&

jxl创建Excel文件java代码示例

记得要下载 并 导入 jxl.jar 包,免积分下载地址:http://download.csdn.net/detail/u010011052/7561041 package Test; import java.io.*; import jxl.*; import jxl.format.Colour; import jxl.write.*; public class JXLTest { private static WritableWorkbook book; private static Wr

java 翻盖hashCode()深入探讨 代码示例

package org.rui.collection2.hashcode; /** * 覆盖hashcode * 设计HashCode时最重要的因素 就是:无论何时,对同一个对象调用HashCode都应该产生同样的值, * 如果你的HashCode方法依赖于对象中易变的数据,用户就要当心了,因为此数据发生变化 时 * HashCode就会生成一个不同的散列码,相当于产生一个不同的健 * 此外 也不应该使HashCode依赖于具有唯一性的对象信息,尤其是使用this的值,这只能很糟糕, * 因为这

python 之初学者的代码示例(短小精悍)(一)

学习Python也有个把月了,最近整理自己初学的代码示例,一个是为了增加自己对细节的把握,一个是让像我一样的初学者能够熟练地使用基础,基础的重要性就不说了,我希望自己能够把这些精巧的小而短的示例分享给大家,共同进步 #help(execfile) Help on built-in function execfile in module __builtin__: execfile(...) execfile(filename[, globals[, locals]]) Read and execu