ZeroMQ-PyZmq Tornado Event Loop

MQ Poller can be used to serve and communicate with multiple sockets. How ever, with MQ Poller, you end up with explicit blocks (under if loop) for handling the sockets. Each socket registered with MQ Poller has to have an explicit “if block” to handle it.

PyZmq includes the tornado ioloop and adapts its IOStream class into ZMQStream for handling poll events on MQ sockets. You can register callbacks to receive and send data.

Before you do this, you must have tornado module installed:

pip install tornado

We will be redoing the previous program to take advantage of the ZMQStream and Tornado ioloop.

# -*- coding: utf-8 -*
import zmq
import time
import sys
import random
from  multiprocessing import Process

‘‘‘
You must first install PyZMQ’s IOLoop
‘‘‘
from zmq.eventloop import ioloop, zmqstream
ioloop.install()

# We have left the command server and the topic publisher same as before.
def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:%s" % port)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            break
        time.sleep (1) 
        
        
def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(1)

# Message handlers are separated from the worker logic. Also note, that we stop the event loop once the worker receives the “Exit” command.
def getcommand(msg):
 print "Received control command: %s" % msg
 if msg[0] == "Exit":
  print "Received exit command, client will stop receiving messages"
  should_continue = False
  ioloop.IOLoop.instance().stop()
        
def process_message(msg):
 print "Processing ... %s" % msg
 

# Here, you can see that we use ZMQStream class to register callbacks. 
# The callbacks are the handlers that we had written earlier. 
# The “If blocks” in previous program has been converted to callbacks registered with tornado event loop. 
# There are no explicit socket handling blocks here.
def client(port_push, port_sub):    
 context = zmq.Context()
 socket_pull = context.socket(zmq.PULL)
 socket_pull.connect ("tcp://localhost:%s" % port_push)
 stream_pull = zmqstream.ZMQStream(socket_pull)
 stream_pull.on_recv(getcommand)
 print "Connected to server with port %s" % port_push
 
 socket_sub = context.socket(zmq.SUB)
 socket_sub.connect ("tcp://localhost:%s" % port_sub)
 socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
 stream_sub = zmqstream.ZMQStream(socket_sub)
 stream_sub.on_recv(process_message)
 print "Connected to publisher with port %s" % port_sub
 ioloop.IOLoop.instance().start()
 print "Worker has stopped processing messages."

if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    Process(target=client, args=(server_push_port,server_pub_port,)).start()

In the output, you should notice that client has exited prior to the publishers which keeps publishing without any subscribers to process these messages:

(D:\anaconda) C:\Users\admin\Desktop\opt>python pyzmq_stream_poller.py
Running server on port:  5556
Running server on port:  5558
8 server#2753
Connected to server with port 5556
Connected to publisher with port 5558
Received control command: [‘Continue‘]
8 server#2753
Received control command: [‘Continue‘]
8 server#2753
Received control command: [‘Continue‘]
8 server#2753
Received control command: [‘Continue‘]
9 server#2753
Processing ... [‘9 server#2753‘]
Received control command: [‘Continue‘]
9 server#2753
Processing ... [‘9 server#2753‘]
Received control command: [‘Continue‘]
8 server#2753
Received control command: [‘Exit‘]
Received exit command, client will stop receiving messages
Worker has stopped processing messages.
8 server#2753
8 server#2753
8 server#2753
时间: 2024-08-05 13:00:16

ZeroMQ-PyZmq Tornado Event Loop的相关文章

【朴灵评注】JavaScript 运行机制详解:再谈Event Loop

PS: 我先旁观下大师们的讨论,得多看书了~ 别人说的:“看了一下不觉得评注对到哪里去,只有吹毛求疵之感. 比如同步异步介绍,本来就无大错:比如node图里面的OS operation,推敲一下就可以猜到那是指同步操作(自然不走event loop了):至于watcher啥的,显然只是实现上的特色,即使用同一个queue实现也未尝不可” [原帖: http://www.ruanyifeng.com/blog/2014/10/event-loop.html 作者:阮一峰] 一年前,我写了一篇<什么

【repost】JavaScript 运行机制详解:再谈Event Loop

一年前,我写了一篇<什么是 Event Loop?>,谈了我对Event Loop的理解. 上个月,我偶然看到了Philip Roberts的演讲<Help, I'm stuck in an event-loop>.这才尴尬地发现,自己的理解是错的.我决定重写这个题目,详细.完整.正确地描述JavaScript引擎的内部运行机制.下面就是我的重写. 进入正文之前,插播一条消息.我的新书<ECMAScript 6入门>出版了(版权页,内页1,内页2),铜版纸全彩印刷,非常

JavaScript 运行机制详解:再谈Event Loop

原文地址:http://www.ruanyifeng.com/blog/2014/10/event-loop.html 一年前,我写了一篇<什么是 Event Loop?>,谈了我对Event Loop的理解. 上个月,我偶然看到了Philip Roberts的演讲<Help, I'm stuck in an event-loop>.这才尴尬地发现,自己的理解是错的.我决定重写这个题目,详细.完整.正确地描述JavaScript引擎的内部运行机制.下面就是我的重写. 进入正文之前,

[Javascript] Task queue &amp; Event loop.

Javascript with Chorme v8 engine works like this : For Chorme engine, v8, it has call stack. And all the async opreations functions are stay in webapis. So everytime  you call 'setTimeout()' or http call, it will always call webapis. So, in the pictu

数据密集型 和 cpu密集型 event loop

Node.js在官网上是这样定义的:“一个搭建在Chrome JavaScript运行时上的平台,用于构建高速.可伸缩的网络程序.Node.js采用的事件驱动.非阻塞I/O模型使它既轻量又高效,是构建运行在分布式设备上的数据密集型实时程序的完美选择.”Web站点早已不仅限于内容的呈现,很多交互性和协作型环境也逐渐被搬到了网站上,而且这种需求还在不断地增长.这就是所谓的数据密集型实时(data-intensive real-time)应用程序,比如在线协作的白板,多人在线游戏等,这种web应用程序

Javascript Event Loop

Event Loop 是一个很重要的概念,指的是计算机系统的一种运行机制. JavaScript语言就采用这种机制,来解决单线程运行带来的一些问题. 本文参考C. Aaron Cois的<Understanding The Node.js Event Loop>,解释什么是Event Loop,以及它与JavaScript语言的单线程模型有何关系. 想要理解Event Loop,就要从程序的运行模式讲起.运行以后的程序叫做"进程"(process),一般情况下,一个进程一次

事件轮询 event loop

Understanding the node.js event loop The first basic thesis of node.js is that I/O is expensive: So the largest waste with current programming technologies comes from waiting for I/O to complete. There are several ways in which one can deal with the

Eclipse编辑java文件报Unhandled event loop exception错误的解

本人Eclipse版本是"eclipse-jee-kepler-SR2-win32-x86_64" 昨天因为换电脑,所以重装了一下软件,装好eclipse之后eclipse是可以使用的(换电脑之前也一直是用这个版本).后来又装了其他一些软件, 为了工作效率,只有边装软件边coding.后来发现再次在Eclipse中编辑java文件就一直会弹出错误提示框.如图. 点开图下面的"Unhandled event loop exception"查看具体的错误,详细错误如下.

[Node.js] Use nodejs-dashboard event loop delay with hrtime()

In this lesson, you will learn how to use the Formidable nodejs-dashboard event loop delay to identify expensive operations in your code. An example application with excessive synchronous file system write operations is used as well as the provided j