rocketmq的消费者集群消费消息,实现负载均衡

package com.bfxy.rocketmq.model;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.bfxy.rocketmq.constants.Const;

public class Consumer1 {

public Consumer1() {
try {
String group_name = "test_model_consumer_name";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.subscribe("test_model_topic2", "TagA");
consumer.setMessageModel(MessageModel.CLUSTERING);
//consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}

class Listener implements MessageListenerConcurrently {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
//if(tags.equals("TagA")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

}

public static void main(String[] args) {
Consumer1 c1 = new Consumer1();
System.out.println("c1 start..");

}
}

//==========================================================

package com.bfxy.rocketmq.model;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.bfxy.rocketmq.constants.Const;

public class Consumer2 {

public Consumer2() {
try {
String group_name = "test_model_consumer_name";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.subscribe("test_model_topic2", "TagA");
consumer.setMessageModel(MessageModel.CLUSTERING);
//consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}

class Listener implements MessageListenerConcurrently {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
//if(tags.equals("TagA")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

}

public static void main(String[] args) {
Consumer2 c2 = new Consumer2();
System.out.println("c2 start..");

}
}

原文地址:https://www.cnblogs.com/zhangzhiqin/p/10351557.html

时间: 2024-10-10 04:04:01

rocketmq的消费者集群消费消息,实现负载均衡的相关文章

集群、分布式、负载均衡区别与联系

1.Linux集群主要分成三大类( 高可用集群, 负载均衡集群,科学计算集群)(下面只介绍负载均衡集群) 负载均衡集群(Load Balance Cluster) 负载均衡系统:集群中所有的节点都处于活动状态,它们分摊系统的工作负载.一般Web服务器集群.数据库集群和应用服务器集群都属于这种类型. 负载均衡集群一般用于相应网络请求的网页服务器,数据库服务器.这种集群可以在接到请求时,检查接受请求较少,不繁忙的服务器,并把请求转到这些服务器上.从检查其他服务器状态这一点上看,负载均衡和容错集群很接

分布式进阶(十九) 基于集群的动态反馈负载均衡策略

基于集群的动态反馈负载均衡策略 基于动态反馈机制的集群负载均衡算法研究 目前应用最为广泛的集群计算技术主要分为三大类:高可用性集群技术.高性能计算集群技术和负载均衡集群技术. 德国的CarlAdamPetri于1962年在他的博士论文<自动机通信>中提出了Petri网的概念,它是一种适合于描述异步.并发.分布式系统的图形数学工具. 动态WRR调度算法 这是一个目前普遍使用的调度算法,算法在WRR的基础上加入了根据服务器端的负载信息周期性地调整服务器性能权值的过程.其基本思想是:根据CPU利用率

keepalived实现WEB服务集群的高可用负载均衡

要求:利用keepalived实现WEB服务集群的高可用负载均衡,即在lvs_dr模型中扮演director的角色,对来自用户的请求按照一定的算法分配至后端服务器中:且keepalived服务器可实现高可用. keepalived是vrrp协议的一种实现,专门为lvs设计,以实现对lvs高可用就集群中的dirctor进行冗余以及对realserver进行健康检测.当一台主机(MASTER)出现问题后,keepalived能够将VIP自动的分配到备用的主机上(BACKUP),从而实现了自身(dir

集群、分布式、负载均衡

集群和负载均衡的概念 负载均衡LVS集群详解 大型电子商务网站架构之--分布式可扩展数据库架构 负载均衡有两种:软负载lvs;硬负载f5. 水平切分数据库: 可以降低单台机器的负载,同时最大限度的降低了了宕机造成的损失. 负载均衡: 有效的降低了单台 机器的访问负载,降低了宕机的可能性: 集群: 解决了数据库宕机带来的单点数据库不能访问的问题: 读写分离: 最大限度了提高了应用中读取 (Read)数据的速度和并发量. 1.Linux集群主要分成三大类( 高可用集群, 负载均衡集群,科学计算集群)

LVS集群中的IP负载均衡技术

章文嵩 ([email protected]) 转自LVS官方参考资料 2002 年 4 月 本文在分析服务器集群实现虚拟网络服务的相关技术上,详细描述了LVS集群中实现的三种IP负载均衡技术(VS/NAT.VS/TUN和VS/DR)的工作原理,以及它们的优缺点. 1.前言在 前面文章中,讲述了可伸缩网络服务的几种结构,它们都需要一个前端的负载调度器(或者多个进行主从备份).我们先分析实现虚拟网络服务的主要技术,指出 IP负载均衡技术是在负载调度器的实现技术中效率最高的.在已有的IP负载均衡技术

使用ansible快速配置RHCS 集群 实现WEB站负载均衡高可用(手记)

什么是RHCS RHCS是Red Hat Cluster Suite的缩写,也就是红帽子集群套件,RHCS是一个能够提供高可用性.高可靠性.负载均衡.存储共享且经济廉价的集群工具集合,它将集群系统中三大集群架构融合一体,可以给web应用.数据库应用等提供安全.稳定的运行环境. 更确切的说,RHCS是一个功能完备的集群应用解决方案,它从应用的前端访问到后端的数据存储都提供了一个行之有效的集群架构实现,通过RHCS提供的这种解决方案,不但能保证前端应用持久.稳定的提供服务,同时也保证了后端数据存储的

LVS集群十种调度算法及负载均衡理论

一.LVS概念 LVS(Linux Virtual Server):Linux 虚拟server: LVS是个负载均衡设备.它不提供不论什么服务.用户请求到这里的时候.它是将客户需求转发至后端真正提供服务的服务.所以说后端的服务称作real server: LVS分为两段,前一段称为ipvsadm(管理集群服务的命令行工具),后面一段叫做ipvs(内核模块): [提示]LVS和iptables不能同一时候使用: 二.LVS类型 LB(Load Balancing):负载均衡集群 特性:为了添加能

Linux集群之高可用负载均衡lvs+keepalived

LVS简介 LVS介绍 LVS是Linux Virtual Server的缩写,意即Linux虚拟服务器,是一个虚拟的服务器集群系统,属于4层负载均衡 ipvs和ipvsadm的关系 我们使用配置LVS的时候,不能直接配置内核中的ipvs,需要使用ipvs的管理工具ipvsadm进行管理 LVS术语 LVS转发原理 LVS负载均衡器接受所有入站请求,并根据调度算法决定哪个realserver处理该请求 LVS调度算法 轮询(rr):按照请求顺序轮流分发到后端RS 加权轮询(wrr):权值高的获得

集群之LVS(负载均衡)详解

提高服务器响应能力的方法 scale on  在原有服务器的基础上进行升级或者直接换一台新的性能更高的服务器. scale out  横向扩展,将多台服务器并发向外响应客户端的请求.优点:成本低,扩展架构比较简单. 集群(Cluster),通俗地讲就是按照某种组织方式将几台电脑组织起来完成某种特定任务的这样一种架构. 三种集群类型: LB,Load Balancing 负载均衡:在一定程度上能够实现高可用的目的. HA,High Availability 高可用:实时在线,能够及时响应客户端请求