ZeroMQ-Push/Pull

Push and Pull sockets let you distribute messages to multiple workers, arranged in a pipeline. A Push socket will distribute sent messages to its Pull clients evenly. This is equivalent to producer/consumer model but the results computed by consumer are not sent upstream but downstream to another pull/consumer socket.

Data always flows down the pipeline, and each stage of the pipeline is connected to at least one node. When a pipeline stage is connected to multiple nodes data is load-balanced among all connected nodes

# producer.py
# Producers are created with ZMQ.PUSH socket types. Producer is bound to well known port to which consumers can connect too.
import time
import zmq

def producer():
    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUSH)    

    zmq_socket.bind("tcp://127.0.0.1:5557")
    # Start your result manager and workers before you start your producers
    for num in xrange(20000):
        work_message = { ‘num‘ : num }
        zmq_socket.send_json(work_message)

producer()
# consumer.py
# Consumer are created with ZMQ.PULL socket types to pull requests from producer and uses a push socket to connect and push result to result collector.

import time
import zmq
import random

def consumer():
    consumer_id = random.randrange(1,10005)
    print "I am consumer #%s" % (consumer_id)
    context = zmq.Context()
    # recieve work
    consumer_receiver = context.socket(zmq.PULL)
    consumer_receiver.connect("tcp://127.0.0.1:5557")
    # send work
    consumer_sender = context.socket(zmq.PUSH)
    consumer_sender.connect("tcp://127.0.0.1:5558")
    
    while True:
        work = consumer_receiver.recv_json()
        data = work[‘num‘]
        result = { ‘consumer‘ : consumer_id, ‘num‘ : data}
        if data%2 == 0: 
            consumer_sender.send_json(result)

consumer()
# resultcollector.py
# result collector are created with ZMQ.PULL socket type and act as consumer of results from intermediate consumers. They also are bound to well known port so that intermedia# te consumer can connect to it.

import time
import zmq
import pprint

def result_collector():
    context = zmq.Context()
    results_receiver = context.socket(zmq.PULL)    results_receiver.bind("tcp://127.0.0.1:5558")    collecter_data = {}
    for x in xrange(1000):
        result = results_receiver.recv_json()
        if collecter_data.has_key(result[‘consumer‘]):
            collecter_data[result[‘consumer‘]] = collecter_data[result[‘consumer‘]] + 1
        else:
            collecter_data[result[‘consumer‘]] = 1
        if x == 999:
            pprint.pprint(collecter_data)

result_collector()
# running it:

python resultcollector.py
python consumer.py
python consumer.py
python producer.py

# Results shows the distribution of transmitted result to result collector:
(D:\anaconda) C:\Users\admin\Desktop\opt>python resultcollector.py
{5892: 1000}

(D:\anaconda) C:\Users\admin\Desktop\opt>python resultcollector.py
{1223: 1000}

(D:\anaconda) C:\Users\admin\Desktop\opt>python resultcollector.py
{5892: 1000}

# consumer-1
(D:\anaconda) C:\Users\admin\Desktop\opt>python consumer.py
I am consumer #5892

# consumer-2
(D:\anaconda) C:\Users\admin\Desktop\opt>python consumer.py
I am consumer #1223

# producer
(D:\anaconda) C:\Users\admin\Desktop\opt>python producer.py
时间: 2024-07-30 03:59:45

ZeroMQ-Push/Pull的相关文章

第二章-第二题(练习使用git的add/commit/push/pull/fetch/clone等基本命令)--梁绍楠

题目描述: 每人自己建立一个HelloWorld项目,练习使用git的add/commit/push/pull/fetch/clone等基本命令.比较项目的新旧版本的差别. 使用步骤: (1)创建版本库 选择一个合适的地方,创建一个空目录HelloWorld.而后通过git init把这个目录变成Git可以管理的仓库(目录下会多出了一个.git目录,该目录是git跟踪管理版本库的,勿轻易修改): 编辑hello文件,内容如下: (2)将文件hello放到git仓库 首先,需要设置用户名.邮箱信息

第二章-第二题(练习使用git的add/commit/push/pull/fetch/clone等基本命令)-By郭青云(未完待续)

题目描述: 每人自己建立一个HelloWorld项目,练习使用git的add/commit/push/pull/fetch/clone等基本命令.比较项目的新旧版本的差别. 使用步骤: 未完待续...... 参考文件:http://blog.csdn.net/u012575819/article/details/50553501

