RabbitMQ 安装和配置

Rabbitmq集群高可用

RabbitMQ是用erlang开发的,集群非常方便,因为erlang天生就是一门分布式语言,但其本身并不支持负载均衡。

Rabbit模式大概分为以下三种:单一模式、普通模式、镜像模式

单一模式:最简单的情况,非集群模式。

及单实例服务。

普通模式:默认的集群模式。

queue创建之后,如果没有其它policy,则queue就会按照普通模式集群。对于Queue来说,消息实体只存在于其中一个节点,A、B两个节点仅有相同的元数据,即队列结构,但队列的元数据仅保存有一份,即创建该队列的rabbitmq节点(A节点),当A节点宕机,你可以去其B节点查看,./rabbitmqctl list_queues 发现该队列已经丢失,但声明的exchange还存在。

当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer。

所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连A或B,出口总在A,会产生瓶颈。

该模式存在一个问题就是当A节点故障后,B节点无法取到A节点中还未消费的消息实体。

如果做了消息持久化,那么得等A节点恢复,然后才可被消费;如果没有持久化的话,队列数据就丢失了。

镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

该模式解决了上述问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。

该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。

所以在对可靠性要求较高的场合中适用,一个队列想做成镜像队列,需要先设置policy,然后客户端创建队列的时候,rabbitmq集群根据“队列名称”自动设置是普通集群模式或镜像队列。具体如下:

队列通过策略来使能镜像。策略能在任何时刻改变,rabbitmq队列也近可能的将队列随着策略变化而变化;非镜像队列和镜像队列之间是有区别的,前者缺乏额外的镜像基础设施,没有任何slave,因此会运行得更快。

为了使队列称为镜像队列,你将会创建一个策略来匹配队列,设置策略有两个键“ha-mode和 ha-params(可选)”。ha-params根据ha-mode设置不同的值,下面表格说明这些key的选项

了解集群中的基本概念:

RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。不过,如前文所述,如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。

一个rabbitmq集 群中可以共享 user,vhost,queue,exchange等,所有的数据和状态都是必须在所有节点上复制的,一个例外是,那些当前只属于创建它的节点的消息队列,尽管它们可见且可被所有节点读取。rabbitmq节点可以动态的加入到集群中,一个节点它可以加入到集群中,也可以从集群环集群会进行一个基本的负载均衡。
集群中有两种节点:
1 内存节点:只保存状态到内存(一个例外的情况是:持久的queue的持久内容将被保存到disk)
2 磁盘节点:保存状态到内存和磁盘。
内存节点虽然不写入磁盘,但是它执行比磁盘节点要好。集群中,只需要一个磁盘节点来保存状态 就足够了
如果集群中只有内存节点,那么不能停止它们,否则所有的状态,消息等都会丢失。

思路:

那么具体如何实现RabbitMQ高可用,我们先搭建一个普通集群模式,在这个模式基础上再配置镜像模式实现高可用,Rabbit集群前增加一个反向代理,生产者、消费者通过反向代理访问RabbitMQ集群。

rabbitmq 安装

1.配置epel 源 on node1-2
rpm -ivh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm           
#http://mirrors.yun-idc.com/epel/5/x86_64/epel-release-5-4.noarch.rpm           
wget -O /etc/yum.repos.d/epel-erlang.repo 

 
 #yum install erlang xmlto
 
 #wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.3/rabbitmq-server-3.5.3-1.noarch.rpm  
 #rpm -ivh rabbitmq-server-3.5.3-1.noarch.rpm
 
 
 #/etc/init.d/rabbitmq-server restart
 
 # rabbitmqctl delete_user guest
 #rabitmqctl add_user admin password
 #rabbitmqctl set_user_tags admin administrator
 #rabbitmqctl add_vhost web
 #rabbitmqctl set_permissions -p web admin ".*" ".*" ".*"
 #rabbitmq restart
 #sudo rabbitmq-plugins enable rabbitmq_management
 
 #node2
  # rabbitmqctl delete_user guest
 #rabitmqctl add_user admin password
 #rabbitmqctl set_user_tags admin administrator
 #rabbitmqctl add_vhost web
 #rabbitmqctl set_permissions -p web admin ".*" ".*" ".*"
 #rabbitmq restart
 #sudo vim /var/lib/rabbitmq/.erlang.cookie #保持2台node 上的文件一直,权限一样
 -r-------- 1 rabbitmq rabbitmq 21 8月  14 10:21 /var/lib/rabbitmq/.erlang.cookie
 
 将node1和node2 组成集群
 node2 上面执行如下命令
 #rabbitmqctl stop_app
 #rabbitmqctl join_cluster --disk [email protected]  
 #rabbitmqctl start_app
 #rabbitmqctl cluster_status
 
 Cluster status of node ‘[email protected]‘ ...
