Rabbitmq集群高可用
RabbitMQ是用erlang开发的,集群非常方便,因为erlang天生就是一门分布式语言,但其本身并不支持负载均衡。
Rabbit模式大概分为以下三种:单一模式、普通模式、镜像模式
单一模式:最简单的情况,非集群模式。
及单实例服务。
普通模式:默认的集群模式。
queue创建之后,如果没有其它policy,则queue就会按照普通模式集群。对于Queue来说,消息实体只存在于其中一个节点,A、B两个节点仅有相同的元数据,即队列结构,但队列的元数据仅保存有一份,即创建该队列的rabbitmq节点(A节点),当A节点宕机,你可以去其B节点查看,./rabbitmqctl list_queues 发现该队列已经丢失,但声明的exchange还存在。
当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer。
所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连A或B,出口总在A,会产生瓶颈。
该模式存在一个问题就是当A节点故障后,B节点无法取到A节点中还未消费的消息实体。
如果做了消息持久化,那么得等A节点恢复,然后才可被消费;如果没有持久化的话,队列数据就丢失了。
镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案。
该模式解决了上述问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。
该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。
所以在对可靠性要求较高的场合中适用,一个队列想做成镜像队列,需要先设置policy,然后客户端创建队列的时候,rabbitmq集群根据“队列名称”自动设置是普通集群模式或镜像队列。具体如下:
队列通过策略来使能镜像。策略能在任何时刻改变,rabbitmq队列也近可能的将队列随着策略变化而变化;非镜像队列和镜像队列之间是有区别的,前者缺乏额外的镜像基础设施,没有任何slave,因此会运行得更快。
为了使队列称为镜像队列,你将会创建一个策略来匹配队列,设置策略有两个键“ha-mode和 ha-params(可选)”。ha-params根据ha-mode设置不同的值,下面表格说明这些key的选项
了解集群中的基本概念:
RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。不过,如前文所述,如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。
一个rabbitmq集 群中可以共享 user,vhost,queue,exchange等,所有的数据和状态都是必须在所有节点上复制的,一个例外是,那些当前只属于创建它的节点的消息队列,尽管它们可见且可被所有节点读取。rabbitmq节点可以动态的加入到集群中,一个节点它可以加入到集群中,也可以从集群环集群会进行一个基本的负载均衡。
集群中有两种节点:
1 内存节点:只保存状态到内存(一个例外的情况是:持久的queue的持久内容将被保存到disk)
2 磁盘节点:保存状态到内存和磁盘。
内存节点虽然不写入磁盘,但是它执行比磁盘节点要好。集群中,只需要一个磁盘节点来保存状态 就足够了
如果集群中只有内存节点,那么不能停止它们,否则所有的状态,消息等都会丢失。
思路:
那么具体如何实现RabbitMQ高可用,我们先搭建一个普通集群模式,在这个模式基础上再配置镜像模式实现高可用,Rabbit集群前增加一个反向代理,生产者、消费者通过反向代理访问RabbitMQ集群。
rabbitmq 安装 1.配置epel 源 on node1-2 rpm -ivh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm #http://mirrors.yun-idc.com/epel/5/x86_64/epel-release-5-4.noarch.rpm wget -O /etc/yum.repos.d/epel-erlang.repo #yum install erlang xmlto #wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.3/rabbitmq-server-3.5.3-1.noarch.rpm #rpm -ivh rabbitmq-server-3.5.3-1.noarch.rpm #/etc/init.d/rabbitmq-server restart # rabbitmqctl delete_user guest #rabitmqctl add_user admin password #rabbitmqctl set_user_tags admin administrator #rabbitmqctl add_vhost web #rabbitmqctl set_permissions -p web admin ".*" ".*" ".*" #rabbitmq restart #sudo rabbitmq-plugins enable rabbitmq_management #node2 # rabbitmqctl delete_user guest #rabitmqctl add_user admin password #rabbitmqctl set_user_tags admin administrator #rabbitmqctl add_vhost web #rabbitmqctl set_permissions -p web admin ".*" ".*" ".*" #rabbitmq restart #sudo vim /var/lib/rabbitmq/.erlang.cookie #保持2台node 上的文件一直,权限一样 -r-------- 1 rabbitmq rabbitmq 21 8月 14 10:21 /var/lib/rabbitmq/.erlang.cookie 将node1和node2 组成集群 node2 上面执行如下命令 #rabbitmqctl stop_app #rabbitmqctl join_cluster --disk [email protected] #rabbitmqctl start_app #rabbitmqctl cluster_status Cluster status of node ‘[email protected]‘ ... [{nodes,[{disc,[‘[email protected]‘]}, {ram,[‘[email protected]‘]}]}, {running_nodes,[‘[email protected]‘,‘[email protected]‘]}, {cluster_name,<<"[email protected]">>}, {partitions,[]}] #sudo rabbitmq-plugins enable rabbitmq_management #sudo rabbitmqctl stop_app sudo rabbitmqctl change_cluster_node_type ram sudo rabbitmqctl start_app 二,配置keepalived 集群 node1 ! Configuration File for keepalived global_defs { router_id LVS_TALARIS_RMQ_SVR } vrrp_instance VI_1 { state BACKUP interface eth0 virtual_router_id 47 priority 160 advert_int 1 nopreempt # lvs_sync_daemon_interface eth0 dont_track_primary garp_master_delay 5 authentication { auth_type PASS auth_pass [email protected] } virtual_ipaddress { 1.1.1.1 } } node2 ! Configuration File for keepalived global_defs { router_id LVS_TALARIS_RMQ_SVR } vrrp_instance VI_1 { state MASTER interface eth0 virtual_router_id 47 priority 150 advert_int 1 nopreempt # lvs_sync_daemon_interface eth0 dont_track_primary garp_master_delay 5 authentication { auth_type PASS auth_pass [email protected] } virtual_ipaddress { 1.1.1.1 } } node1 haproxy: global maxconn 100000 log /dev/log local0 notice defaults mode tcp log global option redispatch retries 3 timeout connect 5000ms timeout client 50000ms timeout server 50000ms listen stats :4966 mode http stats enable stats uri / frontend rmq_lb bind :27010 default_backend rmq_lb backend rmq_lb balance roundrobin server node1_5672 node1:5672 weight 1 maxconn 4096 check inter 1000 server node2_5672 node2:5672 weight 1 maxconn 4096 check inter 1000 node2 haproxy: global maxconn 100000 log /dev/log local0 notice defaults mode tcp log global option redispatch retries 3 timeout connect 5000ms timeout client 50000ms timeout server 50000ms listen stats :4966 mode http stats enable stats uri / frontend rmq_lb bind :27010 default_backend rmq_lb backend rmq_lb balance roundrobin server node1_5672 node1:5672 weight 1 maxconn 4096 check inter 1000 server node2_5672 node2:5672 weight 1 maxconn 4096 check inter 1000 #node1 supervisor [program:rmq_lb] command=/opt/sbin/haproxy -f /opt/etc/haproxy/rmq_lb.cfg autostart=true autorestart=unexpected startsecs=3 startretries=3 stopsignal=TERM stopwaitsecs=5 user=nobody stopasgroup=true killasgroup=true stdout_logfile=syslog stderr_logfile=syslog #node2 supervisor [program:rmq_lb] command=/opt/sbin/haproxy -f /opt/etc/haproxy/rmq_lb.cfg autostart=true autorestart=unexpected startsecs=3 startretries=3 stopsignal=TERM stopwaitsecs=5 user=nobody stopasgroup=true killasgroup=true stdout_logfile=syslog stderr_logfile=syslog #supervisor monitor [program:rmqmonitor] command=ruby /opt/sbin/rmq.watcher.rb -h statsd directory=/opt/ autostart=true user=root redirect_stderr=true stdout_logfile=/data/log/rmqmonitor/rmqmonitor.log #cat rmq.watcher.rb #!/usr/bin/env ruby # # Author: Jeff Vier <[email protected]> require ‘rubygems‘ require ‘digest‘ require ‘find‘ require ‘fileutils‘ require ‘json‘ require ‘socket‘ require ‘resolv‘ require ‘optparse‘ options = { :prefix => Socket.gethostname, :interval => 10, :host => ‘127.0.0.1‘, :port => 8125, :queues => false } OptionParser.new do |opts| opts.banner = "Usage: #{$0} [options]" opts.on(‘-P‘, ‘--prefix [STATSD_PREFIX]‘, "metric prefix (default: #{options[:prefix]})") { |prefix| options[:prefix] = "#{prefix}" } opts.on(‘-i‘, ‘--interval [SEC]‘,"reporting interval (default: #{options[:interval]})") { |interval| options[:interval] = interval } opts.on(‘-h‘, ‘--host [HOST]‘, "statsd host (default: #{options[:host]})") { |host| options[:host] = host } opts.on(‘-p‘, ‘--port [PORT]‘, "statsd port (default: #{options[:port]})") { |port| options[:port] = port } opts.on(‘-q‘, ‘--[no-]queues‘, "report queue metrics (default: #{options[:queues]})") { |queues| options[:queues] = queues } end.parse! ############################################################### # Typical StatsD class, pasted to avoid an external dependency: # Stolen from https://github.com/bvandenbos/statsd-client class Statsd Version = ‘0.0.8‘ class << self attr_accessor :host, :port def host_ip_addr @host_ip_addr ||= Resolv.getaddress(host) end def host=(h) @host_ip_addr = nil @host = h end # +stat+ to log timing for # +time+ is the time to log in ms def timing(stat, time = nil, sample_rate = 1) if block_given? start_time = Time.now.to_f yield time = ((Time.now.to_f - start_time) * 1000).floor end send_stats("#{stat}:#{time}|ms", sample_rate) end def gauge(stat, value, sample_rate = 1) send_stats("#{stat}:#{value}|g", sample_rate) end # +stats+ can be a string or an array of strings def increment(stats, sample_rate = 1) update_counter stats, 1, sample_rate end # +stats+ can be a string or an array of strings def decrement(stats, sample_rate = 1) update_counter stats, -1, sample_rate end # +stats+ can be a string or array of strings def update_counter(stats, delta = 1, sample_rate = 1) stats = Array(stats) send_stats(stats.map { |s| "#{s}:#{delta}|c" }, sample_rate) end private def send_stats(data, sample_rate = 1) data = Array(data) sampled_data = [] # Apply sample rate if less than one if sample_rate < 1 data.each do |d| if rand <= sample_rate sampled_data << "#{d}|@#{sample_rate}" end end data = sampled_data end return if data.empty? raise "host and port must be set" unless host && port begin sock = UDPSocket.new data.each do |d| sock.send(d, 0, host_ip_addr, port) end rescue => e puts "UDPSocket error: #{e}" ensure sock.close end true end end end ################################################################################ include FileUtils # allows use of FileUtils methods without the FileUtils:: prefix ie: mv_f(file, file2) or rm_rf(dir) STDOUT.sync = true # don‘t buffer STDOUT Statsd.host = options[:host] Statsd.port = options[:port] unless system ‘which rabbitmqadmin‘ raise "unable to locate the rabbitmqadmin command" end loop do overview = JSON.parse(`rabbitmqadmin --username=user_monitor --password=Monitor_2015 show overview -f raw_json`) prefix = "rabbitmq.#{options[:prefix]}.overview.object_totals" Statsd.gauge("#{prefix}.channels", overview[0][‘object_totals‘][‘channels‘]) Statsd.gauge("#{prefix}.connections", overview[0][‘object_totals‘][‘connections‘]) Statsd.gauge("#{prefix}.consumers", overview[0][‘object_totals‘][‘consumers‘]) Statsd.gauge("#{prefix}.exchanges", overview[0][‘object_totals‘][‘exchanges‘]) Statsd.gauge("#{prefix}.messages", overview[0][‘queue_totals‘][‘messages‘]) Statsd.gauge("#{prefix}.queues", overview[0][‘object_totals‘][‘queues‘]) if options[:queues] queues = JSON.parse(`rabbitmqadmin --username=user_monitor --password=Monitor_2015 list queues -f raw_json`) queues.each do |queue| if queue.key?(‘name‘) prefix = "#{options[:prefix]}.queues.#{queue[‘name‘]}" Statsd.gauge("#{prefix}.active_consumers", queue[‘active_consumers‘]) Statsd.gauge("#{prefix}.consumers", queue[‘consumers‘]) Statsd.gauge("#{prefix}.memory", queue[‘memory‘]) Statsd.gauge("#{prefix}.messages", queue[‘messages‘]) Statsd.gauge("#{prefix}.messages_ready", queue[‘messages_ready‘]) Statsd.gauge("#{prefix}.messages_unacknowledged", queue[‘messages_unacknowledged‘]) Statsd.gauge("#{prefix}.avg_egress_rate", queue[‘backing_queue_status‘][‘avg_egress_rate‘]) if queue[‘backing_queue_status‘] Statsd.gauge("#{prefix}.avg_ingress_rate", queue[‘backing_queue_status‘][‘avg_ingress_rate‘]) if queue[‘backing_queue_status‘] if queue.key?(‘message_stats‘) Statsd.gauge("#{prefix}.ack_rate", queue[‘message_stats‘][‘ack_details‘][‘rate‘]) if queue[‘message_stats‘][‘ack_details‘] Statsd.gauge("#{prefix}.deliver_rate", queue[‘message_stats‘][‘deliver_details‘][‘rate‘]) if queue[‘message_stats‘][‘deliver_details‘] Statsd.gauge("#{prefix}.deliver_get_rate", queue[‘message_stats‘][‘deliver_get_details‘][‘rate‘]) if queue[‘message_stats‘][‘deliver_get_details‘] Statsd.gauge("#{prefix}.publish_rate", queue[‘message_stats‘][‘publish_details‘][‘rate‘]) if queue[‘message_stats‘][‘publish_details‘] end end end end sleep options[:interval] end # 客户端, 将keepalived VIP 配置成域名 然后客户端连接 域名:haproxy 端口 zabbix 监控rmq UserParameter=rmq.messageTotal[*],/etc/zabbix/scripts/rmqmessage.py -u "$1" -p "$2" -H "$3" cat rmqmessage.py #!/usr/bin/env python #coding:utf-8 import requests,re from requests.auth import HTTPBasicAuth from optparse import OptionParser def RmqMessage(): usage="usage: %prog [options] arg" parser=OptionParser(usage) parser.add_option("-u", "--user",action="store",type="string",help="USERNAME") parser.add_option("-p", "--passwd",action="store",type="string",help="PASSWORD") parser.add_option("-H", "--hostname",action="store",type="string",help="HOSTNAME") (options,args) = parser.parse_args() user=options.user passwd=options.passwd hostname=options.hostname url="http://%s.elenet.me:15672/api/overview" %(hostname) if re.match(r"^\d+.",hostname): url="http://%s:15672/api/overview" %(hostname) try: r = requests.get(url, auth=HTTPBasicAuth(user, passwd)) result=r.json() QmessageTotal=result[‘queue_totals‘][‘messages‘] print QmessageTotal return QmessageTotal except Exception, e: return -1 RmqMessage()