初探active mq

mq(message queue),即消息队列,目前比较流行消息队列是active mq 和kafka。本文介绍如何简单的使用active mq。

ActiveMQ官网下载地址:http://activemq.apache.org/download.html

ActiveMQ 提供了Windows 和Linux、Unix 等几个版本,可简单使用windows版本。下载解压启动即可,进入apache-activemq-5.15.8\bin\win64目录,双击activemq.bat即可启动。

从它的目录来说,还是很简单的:

    • bin存放的是脚本文件
    • conf存放的是基本配置文件
    • data存放的是日志文件
    • docs存放的是说明文档
    • examples存放的是简单的实例
    • lib存放的是activemq所需jar包
    • webapps用于存放项目的目录

ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。 
  admin:http://127.0.0.1:8161/admin/

  我们在浏览器打开链接之后输入账号密码(这里和tomcat 服务器类似)

  默认账号:admin

  密码:admin

到这里为止,ActiveMQ 服务端就启动完毕了。

接下来是代码:

pom.xml添加如下:

<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.8</version>
    </dependency>

创建4个类,分别是两个工具类和两个测试类;

消息队列分为生成者(producter)和消费者(consumer)

1、创建生产者工具类

package com.wzl.demo.utils;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQProducter {

    //ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的链接地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    AtomicInteger count = new AtomicInteger(0);
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个链接
            connection  = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //创建一个事务(这里通过参数可以设置事务的级别)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
            while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "producter:生产了一条消息!,count:"+num);
                System.out.println(Thread.currentThread().getName()+
                        "producter:生产了一条消息!,count:"+num);
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2、创建消费者工具类

package com.wzl.demo.utils;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQConsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:消费了一条消息:"+msg.getText()+"-->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3、创建生产者测试类

package com.wzl.demo.action;

import com.wzl.demo.utils.ActiveMQProducter;

public class TestMQProducter {
    public static void main(String[] args){
        ActiveMQProducter producter = new ActiveMQProducter();
        producter.init();
        TestMQProducter testMq = new TestMQProducter();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producter)).start();
    }

    private class ProductorMq implements Runnable{
        ActiveMQProducter producter;
        public ProductorMq(ActiveMQProducter producter){
            this.producter = producter;
        }

        @Override
        public void run() {
            while(true){
                try {
                    producter.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4、创建消费者测试类

package com.wzl.demo.action;

import com.wzl.demo.utils.ActiveMQConsumer;

public class TestMQConsumer {
    public static void main(String[] args){
        ActiveMQConsumer comsumer = new ActiveMQConsumer();
        comsumer.init();
        TestMQConsumer testConsumer = new TestMQConsumer();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
    }

    private class ConsumerMq implements Runnable{
        ActiveMQConsumer comsumer;
        public ConsumerMq(ActiveMQConsumer comsumer){
            this.comsumer = comsumer;
        }

        @Override
        public void run() {
            while(true){
                try {
                    comsumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

运行生成者测试类和消费测试类,可以观看服务端可视化界面的数量变化。

原文地址:https://www.cnblogs.com/wzlblog/p/10231172.html

时间: 2024-12-15 16:11:05

初探active mq的相关文章

JMS 之 Active MQ 的消息传输

本文使用Active MQ5.6 一.消息协商器(Message Broker) broke:消息的交换器,就是对消息进行管理的容器.ActiveMQ 可以创建多个 Broker,客户端与ActiveMQ交互,实际上都是与ActiveMQ中的Broker交互,Broker配置在${MQ_HOME}\conf\activemq.xml. 二.连接器(Connectors) (一).传输连接器 (transportConnectors) transportConnectors 连接器:就是建立bro

Active MQ 启动报错

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 特性 ⒈ 多种语言和协议编写客户端.语言: Java,C,C++,C#,Ruby,Perl,Python,PHP.应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP ⒉ 完全支持J

JMS 之 Active MQ 启动嵌入式Broke

一.如何启动active MQ 服务 (一).使用命令启动 a./usr/local/activemq-5.9.0/bin 目录下 ./activemq start 默认使用conf/activemq.xml 配置文件 b.[[email protected] bin]# ./activemq start xbean:file:../conf/activemq-slave1.xml 使用指定的配置文件启动 (二).代码启动broker 在程序中可以通过编码的方式启动broker,如果要启动多个b

Active MQ学习笔记

一.Active MQ介绍 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 主要特点: 1. 多种语言和协议编写客户端.语言: Java, C, C++, C#, Ruby, Perl, Python, PHP.应用协议: OpenWire,Stomp REST,WS Notif

active MQ搭建

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 1安装 定义activemq安装目录为/usr/local/activemq 定义activemq数据存放目录为 /data/postmall/activemq/ cd /tmp wget http://archive.apache

JMS进阶-Spring整合Active MQ

只能说,Spring太流弊了,啥都能整合~~~~ First of all, start the service of Active MQ 项目目录结构如下 用到的jar包如下 activemq-client-5.13.1.jar commons-logging-1.1.3.jar geronimo-j2ee-management_1.1_spec-1.0.1.jar geronimo-jms_1.1_spec-1.1.1.jar hamcrest-core-1.3.jar junit-4.12

Active MQ C#实现

原文链接: Active MQ C#实现 内容概要 主要以源码的形式介绍如何用C#实现同Active MQ 的通讯.本文假设你已经正确安装JDK1.6.x,了解Active MQ并有一定的编程基础. 正文 JMS 程序的最终目的是生产和消费的消息能被其他程序使用,JMS 的 Message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS 程序格式的消息. Message 由消息头,属性和消息体三部份组成. Active MQ支持过滤机制,即生产者可以设置消息的属性(Prope

WebSphere MQ&&Active MQ

WebSphere MQ&&ActiveMQ WebSphere MQ 1.  中间件处于应用软件与系统软件之间,是一种以自己的复杂换取企业应用简单化的可复用的基础软件,它使用系统软件所提供的基础服务(功能),衔接网络上应用系统的各个部分或不同的应用,能够达到资源共享.功能共享的目的. 2.  三种通信技术: RPC(remote process call):同步: CPI-C:同步: MQI(message queue interface):异步通信方式,通信的方式与传送协议无关. 3.

JMS 之 Active MQ 消息存储

一.消息的存储方式 ActiveMQ支持JMS规范中的持久化消息与非持久化消息 持久化消息通常用于不管是否消费者在线,它们都会保证消息会被消费者消费.当消息被确认消费后,会从存储中删除 非持久化消息通常用于发送通知以及实时数据,通常要求性能优先,消息可靠性并不是必须的情况 MQ支持可插拔式的消息存储,如:内存.文件和关系数据库等方式 Queue消息模型在ActiveMQ的存储 采用存储采用先进先出(FIFO),一个消息只能被一个消费者消费,当消息被确认消费之后才会被删除. Topic消息模型(针