上下文管理
import contextlib # 上下文管理 @contextlib.contextmanager def worker_state(state_list, worker_thread): """ :param state_list: :param worker_thread: :return: """ state_list.append(worker_thread) # 2. 进入执行函数体 try: yield # 3. 遇到yield,弹出函数执行with代码块 finally: state_list.remove(worker_thread) # 5. 执行,然后结束 free_list = [] current_thread = ‘alex‘ with worker_state(free_list, current_thread): # 1. 首先进入with, 进入worker_state函数 print(123) # 4. 执行此代码后,回到函数
# 上下文管理,实现自动关闭socket import contextlib import socket @contextlib.contextmanager def context_socket(host,port): sk = socket.socket() # 2. 执行函数体内容 sk.bind((host,port)) sk.listen(5) try: yield sk # 3. yield 返回with代码块执行 finally: sk.close() # 5. 继续执行,结束 with context_socket(‘127.0.0.1‘, 8888) as sock: # 1. 执行context_socket函数 print(sock) # 4. 执行with代码块后,返回函数
redis的发布与订阅
# 发布 import redis class RedisHelper: def __init__(self): self.__conn = redis.Redis(host=‘192.168.11.87‘) def public(self,msg,chan): self.__conn.publish(chan,msg) # 调用发布函数,向订阅者发布 return True def subscribe(self,chan): pub = self.__conn.pubsub() pub.subscribe(chan) pub.parse_response() return pub if __name__ == ‘__main__‘: obj = RedisHelper() obj.public(‘this is test‘, ‘fm8888.7‘)
# 订阅 import publish # 导入为上代码的类 obj = publish.RedisHelper() data = obj.subscribe(‘fm8888.7‘) print(data.parse_response())
rabbitmq基本使用
############ 基础使用 # 生产者 import pika # 创建一个连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.31.98‘)) # 创建一个频道 channel = connection.channel() # 定义一个队列 channel.queue_declare(queue=‘rock‘) channel.basic_publish(exchange=‘‘, routing_key=‘rock‘, # 往这个队列发消息 body=‘hello world!‘) print(" [x] Sent ‘hello world‘") connection.close() print(‘ [*] Waiting for messages. To exit press CTRL+c ‘) channel.start_consuming() ================================================================================================================= ############ 消费者 import pika # 创建连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.31.98‘)) # 创建频道 channel = connection.channel() # 定义一个队列 channel.queue_declare(queue=‘rock‘) def callback(ch, method, properties,body): # ch: channel method: 队列名字 properties: 连接上rabbiatmq基本属性 body: 取出队列的内容 print(‘[x] Received %r‘ % body) channel.basic_consume(callback,queue=‘rock‘,no_ack=True) # 当我在队列rock中取到数据时, 我就会执行callback函数,并且我还会给callback函数传入4个参数 print(‘ [*] Waiting for messages. To exit press CTRL+c ‘) channel.start_consuming()
rabbitmq: 消息不丢失
######## 生产者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.11.87‘)) channel = connection.channel() channel.exchange_declare(exchange=‘rock_now‘, type=‘direct‘) severity = ‘info‘ message = ‘123‘ channel.basic_publish(exchange=‘rock_now‘, routing_key=severity, body=message, propperties=pika.BasicProperties( delivery_mode=2, # 生产者设置持久化,保证生产者发送队列时,宕机不会丢失 ) ) print(" [x] Sent %r:%r" % (severity,message)) connection.close() ======================================== ######## 消费者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.11.87‘)) channel = connection.channel() channel.exchange_declare(exchange=‘rock_now‘, type=‘direct‘) # 指定关键字,接收有此关键字的队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = [‘error‘, ‘info‘, ‘warning‘] for severity in severities: channel.queue_bind(exchange=‘rock_now‘, queue=queue_name, routing_key=severity) print(‘ [*] Waiting for messages. To exit press CTRL+c ‘) def callback(ch, method, properties,body): print(‘[x] Received %r‘ % body) channel.basic_consume(callback, queue=queue_name, no_ack=False) # 消费者在接受队里消息后,发送ack确认接收 channel.start_consuming()
rabbitmq: 获取顺序
# 消费者 # 因为是获取,只针对消费者 import pika # 当有多个消费者,共同取数据时,默认python是“按照顺序取” # 也就是,如果有三个消费者,去共同取队列数据, # 第一个消费者,取得是1,4,7,10..., 第二个消费者,取得是2,5,8,11..., 第三个消费者,取的是3,6,9,12... # 虽然,消费者之间并不会因为顺序而阻塞, 但是各个消费者还是会按照他们的先后顺序取跳着取数据 # 下边,我们修改这种默认的配置 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.31.98‘)) channel = connection.channel() # make message persistent channel.queue_declare(queue=‘hello‘) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print(‘ok‘) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) # 此设置为多个消费者,不跳着取值 channel.basic_consume(callback, queue=‘hello‘, no_ack=False) print(‘ [*] Waiting for messages. To exit press CTRL+C‘) channel.start_consuming()
rabbitmq: fanout类型exchange
######### 生产者 import pika # 创建连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.31.98‘)) # 创建频道 channel = connection.channel() # 创建exchang, fanout 类型的exchange发送或接受所有在此exchange中的队列 channel.exchange_declare(exchange=‘rock‘, type=‘fanout‘) message = ‘rock now‘ channel.basic_publish(exchange=‘rock‘, # 往这个exchange组里所有队列发送 routing_key=‘‘, body=message) print(" [x] Sent %r" % message) connection.close() ======================================== ######## 消费者 import pika # 创建连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.11.87‘)) # 创建频道 channel = connection.channel() # 创建exchang, fanout 类型的exchange发送或接受所有在此exchange中的队列 channel.exchange_declare(exchange=‘rock‘, type=‘fanout‘) # 随机创建队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # 绑定 # 这里当我们运行多次此程序时,我们随机创建的队列肯定不一致,而我们绑定的是一个exchange, # 所以,这里我们运行多次此程序,然后运行一次生产者,即可 channel.queue_bind(exchange=‘rock‘, queue=queue_name) print(‘ [*] Waiting for messages. To exit press CTRL+c ‘) def callback(ch,method,properties,body): print(‘ [x] %r‘ % body ) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
rabbitmq: direct类型exchange
######## 消费者 import pika # 创建连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.11.87‘)) # 创建频道 channel = connection.channel() # 创建exchange, direct类型的exchange可使程序根据关键字的信息发送或接收相应队列的内容 channel.exchange_declare(exchange=‘rock_now‘, type=‘direct‘) # 指定关键字,接收有此关键字的队列 # 创建随机队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = [‘error‘, ‘info‘, ‘warning‘] for severity in severities: channel.queue_bind(exchange=‘rock_now‘, queue=queue_name, routing_key=severity) # 指定关键字,找对应关键字队列接收 print(‘ [*] Waiting for messages. To exit press CTRL+c ‘) def callback(ch, method, properties,body): print(‘[x] Received %r‘ % body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming() ======================================== ######## 生产者 import pika # 创建连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘192.168.11.87‘)) # 创建频道 channel = connection.channel() # 绑定exchang channel.exchange_declare(exchange=‘rock_now‘, type=‘direct‘) severity = ‘info‘ message = ‘123‘ channel.basic_publish(exchange=‘rock_now‘, routing_key=severity, # 指定关键字,发送到关键字对应的队列 body=message) print(" [x] Sent %r:%r" % (severity,message)) connection.close()
rabbitmq: topic类型exchange
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
- # 表示可以匹配 0 个 或 多个 单词
- * 表示只能匹配 一个 单词
发送者路由值 队列中 old.boy.python old.* -- 不匹配 old.boy.python old.# -- 匹配
在使用上,参考关键字匹配,只不过是传入值带*或#,topic会模糊匹配,而关键字不会~
pymysql 模块
#基础操作 import pymysql # 创建连接 conn = pymysql.connect(host=‘192.168.31.98‘, port=3306, user=‘root‘, passwd=‘123qwe‘, db=‘test‘) # 创建游标 cursor = conn.cursor() # 执行SQL,并返回收影响行数 effect_row = cursor.execute("update hosts set host = ‘1.1.1.2‘") # 执行SQL,并返回受影响行数 effect_row = cursor.execute("update hosts set host = ‘1.1.1.2‘ where nid > %s", (1,)) # 支持字符串拼接 # 执行SQL,并返回受影响行数 effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) # 支持多个字符串拼接 # 提交,不然无法保存新建或者修改的数据 conn.commit() # 关闭游标 cursor.close() # 关闭连接 conn.close()
# 自增ID import pymysql conn = pymysql.connect(host=‘127.0.0.1‘, port=3306, user=‘root‘, passwd=‘123‘, db=‘t1‘) cursor = conn.cursor() cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) conn.commit() cursor.close() conn.close() # 获取最新自增ID new_id = cursor.lastrowid # 在上代码中,我们使用executemany,添加多条后,获取最新的自增ID为最后一个
# 获取查询数据 import pymysql conn = pymysql.connect(host=‘127.0.0.1‘, port=3306, user=‘root‘, passwd=‘123‘, db=‘t1‘) cursor = conn.cursor() cursor.execute("select * from hosts") # 获取第一行数据 row_1 = cursor.fetchone() # 获取前n行数据,设置为3,则获取3行数据 # row_2 = cursor.fetchmany(3) # 获取所有数据 # row_3 = cursor.fetchall() conn.commit() cursor.close() conn.close()
# 移动游标 ----------------------------------------------------------------- cursor.scroll(1,mode=‘relative‘) # 相对当前位置移动 cursor.scroll(2,mode=‘absolute‘) # 相对绝对位置移动 ----------------------------------------------------------------- import pymysql conn = pymysql.connect(host=‘127.0.0.1‘, port=3306, user=‘root‘, passwd=‘123‘, db=‘t1‘) cursor = conn.cursor() cursor.execute("select * from hosts") # 获取所有数据 row_3 = cursor.fetchall() # 获取所有数据后,我将游标上移2位 cursor.scroll(-2,mode=‘relative‘) # 重新获取所有数据,就从游标处开始获取所有 row_3 = cursor.fetchall() conn.commit() cursor.close() conn.close()
# fetch 数据类型 import pymysql conn = pymysql.connect(host=‘127.0.0.1‘, port=3306, user=‘root‘, passwd=‘123‘, db=‘t1‘) # 游标设置为字典类型 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) r = cursor.execute("call p1()") result = cursor.fetchone() conn.commit() cursor.close() conn.close()
SQLAchemy (ORM)使用:创建与删除
######### 创建表 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine # 创建连接 engine = create_engine("mysql+pymysql://root:[email protected]:3306/test", max_overflow=5) Base = declarative_base() # 规定这么写 # 创建单表 class Users(Base): # 所有你自定义的类必须继承你刚刚创建的base类 __tablename__ = ‘users‘ # 表名定义 # 表中创建三个列 id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint(‘id‘, ‘name‘, name=‘uix_id_name‘), # 联合索引 Index(‘ix_id_name‘, ‘name‘, ‘extra‘), ) Base.metadata.create_all(engine) # 这里,Base会找到所有它的子类,根据子类执行创建表
######### 一对多创建表 # 一对多 class Favor(Base): __tablename__ = ‘favor‘ # 表名定义 nid = Column(Integer, primary_key=True) # 自增ID caption = Column(String(50), default=‘red‘, unique=True) # 字符串最多50,unique=True不允许重复,default默认值red class Person(Base): __tablename__ = ‘person‘ nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # ForeignKey("favor.nid"),与创建favor表的nid列做一个外键 Base.metadata.create_all(engine)
######## 多对多建表 # 多对多 class ServerToGroup(Base): # 第三张表存下两个表的关系 __tablename__ = ‘servertogroup‘ nid = Column(Integer, primary_key=True, autoincrement=True) # 这里同时外键两个表的ID列,这样建立了下两个表(类)的关系 server_id = Column(Integer, ForeignKey(‘server.id‘)) group_id = Column(Integer, ForeignKey(‘group.id‘)) class Group(Base): __tablename__ = ‘group‘ id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) class Server(Base): __tablename__ = ‘server‘ id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) Base.metadata.create_all(engine)
# 创建与删除 Base.metadata.create_all(engine) # 创建所有表 Base.metadata.drop_all(engine) # 删除所有表
SQLAchemy (ORM)使用:操作表
# 增 engine = create_engine("mysql+pymysql://root:[email protected]:3306/test", max_overflow=5) Base = declarative_base() # 规定这么写 class Users(Base): # 所有你自定义的类必须继承你刚刚创建的base类 __tablename__ = ‘users‘ # 表名定义 # 表中创建三个列 id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint(‘id‘, ‘name‘, name=‘uix_id_name‘), # 联合索引 Index(‘ix_id_name‘, ‘name‘, ‘extra‘), ) obj = Users(name="alex0", extra=‘sb‘) # 需要在哪个表增加数据,就将哪个表的类封装一个对象 seeion.add(obj) # 添加 seeion.commit() # 提交
时间: 2024-09-26 22:31:15