ZeroMQ-Poller

In this program, we will create a command server that tells when the worker should exit. Workers subscribes to a topic published by a publisher and prints it. It exits when it receives “Exit” message from the command server.

# zmqpolling.py

import zmq
import time
import sys
import random
from  multiprocessing import Process

# PUSH server that sends command to workers to continue working or exit.
def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:%s" % port)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            break
        time.sleep (1)

# Publisher that publishes for topics “8”,”9”,”10” in random order.
def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(1)   

# Worker that works on messages received for topic “9”. 
# We setup zmq poller to poll for messages on the socket connection to both command server and publisher.
def client(port_push, port_sub):
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://localhost:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://localhost:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    # Initialize poll set
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)

    # We poll the sockets to check if we have messages to recv and work on it. Worker continues working until it receives exit condition.
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll())
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

# Finally, we fire up all the processes.
if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    Process(target=client, args=(server_push_port,server_pub_port,)).start()
# result:

(D:\anaconda) C:\Users\admin\Desktop\opt>python zmqpolling.py
Running server on port:  5556
Running server on port:  5558
8 server#8364
Connected to server with port 5556
Connected to publisher with port 5558
Recieved control command: Continue
9 server#8364
Processing ...  9 server#8364
Recieved control command: Continue
8 server#8364
Recieved control command: Continue
8 server#8364
Recieved control command: Continue
9 server#8364
Processing ...  9 server#8364
Recieved control command: Continue
8 server#8364
Recieved control command: Continue
9 server#8364
Processing ...  9 server#8364
Recieved control command: Exit
Recieved exit command, client will stop recieving messages
8 server#8364
9 server#8364
9 server#8364
时间: 2024-10-18 21:23:50

ZeroMQ-Poller的相关文章

消息队列库——ZeroMQ

消息队列库--ZeroMQ ZeroMQ(简称ZMQ)是一个基于消息队列的多线程网络库,其对套接字类型.连接处理.帧.甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字. ZMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间. ZMQ不是单独的服务,而是一个嵌入式库,它封装了网络通信.消息队列.线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信. 主线程与I/O线程: I

zeromq源码分析笔记之线程间收发命令(2)

在zeromq源码分析笔记之架构说到了zmq的整体架构,可以看到线程间通信包括两类,一类是用于收发命令,告知对象该调用什么方法去做什么事情,命令的结构由command_t结构体确定:另一类是socket_base_t实例与session的消息通信,消息的结构由msg_t确定.命令的发送与存储是通过mailbox_t实现的,消息的发送和存储是通过pipe_t实现的,这两个结构都会详细说到,今天先说一下线程间的收发命令. zeromq的线程可分为两类,一类是io线程,像reaper_t.io_thr

ZeroMq LRU算法中间件

前一段时间2014北京PyCon大会吐槽颇多,所以我就到InfoQ上找了找2013的大会视频,对网络射击手游High Noon 2基于Python的服务器架构的视频挺感兴趣,尤其是游戏服务器中的0 downtime,原理他们底层不是原生的socket, 而是基于ZeroMq的socket,由于ZeroMq的短线自动重连可以满足游戏服务器的热启动,不需要代码层面的热启动,热更新,当更新代码完成后直接重启服务器,之前未处理的请求会继续处理.瞬间觉得非常高大上,于是最近一段时间回家一直研究ZeroMq

ZeroMQ研究与应用分析

转自: http://www.cnblogs.com/rainbowzc/p/3357594.html  1 ZeroMQ概述 ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型.连接处理.帧.甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字.ZeroMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间. 2 系统架构 2.1总体架构 ZeroMQ几乎所有的I/O操作都是异步的,主线程不会被阻塞.ZeroMQ

ZeroMQ(java)之负载均衡

我们在实际的应用中最常遇到的场景如下: A向B发送请求,B向A返回结果.... 但是这种场景就会很容易变成这个样子: 很多A向B发送请求,所以B要不断的处理这些请求,所以就会很容易想到对B进行扩展,由多个B来处理这些请求,那么这里就出现了另外一个问题: B对请求处理的速度可能不同,那么B之间他们的负载也是不同的,那么应该如何对请求进行分发就成了一个比较重要的问题...也就变成了负载均衡的问题了... 其实最好的负载均衡解决方案也很简单: 绝大多数的任务都是独立的,这里中间层可以将A发送过来的请求

ZeroMQ(java)中对IO的封装(StreamEngine)

哎,各种各样杂七杂八的事情...好久没有看代码了,其实要搞明白一个与IO相关的框架,最好的办法就是把它的I/0的读写两个过程搞清楚...例如在netty中,如果能将eventLoop的运行原理搞清楚,然后摸清楚整个I/O读写两个过程,那么也就差不太多了.... 这次来看看ZeroMQ(java)中如何来处理I/O的,先来看看一个类型的定义,IOObject类型,这个类型应该扮演的是工具类的形象,前面看过在ZeroMQ中所谓的IO线程的定义,那么IOObject就是用于直接与IO线程交互的,或者说

ZeroMQ(java)中的数据流SessionBase与SocketBase

前面的文章中已经比较的清楚了ZeroMQ(java)中如何在底层处理IO, 通过StreamEngine对象来维护SelectableChannel对象以及IO的事件回调,然后通过Poller对象来维护Selector对象,然后用IOObject对象来具体的管理SelectableChannel对象在Poller上面的注册,以及事件回调,他们之间的关系可以用下面的图形来简单的描述一下: 对于接收到的数据,首先由StreamEngine进行处理,其实它会调用内部的decoder将字节数据转化为Ms

zeroMQ研究(转)

偶尔一个机会,了解了下zeroMQ消息队列. 1  ZeroMQ概述 ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型.连接处理.帧.甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字.ZeroMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间. 2  系统架构 2.1 总体架构 ZeroMQ几乎所有的I/O操作都是异步的,主线程不会被阻塞.ZeroMQ会根据用户调用zmq_init函数时传入的接口参数,创

ZeroMQ(java)中组件间数据传输(Pipe的实现)

在ZeroMQ(java)中,整个IO的处理流程都是分层来进行的,当然处于最下端的肯定是前面介绍过的poller以及StreamEngin了....涉及到上层的话就还有session,以及socket,先用一张图来大概的描述一下整个层次关系吧.. 整个分层的结构大概就是这样吧,其中poller与StreamEngin是怎么交互的,这个就不说饿了吧,然后Session这个怎么与session之间交互呢,这个以后再说吧,其实在streamEngin里面有自己的session引用..反正这里没啥意思.

ZeroMQ with producer-consumer

</step00> Make sure what you need ! Let's see the map below: </step01> If your data centre send many many data to you by socket ... Let's see the map again .. </step02> So you may know how to use the ZeroMQ deal with your data ... We ass