SpringBoot 整合 ActiveMq

  消息队列,用来处理开发中的高并发问题,通过线程池、多线程高效的处理并发任务。

  首先,需要下载一个ActiveMQ的管理端:我本地的版本是 activemq5.15.8,打开activemq5.15.8\bin\win64\wrapper.exe客户端,可以根据localhost:端口号,访问ActiveMQ的管理界面。默认的用户名、密码都是admin。

  (一)pom 文件中添加 ActiveMq 依赖

<dependency>
     <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-core</artifactId>
     <version>5.7.0</version></dependency><dependency>
     <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-pool</artifactId>
     <version>5.7.0</version></dependency>

  (二)创建线程池,通过线程池创建、管理线程,这样有利于线程的使用,避免了每次都要创建、关闭线程,浪费资源。

  需要注意这里需要把当前类、函数添加到 IOC 容器中,后面将会用到。

    [email protected] 在项目启动的时候,会加载当前类,构造bean。也可以使用@Component

    [email protected]("taskExecutor"),定义了当前 bean 的名称,默认不添加名称的话,后面如果调用,则会根据对象的类型进行类型匹配。

package com.common.utils.threadPool;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程:在执行并发任务的时候,为提高效率,所以启用线程,但是需要创建、销毁
 * java 线程池,使用线程池,避免了每次使用都要创建一个线程从而影响了效率。而是在完成任务后,并不被销毁,还可以继续执行其他任务
 * @since 21:28 2019/4/9
 * @author hanyf
 */

@Configuration
public class ThreadPoolConfig {

    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor(){
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //线程池核心池的大小,默认线程池为0,需要等待任务去创建线程,如线程数量大于了核心池大小,就会将到达的线程放到【缓存队列中】
        taskExecutor.setCorePoolSize(50);
        //设置线程池能创建的最大线程数量
        taskExecutor.setMaxPoolSize(60);
        //线程没有执行任务时保存多长时间后终止
        //默认情况,只有线程数量>coreThreadNum 才会起作用,直到线程数量<coreThreadNum时结束
        taskExecutor.setKeepAliveSeconds(6*60);
        //缓存队列
        taskExecutor.setQueueCapacity(20);
        taskExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.err.println("线程池拒绝策略,正在执行...");
            }
        });
        return taskExecutor;
    }
}

ThreadPoolConfig

  (三)写一个公用的线程池调用工具类,方便以后调用;

    1.构造方法,在具体调用的时候,我们需要传入一个已定义好的线程池对象(引用)。

package com.common.utils.threadPool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
 * 线程的具体调用方法
 * @since 9:03 2019/4/10
 * @author hanyf
 */
public class ThreadPoolUtils {
    public static final Logger log = LoggerFactory.getLogger(ThreadPoolUtils.class);
    public ThreadPoolTaskExecutor executor;

    public ThreadPoolUtils(ThreadPoolTaskExecutor threadPoolTaskExecutor){
        this.executor = threadPoolTaskExecutor;
    }

