消息队列 (2) java实现简单的RabbtMQ

假设有如下问题:

  1.如果消费者连接中断,这期间我们应该怎么办?

  2.如何做到负载均衡?

  3.如何有效的将数据发送到相关的接收者?就是怎么样过滤

  4.如何保证消费者收到完整正确的数据

  5.如何让优先级高的接收者先收到数据

一、"Hello RabbitMQ"

如图:P代表生产者,C代表消费者,红色部分为消息队列

二、项目开始

  1.首先创建一个maven项目,然后导入rabbitMQjar包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>testRabbit</groupId>
    <artifactId>test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
    </dependencies>
</project>

  2.创建消费者Producer

public class Producer {
    public final static String QUEUE_NAME = "rabbitMQ.test";

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //设置RabbitMQ相关信息
        factory.setHost("localhost");

        //创建一个新的连接
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //声明一个队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发送消息到队列中
        String message = "hello rabbitMQ";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
        System.out.println("Producer Send : " + message);

        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

queueDeclare第一个参数表示队列名称,第二个参数为是否持久化(true表示是,队列将在服务器重启时生存),第三个参数为是否独占队列(创建者可以使用的私有队列,断开后自动删除),第四个参数为当所有消费者客户端连接断开时是否自动删除队列,第五个参数为队列的其他参数。

basicPublish第一个参数为交换机名称,第二个参数为队列映射的路由key,第三个参数为消息的其他属性,第四个参数为发送消息的主体。

  3.创建消费者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer {
    public final static String QUEUE_NAME = "rabbitMQ.test";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("localhost");
        //创建一个新的连接
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //声明要关注的队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        System.out.println("客户端等待接受消息");

        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要哪个频道的信息,如果频道中有消息,就会执行回调函数handleDelivery
        Consumer comsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String (body,"UTF-8");
                System.out.println("客户端接收:"+message);
            }
        };

        //自动回答队列应答 -- RabbitMQ中的消息确认机制
         channel.basicConsume(QUEUE_NAME,true,comsumer);

    }
}

该方法用于获取生产者发送的消息,其中envelope主要存放生产者相关的信息(比如交换机、路由key等)body是消息实体。

运行结果如下:

三、实现任务分发

一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们就需要更多的工作者来处理,这样就需要采用分布机制了。

新建一个生产者NewTask

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class NewTask {
    public final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String [] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);

        //分发消息
        for(int i = 0;i<10;i++){
            String message = "hello RabbitMQ "+ i;
            channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
            System.out.println("NewTask send :" + message);
        }

        channel.close();
        connection.close();

    }
}

