python第五十三天--进程,协程.select.异步I/O...

进程:

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 import multiprocessing,threading,time
 5
 6 def run(name):
 7     t=threading.Thread(target=run2)#创建新线程
 8     t.start()
 9     print(‘进程[%s],打印中...‘%name)
10     time.sleep(1)
11
12 def run2():
13     print(threading.get_ident())#打印线程ID
14     time.sleep(2)
15
16
17 if __name__ == ‘__main__‘:
18     for i in range(10):
19         p=multiprocessing.Process(target=run,args=(‘第[%s]个进程‘%i,))#创建新进程
20         p.start()

进程号:

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4
 5 import  multiprocessing,threading,time,os
 6 from  multiprocessing import Process#从 multprocessing 打开 Process
 7 def info_l(file):
 8     print(‘当前模块名:‘,__name__)
 9     print(‘父进程ID:‘,os.getppid())
10     print(‘进程ID:‘,os.getpid())
11     print(‘\n\n‘)
12
13 def f(name):
14     print(‘查看:‘,name)
15     info_l(‘相关列表‘)
16
17
18 if __name__ == ‘__main__‘:
19     info_l(‘主进程相关列表‘)
20     p=Process(target=f,args=(‘当前进程‘,))
21     p.start()
22     p.join()

进程锁:

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4
 5 from multiprocessing import Process,Lock # Lock 进程锁通讯中间件
 6
 7 def f(lock,i):
 8     lock.acquire()#进程锁
 9     print(‘第[%s]个进程‘%i)
10     lock.release()#解锁
11 if __name__ ==‘__main__‘:
12     lock=Lock()#进程锁对象
13     for i in range(10):
14         p=Process(target=f,args=(lock,i)).start()

进程之间通讯:

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4
 5 import multiprocessing,queue,threading
 6 from multiprocessing import Process,Queue # Queue 进程通讯中间件
 7
 8
 9 ‘‘‘  线程 之间共享队列
10 def f():
11     q.put([1,None,‘加入数据‘])#队列
12 if __name__ ==‘__main__‘:
13     q=queue.Queue()#线程队列
14     #q=Queue()#进程队列
15     p=threading.Thread(target=f,)#创建线程
16     p.start()
17     print(q.get())#输出,取出的
18     p.join()
19 ‘‘‘
20 def f(q):#存入q对象
21     q.put([1,None,‘加入数据‘])#队列
22 if __name__ ==‘__main__‘:
23     q=Queue()#进程队列
24     p=Process(target=f,args=(q,))#创建进程
25     p.start()
26     print(q.get())#输出,取出的
27     p.join()

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 import os
 5 from multiprocessing import Process,Pipe,Manager # Pipe 管道 进程通讯中间件
 6
 7
 8 def f(d,l):
 9     d[os.getpid()]=os.getpid()#修改字典
10     l.append(os.getpid())#添加列表内容
11     print(d)
12     print(l)
13
14
15 if __name__ ==‘__main__‘:
16     with Manager() as manager:
17         d=manager.dict()#创建一个进程之间可修改的字典
18         l=manager.list(range(5))#创建一个进程之间可修改的列表
19         p_list=[]#join使用
20         for i in range(10):
21             p=Process(target=f,args=(d,l))#创建进程传入数据,
22             p.start()
23             p_list.append(p)
24         for r in p_list:#等待进程完成
25             r.join()
26         print(d)
27         print(l)

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4
 5 from multiprocessing import Process,Pipe # Pipe 管道 进程通讯中间件
 6
 7 def f(conn):#存入conn对象
 8     conn.send([‘子进程发送信息‘,‘....‘])
 9     print(‘收到父进程的信息:‘,conn.recv())
10     conn.close()
11 if __name__ ==‘__main__‘:
12     parent_conn,child_conn=Pipe()#生成一个管道,返回两个值,管理双端
13     p=Process(target=f,args=(child_conn,))#创建进程
14     p.start()
15     print(‘收到子进程的信息:‘,parent_conn.recv())
16     parent_conn.send([‘父进程发送信息‘])
17     p.join()