[{nodes,[{disc,[‘[email protected]‘]},
         {ram,[‘[email protected]‘]}]},
 {running_nodes,[‘[email protected]‘,‘[email protected]‘]},
 {cluster_name,<<"[email protected]">>},
 {partitions,[]}]
 
 
 #sudo rabbitmq-plugins enable rabbitmq_management
 
 #sudo rabbitmqctl stop_app
  sudo rabbitmqctl change_cluster_node_type ram
  sudo rabbitmqctl start_app
 
 
 二,配置keepalived 集群
 
 node1
 ! Configuration File for keepalived

global_defs {
   router_id LVS_TALARIS_RMQ_SVR
}

vrrp_instance VI_1 {
    state BACKUP
    interface eth0
    virtual_router_id 47
    priority 160
    advert_int 1
    nopreempt
#    lvs_sync_daemon_interface eth0
    dont_track_primary
    garp_master_delay  5
    authentication {
        auth_type PASS
        auth_pass [email protected]
    }
    virtual_ipaddress {
        1.1.1.1
    }
}

node2
! Configuration File for keepalived

global_defs {
   router_id LVS_TALARIS_RMQ_SVR
}

vrrp_instance VI_1 {
    state MASTER
    interface eth0
    virtual_router_id 47
    priority 150
    advert_int 1
    nopreempt
#    lvs_sync_daemon_interface eth0
    dont_track_primary
    garp_master_delay  5
    authentication {
        auth_type PASS
        auth_pass [email protected]
    }
    virtual_ipaddress {
        1.1.1.1
    }
}

node1 haproxy:
global
    maxconn 100000
    log /dev/log local0 notice

defaults
    mode tcp
    log global

    option redispatch
    retries 3

    timeout connect 5000ms
    timeout client 50000ms
    timeout server 50000ms

listen stats :4966
    mode http
    stats enable
    stats uri /

frontend rmq_lb
    bind :27010
    default_backend rmq_lb

backend rmq_lb
  balance roundrobin
  server node1_5672 node1:5672 weight 1 maxconn 4096 check inter 1000
  server node2_5672 node2:5672 weight 1 maxconn 4096 check inter 1000
  
  
  node2 haproxy:
  
  global
    maxconn 100000
    log /dev/log local0 notice

defaults
    mode tcp
    log global

    option redispatch
    retries 3

    timeout connect 5000ms
    timeout client 50000ms
    timeout server 50000ms

listen stats :4966
    mode http
    stats enable
    stats uri /

frontend rmq_lb
    bind :27010
    default_backend rmq_lb

backend rmq_lb
  balance roundrobin
  server node1_5672 node1:5672 weight 1 maxconn 4096 check inter 1000
  server node2_5672 node2:5672 weight 1 maxconn 4096 check inter 1000
  
  
  
  #node1 supervisor
  [program:rmq_lb]
command=/opt/sbin/haproxy -f /opt/etc/haproxy/rmq_lb.cfg
autostart=true
autorestart=unexpected
startsecs=3
startretries=3
stopsignal=TERM
stopwaitsecs=5
user=nobody
stopasgroup=true
killasgroup=true
stdout_logfile=syslog
stderr_logfile=syslog

#node2 supervisor
[program:rmq_lb]
command=/opt/sbin/haproxy -f /opt/etc/haproxy/rmq_lb.cfg
autostart=true
autorestart=unexpected
startsecs=3
startretries=3
stopsignal=TERM
stopwaitsecs=5
user=nobody
stopasgroup=true
killasgroup=true
stdout_logfile=syslog
stderr_logfile=syslog

