通过Java操作ActiveMQ的代码记录

ActiveMQ的数据发送类

import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;

public class ActiveMQSend {

    private final String ip;
    private final String port;
    private PooledConnection pooledConnection;

    /**
     * 构造方法(传入需要连接的IP和端口)
     *
     * @param ip (AvctiveMQ的服务IP)
     * @param port (ActiveMQ的服务端口)
     */
    public ActiveMQSend(String ip, String port) {
        this.ip = ip;
        this.port = port;
        this.init();
    }

    /**
     * 初始化ActiveMQ的连接池
     */
    private void init() {
        try {
            String[] ips = this.ip.split(",");
            String[] ports = this.port.split(",");
            StringBuilder tcpLink = new StringBuilder();
            for (int i = 0; i < ips.length; i++) {
                tcpLink.append("tcp://").append(ips[i]).append(":").append(ports[i]).append(",");
            }
            String mqLink = tcpLink.toString();
            if (tcpLink.length() > 0) {
                if (‘,‘ == tcpLink.charAt(tcpLink.length() - 1)) {
                    mqLink = tcpLink.substring(0, tcpLink.length() - 1);
                }
            }

            String url = String.format("failover:(%s)?initialReconnectDelay=1000&timeout=3000&startupMaxReconnectAttempts=2", mqLink);
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
            factory.setMaxThreadPoolSize(50);
            PooledConnectionFactory poolFactory = new PooledConnectionFactory(factory);
            pooledConnection = (PooledConnection) poolFactory.createConnection();
            pooledConnection.start();
        } catch (Exception ex) {
            LogUtil.error(ex);
            this.destroy();
        }
    }

    /**
     * 向ActiveMQ中发送数据
     *
     * @param needSendMsg 需要发送的数据信息
     * @param sendMQName 需要发送到的队列名称
     */
    public void send(String needSendMsg, String sendMQName) {
        if (this.pooledConnection == null) {
            this.init();
        }

        if (this.pooledConnection != null) {
            try {
                Session session = this.pooledConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue(sendMQName);
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                TextMessage message = session.createTextMessage(needSendMsg);
                producer.send(message);
                session.close();
            } catch (Exception ex) {
                LogUtil.error(ex);
                this.destroy();
            }
        }
    }

    /**
     * 回收连接池
     */
    public void destroy() {
        try {
            if (pooledConnection != null) {
                pooledConnection.stop();
            }
        } catch (Exception e) {
            LogUtil.error(e);
        }
        try {
            if (pooledConnection != null) {
                pooledConnection.close();
            }
        } catch (Exception e) {
            LogUtil.error(e);
        }
        pooledConnection = null;
    }
}

调用发送

public static void main(String[] args) {

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                ActiveMQSend sen = new ActiveMQSend("127.0.0.1", "61616");
                for (int i = 0; i < 50000; i++) {
                    String msg = String.format("这是TestQueue 2 第 %s 条发送的数据!", i);
                    sen.send(msg, "TestQueue2");
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                ActiveMQSend sen = new ActiveMQSend("127.0.0.1", "61616");
                for (int i = 0; i < 50000; i++) {
                    String msg = String.format("这是TestQueue 1 第 %s 条发送的数据!", i);
                    sen.send(msg, "TestQueue1");
                }
            }
        });

        t1.start();
        t2.start();
    }

ActiveMQ数据接收类

import javax.jms.MessageConsumer;
import javax.jms.Destination;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;

public class Receiver {

    private PooledConnection pooledConnection;

    private void init() {
        try {
            String url = "failover:(tcp://192.168.10.219:61616)?initialReconnectDelay=1000&timeout=3000&startupMaxReconnectAttempts=2";
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
            factory.setMaxThreadPoolSize(100);
            PooledConnectionFactory poolFactory = new PooledConnectionFactory(factory);
            pooledConnection = (PooledConnection) poolFactory.createConnection();
            pooledConnection.start();
        } catch (Exception ex) {
            Log.error(ex);
            this.destroy();
        }
    }

    public void receive(String queueName) {
        if (this.pooledConnection == null) {
            this.init();
        }
        if (this.pooledConnection != null) {
            try {
                Session session = this.pooledConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue(queueName);
                MessageConsumer consumer = session.createConsumer(destination);
                while (true) {
                    TextMessage message = (TextMessage) consumer.receive(10);
                    if (null != message) {
                        System.out.println("收到消息" + message.getText());
                    }
                }
            } catch (Exception ex) {
                Log.info(ex.getMessage());
                this.destroy();
            }
        }
    }

