【分布式系列之ActiveMq】ActiveMq入门示例

前言

github地址:https://github.com/AndyFlower/web-back/tree/master/ActiveMq01

下载ActiveMQ :http://activemq.apache.org/download.html

放到自己的目录,大致目录如下:

  • bin存放的是脚本文件
  • conf存放的是基本配置文件
  • data存放的是日志文件
  • docs存放的是说明文档
  • examples存放的是简单的实例
  • lib存放的是activemq所需jar包
  • webapps用于存放项目的目录

然后启动ActiveMQ:比如我的目录是:D:\develop tools\apache-activemq-5.15.2\bin\win64下的activemq.bat

出现如下消息则说明启动成功了。

登录上述启动成功的地址:http://127.0.0.1:8161用户名和密码是admin:admin

一、创建一个java项目,加入maven依赖

<dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.2</version>
        </dependency>
    </dependencies>

二、项目目录如下

三、编写具体的生产者和消费者

package com.slp.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author sanglp
 * @create 2017-12-05 11:30
 * @desc 生产者
 **/
public class Producer {
    //ActiveMQ默认用户名
    private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;
    //ActiveMQ默认登陆密码
    private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ链接地址
    private static final String BROKEN_URL= ActiveMQConnection.DEFAULT_BROKER_URL;

    AtomicInteger count = new AtomicInteger(0);
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<MessageProducer>();

