了解saltstack的通信协议zeromq(二)

上文讨论了PAIR/PAIR,REQ/REP两种模式,现在看看PUB/SUB和PUSH/PULL模式。

PUB/SUB:发布订阅模式,跟我们订阅新闻类似的,采用异步IO,多对多模式,如果没有订阅,服务端发送的消息直接丢弃掉。

pub_server.py

import zmq
import random
import sys
import time

port = "5556"
if len(sys.argv) > 1:
        port =  sys.argv[1]
        int(port)

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
        topic = random.randrange(9999,10005)
        messagedata = random.randrange(1,215) - 80
        print "%d %d" % (topic, messagedata)
        socket.send("%d %d" % (topic, messagedata))
        time.sleep(1)

sub_client.py

import sys
import time
import zmq

port = "5556"
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect("tcp://localhost:%s" % port)

#socket.set(zmq.UNSUBSCRIBE, messagedata)
topicfilter = "10001"
socket.set(zmq.SUBSCRIBE, topicfilter)

#Process 5 updates
total_value = 0
#for update_nbr in range (5):
while True:
        string = socket.recv()
        topic, messagedata = string.split()
#       total_value += int(messagedata)
        print topic, messagedata
        time.sleep(1)

zmq.SUBCRIBE是用来指明订阅某种消息,这里订阅的是出现10001的信息

PUSH/PULL:任务分发模式,主要用于分布式计算的,将很多个任务分发到worker,然后worker将计算结果发送到结果收集器。

producer.py

import time
import zmq
import random