#supervisor monitor
[program:rmqmonitor]
command=ruby /opt/sbin/rmq.watcher.rb  -h statsd
directory=/opt/
autostart=true
user=root
redirect_stderr=true
stdout_logfile=/data/log/rmqmonitor/rmqmonitor.log

#cat rmq.watcher.rb
#!/usr/bin/env ruby
#
# Author: Jeff Vier <[email protected]>

require ‘rubygems‘
require ‘digest‘
require ‘find‘
require ‘fileutils‘
require ‘json‘
require ‘socket‘
require ‘resolv‘

require ‘optparse‘

options = {
  :prefix => Socket.gethostname,
  :interval => 10,
  :host => ‘127.0.0.1‘,
  :port => 8125,
  :queues => false
}
OptionParser.new do |opts|
  opts.banner = "Usage: #{$0} [options]"

  opts.on(‘-P‘, ‘--prefix [STATSD_PREFIX]‘, "metric prefix (default: #{options[:prefix]})") { |prefix|   options[:prefix] = "#{prefix}" }
  opts.on(‘-i‘, ‘--interval [SEC]‘,"reporting interval (default: #{options[:interval]})")   { |interval| options[:interval] = interval }
  opts.on(‘-h‘, ‘--host [HOST]‘,   "statsd host (default: #{options[:host]})")              { |host|     options[:host] = host }
  opts.on(‘-p‘, ‘--port [PORT]‘,   "statsd port (default: #{options[:port]})")              { |port|     options[:port] = port }
  opts.on(‘-q‘, ‘--[no-]queues‘,   "report queue metrics (default: #{options[:queues]})")   { |queues|   options[:queues] = queues }
end.parse!

###############################################################
# Typical StatsD class, pasted to avoid an external dependency:
# Stolen from https://github.com/bvandenbos/statsd-client

class Statsd

  Version = ‘0.0.8‘

  class << self

    attr_accessor :host, :port

    def host_ip_addr
      @host_ip_addr ||= Resolv.getaddress(host)
    end

    def host=(h)
      @host_ip_addr = nil
      @host = h
    end

    # +stat+ to log timing for
    # +time+ is the time to log in ms
    def timing(stat, time = nil, sample_rate = 1)
      if block_given?
        start_time = Time.now.to_f
        yield
        time = ((Time.now.to_f - start_time) * 1000).floor
      end
      send_stats("#{stat}:#{time}|ms", sample_rate)
    end

    def gauge(stat, value, sample_rate = 1)
      send_stats("#{stat}:#{value}|g", sample_rate)
    end

    # +stats+ can be a string or an array of strings
    def increment(stats, sample_rate = 1)
      update_counter stats, 1, sample_rate
    end

    # +stats+ can be a string or an array of strings
    def decrement(stats, sample_rate = 1)
      update_counter stats, -1, sample_rate
    end

    # +stats+ can be a string or array of strings
    def update_counter(stats, delta = 1, sample_rate = 1)
      stats = Array(stats)
      send_stats(stats.map { |s| "#{s}:#{delta}|c" }, sample_rate)
    end

    private

    def send_stats(data, sample_rate = 1)
      data = Array(data)
      sampled_data = []

      # Apply sample rate if less than one
      if sample_rate < 1
        data.each do |d|
          if rand <= sample_rate
            sampled_data << "#{d}|@#{sample_rate}"
          end
        end
        data = sampled_data
      end

      return if data.empty?

      raise "host and port must be set" unless host && port

      begin
        sock = UDPSocket.new
        data.each do |d|
          sock.send(d, 0, host_ip_addr, port)
        end
      rescue => e
        puts "UDPSocket error: #{e}"
      ensure
        sock.close
      end
      true
    end
  end
end

################################################################################

include FileUtils # allows use of FileUtils methods without the FileUtils:: prefix ie: mv_f(file, file2) or rm_rf(dir)

STDOUT.sync = true # don‘t buffer STDOUT
Statsd.host = options[:host]
Statsd.port = options[:port]

unless system ‘which rabbitmqadmin‘
  raise "unable to locate the rabbitmqadmin command"
end