    public void destroy() {
        try {
            if (pooledConnection != null) {
                pooledConnection.stop();
            }
        } catch (Exception e) {
            Log.error(e);
        }
        try {
            if (pooledConnection != null) {
                pooledConnection.close();
            }
        } catch (Exception e) {
            Log.error(e);
        }
        pooledConnection = null;
    }
}

数据接收类的调用

public static void main(String[] args) {
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                Receiver r1 = new Receiver();
                r1.receive("TestQueue2");
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                Receiver r2 = new Receiver();
                r2.receive("TestQueue1");
            }
        });
        t1.start();
        t2.start();
    }
时间: 2024-10-07 18:46:43

通过Java操作ActiveMQ的代码记录的相关文章

Java操作Redis(代码演示)

redis-demo演示 一.创建一个maven工程 1.在pom.xml中引入相关redis的相关依赖 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://mave

java 操作oracle 建表,更新记录

1.  建立表的类 import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.PreparedStatement; public class lx01{ public static void main(String[] args) throws SQLException, ClassNotFoundE

java学习中,匿名函数、构造方法、构造代码块、构造方法中调用构造方法(java 学习中的小记录)

java学习中,匿名函数.构造方法.构造代码块.构造方法中调用构造方法(java 学习中的小记录) 作者:王可利(Star·星星) 匿名函数 匿名对象:没有名字的对象 匿名对象使用的注意点: 1.一般不会用匿名对象给属性赋值,无法获取属性值,每次new 都是一个新的对象. 2.匿名对象永远都不可能是一个对象. 如:person new().name = "星星":是不行的 匿名对象的好处:书写简单. 匿名对象使用的场景: 1.如果一个对象调用一个方法一次的时候,就可以用匿名对象来调用.

java学习中,DVD管理系统纯代码(java 学习中的小记录)

java学习中,DVD管理系统纯代码(java 学习中的小记录)作者:王可利(Star·星星) class DvdMain{ public static void main (String[] args){ DvdMgr dvd = new DvdMgr(); //初始化dvd dvd.initial(); //开始进入切换菜单 dvd.startMenu(); } } 1 class DvdSet { 2 3 //定义三个属性 4 String[] name = new String[50];

java操作memcached入门教程demo代码

原文:java操作memcached入门教程demo代码 源代码下载地址:http://www.zuidaima.com/share/1550463754996736.htm 参考地址: http://www.open-open.com/lib/view/open1357831114183.html http://tech.idv2.com/2008/07/10/memcached-001/ 感谢 京-java牛-ID号1  感谢 锡-SuperPrivate-195 在网上搜索了部分代码,一个

hbase-0.98整合hadoop-2.6,附java操作代码

cd /opt/hbase-0.98.13-hadoop2/conf vi hbase-env.sh export JAVA_HOME=/opt/jdk1.7.0_75 vi hbase-site.xml <!--设置hbase根目录,master为机器的hostname--><property><name>hbase.rootdir</name><value>hdfs://master:9000/hbase</value></

java操作hbase例子

hbase安装方法请参考:hbase-0.94安装方法详解 hbase常用的shell命令请参考:hbase常用的shell命令例子 java操作hbase,在eclipse中创建一个java项目,将hbase安装文件根目录的jar包和lib目录下jar包导入项目,然后就可以编写java代码操作hbase了.下面代码给出来一个简单的示例 /** * @date 2015-07-23 21:28:10 * @author sgl */ package com.songguoliang.hbase;

java写的迷宫代码

迷宫代码:截图如下:package com.zxl.maze; /* * 抽象类表示选择不同的算法*/ public abstract class AbstractMap { /* * 得到数据*/ public abstract boolean[][] getData(int m,int n); /* * 重置*/ public abstract void reset(int m,int n); } package com.zxl.maze; /* *深度优先,生成迷宫*/ import ja

memcached—Java操作Memcached实例

前面博客介绍了如何在Windows操作系统中安装Memcached,总结一下如何使用Java操作Memcached实例: 代码一: package com.ghj.packageoftool; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import jav