PYTHON之路(九)

queue队列
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty
Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full
Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)
Equivalent to put(item, False).

Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()
Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消费完毕

生产者消费者模型

import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
count = 0
while count <20:
time.sleep(random.randrange(3))
q.put(count)
print(‘Producer %s has produced %s baozi..‘ %(name, count))
count +=1
def Consumer(name):
count = 0
while count <20:
time.sleep(random.randrange(4))
if not q.empty():
data = q.get()
print(data)
print(‘\033[32;1mConsumer %s has eat %s baozi...\033[0m‘ %(name, data))
else:
print("-----no baozi anymore----")
count +=1
p1 = threading.Thread(target=Producer, args=(‘A‘,))
c1 = threading.Thread(target=Consumer, args=(‘B‘,))
p1.start()
c1.start()

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,queue
import time

def consumer(n):
while True:
print("\033[32;1mconsumer [%s]\033[0m get task: %s" % (n,q.get()))
time.sleep(1)
q.task_done()
def producer(n):
count = 1
while True:
#time.sleep(1)
#if q.qsize() <3:
print("prodcer [%s] produced a new task : %s" %(n,count))
q.put(count)
count +=1
q.join() #queue is emtpy
print("all taks has been cosumed by consumers...")

q = queue.Queue()
c1 = threading.Thread(target=consumer,args=[1,])
c2 = threading.Thread(target=consumer,args=[2,])
c3 = threading.Thread(target=consumer,args=[3,])
p = threading.Thread(target=producer,args=["XiaoYu",])
p2 = threading.Thread(target=producer,args=["LiuYao",])
c1.start()
c2.start()
c3.start()
p.start()
p2.start()

#################################################################################

http://www.cnblogs.com/alex3714/articles/5248247.html

协程
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程的好处:

无需线程上下文切换的开销
无需原子操作锁定及同步的开销
方便切换控制流,简化编程模型
高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:

无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
使用yield实现协程操作例子 

import time
import queue
def consumer(name):
print("--->starting eating baozi...")
while True:
new_baozi = yield
print("[%s] is eating baozi %s" % (name,new_baozi))
#time.sleep(1)

def producer():

r = con.__next__()
r = con2.__next__()
n = 0
while n < 5:
n +=1
con.send(n)
con2.send(n)
print("\033[32;1m[producer]\033[0m is making baozi %s" %n )

if __name__ == ‘__main__‘:
con = consumer("c1")
con2 = consumer("c2")
p = producer()

Gevent
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
pip3 install gevent

import gevent

def foo():
print(‘Running in foo‘)
gevent.sleep(0) #block
print(‘Explicit context switch to foo again‘)

def bar():
print(‘Explicit context to bar‘)
gevent.sleep(0) #block
print(‘Implicit context switch back to bar‘)

gevent.joinall([
gevent.spawn(foo), #gevent.spawn(fun,param) -- > 启动协程,类似让单线程下的任务异步进行
gevent.spawn(bar),
])
输出:

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

if change to foo() gevent.sleep(0) change to gevent.sleep(1) , result would be
Running in foo
Explicit context to bar
Implicit context switch back to bar #--- > sleep 1 second, then print the later sentence
Explicit context switch to foo again

同步与异步的性能区别

import gevent

def task(pid):
"""
Some non-deterministic task
"""
print(‘Task %s start‘ %pid)
gevent.sleep(0.5)
print(‘Task %s done‘ % pid)

def synchronous():
for i in range(1,10):
task(i)

def asynchronous():
threads = [gevent.spawn(task, i) for i in range(10)]
gevent.joinall(threads)

print(‘Synchronous:‘)
synchronous()

print(‘Asynchronous:‘)
asynchronous()
上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。

遇到IO阻塞时会自动切换任务

from gevent import monkey; monkey.patch_all() #monkey.patch_all()有点类似黑语法,使得程序变成非阻塞 gevent.spawn() + monkey.patch_all()即是异步非阻塞
import gevent
from urllib.request import urlopen

def f(url):
print(‘GET: %s‘ % url)
resp = urlopen(url)
data = resp.read()
print(‘%d bytes received from %s.‘ % (len(data), url))

gevent.joinall([
gevent.spawn(f, ‘https://www.python.org/‘),
gevent.spawn(f, ‘https://www.yahoo.com/‘),
gevent.spawn(f, ‘https://github.com/‘),
])

输出
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
46971 bytes received from https://www.python.org/.
25179 bytes received from https://github.com/.
488123 bytes received from https://www.yahoo.com/.

如果没有money.patch_all(),输出结果
GET: https://www.python.org/
46971 bytes received from https://www.python.org/.
GET: https://www.yahoo.com/
474672 bytes received from https://www.yahoo.com/.
GET: https://github.com/
25179 bytes received from https://git

通过gevent实现单线程下的多socket并发

server side

import sys
import socket
import time
import gevent

from gevent import socket,monkey
monkey.patch_all()
def server(port):
s = socket.socket()
s.bind((‘0.0.0.0‘, port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
def handle_request(s):
try:
while True:
data = s.recv(1024)
print("recv:", data)
s.send(data)
if not data:
s.shutdown(socket.SHUT_WR)

except Exception as ex:
print(ex)
finally:

s.close()
if __name__ == ‘__main__‘:
server(8001)

client side   

import socket

HOST = ‘localhost‘ # The remote host
PORT = 8001 # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"),encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
#print(data)

print(‘Received‘, repr(data))
s.close()

###########################################################################################################

首先列一下,sellect、poll、epoll三者的区别
select
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

poll
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

其实主要就是两个部分,内核如何通知就绪的文件描述符,用户空间如何获取就绪的文件描述符。

####################################################################################################

http://www.cnblogs.com/wupeiqi/articles/5095821.html

paramiko
MySQLdb

基于用户名密码连接:

import paramiko

# 创建SSH对象
ssh = paramiko.SSHClient()
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname=‘c1.salt.com‘, port=22, username=‘wupeiqi‘, password=‘123‘)

# 执行命令
stdin, stdout, stderr = ssh.exec_command(‘df‘)
# 获取命令结果
result = stdout.read()

# 关闭连接
ssh.close()

SSHClient 封装 Transport

import paramiko

transport = paramiko.Transport((‘hostname‘, 22))
transport.connect(username=‘wupeiqi‘, password=‘123‘)

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command(‘df‘)
print stdout.read()

transport.close()

基于密钥

import paramiko

private_key = paramiko.RSAKey.from_private_key_file(‘/home/auto/.ssh/id_rsa‘)

transport = paramiko.Transport((‘hostname‘, 22))
transport.connect(username=‘wupeiqi‘, pkey=private_key)

ssh = paramiko.SSHClient()
ssh._transport = transport

stdin, stdout, stderr = ssh.exec_command(‘df‘)

transport.close()

SFTPClient

用于连接远程服务器并执行上传下载

基于用户名密码上传下载

import paramiko

transport = paramiko.Transport((‘hostname‘,22))
transport.connect(username=‘wupeiqi‘,password=‘123‘)

sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put(‘/tmp/location.py‘, ‘/tmp/test.py‘)
# 将remove_path 下载到本地 local_path
sftp.get(‘remove_path‘, ‘local_path‘)

transport.close()

基于公钥密钥上传下载

import paramiko

private_key = paramiko.RSAKey.from_private_key_file(‘/home/auto/.ssh/id_rsa‘)

transport = paramiko.Transport((‘hostname‘, 22))
transport.connect(username=‘wupeiqi‘, pkey=private_key )

sftp = paramiko.SFTPClient.from_transport(transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put(‘/tmp/location.py‘, ‘/tmp/test.py‘)
# 将remove_path 下载到本地 local_path
sftp.get(‘remove_path‘, ‘local_path‘)

transport.close()

################################################################################################

MySQLdb

python2.7
apt-get install python-mysqldb

python3.4
pip3 install pymysql

delete

import pymysql
#import MySQLdb

conn = pymysql.connect(host=‘127.0.0.1‘,user=‘root‘,passwd=‘123456‘,db=‘newdb‘)

cur = conn.cursor()
b=(2,4)
#reCount = cur.execute(‘create table new (id int(10),name varchar(20))‘)
for a in b:
reCount1 = cur.execute(‘delete from new where id in (%s)‘,a)
conn.commit()
cur.close()
conn.close()
#print(reCount)
print(reCount1)

insert many

import pymysql
#import MySQLdb

conn = pymysql.connect(host=‘127.0.0.1‘,user=‘root‘,passwd=‘123456‘,db=‘newdb‘)

cur = conn.cursor()
li = [(3,‘rose‘),(4,‘lize‘)]
#reCount = cur.execute(‘create table new (id int(10),name varchar(20))‘)
reCount1 = cur.executemany(‘insert into new(id,name) values(%s,%s)‘,li)
conn.commit()
cur.close()
conn.close()
#print(reCount)
print(reCount1)

insert

import pymysql
#import MySQLdb

conn = pymysql.connect(host=‘127.0.0.1‘,user=‘root‘,passwd=‘123456‘,db=‘newdb‘)

cur = conn.cursor()

#reCount = cur.execute(‘create table new (id int(10),name varchar(20))‘)
reCount1 = cur.execute(‘insert into new(id,name) values(%s,%s)‘,(1,‘jack‘))
reCount2 = cur.execute(‘insert into new values(%(id)s,%(name)s)‘,{‘id‘:2,‘name‘:‘alex‘})
conn.commit()
cur.close()
conn.close()
#print(reCount)
print(reCount1)
print(reCount2)

general

import pymysql
#import MySQLdb

conn = pymysql.connect(host=‘127.0.0.1‘,user=‘root‘,passwd=‘123456‘)

cur = conn.cursor()

reCount = cur.execute(‘create database aa‘)
cur.execute(‘use aa;create table aaa (id int(1))‘)
conn.commit()
cur.close()
conn.close()
print(reCount)

select

import pymysql
#import MySQLdb

conn = pymysql.connect(host=‘127.0.0.1‘,user=‘root‘,passwd=‘123456‘,db=‘mysql‘)

cur = conn.cursor()

reCount = cur.execute(‘select user,host from user‘)
print(cur.fetchone())
print(cur.fetchmany(2))
cur.scroll(-1,mode=‘relative‘)
print(cur.fetchone())
cur.scroll(0,mode=‘absolute‘)
print(cur.fetchone())
result = cur.fetchall()
cur.close()
conn.close()
print(reCount)
for i in result:
print(i[0],i[1])

update

import pymysql
#import MySQLdb

conn = pymysql.connect(host=‘127.0.0.1‘,user=‘root‘,passwd=‘123456‘,db=‘newdb‘)

cur = conn.cursor()
b=(2,4)
#reCount = cur.execute(‘create table new (id int(10),name varchar(20))‘)
reCount1 = cur.execute(‘update new set id = %s‘,(b[0],))
conn.commit()
cur.close()
conn.close()
#print(reCount)
print(reCount1)

时间: 2024-07-28 18:34:01

PYTHON之路(九)的相关文章

python之路九

paramiko paramiko模块是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接 ssh执行命令: import paramikossh = paramiko.SSHClient()ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())ssh.connect(hostname='192.168.155.8',port=22,username='root',password='1989

Python之路【第十九篇】:爬虫

Python之路[第十九篇]:爬虫 网络爬虫(又被称为网页蜘蛛,网络机器人,在FOAF社区中间,更经常的称为网页追逐者),是一种按照一定的规则,自动地抓取万维网信息的程序或者脚本.另外一些不常使用的名字还有蚂蚁.自动索引.模拟程序或者蠕虫. Requests Python标准库中提供了:urllib.urllib2.httplib等模块以供Http请求,但是,它的 API 太渣了.它是为另一个时代.另一个互联网所创建的.它需要巨量的工作,甚至包括各种方法覆盖,来完成最简单的任务. import

Python之路【第二篇】:Python基础(一)

Python之路[第二篇]:Python基础(一) 入门知识拾遗 一.作用域 对于变量的作用域,执行声明并在内存中存在,该变量就可以在下面的代码中使用. 1 2 3 if 1==1:     name = 'wupeiqi' print  name 下面的结论对吗? 外层变量,可以被内层变量使用 内层变量,无法被外层变量使用 二.三元运算 1 result = 值1 if 条件 else 值2 如果条件为真:result = 值1如果条件为假:result = 值2 三.进制 二进制,01 八进

python之路--模块--景丽洋

python之路--常用模块 阅读目录 认识模块 什么是模块 模块的导入和使用 常用模块一 collections模块 时间模块 random模块 os模块 sys模块 序列化模块 re模块 常用模块二 hashlib模块 configparse模块 logging模块 认识模块 返回顶部 什么是模块 什么是模块? 常见的场景:一个模块就是一个包含了python定义和声明的文件,文件名就是模块名字加上.py的后缀. 但其实import加载的模块分为四个通用类别: 1 使用python编写的代码(

Python之路【目录】

目录 Python之路[第一篇]:Python简介和入门 Python之路[第二篇]:Python基础(一) Python之路[第三篇]:Python基础(二) Python之路[第四篇]:模块 Python之路[第五篇]:面向对象及相关 Python之路[第六篇]:Socket Python之路[第七篇]:线程.进程和协程 Python之路[第八篇]:堡垒机实例以及数据库操作 Python之路[第九篇]:Python操作 RabbitMQ.Redis.Memcache.SQLAlchemy P

Python之路【第十七篇】:Django【进阶篇 】

Python之路[第十七篇]:Django[进阶篇 ] Model 到目前为止,当我们的程序涉及到数据库相关操作时,我们一般都会这么搞: 创建数据库,设计表结构和字段 使用 MySQLdb 来连接数据库,并编写数据访问层代码 业务逻辑层去调用数据访问层执行数据库操作 import MySQLdb def GetList(sql): db = MySQLdb.connect(user='root', db='wupeiqidb', passwd='1234', host='localhost')

Python之路【第九篇】:Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Python之路[第九篇]:Python操作 RabbitMQ.Redis.Memcache.SQLAlchemy Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. Memc

七日Python之路--第十二天(Django Web 开发指南)

<Django Web 开发指南>.貌似使用Django1.0版本,基本内容差不多,细读无妨.地址:http://www.jb51.net/books/76079.html (一)第一部分 入门 (1)内置数字工厂函数 int(12.34)会创建一个新的值为12的整数对象,而float(12)则会返回12.0. (2)其他序列操作符 连接(+),复制(*),以及检查是否是成员(in, not in) '**'.join('**')   或  '***%s***%d' % (str, int)

七日Python之路--第九天

众所周知,代码这东西不是看出来的.程序这东西只哟一个标准. 下面找点开源的东西看看,学习一下大婶们的犀利编码...... 推荐一下: 虽然有点老了:http://www.iteye.com/topic/405150,还有就是GitHub上面搜索一下Django就能出来很多,当然还有OSChina.只是有个问题,就是Django版本不同,具体的内容可能会有些不同,但大概还是相同的.领略即可,然后书写自己的代码. 首要的还是官方文档. 看着还是有些难度的.偶然发现一个不错的Blog:http://w