然后创建2个工作者work1和work2代码一样

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Work1 {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        System.out.println("Work1 等待接受消息");

        //每次从队列获取的数量
        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("Worker1 接受到消息:"+message);
                try{
                    //throw new Exception();
                    doWork(message);
                }catch (Exception ex){
                    channel.abort();
                }finally {
                    System.out.println("Work1 完成了");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };

        boolean autoAck=false;
        //消息消费完成确认
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);

    }
    private static void doWork(String task) {
        try {
            Thread.sleep(1000); // 暂停1秒钟
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

channel.basicQos(1);保证一次只分发一个,autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,那么就无法获取数据,我们当时不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知生产者,最后从队列中删除这条信息。如果消费者异常退出,如果还有其他消费者,那么就会把队列中的消息发送给其他消费者,如果没有,等消费者启动时候再次发送。

两个都不抛出异常时:

其中一个设置为异常时,会把消息都发送给另一个正常的。等待异常的程序重启后,才会继续给它发送。

原文地址:https://www.cnblogs.com/baidawei/p/9172433.html

时间: 2024-10-10 13:00:09

消息队列 (2) java实现简单的RabbtMQ的相关文章

python 实现多个线程间消息队列传递,一个简单的列子

#-*-coding:utf8-*-"""Producer and consumer models: 1. There are many producers and consumers at the same time, but they complement each other. Implemented by message queuing to achieve at the same time production and consumpion processing.

.net微软消息队列(msmq)简单案例

1.首先我们需要安装消息队列服务,它是独立的消息记录的服务,并保存在硬盘文件中. 我们添加名为:DMImgUpload的私有消息队列. 2.定义消息队列的连接字符串建议采用IP: (1)FormatName:DIRECT=OS:Pac_gzf-PC\Private$\DMImgUpload (2)FormatName:DIRECT=TCP:192.168.1.105\Private$\DMImgUpload string queuePath="FormatName:DIRECT=TCP:192.

Java Design Demo -简单的队列-异步多任务队列(java android)

简单的单线程队列 -- 工作的时候遇到劣质打印机.给打印机发消息,打印机就会打印,如果在打印机还在打印的时候,就 再发消息打印,就会出现消息丢失.所以需要给上一个任务一些处理的间隔时间. 单线程的消息队列示例 [java] view plaincopyprint? package demo1; import java.util.LinkedList; public class Main { /** * @param args */ private static Thread thread; pr

消息队列概念与认知

本文是-消息队列学习的概念与介绍篇.目的是能够对消息队列能够有一个简单的了解和大体的认知. 参考/学习资料整理(好东西要学会分享 ) B站上的黑马ActiveMQ的视频教程 Hollis公众号上的消息队列文章 架构之家公众号上的消息队列文章 JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目) CS-Notes(技术面试必备基础知识) JCSprout(处于萌芽阶段的 Java 核心知识库) 一个在线绘图的工具 一.消息队列简介 消息队列 MQ(message qu

消息队列状态:struct msqid_ds

Linux的消息队列(queue)实质上是一个链表, 它有消息队列标识符(queue ID). msgget创建一个新队列或打开一个存在的队列; msgsnd向队列末端添加一条新消息; msgrcv从队列中取消息, 取消息是不一定遵循先进先出的, 也可以按消息的类型字段取消息. 1. 标识符(des)和键(key): 消息队列, 信号量和共享存储段, 都属于内核中的IPC结构, 它们都用标识符来描述. 这个标识符是一个非负整数, 与文件描述符不同的是, 创建时并不会重复利用通过删除回收的整数,

进程间通信——XSI IPC之消息队列

进程间通信XSI IPC有3种:消息队列.共享内存.信号量.它们之间有很多相似之处,但也有各自的特殊的地方.消息队列作为其中比较简单的一种,它会有些什么东西呢,来一起探讨探讨.. 消息队列结构 消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法. 每个数据块都被认为是一个类型,接受进程接收的数据块可以有不同的类型值. 我们可以通过发送消息来避免命名管道的同步和阻塞问题. 消息队列与管道不同的是,消息队列是基于消息的,而管道是基于字节流的,且消息队列的读取不一定是先入先出. 命名管道:

使用java实现阿里云消息队列简单封装

一.前言 最近公司有使用阿里云消息队列的需求,为了更加方便使用,本人用了几天时间将消息队列封装成api调用方式以方便内部系统的调用,现在已经完成,特此记录其中过程和使用到的相关技术,与君共勉. 现在阿里云提供了两种消息服务:mns服务和ons服务,其中我认为mns是简化版的ons,而且mns的消息消费需要自定义轮询策略的,相比之下,ons的发布与订阅模式功能更加强大(比如相对于mns,ons提供了消息追踪.日志.监控等功能),其api使用起来更加方便,而且听闻阿里内部以后不再对mns进行新的开发

Java操作simple简单消息队列

1.进入官网 进入get start 然后进入Tutorials 发现简单消息队列 2. 原文地址:https://www.cnblogs.com/juncaoit/p/8570703.html

挨踢部落坐诊第四期:Java消息队列的应用场景和作用

挨踢部落是为核心开发者提供深度技术交流,解决开发需求,资源共享的服务社群.基于此社群,我们邀请了业界技术大咖对开发需求进行一对一突破,解除开发过程中的绊脚石.以最专业.最高效的答复为开发者解决开发难题. 消息队列 话题关键词:消息队列.索引.App.路由.接口 部落阵容:51CTO管理团队: 面向对象:移动开发者.IT运维.数据分析师 参与方式:加入51CTO开发者QQ交流群(群号370892523(已满).请加312724475),有任何技术问题,在群里提问,或发给群主小官. 活动详情: 重庆