背景
Javashop电商系统的消息总线使用的事rabbitmq,在订单创建、静态页生成、索引生成等等业务中大量采用异步消息系统,这个对于mq高可用的要求有两个重要的考量:
1、集群化
2、可扩容
3、冗灾
冗灾就要实现rabbitmq的持久化,要考虑到rabbitmq宕机的情况,当rabbitmq因不可抗因素挂掉了,这时有一些消息还没来得及被消费,当我们再恢复了rabbitmq的运行后,这些消息应该同时被恢复,可以再次被消费。
本文着重讨论rabbitmq的k8s的持久化部署方案,当然提供在方案也支持了集群及扩容。
思路
1、数据的存储
在k8s中的持久化部署不可避免的要用到持久卷,我们采用nfs方式的持久卷来存储es数据。
持久卷的详细介绍请见这里:
https://kubernetes.io/docs/concepts/storage/persistent-volumes/
2、多节点的权限问题
rabbit的数据目录默认只允许一个节点访问,但在k8s上采用了持久卷,所有节点的数据都存储在这个卷上,这会导致rabbitmq的数据目录访问权限问题:
1 {{failed_to_cluster_with, 2 [[email protected]], 3 "Mnesia could not connect to any nodes."}, 4 {rabbit,start,[normal,[]]}}
我们通过指定节点名称的方式来解决,稍后可以在配置文件中看到具体的配置项。
部署过程
一、pv(持久卷的建立)
先要建立nfs服务器
对于持久卷的结构规划如下:
1 /nfs/data/mqdata
根据如上规划建立nfs服务:
1 #master节点安装nfs 2 yum -y install nfs-utils 3 #创建nfs目录 4 mkdir -p /nfs/data/{mqdata,esmaster,esdata} 5 #修改权限 6 chmod -R 777 /nfs/data/ 7 8 #编辑export文件 9 vim /etc/exports 10 11 粘贴如下内容: 12 /nfs/data/mqdata *(rw,no_root_squash,sync) 13 14 #配置生效 15 exportfs -r 16 #查看生效 17 exportfs 18 19 #启动rpcbind、nfs服务 20 systemctl restart rpcbind && systemctl enable rpcbind 21 systemctl restart nfs && systemctl enable nfs 22 23 #查看 RPC 服务的注册状况 24 rpcinfo -p localhost 25 26 #showmount测试,这里的ip输入master节点的局域网ip 27 showmount -e <your ip>
如果成功可以看到可被挂载的目录:
1 # showmount -e 172.17.14.73 2 Export list for 172.17.14.73: 3 /nfs/data/esmaster * 4 /nfs/data/mqdata *
接下来,要在每一个节点上安装nfs服务以便使k8s可以挂载nfs目录
1 #所有node节点安装客户端 2 yum -y install nfs-utils 3 systemctl start nfs && systemctl enable nfs
这样就为k8s的持久卷做好了准备。
建立持久卷
有了nfs的准备,我就可以建立持久卷了:
我们分享了javashop内部使用的yaml仓库供大家参考:
https://gitee.com/enation/rabbitmq-on-kubernetes
在您的k8s maseter节点服务器上 clone我们准备好的yaml文件
https://gitee.com/enation/rabbitmq-on-kubernetes.git
修改根目录中的pv.yaml
修改其中的server配置为nfs服务器的IP:
1 nfs: 2 server: 192.168.1.100 #这里请写nfs服务器的ip
通过下面的命令建立持久卷:
1 kubectl create -f pv.yaml
通过以下命令查看持久卷是否建立成功:
1 kubectl get pv
部署rabbitmq
在k8s master节点上执行下面的命令创建namespace:
1 kubectl create namespace ns-rabbitmq
执行下面的命令创建rabbitmq集群(执行整个目录的所有配置文件)
1 kubectl create -f rabbitmq/
通过以上部署我们建立了一个ns-rabbitmq的namespace,并在其中创建了相应的pvc、角色账号,有状态副本集以及服务。
镜像
使用的是javashop自己基于rabbitmq:3.8做的,加入了延迟消息插件,其他没有变化。
服务
我们默认开启了对外nodeport端口,对应关系:
31672->15672
30672->5672
k8s内部可以通过下面的服务名称访问:
rabbitmq.ns-rabbitmq:15672
rabbitmq.ns-rabbitmq:5672
等待容器都启动成功后验证。
验证
使用附带程序校验
- 发送消息(注释掉接收消息)
- 观察mq的队列中有消息堆积
- 删除mq的副本集
- 恢复mq副本集
- 接收消息
关键技术点
1、集群发现:
使用rabbitmq提供的k8s对等发现插件:rabbitmq_peer_discovery_k8s
2、映射持久卷
映射到:/var/lib/rabbitmq/mnesia
3、自定义数据目录
1 - name: RABBITMQ_MNESIA_BASE 2 value: /var/lib/rabbitmq/mnesia/$(MY_POD_NAME)
其中MY_POD_NAME是读取的容器名称,通过有状态副本集保证唯一性的绑定:
1 - name: MY_POD_NAMESPACE 2 valueFrom: 3 fieldRef: 4 fieldPath: metadata.namespace
附带验证程序
1 private static CachingConnectionFactory connectionFactory; 2 private static void initConnectionFactory() { 3 connectionFactory = new CachingConnectionFactory(); 4 connectionFactory.setHost("localhost"); 5 connectionFactory.setPort(5672); 6 connectionFactory.setUsername("guest"); 7 connectionFactory.setPassword("guest"); 8 } 9 public static void main(String[] args) { 10 initConnectionFactory(); 11 //发送消息 12 send(); 13 //接收消息 14 receive(); 15 } 16 private static void receive() { 17 AmqpTemplate template = new RabbitTemplate(connectionFactory); 18 String foo = (String) template.receiveAndConvert("myqueue"); 19 System.out.println("get message : "+ foo); 20 } 21 private static void send() { 22 AmqpAdmin admin = new RabbitAdmin(connectionFactory); 23 admin.declareQueue(new Queue("myqueue",true)); 24 AmqpTemplate template = new RabbitTemplate(connectionFactory); 25 template.convertAndSend("myqueue", "foo"); 26 }
欢迎关注Javashop技术分享公众号,观看更多的视频讲解:
原文地址:https://www.cnblogs.com/javashop-docs/p/12345476.html