context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind(‘tcp://*:5557‘)

# sync start of batch
# be sure all worker connect success
sink = context.socket(zmq.PUSH)
sink.connect(‘tcp://0.0.0.0:5558‘)
print ‘Press Enter when the workers are ready:‘
_ = raw_input()
print ‘Sending tasks to workers...‘
sink.send(b‘0‘)

for task_nbr in xrange(1000000):
        workload = random.randint(1,10)
        sender.send_string(u‘%i‘ % workload)

for i in range(10):
        sender.send_string(u‘0‘)
time.sleep(1)

consumer.py

import sys
import time
import zmq

context = zmq.Context()

# Socket to recevie messages on
receiver = context.socket(zmq.PULL)
receiver.connect(‘tcp://localhost:5557‘)

# socket to send messages
sender = context.socket(zmq.PUSH)
sender.connect(‘tcp://localhost:5558‘)

while True:
        a_str = receiver.recv_string()
        num = int(a_str)
        if num % 2 == 0 or a_str == u‘0‘:
                sender.send_string(a_str)

result.py

import sys
import time
import zmq

context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

# Wait for start of batch
s = receiver.recv()
sum = 0
flag = 0
# Start our clock now
tstart = time.time()
while True:
        a_str = receiver.recv_string()
        num = int(a_str)
        sum += num
        if a_str == ‘0‘:
                flag += 1
        if flag == 10:
                break

tend = time.time()
tdiff = tend - tstart
total_msec = tdiff * 1000
print "Total elapsed time: %d msec" % total_msec

这个结果并不精确,分别是启动1个、2个、3个、4个consumer.py进程的测试结果,说明计算缩短了时间。

Queue,Forwarder,Streamer分别是REQ/REP、PUB/SUB、PUSH/PULL的代理,用于代理不同网段的机器。

关于代理的用法,这里不讲述。请参考下面地址。

参考地址:

http://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/pyzmq.html

https://github.com/anjuke/zguide-cn

http://zguide.zeromq.org/

了解saltstack的通信协议zeromq(二),布布扣,bubuko.com

时间: 2025-01-04 21:50:00

了解saltstack的通信协议zeromq(二)的相关文章

了解saltstack的通信协议zeromq(一)

学了saltstack有一段时间了,说实话,对于一个python爱好者来说salt源代码真是一个宝藏啊.于是乎去看了源代码,发现所有问题都卡在了底层通信上,在看saltstack之前都不知道有一个这么好的zeromq通信协议.现在就来记录关于zeromq的学习笔记. zmq是什么:zmq是基于之前协议(tcp,ipc,inproc)开发的并发框架,采用异步IO非阻塞方式,多对多的通信模式,无需在意服务端和客户端的启动顺序.它包含四种通信模式:PAIR/PAIR,REP/REQ,PUB/SUB,P

基于STM32之UART串口通信协议(二)发送

一.前言 1.简介 在上一篇UART详解中,已经有了关于UART的详细介绍了,也有关于如何使用STM32CubeMX来配置UART的操作了,而在该篇博客,主要会讲解一下如何实现UART串口的发送功能. 2.UART简介 嵌入式开发中,UART串口通信协议是我们常用的通信协议之一,全称叫做通用异步收发传输器(Universal Asynchronous Receiver/Transmitter). 3.准备工作 在UART详解中已经有了详细的说明,在这里就不说明了. 注: 建议每次编写好一个相关功

Saltstack 基础模块记录二

一. Archive模块 功能:实现系统层面的压缩包调用,支持gzip.gunzip.rar.tar.unrar.unzip等 示例: 1.采用gunzip解压sourcefile.txt.gz包 salt '*' archive.gunzip sourcefile.txt.gz 2.采用gzip压缩sourcefile.txt文件 salt '*' archive.gzip sourcefile.txt 3.API调用: client.cmd('*','archive.gunzip',['so

【saltstack学习系列之二】salt-minion端修改主机名

删除minion-id和pki目录cd /etc/saltrm -rf minion_id pki/ 修改/etc/sysconfig/network文件vim /etc/sysconfig/networkHOSTNAME=Admin2-saltstack.littlebee.com 保存退出 登出系统重新进入 重启salt-minion服务/etc/init.d/salt-minion restart .在master端把新的key加入进去salt-key -ya Admin2-saltsta

串口通信协议数据处理二

该部分主要针对主动发送的连续仪表数据进行处理.只处理固定长度的数据. 在称重系统中,主要针对金钟和托利多的仪表进行处理. 串口的数据接收有两种方式,一种是通过事件触发方式,通过监听DataReceived事件:另一只就是通过循环主动查询BytesToRead属性,来判断是否有数据需处理.主动查询程序控制更方便,采用主动查询. 要进行通讯数据的处理,首先需要接收从com口获取的数据.接收到的数据需要一个内存空间进行存储.考虑到接收到的数据是连续数据,且数据接收后要进行后期的处理,处理后就要对数据进

Saltstack 服务器基本安装

Salt介绍 Salt是一个基础平台管理工具 Salt是一个配置管理系统,能够维护预定义状态的远程节点 Salt是一个分布式远程执行系统,用来在远程节点上执行命令和查询数据 Salt核心功能 使命令发送到远程系统是并行的而不是串行的 使用安全加密的协议 使用最小最快的网络载荷 提供简单的编程接口 Salt有点 Saltstack是用python语音编写 相当于设备是轻量级别的 Saltstack通讯采用ZEROMQ实现使得它很快速 Saltstack是开源的 通过python可以自己写模块 Sa

saltstack python api 调用

一.Python安装saltstack pip install salt 二.接口调用实例 1.test.ping import salt.client client = salt.client.LocalClient() ret = client.cmd('*','test.ping') print ret 2.cmd模块 1)功能:实现远程命令的调用执行 2)实例: 获取所有主机的内存使用情况 salt '*' cmd.run "free -m" client.cmd('SN201

saltstack 安装及遇到问题

需求:经常对线上服务器做一些修改配置文件,搭建等操作,很麻烦 系统环境是RHEL5 安装过程和很多网友一样,先安装epel rpm -Uvh http://mirror.pnl.gov/epel/5/x86_64/epel-release-5-4.noarch.rpm  yum update yum install salt-minion yum install salt-master 安装过程巨简单,装好后,因为saltstack 是用zeromq来进行通信的,minion运行的时候log报错

SaltStack使用

一. SaltStack基本使用方法 二. SaltStack的命令行工具 三. SaltStack的远程执行命令 四. SaltStack的模块 Grains Grains 是minion启动的时候采集的系统静态,包括CPU,操作系统,文件系统,硬盘等等 显示所有Grains信息分类:salt '*' grains.ls 显示所有静态信息:salt '*' grains.items Grains应用场景 1. 获取系统信息 2. 用于分类查找minion 3. 与其他模块结合完成更灵活的min