第二章-第二题(每人自己建立一个HelloWorld项目,练习使用git的add/commit/push/pull/fetch/clone等基本命令。比较项目的新旧版本的差别。)--by侯伟婷

第二题:每人自己建立一个HelloWorld项目,练习使用git的add/commit/push/pull/fetch/clone等基本命令.比较项目的新旧版本的差别. 下面我将自己的练习结果和个人感受记录如下: 第一步:安装Git,设置自己的账号和邮箱,参见Git教程-廖雪峰的官方网站,网址如下参考资料1所示. 第二步:在Git中新建repository,名叫HelloWorld,并进行初始化,如图所示. 第三步:在HelloWorld版本库中新建了helloWorld.txt文件,用以练习G

4,MongoDB 之 $关键字 及 $修改器 $set $inc $push $pull $pop MongoDB

我们在之前的 MongoDB 之 手把手教你增删改查 MongoDB - 2 中提到过 $set 这个系统关键字,用来修改值的对吧 但是MongoDB中类似这样的关键字有很多, $lt $gt $lte $gte 等等,这么多我们也不方便记,这里我们说说几个比较常见的 一.查询中常见的 等于 大于 小于 大于等于 小于等于 等于 : 在MongoDB中什么字段等于什么值其实就是 " : " 来搞定 比如 "name" : "路飞学城" 大于 :

git push/pull 到远端

git push/pull origin 本地分支:远端希望创建的分支 本地远程分支名相同时,可以省略 git pull/push origin  分支名 origin:   git为你默认创建了一个指向远端代码库的origin git remote -v 原文地址:https://www.cnblogs.com/mu-zhang/p/11509946.html

09_EGIT插件的安装,Eclipse中克隆(clone),commit,push,pull操作演示

?? 1 下载EGIT,下载地址:http://www.eclipse.org/egit/download/ 最终的下载地址: http://www.eclipse.org/downloads/download.php?file=/egit/updates/org.eclipse.egit.repository-4.0.1.201506240215-r.zip&mirror_id=105 2 安装EGIT插件 3 Eclipse中使用GIT,先准备GIT相关的文件夹等. 创建一个仓库 4 Ecl

Linux下搭建github环境并push pull代码

一.准备工作 1.有一个github账号,没有的话请到到github.com注册 2.已经安装了git 通过下面的命令检查是否已经安装了git $ git --version ###检查是否安装了git,如果没有安装就执行下一条命令 $ sudo apt-get install git ###安装git的命令 二.搭建github环境 1.命令生成ssh key公钥 $ ssh-keygen -t rsa -C "your email address" ###比如我的邮箱是[email

git clone, push, pull, fetch 的用法

Git是目前最流行的版本管理系统,学会Git几乎成了开发者的必备技能. Git有很多优势,其中之一就是远程操作非常简便.本文详细介绍5个Git命令,它们的概念和用法,理解了这些内容,你就会完全掌握Git远程操作. git clone git remote git fetch git pull git push 本文针对初级用户,从最简单的讲起,但是需要读者对Git的基本用法有所了解.同时,本文覆盖了上面5个命令的几乎所有的常用用法,所以对于熟练用户也有参考价值. git 一.git clone

CentOS下git push/pull无需输入密码

最近因为工作需要,新申请了一台服务器,配置好git后,发现每次git push和git pull时,都需要输入密码,非常麻烦.网上找了很多资料,不是语焉不详,就是有错误. 经过几个小时的摸索,终于确定解决方法如下: 环境:机器A:CentOS 7,登录用户名root机器B:CentOS 7,登录用户名QiaoYL 解决问题的核心思路是用SSH,实现在机器A上,使用非当前用户免密登录机器B,步骤如下:1. 在机器A上,执行如下命令,生成密钥对,后续操作按提示进行即可:ssh-keygen -t r

【转】adb push&pull bug --- Permission denied----不错

原文网址:http://blog.csdn.net/hengkong_horse/article/details/8708076 问题背景: adb push  E:\\rrr.txt  /data/ 显示如下: Failed to copy 'E:\\rrr.txt' to '/data/': Permission denied 解决方法: 1.登陆手机shell 命令: Adb shell 2.查看/data权限 命令: ls -l 显示如下: drwxrwx--x system   sys