ActiveMQ 入门helloworld

1.下载安装ActiveMQ

官网下载地址:http://activemq.apache.org/download.html

ActiveMQ 提供了Windows 和Linux、Unix 等几个版本,我选择了Windows 版本下进行开发。

下载完压缩包后,直接解压:

目录:

bin存放的是脚本文件

conf存放的是基本配置文件

data存放的是日志文件

docs存放的是说明文档

examples存放的是简单的实例

lib存放的是activemq所需jar包

webapps用于存放项目的目录

2.启动ActiveMQ

可以在命令行中用命令启动,我这里就直接鼠标点击运行脚本启动了

ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。

admin:http://127.0.0.1:8161/admin/

我们在浏览器打开链接之后输入账号密码(默认账号密码都是admin),类似于访问tomcat。

登陆成功后页面显示:

ActiveMQ启动就完成了,关闭直接点击关闭脚本或用命令管就可以了。

3.不多bb,直接上helloworld代码

创建一个maven项目,最好是用jdk1.8,我用jdk1.7会报错,pom.xml依赖如下:

<dependencies>
      <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-all</artifactId>
          <version>5.15.0</version>
      </dependency>
    </dependencies>

3.1创建生产者类:

package com.iflytek;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 *
 * @author cjf
 * 生产者
 */