loop do
  overview = JSON.parse(`rabbitmqadmin --username=user_monitor --password=Monitor_2015 show overview -f raw_json`)
  prefix = "rabbitmq.#{options[:prefix]}.overview.object_totals"
  Statsd.gauge("#{prefix}.channels", overview[0][‘object_totals‘][‘channels‘])
  Statsd.gauge("#{prefix}.connections", overview[0][‘object_totals‘][‘connections‘])
  Statsd.gauge("#{prefix}.consumers", overview[0][‘object_totals‘][‘consumers‘])
  Statsd.gauge("#{prefix}.exchanges", overview[0][‘object_totals‘][‘exchanges‘])
  Statsd.gauge("#{prefix}.messages", overview[0][‘queue_totals‘][‘messages‘])
  Statsd.gauge("#{prefix}.queues", overview[0][‘object_totals‘][‘queues‘])

  if options[:queues]
    queues = JSON.parse(`rabbitmqadmin --username=user_monitor --password=Monitor_2015 list queues -f raw_json`)
    queues.each do |queue|
      if queue.key?(‘name‘)
        prefix = "#{options[:prefix]}.queues.#{queue[‘name‘]}"
        Statsd.gauge("#{prefix}.active_consumers", queue[‘active_consumers‘])
        Statsd.gauge("#{prefix}.consumers", queue[‘consumers‘])
        Statsd.gauge("#{prefix}.memory", queue[‘memory‘])
        Statsd.gauge("#{prefix}.messages", queue[‘messages‘])
        Statsd.gauge("#{prefix}.messages_ready", queue[‘messages_ready‘])
        Statsd.gauge("#{prefix}.messages_unacknowledged", queue[‘messages_unacknowledged‘])
        Statsd.gauge("#{prefix}.avg_egress_rate", queue[‘backing_queue_status‘][‘avg_egress_rate‘])   if queue[‘backing_queue_status‘]
        Statsd.gauge("#{prefix}.avg_ingress_rate", queue[‘backing_queue_status‘][‘avg_ingress_rate‘]) if queue[‘backing_queue_status‘]
        if queue.key?(‘message_stats‘)
          Statsd.gauge("#{prefix}.ack_rate", queue[‘message_stats‘][‘ack_details‘][‘rate‘])                  if queue[‘message_stats‘][‘ack_details‘]
          Statsd.gauge("#{prefix}.deliver_rate", queue[‘message_stats‘][‘deliver_details‘][‘rate‘])          if queue[‘message_stats‘][‘deliver_details‘]
          Statsd.gauge("#{prefix}.deliver_get_rate", queue[‘message_stats‘][‘deliver_get_details‘][‘rate‘])  if queue[‘message_stats‘][‘deliver_get_details‘]
          Statsd.gauge("#{prefix}.publish_rate", queue[‘message_stats‘][‘publish_details‘][‘rate‘])          if queue[‘message_stats‘][‘publish_details‘]
        end
      end
    end
  end

  sleep options[:interval]
end

# 客户端,
将keepalived VIP 配置成域名
然后客户端连接 域名:haproxy 端口

zabbix 监控rmq 
UserParameter=rmq.messageTotal[*],/etc/zabbix/scripts/rmqmessage.py -u "$1" -p "$2" -H "$3"

cat rmqmessage.py

#!/usr/bin/env python
#coding:utf-8
import requests,re
from requests.auth import HTTPBasicAuth
from optparse import OptionParser

def RmqMessage():
   usage="usage: %prog [options] arg"
   parser=OptionParser(usage)
   parser.add_option("-u", "--user",action="store",type="string",help="USERNAME")
   parser.add_option("-p", "--passwd",action="store",type="string",help="PASSWORD")
   parser.add_option("-H", "--hostname",action="store",type="string",help="HOSTNAME")
   (options,args) = parser.parse_args()
   user=options.user
   passwd=options.passwd

   hostname=options.hostname
   url="http://%s.elenet.me:15672/api/overview" %(hostname)
   if re.match(r"^\d+.",hostname):
   	url="http://%s:15672/api/overview" %(hostname)
   try:
	r = requests.get(url, auth=HTTPBasicAuth(user, passwd))
	result=r.json()
	QmessageTotal=result[‘queue_totals‘][‘messages‘]
	print QmessageTotal
	return QmessageTotal
   except Exception, e:
	return -1
