In this program, we will create a command server that tells when the worker should exit. Workers subscribes to a topic published by a publisher and prints it. It exits when it receives “Exit” message from the command server.
# zmqpolling.py import zmq import time import sys import random from multiprocessing import Process # PUSH server that sends command to workers to continue working or exit. def server_push(port="5556"): context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:%s" % port) print "Running server on port: ", port # serves only 5 request and dies for reqnum in range(10): if reqnum < 6: socket.send("Continue") else: socket.send("Exit") break time.sleep (1) # Publisher that publishes for topics “8”,”9”,”10” in random order. def server_pub(port="5558"): context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:%s" % port) publisher_id = random.randrange(0,9999) print "Running server on port: ", port # serves only 5 request and dies for reqnum in range(10): # Wait for next request from client topic = random.randrange(8,10) messagedata = "server#%s" % publisher_id print "%s %s" % (topic, messagedata) socket.send("%d %s" % (topic, messagedata)) time.sleep(1) # Worker that works on messages received for topic “9”. # We setup zmq poller to poll for messages on the socket connection to both command server and publisher. def client(port_push, port_sub): context = zmq.Context() socket_pull = context.socket(zmq.PULL) socket_pull.connect ("tcp://localhost:%s" % port_push) print "Connected to server with port %s" % port_push socket_sub = context.socket(zmq.SUB) socket_sub.connect ("tcp://localhost:%s" % port_sub) socket_sub.setsockopt(zmq.SUBSCRIBE, "9") print "Connected to publisher with port %s" % port_sub # Initialize poll set poller = zmq.Poller() poller.register(socket_pull, zmq.POLLIN) poller.register(socket_sub, zmq.POLLIN) # We poll the sockets to check if we have messages to recv and work on it. Worker continues working until it receives exit condition. # Work on requests from both server and publisher should_continue = True while should_continue: socks = dict(poller.poll()) if socket_pull in socks and socks[socket_pull] == zmq.POLLIN: message = socket_pull.recv() print "Recieved control command: %s" % message if message == "Exit": print "Recieved exit command, client will stop recieving messages" should_continue = False if socket_sub in socks and socks[socket_sub] == zmq.POLLIN: string = socket_sub.recv() topic, messagedata = string.split() print "Processing ... ", topic, messagedata # Finally, we fire up all the processes. if __name__ == "__main__": # Now we can run a few servers server_push_port = "5556" server_pub_port = "5558" Process(target=server_push, args=(server_push_port,)).start() Process(target=server_pub, args=(server_pub_port,)).start() Process(target=client, args=(server_push_port,server_pub_port,)).start()
# result: (D:\anaconda) C:\Users\admin\Desktop\opt>python zmqpolling.py Running server on port: 5556 Running server on port: 5558 8 server#8364 Connected to server with port 5556 Connected to publisher with port 5558 Recieved control command: Continue 9 server#8364 Processing ... 9 server#8364 Recieved control command: Continue 8 server#8364 Recieved control command: Continue 8 server#8364 Recieved control command: Continue 9 server#8364 Processing ... 9 server#8364 Recieved control command: Continue 8 server#8364 Recieved control command: Continue 9 server#8364 Processing ... 9 server#8364 Recieved control command: Exit Recieved exit command, client will stop recieving messages 8 server#8364 9 server#8364 9 server#8364
时间: 2024-10-18 21:23:50