网络编程之python zeromq学习系列之一

    简介:

      zeromq中间件,他是一个轻量级的消息中间件,传说是世界上最快的消息中间件,为什么这么说呢?
    因为一般的消息中间件都需要启动消息服务器,但是zeromq这厮尽然没有消息服务器,他压根没有消息中间件的架子,但是这并不能掩盖他的强大。
    通过和activemq,rabbitmq对比,显然功能上没有前两者这么强大,他不支持消息的持久化,但是有消息copy功能,他也不支持崩溃恢复,而且由于他太快了,可能客户端还没启动,服务端的消息就已经发出去了,这个就容易丢消息了,但是zeromq自由他的办法,就先说这么多了。先来看看怎么在python中引入这个强大的利器。
    我自己之所以,学习体会一下,主要原因,是想在练习过程中体会其中的应用原理及逻辑,最好是能感知到其中的设计思想,为以后,自己做东西积攒点经验.
    另外最近也比较关注自动化运维的一些东西.网上说saltstack本身就用的zeromq做消息队列.所以更引起了我的兴趣.
    安装:
    我的操作系统是ubuntu 14.04的 python zeromq 环境安装参考这里的官网

    下面测试:

    一,C/S模式:
    server 端代码:
        #!/usr/bin/env python
        # coding:utf8
        #author: [email protected]

        import zmq
        #调用zmq相关类方法,邦定端口
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(‘tcp://*:10001‘)

        while True:
            #循环接受客户端发来的消息
            msg = socket.recv()
            print "Msg info:%s" %msg
            #向客户端服务器发端需要执行的命令
            cmd_info = raw_input("client cmd info:").strip()
            socket.send(cmd_info)

        socket.close()

    client 端代码:
      import zmq
        import time
        import commands

        context = zmq.Context()
        socket = context.socket(zmq.REQ)
        socket.connect(‘tcp://127.0.0.1:10001‘)

        def execut_cmd(cmd):
            s,v = commands.getstatusoutput(cmd)
            return v

        while True:
            #获取当前时间
            now_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())

            socket.send("now time info:[%s] request execution command:‘\n‘,%s"%(now_time,result))
            recov_msg = socket.recv()
            #调用execut_cmd函数,执行服务器发过来的命令
            result = execut_cmd(recov_msg)
            print recov_msg,‘\n‘,result,
            time.sleep(1)
            #print "now time info:%s cmd status:[%s],result:[%s]" %(now_time,s,v)
            continue

        socket.close()
      注意:此模式是经典的应答模式,不能同时send多个数据,
        这种模式说是主要用于远程调用和任务分配,但我愚笨,还是理解不透.后面有时间,再回过来好好看看,

        测试:
        req端
        # python zmq-server-cs-v01.py
        rep端
        # python  zmq-client-cs-v01.py
      
    二,发布订阅模式(pub/sub)

        pub 发布端代码如下:

        #!/usr/bin/env python
        # coding:utf8
        #author: [email protected]

        import itertools
        import sys,time,zmq

        def main():
            if len(sys.argv) != 2:
                print ‘Usage: publisher‘
                sys.exit(1)
            bind_to = sys.argv[1]
            all_topics = [‘sports.general‘,‘sports.football‘,‘sports.basketball‘,‘stocks.general‘,‘stocks.GOOG‘,‘stocks.AAPL‘,‘weather‘]

            ctx = zmq.Context()
            s = ctx.socket(zmq.PUB)
            s.bind(bind_to)

            print "Starting broadcast on topics:"
            print "%s" %all_topics
            print "Hit Ctrl-c to stop broadcasting."
            print "waiting so subscriber sockets can connect...."

            print
            time.sleep(1)
            msg_counter = itertools.count()

            try:
                for topic in itertools.cycle(all_topics):
                msg_body = str(msg_counter.next())
                #print msg_body,
                print ‘Topic:%s,msg:%s‘ %(topic,msg_body)
                s.send_multipart([topic,msg_body])
                #s.send_pyobj([topic,msg_body])
                time.sleep(0.1)
            except KeyboardInterrupt:

                pass

            print "Wating for message queues to flush"

            time.sleep(0.5)
            s.close()
            print "Done"

        if __name__ == "__main__":
        main()

        sub  端代码:

            #!/usr/bin/env python
            # coding:utf8
            #author: [email protected]

            import zmq
            import time,sys

            def main():

            if len(sys.argv) < 2:
                print "Usage: subscriber [topic topic]"
                sys.exit(1)

            connect_to = sys.argv[1]
            topics = sys.argv[2:]

            ctx = zmq.Context()
            s = ctx.socket(zmq.SUB)
            s.connect(connect_to)

            #manage subscriptions

            if not topics:
                print "Receiving messages on ALL topics...."
                s.setsockopt(zmq.SUBSCRIBE,‘‘)
            else:
                print "Receiving messages on topics: %s..." %topics

                for t in topics:
                s.setsockopt(zmq.SUBSCRIBE,t)

                print
            try:
                while True:
                topics,msg = s.recv_multipart()
                print ‘Topic:%s,msg:%s‘ %(topics,msg)
            except KeyboardInterrupt:
                pass
            print "Done...."

            if __name__ == "__main__":
            main()

     注意:
     这里的发布与订阅角色是绝对的,即发布者无法使用recv,订阅者不能使用send,官网还提供了一种可能出现的问题:当订阅者消费慢于发布,
     此时就会出现数据的堆积,而且还是在发布端的堆积(有朋友指出是堆积在消费端,或许是新版本改进,需要读者的尝试和反馈,thx!),显然,
     这是不可以被接受的。至于解决方案,或许后面的"分而治之"就是吧

     测试:
     pub端: 发布端 
     #python zmq-server-pubsub-v02.py  tcp://127.0.0.1:10001
     sub端:订阅端
     #python zmq-server-cs-v01.py  tcp://127.0.0.1:10001 sports.football
     
     三,push/pull 分而治之模式.
     
     任务发布端代码
     
     #!/usr/bin/env python
        # coding:utf8
        #author: [email protected]

        import zmq
        import random
        import time

        context = zmq.Context()
        #socket to send messages on
        sender = context.socket(zmq.PUSH)
        sender.bind(‘tcp://*:5557‘)

        print ‘Press Enter when the workers are ready:‘
        _ = raw_input()
        print "Sending tasks to workers...."

        #The first messages is "0" and signals start to batch

        sender.send(‘0‘)

        #Initialize random mumber generator

        random.seed()

        #send 100 tasks

        total_msec = 0
        for task_nbr in range(100):
            #Random workload from 1 to 100 msecs
            #print task_nbr,
            workload = random.randint(1,100)
            total_msec += workload
            sender.send(str(workload))
            print "Total expected cost:%s msec:%s workload:%s" %(total_msec,task_nbr,workload)

        work端代码如下:

        #!/usr/bin/env python
        # coding:utf8
        #author: [email protected]

        import sys,time,zmq
        import commands

        context = zmq.Context()
        #socket to receive messages on

        receiver = context.socket(zmq.PULL)
        receiver.connect(‘tcp://127.0.0.1:5557‘)

        #Socket to send messages to

        sender = context.socket(zmq.PUSH)
        sender.connect("tcp://127.0.0.1:5558")

        #Process tasks forever

        while True:
            s = receiver.recv()

            #Simple progress indicator for the viewer
            print s,
            sys.stdout.write("%s ‘\t‘ "%s)
            sys.stdout.flush()

            #Do the work
            time.sleep(int(s)*0.001)
            #Send results to sink
            sender.send(s)

    pull端代码如下:
            #!/usr/bin/env python
            # coding:utf8
            #author: [email protected]

            import sys
            import time
            import zmq

            context = zmq.Context()

            #Socket to receive messages on

            receiver = context.socket(zmq.PULL)
            receiver.bind("tcp://*:5558")

            #Wait for start of batch

            s = receiver.recv()

            #Start our clock now
            tstart = time.time()

            #Process 100 confirmations
            total_msec = 0

            for task_nbr in range(100):
            s = receiver.recv()

            if task_nbr % 10 == 0:
                print task_nbr,
                print s,
                sys.stdout.write(‘:‘)

            else:
                print s,
                #print task_nbr,
                sys.stdout.write(‘.‘)

            #Calculate and report duration of batch
            tend = time.time()
            print "Total elapsed time:%d msec "%((tend-tstart)*1000)

    注意点:
    这种模式与pub/sub模式一样都是单向的,区别有两点:
    1,该模式下在没有消费者的情况下,发布者的信息是不会消耗的(由发布者进程维护)
    2,多个消费者消费的是同一列信息,假设A得到了一条信息,则B将不再得到
    这种模式主要针对在消费者能力不够的情况下,提供的多消费者并行消费解决方案(也算是之前的pub/sub模式的
    那个"堵塞问题"的一个解决策略吧)

    其实所谓的分就是pull端去抢push端发出来的任务.谁抢着算谁的.

    测试:
     #python zmq-server-pushpull-v03.py
     #python zmq-work-pushpull-v03.py
     #python zmq-client-pushpull-v03.py
     
时间: 2024-12-24 17:41:42

网络编程之python zeromq学习系列之一的相关文章

学习编程之Python篇(一)

第一次接触编程,你将面对两大难题: 1.  对所要使用的编程语言的语法和语义不甚了了. 2.  不知道如何通过编程来解决问题. 作为一名新手,你会尝试同时来解决这两个难题:一边熟悉编程语言的语法语义,一边考虑如何靠编程解决问题.这是一个循序渐进的过程,万事开头难,务必保持耐心,切勿操之过急. 学习编程其实没有什么捷径可走,最好的方法就是反复操练,聆听规则,讨论方法,都不如真正做点什么. 在掌握了一些编程语言的语法语义之后,接下来的难题就是怎样才能写出好的程序.那么,我们首先来看看什么是好的程序.

学习编程之Python篇(二)

学习编程与学习踢球.学习演奏并无差别,最佳方式就是不断练习,所以我们鼓励你敲些代码,看看会发生什么,如果这些代码头一次不起作用,没关系,再来,看看你能否把它们纠正过来. 首先是一个简单的快速入门程序,让我们通过了解这个程序的细节,来熟悉Python. 第一项任务:给定半径,计算一个圆的周长和面积. 程序分解: 1.  提示用户输入半径: 2.  应用数学公式,根据获得的半径,得出周长和面积: 3.  输出结果. 代码1.1 运行程序的最简单方法是在IDLE编辑器里打开它,然后选择Run->Run

unix下网络编程之I/O复用(三)

poll函数 在上文unix下网络编程之I/O复用(二)中已经介绍了select函数的相关使用,本文将介绍另一个常用的I/O复用函数poll.poll提供的功能与select类似,不过在处理流设备时,它能够提供额外的信息. poll函数原型: 1 2 3 #include<poll.h>    int poll (struct pollfd * fdarray , unsigned long nfds , int timeout);    //返回:就需描述字的个数,0--超时,-1--出错

Python爬虫学习系列教程

Python爬虫学习系列教程 大家好哈,我呢最近在学习Python爬虫,感觉非常有意思,真的让生活可以方便很多.学习过程中我把一些学习的笔记总结下来,还记录了一些自己实际写的一些小爬虫,在这里跟大家一同分享,希望对Python爬虫感兴趣的童鞋有帮助,如果有机会期待与大家的交流. Python版本:2.7 一.爬虫入门 1. Python爬虫入门一之综述 2. Python爬虫入门二之爬虫基础了解 3. Python爬虫入门三之Urllib库的基本使用 4. Python爬虫入门四之Urllib库

网络编程之socket

网络编程之socket socket:在网络编程中的一个基本组件,也称套接字. 一个套接字就是socket模块中的socket类的一个实例. 套接字包括两个: 服务器套接字和客户机套接字 套接字的实例化需要3个参数: 1.地址簇:socket.AF_INET 2. 流:socket.SOCK_STREAM 3.使用的协议: 默认为0 服务器套接字:以下简称socket_server 客户端套接字:以下简称socket_client 地址:address=('127.0.0.1',8000) so

linux网络编程之shutdown() 与 close()函数详解

linux网络编程之shutdown() 与 close()函数详解 参考TCPIP网络编程和UNP: shutdown函数不能关闭套接字,只能关闭输入和输出流,然后发送EOF,假设套接字为A,那么这个函数会关闭所有和A相关的套接字,包括复制的:而close能直接关闭套接字. 1.close()函数 [cpp] view plain copy print? <span style="font-size:13px;">#include<unistd.h> int 

java网络编程之UDP实例

package Socket; import java.net.DatagramPacket; import java.net.InetAddress; public class Dgram { public static DatagramPacket toDatagram(String s, InetAddress destIA, int destPort) { byte[] buf = new byte[s.length() + 1]; s.getBytes(0, s.length(), b

java网络编程之TCP实例

Dgram类 package Socket; import java.net.DatagramPacket; import java.net.InetAddress; public class Dgram { public static DatagramPacket toDatagram(String s, InetAddress destIA, int destPort) { byte[] buf = new byte[s.length() + 1]; s.getBytes(0, s.leng

扯谈网络编程之Tcp SYN flood洪水攻击

简介 TCP协议要经过三次握手才能建立连接: (from wiki) 于是出现了对于握手过程进行的攻击.攻击者发送大量的FIN包,服务器回应(SYN+ACK)包,但是攻击者不回应ACK包,这样的话,服务器不知道(SYN+ACK)是否发送成功,默认情况下会重试5次(tcp_syn_retries).这样的话,对于服务器的内存,带宽都有很大的消耗.攻击者如果处于公网,可以伪造IP的话,对于服务器就很难根据IP来判断攻击者,给防护带来很大的困难. 攻与防 攻击者角度 从攻击者的角度来看,有两个地方可以