RmqMessage()
时间: 2024-11-03 22:42:40

RabbitMQ 安装和配置的相关文章

RabbitMQ安装和配置

RabbitMQ: MQ:message queue.MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBM WEBSPHERE MQ

CentOS7 erlang RabbitMQ 安装并且配置远程访问

对于安装RabbitMQ,我也是着实废了一番力气,版本下载很多,但是都存在编译问题,编译不通过,报错找不到错误原因,甚至error都是***这样的存在. 其他的依赖, 我没有测试过,因为我环境中存在: Python,simplejson,安装 介于RabbitMQ是依赖erlang语言. erlang安装比较重要,版本问题,编译问题,不能存在任何问题,否则RabbitMQ是绝对安装不了的,即便是二进制包也不能使用 可以使用yum安装,我找了很久,用下面的方式成功 1.下载源码wget http:

AMQP之RabbitMQ安装与配置

刚开始接触RabbitMQ,今天尝试安装,具体流程如下,参照了一些网上同行的经验,环境如下图: rabbitmq版本:3.1.5 下载地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.5/rabbitmq-server-3.1.5.tar.gz 文件下载目录:/home/gao/server以下简称为当前目录 准备工作:安装依赖环境 yum install build-essential openssl openssl-devel

RabbitMQ安装,配置

安装(centos系统) 第一步: 下载rabbitmq安装包 第二步: 安装erlang 1) 安装Erlang Solutions仓库到你的系统(目的在于让你可以使用yum安装到最新版本的erlang, 如果不设置, yum安装的erlang版本通常太低)  wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm  rpm -Uvh erlang-solutions-1.0-1.noarch.r

RabbitMQ安装与配置

1.安装 Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装RabbitMQ之前要先安装Erlang. erlang:http://www.erlang.org/download.html rabbitmq:http://www.rabbitmq.com/download.html 注意:1.默认安装的Rabbit MQ 监听端口是:5672 2.配置 1. 安装完以后erlang需要手动设置ERLANG_HOME 的系统变量. 输入:set ERLANG_HOME=C:\P

RabbitMQ学习系列(二): RabbitMQ安装与配置

1.安装 Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装RabbitMQ之前要先安装Erlang. erlang:http://www.erlang.org/download.html rabbitmq:http://www.rabbitmq.com/download.html 注意: 1.现在先别装最新的 3.6.3 ,本人在安装完最新的版本,queue 队列有问题,降到了 3.6.2 就解决了. 2.默认安装的Rabbit MQ 监听端口是:5672 2.配置 1.

RabbitMQ安装、配置

RabbitMQ是最流行开源消息系统,已经超过35000个RabbitMQ生产部署在各种规模的企业.RabbitMQ是轻量级.易部署自建机房或云上.它支持多种消息协议,RabbitMQ支持分布式部署满足高可用.高扩展的需求.RabbitMQ能运行在各种操作系统.云环境并且提供各种开发工具支持多种开发语言.这篇文章是个人生产环境部署的操作,各位大神可以根据自己环境做调整,欢迎各位的批评与建议.[温馨提示:在复制命令时注意先放在编辑器里面格式化下] 第一步:下载正确的软件 Socat下载连接: ht

linux(ubuntu)下的rabbitmq安装与配置

今天突然对rabbitMQ来了兴趣,就在虚拟机上装了个玩玩(虚拟机上安装的ubuntu 14.04 ,可以输入lsb_release -a查看版本信息). 只要是linux内核的,应该基本安装方式都大同小异.下面是最简便的一种安装方式,适合一休哥这种懒人使用(使用源码或者其他方式都要 先手动安装一大堆依赖): APT安装: ?1.添加以下安装源到/etc/apt/sources.list中: deb http://www.rabbitmq.com/debian/ testing main 2.执

Ubuntu Rabbitmq 安装与配置

原文链接:http://blog.csdn.net/rickey17/article/details/72756766 添加源 新增公钥(不加会有警告) 更新源 安装rabbitmq-server echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list wget -O- https://www.rabbitmq.com/rabbitmq-rel