前言
github地址:https://github.com/AndyFlower/web-back/tree/master/ActiveMq01
下载ActiveMQ :http://activemq.apache.org/download.html
放到自己的目录,大致目录如下:
- bin存放的是脚本文件
- conf存放的是基本配置文件
- data存放的是日志文件
- docs存放的是说明文档
- examples存放的是简单的实例
- lib存放的是activemq所需jar包
- webapps用于存放项目的目录
然后启动ActiveMQ:比如我的目录是:D:\develop tools\apache-activemq-5.15.2\bin\win64下的activemq.bat
出现如下消息则说明启动成功了。
登录上述启动成功的地址:http://127.0.0.1:8161用户名和密码是admin:admin
一、创建一个java项目,加入maven依赖
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency> </dependencies>
二、项目目录如下
三、编写具体的生产者和消费者
package com.slp.activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author sanglp * @create 2017-12-05 11:30 * @desc 生产者 **/ public class Producer { //ActiveMQ默认用户名 private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; //ActiveMQ默认登陆密码 private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD; //ActiveMQ链接地址 private static final String BROKEN_URL= ActiveMQConnection.DEFAULT_BROKER_URL; AtomicInteger count = new AtomicInteger(0); //链接工厂 ConnectionFactory connectionFactory; //链接对象 Connection connection; //事务管理 Session session; ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<MessageProducer>(); public void init(){ try { //创建一个链接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); //从工厂中创建一个链接 connection = connectionFactory.createConnection(); //开启链接 connection.start(); //创建一个事务(通过参数设置事务的级别) session = connection.createSession(true,Session.SESSION_TRANSACTED); } catch (JMSException e) { e.printStackTrace(); } } public void sendMessage(String disname){ try { //创建一个消息队列 Queue queue = session.createQueue(disname); //消息生产者 MessageProducer messageProducer = null; if (threadLocal.get()!=null){ messageProducer = threadLocal.get(); }else { messageProducer = session.createProducer(queue); threadLocal.set(messageProducer); } while (true){ Thread.sleep(1000); int num = count.getAndIncrement(); //创建一条消息 TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+"productor:我正在生产东西!,count:"+count); System.out.println(Thread.currentThread().getName()+"productor:我正在生产东西!,count:"+count); //发送消息 messageProducer.send(msg); //提交事务 session.commit(); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
package com.slp.activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author sanglp * @create 2017-12-05 11:30 * @desc 消费者 **/ public class Consumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory; Connection connection; Session session; ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<MessageConsumer>(); AtomicInteger count = new AtomicInteger(); public void init(){ try { connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { e.printStackTrace(); } } public void getMessage(String disname){ try { Queue queue = session.createQueue(disname); MessageConsumer consumer ; if(threadLocal.get()!=null){ consumer = threadLocal.get(); }else{ consumer = session.createConsumer(queue); threadLocal.set(consumer); } while(true){ Thread.sleep(1000); TextMessage msg = (TextMessage) consumer.receive(); if(msg!=null) { msg.acknowledge(); System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement()); }else { break; } } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
四、运行测试
package com.slp.activemq; /** * @author sanglp * @create 2017-12-05 11:31 * @desc mq测试 **/ public class TestMq { public static void main(String[] args){ Producer producer = new Producer(); producer.init(); TestMq testMq = new TestMq(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //Thread 1 new Thread(testMq.new ProducorMq(producer)).start(); //Thread 2 new Thread(testMq.new ProducorMq(producer)).start(); //Thread 3 new Thread(testMq.new ProducorMq(producer)).start(); //Thread 4 new Thread(testMq.new ProducorMq(producer)).start(); //Thread 5 new Thread(testMq.new ProducorMq(producer)).start(); } private class ProducorMq implements Runnable{ Producer producer; public ProducorMq(Producer producer){ this.producer = producer; } public void run() { while(true){ try { producer.sendMessage("Jaycekon-MQ"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
运行消费者
package com.slp.activemq; /** * @author sanglp * @create 2017-12-05 11:31 * @desc 消费者测试 **/ public class TestConcumer { public static void main(String[] args){ Consumer consumer = new Consumer(); consumer.init(); TestConcumer testConsumer = new TestConcumer(); new Thread(testConsumer.new ConsumerMq(consumer)).start(); new Thread(testConsumer.new ConsumerMq(consumer)).start(); new Thread(testConsumer.new ConsumerMq(consumer)).start(); new Thread(testConsumer.new ConsumerMq(consumer)).start(); } private class ConsumerMq implements Runnable{ Consumer consumer; public ConsumerMq(Consumer consumer){ this.consumer = consumer; } public void run() { while(true){ try { consumer.getMessage("Jaycekon-MQ"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
时间: 2024-10-08 23:31:04