    public void start(){
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    log.info("ThreadPoolUtils====>currentThreadPoolSize【"+ executor.getPoolSize()+"】,ActiveCount【"+executor.getActiveCount()+"】");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

ThreadPoolUtils

  (四)配置ActiveMQ:先创建好Listener,用来监听队列的变化。在接下来的ActiveMQ对象构建中会指定队列的关联监听器,所以现在先创建好。

package com.common.utils.activeMQ.listener;

import com.common.utils.activeMQ.producer.DefaultProducerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 监听器,用来监听队列,并执行
 * @since 16:31 2019/4/10
 * @author hanyf
 */
public class DefaultMessageListener implements MessageListener {
    private static final Logger log = LoggerFactory.getLogger(DefaultMessageListener.class);
    @Override
    public void onMessage(Message message) {
        TextMessage m = (TextMessage)message;
        try {
            System.out.println(m.getText());
            log.info("接收到消息啦...==========>>>"+m.getText());
        } catch (JMSException e) {
            e.printStackTrace();
            log.error("监听器错误...==========>>>"+e.getMessage());
        }
    }
}

  (五)配置ActiveMQ :队列定义好队列名称、配置信息、绑定监听器

package com.common.utils.activeMQ;

import com.common.utils.activeMQ.listener.DefaultMessageListener;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.stereotype.Component;

/**
 *  activeMQ 的相关配置
 * @since 9:36 2019/4/10
 * @author hanyf
 */
@Component
public class ActiveMQConfig {
    public static final String QUEUETEST = "hyfTestQueue";

    /**
     *  创建 activemq 的连接工厂对象
     * @since 15:24 2019/4/10
     * @author hanyf
     * @params  * @param null
     * @return
     */
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory(){
        return new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD
                ,ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
    }

    /**
     *      PooledConnectionFactory 对象是用来管理 jms template在发送消息过程中,每次都要创建connection、session、producer 对象,将会耗费性能,
     *  使用 PooledConnectionFactory 来缓存 connection、session、producer 对象
     * @since 15:38 2019/4/10
     * @author hanyf
     * @params  * @param null
     * @return
     */
    @Bean
    public PooledConnectionFactory pooledConnectionFactory(){
        PooledConnectionFactory pooledConnectionFactory= new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(this.activeMQConnectionFactory());
        pooledConnectionFactory.setMaximumActiveSessionPerConnection(500);
        pooledConnectionFactory.setMaxConnections(300);
        pooledConnectionFactory.setBlockIfSessionPoolIsFull(true);
        return pooledConnectionFactory;
    }

    /**
     * Spring 基于JMS监听器(有三种)
     * @since 10:45 2019/4/10
     * @author hanyf
     * @params  * @param null
     * @return
     */
    @Bean
    public DefaultMessageListenerContainer hyfTestQueueListener(){
        DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
        //监听的队列名称
        listenerContainer.setDestination(new ActiveMQQueue(QUEUETEST));
        listenerContainer.setConnectionFactory(this.activeMQConnectionFactory());
        //具体监听操作类
        listenerContainer.setMessageListener(new DefaultMessageListener());
        return listenerContainer;
    }

}

ActiveMQConfig

  (六)配置ActiveMQ:创建 Producer,具体的消息发送对象,需要使用线程池来发送信息

package com.common.utils.activeMQ.producer;

import org.apache.activemq.pool.PooledConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.*;
import java.util.concurrent.Executor;

/**
 * 自己创建的默认生产者模板
 *  生产者 :顾名思义就是用来生产消息的,
 * 在这里使用并发的方式来进行发送、生产消息
 * @since 11:04 2019/4/10
 * @author hanyf
 */
@Component
public class DefaultProducerTemplate {

    private static final Logger log = LoggerFactory.getLogger(DefaultProducerTemplate.class);

    @Resource
    private PooledConnectionFactory pooledConnectionFactory;

    @Resource(name = "taskExecutor")
    public Executor threadExecutor;

    public boolean send(String destinationName,int priority,String message){
        try{
            threadExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    Connection connection = null;
                    Session session = null;

                    long start = System.currentTimeMillis();
                    try {
                        //从连接池工厂获取一个连接
                        connection = pooledConnectionFactory.createConnection();
                        //第一个参数:非事务类型   第二个参数:表示消息的确认类型
                        session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
                        //创建发送的 mq 目标队列
                        Destination destination = session.createQueue(destinationName);
                        //创建 producer
                        MessageProducer producer = session.createProducer(destination);
                        //是否持久化
                        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                        //优先级
                        producer.setPriority(priority);

                        Message _m = session.createTextMessage(message);
                        producer.send(_m);

                        log.info("producer send =============>>>【"+message+"】,duration:"+ (System.currentTimeMillis()-start)+"ms" );
                    } catch (JMSException e) {
                        e.printStackTrace();
                        log.error("producer send =============>>>" + e.getMessage());
                    }
                    finally {
                        try{
                            if(connection != null)connection.close();
                            if (session != null) session.close();
                        }
                        catch (Exception e){
                            e.printStackTrace();
                            log.error("producer send =============>>>" + e.getMessage());
                        }
                    }
                }
            });
        }
        catch (Exception e){
            e.printStackTrace();
            return false;
        }
        return true;
    }
}