public class Producter {

//ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的链接地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

AtomicInteger count = new AtomicInteger(0);
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

public void init(){
        try {
            //创建一个链接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //从工厂中创建一个链接
            connection  = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //创建一个事务(这里通过参数可以设置事务的级别)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

public void sendMessage(String disname){
        try {
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
           while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                System.out.println(Thread.currentThread().getName()+
                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3.2,创建消费者类:

package com.iflytek;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 *
 * @author cjf
 * 消费者
 */
public class Comsumer {

private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

ConnectionFactory connectionFactory;

Connection connection;

Session session;

ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

4.运行ActiveMQ项目,进行测试

4.1,生产者生产消息:

package com.iflytek;
/**
 *
 * @author cjf
 * 测试生产者生产消息
 */
public class TestProducter {
    public static void main(String[] args){
        Producter producter = new Producter();
        producter.init();
        TestProducter testMq = new TestProducter();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producter)).start();
    }

private class ProductorMq implements Runnable{
        Producter producter;
        public ProductorMq(Producter producter){
            this.producter = producter;
        }

@Override
        public void run() {
            while(true){
                try {
                    producter.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

运行结果:

4.2,测试消费者消费消息:

package com.iflytek;
/**
 *
 * @author cjf
 * activeMQ测试消费者消费消息
 */
public class TestConsumer {
    public static void main(String[] args){
        Comsumer comsumer = new Comsumer();
        comsumer.init();
        TestConsumer testConsumer = new TestConsumer();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
    }

private class ConsumerMq implements Runnable{
        Comsumer comsumer;
        public ConsumerMq(Comsumer comsumer){
            this.comsumer = comsumer;
        }

@Override
        public void run() {
            while(true){
                try {
                    comsumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

运行结果:

5.ActiveMQ的helloworld就写完了,更多MQ其他的知识自己再好好学,我也是个菜鸡刚看了2天这东西,求关注啊,可以交流一下。

时间: 2024-11-06 09:44:01

ActiveMQ 入门helloworld的相关文章

Hibernate4.3.5入门HelloWorld

本文给出一个简单的Hibernate4.3.5入门实例,配置方式采用XML文件方式(这种方式已经不是主流了,目前越来越多采用Annotation方式映射POJO实体) 代码结构如下图所示:主要用到hibernate-release-4.3.5.Final\lib\required下的jar 在这里只展示出主要的代码 1.HibernateUtil.java 1 package com.wangp.hibernate.helper; 2 3 import org.hibernate.Session

ActiveMQ 入门Nodejs版

ActiveMQ 入门下载与安装 官方下载地址 解压,运行bin/win[32|64]/activemq[.bat] 启动服务 环境信息 控制台: http://localhost:8161 默认端口:8161 服务地址: host: localhost port: 61613 代码例子 基本信息: 语言:Node.js 客户端:stompjs 消息发布者: 复制代码 Queue消息消费者 // Consumer_queue.js var Stomp = require('stompjs');

CodeIgniter入门——HelloWorld

原文:CodeIgniter入门--HelloWorld CodeIgniter(CI)是一套给PHP网站开发者使用的应用程序开发框架和工具包. 初次接触,来一个HelloWorld~~~ ^_^ 准备工作: 一.下载CI 官方网站:http://ellislab.com/codeigniter CodeIgniter中国:http://codeigniter.org.cn/ 可以从上面下载相关版本以及文档,我在这里下载使用的2.2.0版本. 二.安装CI 1.首先你得有php运行环境.如果是w

ActiveMQ入门系列二:入门代码实例(点对点模式)

在上一篇<ActiveMQ入门系列一:认识并安装ActiveMQ(Windows下)>中,大致介绍了ActiveMQ和一些概念,并下载.安装.启动他,还访问了他的控制台页面. 这篇,就用代码实例说下如何实现消息的生产和消费. 一.理论基础 同RabbitMQ一样,ActiveMQ中也是有两种模式: 点对点模式(Point to Point,简写为PTP) 发布/订阅模式(Publish & Subscribe,简写为Pub & Sub) 通过上一篇我们知道了制造消息的应用叫生产

ActiveMQ入门系列三:发布/订阅模式

在上一篇<ActiveMQ入门系列二:入门代码实例(点对点模式)>中提到了ActiveMQ中的两种模式:点对点模式(PTP)和发布/订阅模式(Pub & Sub),详细介绍了点对点模式并用代码实例进行说明,今天就介绍下发布/订阅模式. 一.理论基础 发布/订阅模式的工作示意图: 消息生产者将消息(发布)到topic中,可以同时有多个消息消费者(订阅)消费该消息. 和点对点方式不同,发布到topic的消息会被所有订阅者消费. 当生产者发布消息,不管是否有消费者,都不会保存消息. 一定要先

Springboot2.x入门——helloWorld

Springboot2.x入门--helloWorld 一.简介 1.1 Springboot简介 Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置.通过这种方式,Spring Boot致力于在蓬勃发展的快速应用开发领域(rapid application development)成为领导者. 1.2 Springboot2.x比较1.x 新版本代码无

Node.js开发入门—HelloWorld再分析

在Node.js开发入门(1)我们用http模块实现了一个简单的HelloWorld网站,这次我们再来仔细分析下代码,了解更多的细节. 先看看http版本的HelloWorld代码: 代码就是这么简单: // 引入http模块 var http = require("http"); // 创建server,指定处理客户端请求的函数 http.createServer( function(request, response) { response.writeHead(200, {&quo

8086汇编语言入门-HelloWorld

附件下载:  http://pan.baidu.com/s/1i5R9qO9    密码:rfgk 80x86微处理器汇编语言编程.学习任何编程语言都免不了要跨越HelloWorld这道坎,面向机器的汇编语言与面向过程/对象的高级语言不同,编码过程中对寄存器.内存的分配等细节都需要编程人员去关心:而高级语言程序如C语言.C++,即使你不知道printf具体是怎么实现的,只要掌握使用方法即可(封装成模块).C语言的helloworld程序简单到只有一行执行语句:   printf("Hello,W

C语言编程入门——HelloWorld!

 将我的C语言学习过程记录下来,供大家学习交流,适合C语言入门者学习,希望能对大家有帮助. 推荐学习教程:<C语言程序设计> 作者:谭浩强 推荐学习视频:C语言教程 作者:郝斌(链接) 我在学完之后,记下了自己的学习心得,大家可在我的博客中查看(链接) Hello World ,中文意思:你好,世界.世界上的第一个程序就是Hello World,由Brian Kernighan创作. 大多数程序员去学一门语言,写的第一个程序都是HelloWorld! C语言版 Hello World #