    public void init(){
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个链接
            connection = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //创建一个事务(通过参数设置事务的级别)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if (threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else {
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
            while (true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+"productor:我正在生产东西!,count:"+count);
                System.out.println(Thread.currentThread().getName()+"productor:我正在生产东西!,count:"+count);
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

  

package com.slp.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author sanglp
 * @create 2017-12-05 11:30
 * @desc 消费者
 **/
public class Consumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<MessageConsumer>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer ;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 四、运行测试

package com.slp.activemq;

/**
 * @author sanglp
 * @create 2017-12-05 11:31
 * @desc mq测试
 **/
public class TestMq {
    public static void main(String[] args){
        Producer producer = new Producer();
        producer.init();
        TestMq testMq = new TestMq();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProducorMq(producer)).start();
        //Thread 2
        new Thread(testMq.new ProducorMq(producer)).start();
        //Thread 3
        new Thread(testMq.new ProducorMq(producer)).start();
        //Thread 4
        new Thread(testMq.new ProducorMq(producer)).start();
        //Thread 5
        new Thread(testMq.new ProducorMq(producer)).start();
    }

    private  class  ProducorMq implements Runnable{
        Producer producer;
        public ProducorMq(Producer producer){
            this.producer = producer;
        }
        public void run() {
            while(true){
                try {
                    producer.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  

 运行消费者

package com.slp.activemq;

/**
 * @author sanglp
 * @create 2017-12-05 11:31
 * @desc 消费者测试
 **/
public class TestConcumer {
    public static void main(String[] args){
        Consumer consumer = new Consumer();
        consumer.init();
        TestConcumer testConsumer = new TestConcumer();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
        new Thread(testConsumer.new ConsumerMq(consumer)).start();
    }

    private class ConsumerMq implements Runnable{
        Consumer consumer;
        public ConsumerMq(Consumer consumer){
            this.consumer = consumer;
        }

        public void run() {
            while(true){
                try {
                    consumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  

时间: 2024-10-08 23:31:04

【分布式系列之ActiveMq】ActiveMq入门示例的相关文章

【java开发系列】—— spring简单入门示例

1 JDK安装 2 Struts2简单入门示例 前言 作为入门级的记录帖,没有过多的技术含量,简单的搭建配置框架而已.这次讲到spring,这个应该是SSH中的重量级框架,它主要包含两个内容:控制反转\依赖注入,和AOP面向切面编程. 1 控制反转IOC\依赖注入DI,因为翻译的不同,因此有两个名字. 控制反转意思就是说,当我们调用一个方法或者类时,不再有我们主动去创建这个类的对象,控制权交给别人(spring). 依赖注入意思就是说,spring主动创建被调用类的对象,然后把这个对象注入到我们

分布式服务框架 dubbo/dubbox 入门示例(转)

dubbo是一个分布式的服务架构,可直接用于生产环境作为SOA服务框架. 官网首页:http://dubbo.io/ ,官方用户指南 http://dubbo.io/User+Guide-zh.htm上面的几张图画得不错,完全可以当做SOA架构的学习资料 淘宝将这个项目开源出来以后,得到了不少同行的支持,包括: 当当网的扩展版本dubbox :https://github.com/dangdangdotcom/dubbox 京东的扩展版本jd-hydra: http://www.oschina.

分布式服务框架 dubbo/dubbox 入门示例

dubbo是一个分布式的服务架构,可直接用于生产环境作为SOA服务框架. 官网首页:http://dubbo.io/ ,官方用户指南 http://dubbo.io/User+Guide-zh.htm上面的几张图画得不错,完全可以当做SOA架构的学习资料 淘宝将这个项目开源出来以后,得到了不少同行的支持,包括: 当当网的扩展版本dubbox :https://github.com/dangdangdotcom/dubbox 京东的扩展版本jd-hydra: http://www.oschina.

【java开发系列】—— struts2简单入门示例

前言 最近正好有时间总结一下,过去的知识历程,虽说东西都是入门级的,高手肯定是不屑一顾了,但是对于初次涉猎的小白们,还是可以提供点参考的. struts2其实就是为我们封装了servlet,简化了jsp跳转的复杂操作,并且提供了易于编写的标签,可以快速开发view层的代码. 过去,我们用jsp和servlet搭配,实现展现时,大体的过程是: 1 jsp触发action 2 servlet接受action,交给后台class处理 3 后台class跳转到其他的jsp,实现数据展现 现在有了stru

Dubbo--(十三)分布式服务治理简介、入门示例讲解、管控台部署使用

ActiveMQ从入门到精通(二)

接上一篇<ActiveMQ从入门到精通(一)>,本篇主要讨论的话题是:消息的顺序消费.JMS Selectors.消息的同步/异步接受方式.Message.P2P/PubSub.持久化订阅.持久化消息到MySQL以及与Spring整合等知识. 消息的顺序消费 在上一篇文章中,我们已经明确知道了ActiveMQ并不能保证消费的顺序性,即便我们使用了消息优先级.而在实际开发中,有些场景又是需要对消息进行顺序消费的,比如:用户从下单.到支付.再到发货等.如果使用ActiveMQ该如何保证消费的顺序性

Velocity魔法堂系列一:入门示例

一.前言 Velocity作为历史悠久的模板引擎不单单可以替代JSP作为Java Web的服务端网页模板引擎,而且可以作为普通文本的模板引擎来增强服务端程序文本处理能力.而且Velocity被移植到不同的平台上,如.Net的NVelocity和js的Velocity.js,虽然各平台在使用和实现上略有差别,但大部分语法和引擎核心的实现是一致的,因此学习成本降低不少哦. 最好的学习资源——官网:http://velocity.apache.org/ 本系列打算采用如下结构对Velocity进行较为

Apache Camel框架入门示例

Apache Camel是Apache基金会下的一个开源项目,它是一个基于规则路由和中介引擎,提供企业集成模式的Java对象的实现,通过应用程序接口(或称为陈述式的Java领域特定语言(DSL))来配置路由和中介的规则.领域特定语言意味着Apache Camel支持你在的集成开发工具中使用平常的,类型安全的,可自动补全的Java代码来编写路由规则,而不需要大量的XML配置文件.同时,也支持在Spring中使用XML配置定义路由和中介规则. Camel提供的基于规则的路由(Routing)引擎,可

一起学微软Power BI系列-官方文档-入门指南(6)Power BI与Excel

今天介绍了官方入门文档中有关PowerBI和Excel的知识.前几篇入门文档有点仓促,加上最近时间的研究,会有更多技巧性和入门型的文章或者视频发布,最后2篇入门文档将更加详细一点,因为部分文章进行简单的翻译. 本文原文地址:一起学微软Power BI系列-官方文档-入门指南(6)Power BI与Excel Power BI系列文章地址:微软Power BI技术文章与资源目录 1.系列文章说明 一起学微软Power BI系列 文章将分为 官方文档,文档翻译,中文入门教程,中文视频教程和案例等内容

Gradle学习系列之一——Gradle快速入门

Gradle学习系列之一--Gradle快速入门 这是一个关于Gradle的学习系列,其中包含以下文章: Gradle快速入门 创建Task的多种方法 读懂Gradle语法 增量式构建 自定义Property 使用java Plugin 依赖管理 构建多个Project 自定义Task类型 自定义Plugin 请通过以下方式下载本系列文章的Github示例代码: git clone https://github.com/davenkin/gradle-learning.git 和Maven一样,