【rabbitmq】Queueingconsumer被废止后老代码如何做的解决方案

amqp-client 3.x之前的rabbitmq版本有个消费者的写法是借助于Queueingconsumer的:

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicQos(1);

channel.basicConsume(QUEUE_NAME, false, "consumer_test",consumer);

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [X] Received ‘" + message + "‘");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

break;

}

这个应该是5.x之前的经典写法。但是在4.x的版本QueueingConsumer被标记废止5.x被移除。移除的原因是什么呢?

原来QueueingConsumer内部用LinkedBlockingQueue来存放消息的内容,而LinkedBlockingQueue:一个由链表结构组成的有界队列,照先进先出的顺序进行排序 ,未指定长度的话,默认 此队列的长度为Integer.MAX_VALUE,那么问题来了,如果生产者的速度远远大于消费者的速度,也许没等到队列阻塞的条件产生(长度达到Integer.MAX_VALUE)内存就完蛋了,在老的版本你可以通过设置 rabbitmq的prefetch属性channel.basicQos(prefetch)来处理这个问题如果不设置可能出现内存问题(比如因为网络问题只能向rabbitmq生产不能消费,消费者恢复网络之后就会有大量的数据涌入,出现内存问题,oom fgc等)。

而且最上面的写法很不合理不符合事件驱动,什么时候停止while循环也不能写的很优雅,所以在更高的版本直接被移除。取而代之的是DefaultConsumer,你可以通过扩展DefaultConsumer来实现消费者:

消费的代码:(RabbitMqMessageConsumer是对DefaultConsumer的扩展)

RabbitMqMessageConsumer rpcMessageConsumer = new RabbitMqMessageConsumer(channel,cores);
channel.basicQos(cores);
channel.basicConsume(QUEUE_NAME, true, rpcMessageConsumer);

RabbitMqMessageConsumer代码:

public class RabbitMqMessageConsumer extends DefaultConsumer{
      public RabbitMqMessageConsumer(Channel channel) {
          super(channel);
     }
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  //TODO someting
 }
}

其中handleDelivery是处理消息的逻辑。

高版本的解决方案给出了,那么回到我们题目的问题,老代码是按照4.x之前的写的,由于某种原因升级到了5.x了如何做?釜底抽薪的办法就是按照上面的事件驱动的方式重写消费者。折中的办法呢(不想改变老代码的逻辑和结构)。

我就碰到了这样的问题,老代码写了很多的轮子,导致这块代码很难重写。那就只能按照原来QueueingConsumer的写法继续做。解决思路如下:

首先消费的过程还是按照最开始那样:

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicQos(1);

channel.basicConsume(QUEUE_NAME, false, "consumer_test",consumer);

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [X] Received ‘" + message + "‘");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

break;

}

然后实现自己的QueueingConsumer(QueueingConsumer已经移除):

public class QueueingConsumer extends DefaultConsumer{
  private LinkedBlockingQueue<Delivery> queue;
  public QueueingConsumer(Channel channel) {
    super(channel);
    queue = new LinkedBlockingQueue<RabbitMqMessageConsumer.Delivery>();
  }
  public QueueingConsumer(Channel channel,int size) {
    super(channel);
    queue = new LinkedBlockingQueue<RabbitMqMessageConsumer.Delivery>(size);
  }
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
    Delivery delivery = new Delivery();
    delivery.setBody(body);
    delivery.setProperties(properties);
    delivery.setEnvelope(envelope);
    try {
      queue.put(delivery);
    } catch (InterruptedException e) {
      LogUtils.error(e);
    }
  }
  public Delivery nextDelivery()throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{
    return queue.take();
  }
  public Delivery nextDelivery(long timeout)throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{
    return queue.poll(timeout, TimeUnit.MILLISECONDS);
  }
  public class Delivery{
    private BasicProperties properties;
    private byte[] body;
    private Envelope envelope;
    public BasicProperties getProperties() {
      return properties;
    }
    public void setProperties(BasicProperties properties) {
      this.properties = properties;
    }
    public byte[] getBody() {
      return body;
    }
    public void setBody(byte[] body) {
      this.body = body;
    }
    public Envelope getEnvelope() {
      return envelope;
    }
    public void setEnvelope(Envelope envelope) {
      this.envelope = envelope;
    }

  }
}

这样你就能在不修改之前的老代码的情况下升级版本了,当然最好还是重写,这个只能起到个过度

原文地址:https://www.cnblogs.com/nfsnyy/p/12264590.html

时间: 2024-10-19 14:05:37

【rabbitmq】Queueingconsumer被废止后老代码如何做的解决方案的相关文章

框架页面尽可以这么用(后置代码中控制框架)