进程池:

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 #__author__=2017/6/23
 5 import time,os
 6 from multiprocessing import Process,Lock,Pool # Pool 进程池通讯中间件
 7
 8 def Foo(i):
 9     print(‘第[%s]个进程,ID:‘%i,os.getpid())
10     time.sleep(3)
11     return i+100
12 def Bar(arg):
13     print(‘回调>>>>:‘,arg,os.getpid())
14 if __name__ ==‘__main__‘:
15     #pool=Pool(processes=5)#定义一个进程池 表示允许进程池同时放入5个进程
16     pool=Pool(5)#定义一个进程池 表示允许进程池同时放入5个进程
17     for i in range(10):
18         #pool.apply(func=Foo,args=(i,))#使用进程池创建进程  串行
19         #pool.apply_async(func=Foo,args=(i,))#使用进程池创建进程 并行
20         pool.apply_async(func=Foo,args=(i,),callback=Bar)#回调
21
22     print(‘结束‘)
23     #pool.join()
24     pool.close()#一定要先关闭进程池
25     pool.join()#后进行join

协程:

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 #python
 5 #2017/6/24    10:10
 6 #__author__=‘Administrator‘
 7
 8 import time
 9 import queue
10 def consumer(name):#消费者函数
11     print(‘[%s]消费产品中.......‘%name)
12     while True:
13         new_b=yield #跳转点
14         print(‘[%s] 消费 [%s]‘%(name,new_b))
15 def producer():#生产者函数
16     r=con.__next__()
17     r2=con2.__next__()
18     n=0
19     while n<10:
20         n+=1
21         con.send(n)#发送给消费者
22         con2.send(n)
23         print(‘\033[32;1m[生产者]\033[0m生产产品[%s]‘%n)
24
25 if __name__==‘__main__‘:
26     con=consumer(‘消费者A‘)
27     con2=consumer(‘消费者B‘)
28     p=producer()

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 #python
 5 #2017/6/24    10:31
 6 #__author__=‘Administrator‘
 7 #import greenlet
 8 from greenlet import greenlet
 9
10 def test1():
11     print(‘函数一: 12‘)
12     ger2.switch()#进行协程切换
13     print(‘函数一: 34‘)
14     ger2.switch()
15
16 def test2():
17     print(‘函数二: 56‘)
18     ger1.switch()
19     print(‘函数二: 78‘)
20     ger1.switch()
21
22 ger1=greenlet(test1)#创建协程
23 ger2=greenlet(test2)
24 ger1.switch()

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 #python
 5 #2017/6/24    10:47
 6 #__author__=‘Administrator‘
 7
 8 import gevent
 9 def func1():
10     print(‘func1 reading....‘)
11     gevent.sleep(2)
12     print(‘func1 reading two.. end‘)
13
14 def func2():
15     print(‘func2 reading....‘)
16     gevent.sleep(0)
17     print(‘func2 reading two.. end‘)
18 def func3():
19     print(‘func3 reading....‘)
20     gevent.sleep(1)
21     print(‘func3 reading two.. end‘)
22
23 gevent.joinall([
24     gevent.spawn(func1),
25     gevent.spawn(func2),
26     gevent.spawn(func3)
27 ])

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 #python
 5 #2017/6/24    14:03
 6 #__author__=‘Administrator‘
 7 from urllib import request
 8 import gevent,time
 9 from gevent import monkey
10 monkey.patch_all()#对所有的I/O操作进行标记
11
12 def f(url):#爬取网页
13     print(‘网址: ‘,url)
14     resp=request.urlopen(url)#打开网页
15     data=resp.read()#读取网页
16     print(‘网址:[%s]的网页数据大小:[%s]‘%(url,len(data)))
17
18 urls=[‘https://www.python.org/‘,
19       ‘https://hao.360.cn/‘,
20       ‘https://www.yahoo.com/‘]
21
22 time_start=time.time()#同步 串行开始时间
23 for url in urls:
24     f(url)
25 print(‘同步时长:‘,time.time()-time_start)
26
27 time_start_asy=time.time()#异步 并行开始时间
28 gevent.joinall([
29     gevent.spawn(f,‘https://www.python.org/‘),
30     gevent.spawn(f,‘https://hao.360.cn/‘),
31     gevent.spawn(f,‘https://www.yahoo.com/‘)
32 ])
33 print(‘异域步时长:‘,time.time()-time_start_asy)

协程socket_server 实现并发

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 #python
 5 #2017/6/24    14:42
 6 #__author__=‘Administrator‘
 7
 8 import sys
 9 import socket
10 import time
11 import gevent
12
13 from gevent import socket,monkey
14 monkey.patch_all()
15
16
17 def server(port):
18     s = socket.socket()#socket 对象
19     s.bind((‘0.0.0.0‘, port))#服务端,bind IP 端口
20     s.listen(500)
21     print(‘监听中....‘)
22     while True:
23         cli, addr = s.accept()
24         gevent.spawn(handle_request, cli)#创建一个新协程来
25
26
27
28 def handle_request(conn):
29     try:
30         while True:
31             data = conn.recv(1024)
32             print("recv:", data)
33             conn.send(data)
34             if not data:
35                 conn.shutdown(socket.SHUT_WR)
36
37     except Exception as  ex:
38         print(ex)
39     finally:
40         conn.close()
41 if __name__ == ‘__main__‘:
42     server(8001)

select :

socket_server

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 #python
 5 #2017/6/24    19:34
 6 #__author__=‘Administrator‘
 7
 8 import select,socket,sys ,queue
 9
10 s=socket.socket()#实例化一个连接对象
11 s.setblocking(0)#设置成非阻塞
12 server_addr=(‘localhost‘,9500)#设置绑定的 IP 端口
13 s.bind(server_addr)#连接对象绑定IP 端口
14 s.listen(100)#队列  可连接数量
15 inputs=[s,]#首先要监测本身
16
17 outputs=[]#发送列表
18
19 meg_queues={} #发送 连接对象的队列集合  字典
20
21 while True:
22     print(‘监听中......‘)
23     readable,writeable,exeptional=select.select(inputs,outputs,inputs)#生成select 对象,返回三个列表 连接,发关,错误
24
25     for i in readable: #i为一个socket
26         if i is s:#如果i 是s 表示有新 连接 进来
27             conn,client_addr=i.accept()#建立一个新连接
28             print(‘接入一个新连接...‘,client_addr)
29             conn.setblocking(0)#也设成非阻塞
30             inputs.append(conn)#加入select,的连接列表,避免出现阻塞
31             meg_queues[conn]=queue.Queue()#创建一个队列  添加到字典
32         else:
33             try:
34                 data=i.recv(1024)#如果不是新连接就收数据
35             except Exception as e:
36                 print(e)
37             if data: #如果数据不为空
38                 print(‘[%s] 发来的数据 [%s]‘%(i.getpeername,data))
39                 meg_queues[i].put(data)#当前连接的消息队列加入数据
40                 if i not in outputs:#如果当前连接没有在发送列表内,就加入发送列表
41                     outputs.append(i)
42             else:
43                 print(‘客户端已经断开了....‘)#开始清理工作
44                 if i in outputs:#在发送列表
45                     outputs.remove(i)#在发送列表内删除
46                 inputs.remove(i)#在连接列表内删除
47                 del meg_queues[i]#在队列字典内删除
48
49     for w in writeable:#循环发送列表
50         try:
51             msg=meg_queues[w].get_nowait()#取出队列中的数据,判断
52         except queue.Empty:#如果数据为空
53             outputs.remove(w)##从发送列表内删除
54         else:
55             w.send(msg)#发送
56
57     for e in exeptional:#循环错误列表
58         print(‘连接[%s]出错!‘%e.getpeername)
59         inputs.remove(e)##从发送列表内删除
60         if e in outputs:#在发送列表
61             outputs.remove(e)#在发送列表内删除
62         e.close()
63         del meg_queues[e]#在队列字典内删除

socket_client

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 #python
 5 #2017/6/24    19:23
 6 #__author__=‘Administrator‘
 7
 8 import socket
 9
10 server_addr=(‘localhost‘,9500)#设置绑定的 IP 端口
11 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
12 s.connect(server_addr)
13 while True:
14     msg = bytes(input(">>:"),encoding="utf8")#定义一个数据 消息
15     if msg:
16         s.sendall(msg)#发送数据
17     else:
18         print(‘不能为空‘)
19         continue
20     data = s.recv(1024)#收数据(读数据)
21     #print(data)
22
23     print(‘Received‘, repr(data.decode()))
24 s.close()

selectors :

 1 #!usr/bin/env python
 2 #-*-coding:utf-8-*-
 3 # Author calmyan
 4 #python
 5 #2017/6/24    21:58
 6 #__author__=‘Administrator‘
 7 import selectors
 8 import socket
 9
10 sel = selectors.DefaultSelector()#生成一个创建一个selectors对象
11
12 def accept(sock, mask):
13     conn, addr = sock.accept()  # 建立新连接
14     print(‘accepted‘, conn, ‘from‘, addr)
15     conn.setblocking(False)#设成非阻塞
16     sel.register(conn, selectors.EVENT_READ, read)#注册 连接,回调函数 read
17
18 def read(conn, mask):
19     data = conn.recv(1024)  # 接收数据
20     if data:#不为空
21         print(‘echoing‘, repr(data), ‘to‘, conn)
22         conn.send(data)  # 发送数据
23     else:#如果为空
24         print(‘closing‘, conn)
25         sel.unregister(conn)#取消注册
26         conn.close()#关闭连接
27
28 server_addr=(‘localhost‘,9501)#设置绑定的 IP 端口
29 sock = socket.socket()#创建一个sock对象
30 sock.bind(server_addr)#绑定IP 端口
31 sock.listen(100)
32 print(‘监听中...‘)
33 sock.setblocking(False)#非阻塞
34 sel.register(sock, selectors.EVENT_READ, accept)#注册连接  返调函数为accept
35
36 while True:
37     events = sel.select()#默认为阻塞模式
38     for key, mask in events:#如果有连接,接入
39         callback = key.data#新建连接句柄
40         callback(key.fileobj, mask)

事件驱动与异步IO

服务器处理模型的程序时,有以下几种模型:

(1)每收到一个请求,创建一个新的进程,来处理该请求;

(2)每收到一个请求,创建一个新的线程,来处理该请求;

(3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求

第(1)中方法,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。

第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题。

第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。

综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

时间: 2024-08-04 22:29:23

python第五十三天--进程,协程.select.异步I/O...的相关文章

Python实现基于协程的异步爬虫

一.课程介绍 1. 课程来源 本课程核心部分来自<500 lines or less>项目,作者是来自 MongoDB 的工程师 A. Jesse Jiryu Davis 与 Python 之父 Guido van Rossum.项目代码使用 MIT 协议,项目文档使用 http://creativecommons.org/licenses/by/3.0/legalcode 协议. 课程内容在原文档基础上做了稍许修改,增加了部分原理介绍,步骤的拆解分析及源代码注释. 2. 内容简介 传统计算机

15.python并发编程(线程--进程--协程)

一.进程:1.定义:进程最小的资源单位,本质就是一个程序在一个数据集上的一次动态执行(运行)的过程2.组成:进程一般由程序,数据集,进程控制三部分组成:(1)程序:用来描述进程要完成哪些功能以及如何完成(2)数据集:是程序在执行过程中所需要使用的一切资源(3)进程控制块:用来记录进程外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志.3.进程的作用:是想完成多任务并发,进程之间的内存地址是相互独立的二.线程:1.定义:最小的执行单位,线程的出现是为了

python学习笔记-(十四)进程&amp;协程

一. 进程 1. 多进程multiprocessing multiprocessing包是Python中的多进程管理包,是一个跨平台版本的多进程模块.与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程.该进程可以运行在Python程序内部编写的函数.该Process对象与Thread对象的用法类似. 创建一个Process实例,可用start()方法启动. join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步.

Python的线程&amp;进程&amp;协程[0] -&gt; 基本概念

基本概念 / Basic Concept 0 简介与动机 / Why Multi-Thread/Multi-Process/Coroutine 在多线程(multithreaded, MT)编程出现之前,计算机程序的执行是由单个步骤序列组成的,该序列在主机的CPU中按照同步顺序执行.即无论任务多少,是否包含子任务,都要按照顺序方式进行. 然而,假定子任务之间相互独立,没有因果关系,若能使这些独立的任务同时运行,则这种并行处理方式可以显著提高整个任务的性能,这便是多线程编程. 而对于Python而

Python开发【第九篇】:协程、异步IO

协程 协程,又称微线程,纤程.英文名Coroutine.一句话说明什么是协程,协程是一种用户态的轻量级线程. 协程拥有自己的寄存器上下文和栈.协程调度切换时,将寄存器上下文和栈保存到其他地方,在切换回来的时候,恢复先前保存的寄存器上下文和栈.因此,协程能保留上一次调用的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法,进入上一次离开时所处逻辑流的位置. 子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返

11.python并发入门(part12 初识协程)

一.协程的简介. 协程,又被称为微线程,虽然是单进程,单线程,但是在某种情况下,在python中的协程执行效率会优于多线程. 这是因为协程之间的切换和线程的切换是完全不一样的!协程的切换是由程序自身控制的(程序的开发者使用yield去进行控制,协程和协程之间的切换是可控制的,想什么时候切换就什么时候切换). 当使用多线程时,开的线程越多,协程的优势就越明显. 协程的另一个优点,就是无需锁机制,因为协程只有一个进程,和线程,不存在多线程或者多进程之间访问公共资源的冲突,所以说,在协程中无需加锁,如

python协程与异步I/O

协程 首先要明确,线程和进程都是系统帮咱们开辟的,不管是thread还是process他内部都是调用的系统的API,而对于协程来说它和系统毫无关系; 协程不同于线程的是,线程是抢占式的调度,而协程是协同式的调度,也就是说,协程需要自己做调度. 他就和程序员有关系,对于线程和进程来说,调度是由CPU来决定调度的; 对于协程来说,程序员就是上帝,你想让谁执行到哪里他就执行到哪里; 协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续).协程,

python基础----迭代器、生成器、协程函数

一.什么是迭代器协议 1.迭代器协议是指:对象必须提供一个next方法,执行该方法要么返回迭代中的下一项,要么就引起一个StopIteration异常,以终止迭代 (只能往后走不能往前退) 2.可迭代对象:实现了迭代器协议的对象(如何实现:对象内部定义一个__iter__()方法) 3.协议是一种约定,可迭代对象实现了迭代器协议,python的内部工具(如for循环,sum,min,max函数等)使用迭代器协议访问对象. 二,为什么要用迭代器 优点: 1:迭代器提供了一种不依赖于索引的取值方式,

python并发之多进程、多线程、协程和异步

一.多线程 二.协程(又称微线程,纤程) 协程,与线程的抢占式调度不同,它是协作式调度.协程在python中可以由generator来实现. 首先要对生成器和yield有一个扎实的理解. 调用一个普通的python函数,一般是从函数的第一行代码开始执行,结束于return语句.异常或者函数执行(也可以认为是隐式地返回了None). 一旦函数将控制权交还给调用者,就意味着全部结束.而有时可以创建能产生一个序列的函数,来“保存自己的工作”,这就是生成器(使用了yield关键字的函数). 能够“产生一