DefaultProducerTemplate

  (七)配置ActiveMQ:调用呐,自己编写一个调用类:引入线程池、producer

@Resource(name = "taskExecutor")
    private ThreadPoolTaskExecutor executor;

@Resource
    private DefaultProducerTemplate template;

//具体的调用类
@RequestMapping("doThread.json")public MessageOutput doThread(@ModelAttribute QuartzJobManage manage){    int n = 40;    while(n>0){        new ThreadPoolUtils(this.executor).start();        n--;    }    return new MessageOutput("200");}

结果:

  之前

  发送:

  ActiveMQ管理端:

Over。。。

原文地址:https://www.cnblogs.com/mysouler/p/10795204.html

时间: 2024-10-08 20:12:28

SpringBoot 整合 ActiveMq的相关文章

Web项目容器集成ActiveMQ &amp; SpringBoot整合ActiveMQ

集成tomcat就是随项目启动而启动tomcat,最简单的方法就是监听器监听容器创建之后以Broker的方式启动ActiveMQ. 1.web项目中Broker启动的方式进行集成 在这里采用Listener监听ServletContext创建和销毁进行Broker的启动和销毁. 0.需要的jar包: 1.listener实现ServletContextListener接口 package cn.qlq.listener; import javax.servlet.ServletContextEv

解决Springboot整合ActiveMQ发送和接收topic消息的问题

环境搭建 1.创建maven项目(jar) 2.pom.xml添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> </parent> <dependencies> &l

SpringBoot整合ActiveMQ发送邮件

虽然ActiveMQ以被其他MQ所替代,但仍有学习的意义,本文采用邮件发送的例子展示ActiveMQ 1. 生产者1.1 引入maven依赖1.2 application.yml配置1.3 创建配置类ConfigQueue1.4 创建生产者类Producer1.5 启动类AppProducer2. 消费者2.1 引入maven依赖2.2 application.yml配置2.3 创建消费者类Consumer2.4 启动类AppConsumer3. 启动截图3.1 生产者截图3.2 消费者截图3.

SpringBoot整合ActiveMQ

先建工程 .. .. .. .. ..先看一下最终目录结构(实际上核心就是两个类,但是其他的多写写还是没有坏处的) 消息实体类 package com.example.demo.domain; import java.io.Serializable; import java.util.Date; public class Message implements Serializable { private int id; private String from; private String to

SpringBoot整合ActiveMQ实现持久化

点对点(P2P) 结构 创建生产者和消费者两个springboot工程 导入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> 生产者 步骤一:application.properties文件 spring.activemq.brok

springboot整合activemq小demo

直接上干货... 1.首先配置pom.xml文件如下: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.

springboot 整合ActiveMq

pom.xml <!-- 配置ActiveMQ启动器 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> 创建消息队列 //创建队列 @Bean public Queue queue(){ return new Acti

SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQ、SpringBoot 整合 RabbitMQ、SpringBoot 整合 Kafka)

https://www.cnblogs.com/xuyiqing/p/10851859.html https://www.cnblogs.com/leeSmall/p/8721556.html https://www.cnblogs.com/linyufeng/p/9885645.html 原文地址:https://www.cnblogs.com/418836844qqcom/p/11540020.html

SpringBoot整合ActiveMQ开启持久化

1.开启队列持久化 只需要添加三行代码 jmsTemplate.setDeliveryMode(2); jmsTemplate.setExplicitQosEnabled(true); jmsTemplate.setDeliveryPersistent(true); 2. 开启主题持久化,启动类添加如下配置 @Bean(name = "topicListenerFactory") public JmsListenerContainerFactory<DefaultMessageL