下面是框架页: <%@ Page CodeBehind="Frameset.aspx.cs" Language="c#" AutoEventWireup="false" Inherits="IbatisTest.Web.Frameset" %><!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Frameset//EN"><HTML>

我动了十年的老代码…

老代码的禁忌- 如果你恨一个程序员,就让他去维护年久失修的老代码吧! 尽管这样,我们对老代码仍是恨之厌之烦之远之却不可弃之,它已经老而成精有了生命. 每当有新人来时,它都会- 禁忌就这样传承了下来.我们知道它有很多问题,可是没人敢去动它. 于是,故事开始了- 新版51CTO家园,哪里不一样了? 呵呵,我们下手,还能让它有一样的地方吗?! 像小编这么清新脱俗的画风,那新版51CTO家园也是带有清新气息的! But!如果你只看到了清新脱俗的UI设计,那我只能说:你将错过整个银河系- 我们还有- 精准

十年的老代码,你敢动?

你入职一家新单位,被告知需要维护一个老产品,经理找质管给你开通了svn权限,告诉你迁出哪个分支--就是那个十年前已经定型的分支,就是那个超过6代程序员维护过的分支--然后告诉你说,就在这个分支上改,添加一个新接口,以便支持H5 Video. 于是你开始看代码,云山雾罩,各种痛苦,完全搞不懂业务逻辑和代码的关系,也闹不明白这块代码为什么这么写那块代码是几个意思.你战战兢兢如履薄冰思前想后寸步难行. 你去问进来5个多月还没转正的老同事,他告诉你他也不懂,让你凑合着加个新接口实现了功能就行.加了新功能

.NET在后置代码中输入JS提示语句(背景不会变白)

来源:http://niunan.iteye.com/blog/248256 Page.ClientScript.RegisterStartupScript(Page.GetType(), "message", "<script language='javascript' defer>alert('加入暂存架成功!');</script>"); 类似于AJAX的效果,页面不刷新!! .NET在后置代码中输入JS提示语句(背景不会变白)

我的Git教程 之 解决 git clone后无代码

解决 git clone 后无代码 前言:这个教程只适用于像我一样大致理解Git的原理,但是不太记得住Git命令的同学使用.所以具体原理只会提一下,具体可以参见Pro Git. 在另一篇 简明的教程 里提到获取Git库有两种方式,一种是直接在工作目录下创建一个新的Git库,另一种是从已有的库中克隆,即使用git clone. 其中使用第二种方法可能出现目录为空,即没有代码的现象. (1)分析原因 在Git Bash中,切换到目标目录.然后使用 $ ls -a 查看如果能看到.git目录,说明克隆

webpack: 简单分析 webpack 打包后的代码

编译后代码 /******/ (function(modules) { // webpackBootstrap /******/ // The module cache /******/ var installedModules = {}; /******/ /******/ // The require function /******/ function __webpack_require__(moduleId) { /******/ /******/ // Check if module

java虚拟机jvm启动后java代码层面发生了什么?

java虚拟机jvm启动后java代码层面发生了什么? 0000 我想验证的事情 java代码在被编译后可以被jdk提供的java命令进行加载和运行, 在我们的程序被运行起来的时候,都发生了什么事情, 下面就来探究下这个问题, 这个问题被拆成了两个问题, 第一个问题用来确定发生了哪些事情, 第二个问题用来确定这些事情是如何进行的. java进程里面都发生了哪些活动? 这些活动在java代码(反编译或者是源码)级别有所体现吗? 0001 寻找验证的方式 当我在探究上面两个问题时, 我想了很多方式去

mysql连接的空闲时间超过8小时后 MySQL自动断开该连接解决方案 详细出处参考:http://www.jb51.net/article/32284.htm

MySQL 的默认设置下,当一个连接的空闲时间超过8小时后,MySQL 就会断开该连接,而 c3p0 连接池则以为该被断开的连接依然有效.在这种情况下,如果客户端代码向 c3p0 连接池请求连接的话,连接池就会把已经失效的连接返回给客户端,客户端在使用该失效连接的时候即抛出异常 解决这个问题的办法有三种: 1. 增加 MySQL 的 wait_timeout 属性的值. 修改 /etc/mysql/my.cnf文件,在 [mysqld] 节中设置: # Set a connection to w

python 将中文转拼音后填充到url做参数并写入excel

闲着没事写了个小工具,将中文转拼音后填充到url做参数并写如excel 一.先看下演示,是个什么东西 二.代码 代码用到一个中文转拼音的库,库是网上下的,稍微做了下修改,已经找不原来下载的地址了,然后需要装个pywin32库,用来写excel表格的,下面看代码. #!/usr/bin/env python # coding=utf-8# Author: ca0gu0 from lib.chinese2pinyin import search from time import sleep impo