1. RabbitMQ简介
rabbitmq服务类似于mysql、apache服务,只是提供的功能不一样。rabbimq是用来提供发送消息的服务,可以用在不同的应用程序之间进行通信。
2.安装RabbitMQ Ubuntu 14.04
sudo apt-get install rabbitmq-server
安装好后,rabbitmq服务就已经启动好了。接下来看下python编写Hello World!的实例。实例的内容就是从send.py发送“Hello World!”到rabbitmq,receive.py 从rabbitmq接收send.py发送的信息
rabbitmq消息发送流程(来源rabbitmq官网)
其中P表示produce,生产者的意思,也可以称为发送者,实例中表现为send.py;C表示consumer,消费者的意思,也可以称为接收者,实例中表现为receive.py;中间红色的表示队列的意思,实例中表现为hello队列。
python使用rabbitmq服务,可以使用现成的类库pika、txAMQP或者py-amqplib,这里选择了pika。
4.安装pika
sudo apt-get install python3-pip
sudo pip3 install pika
5.发布 连接到rabbitmq服务器,因为是在本地测试,所以就用localhost就可以了。
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters( host = ‘localhost‘)) #链接本地rabbitmq chan = conn.channel() #创建一个通信频道 chan.queue_declare(queue=‘wyx‘) #声明消息队列,消息将在这个队列中进行传递。如果将消息发送到不存在的队列,rabbitmq将会自动清除这些消息。 chan.basic_publish( #发布 exchange=‘‘, routing_key=‘wyx‘, #路由键, 相当于字典里面的key body=‘hello wyx‘ #发送的内容 相当于字典里面的value) print(‘sent hello wyx‘) 6.订阅
import pika conn = pika.BlockingConnection(pika.ConnectionParameters(host = ‘localhost‘)) #链接本地rabbitmq chan = conn.channel() #创建链接频道 chan.queue_declare(‘wyx‘) #如果有wyx这个消息队列,则不创建,否则创建
def callback(ch,method,properties,body): print(‘revice %r‘ % body) #接收发布者消息函数,并打印出内容 chan.basic_consume(callback, queue=‘wyx‘, no_ack=True) print(‘wait‘)chan.start_consuming()
7.fanout类型 RabbitMQ 上面的演示,消息是依次发送给绑定到该队列的接收端。如果要广播出去,就要使用交换机,本篇演示了交换机的工作方式也就是exchange的fanout类型 8.发布者
import pika conn = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) #链接RabbitMQchan = conn.channel() #创建频道 chan.exchange_declare( #创建exchange,名字为logs,类型为fanout,所有队列都可以收到,如果没有log这个名字则创建 exchange=‘logs‘, type = ‘fanout‘)mess = ‘wyx‘ #需要发布的消息 chan.basic_publish( exchange=‘logs‘, #exchage名字 routing_key=‘‘, #存放的键 body=mess) print(‘hah %s‘ % mess)conn.close() 9.订阅者
import pika conn = pika.BlockingConnection(pika.ConnectionParameters( host=‘127.0.0.1‘, #链接redis)) chan = conn.channel() #创建频道,根据链接创建 chan.exchange_declare(exchange=‘logs‘,type=‘fanout‘) #创建exchange,名字为logs,类型为fanout,所有队列都可以收到,如果没有log这个名字则创建 result = chan.queue_declare(exclusive=True) #随机创建一个队列queue_name = result.method.queue #队列名字 chan.queue_bind(exchange=‘logs‘,queue=queue_name) #把频道和所有队列绑定 print(‘wait‘) def callback(ch,method,properties,body): #body为接受的消息 print(‘haaj %r‘ % body) chan.basic_consume(callback,queue=queue_name,no_ack=True) #no_ack 是否做持久话,False为做持久化,True为不做持久化 chan.start_consuming() 10. 远程结果返回 在发布端执行一个命令,订阅者执行命令,并且返回结果
11.发布者
#!/usr/bin/env python3#coding=utf8import pika class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) #链接本地rabbitmq self.channel = self.connection.channel() #创建频道 #定义接收返回消息的队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #定义接收到返回消息的处理方法 def on_response(self, ch, method, props, body): self.response = body def request(self, n): self.response = None #发送计算请求,并声明返回队列 self.channel.basic_publish(exchange=‘‘, routing_key=‘exec_queue‘, properties=pika.BasicProperties( reply_to = self.callback_queue, ), body=str(n)) #接收返回的数据 while self.response is None: self.connection.process_data_events() return self.response center = Center() #实例化#Center类,自动执行__init__函数 print( " 请输入想要执行的命令")mess = input(‘Please enter the command you want to execute‘).strip()response = center.request(mess) #执行结果 print(" [.] 执行结果 %r" % (response,) ) 12 订阅者
#!/usr/bin/env python3#coding=utf8import pikaimport subprocess #连接rabbitmq服务器connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘))channel = connection.channel() #定义队列channel.queue_declare(queue=‘exec_queue‘)print( ‘等待执行命令‘) #执行命令,并returndef exec_cmd(n): t = subprocess.Popen(n,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) stdout = t.stdout.read() stderr = t.stderr.read() if stdout: return stdout else: return stderr #定义接收到消息的处理方法def request(ch, method, properties, body): print( " [.] 执行命令 (%s)" % (body,)) body = str(body,encoding=‘utf-8‘) response = exec_cmd(body) #将计算结果发送回控制中心 ch.basic_publish(exchange=‘‘, routing_key=properties.reply_to, body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1)channel.basic_consume(request, queue=‘exec_queue‘) channel.start_consuming() 13. 使用MySQLdb操作mysql数据库,并连接数据库
#!/usr/bin/python3 # -*- coding: UTF-8 -*- import MySQLdb
# 打开数据库连接 db = MySQLdb.connect("localhost","testuser","test123","TESTDB" ) # 使用cursor()方法获取操作游标 cursor = db.cursor() # 使用execute方法执行SQL语句 cursor.execute("SELECT VERSION()") # 使用 fetchone() 方法获取一条数据库。 data = cursor.fetchone() print("Database version : %s " % data) # 关闭数据库连接 db.close() 14.创建数据库表
import MySQLdb # 打开数据库连接 db = MySQLdb.connect("localhost","testuser","test123","TESTDB" ) # 使用cursor()方法获取操作游标 cursor = db.cursor() # 如果数据表已经存在使用 execute() 方法删除表。 cursor.execute("DROP TABLE IF EXISTS EMPLOYEE") # 创建数据表SQL语句 sql = """CREATE TABLE EMPLOYEE ( FIRST_NAME CHAR(20) NOT NULL, LAST_NAME CHAR(20), AGE INT, SEX CHAR(1), INCOME FLOAT )""" cursor.execute(sql) # 关闭数据库连接 db.close() 15.数据库插入操作
import MySQLdb # 打开数据库连接 db = MySQLdb.connect("localhost","testuser","test123","TESTDB" ) # 使用cursor()方法获取操作游标 cursor = db.cursor() # SQL 插入语句 sql = """INSERT INTO EMPLOYEE(FIRST_NAME, LAST_NAME, AGE, SEX, INCOME) VALUES (‘Mac‘, ‘Mohan‘, 20, ‘M‘, 2000)""" try: # 执行sql语句 cursor.execute(sql) # 提交到数据库执行 db.commit() except: # Rollback in case there is any error db.rollback() # 关闭数据库连接 db.close() 16.数据库查询操作
import MySQLdb # 打开数据库连接 db = MySQLdb.connect("localhost","testuser","test123","TESTDB" ) # 使用cursor()方法获取操作游标 cursor = db.cursor() # SQL 查询语句 sql = "SELECT * FROM EMPLOYEE WHERE INCOME > ‘%d‘" % (1000) try: # 执行SQL语句 cursor.execute(sql) # 获取所有记录列表 results = cursor.fetchall() for row in results: fname = row[0] lname = row[1] age = row[2] sex = row[3] income = row[4] # 打印结果 print "fname=%s,lname=%s,age=%d,sex=%s,income=%d" % (fname, lname, age, sex, income ) except: print("Error: unable to fecth data") # 关闭数据库连接 db.close()
17.数据库更新操作
import MySQLdb # 打开数据库连接 db = MySQLdb.connect("localhost","testuser","test123","TESTDB" ) # 使用cursor()方法获取操作游标 cursor = db.cursor() # SQL 更新语句 sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = ‘%c‘" % (‘M‘) try: # 执行SQL语句 cursor.execute(sql) # 提交到数据库执行 db.commit() except: # 发生错误时回滚 db.rollback() # 关闭数据库连接 db.close() 18.删除操作
import MySQLdb # 打开数据库连接 db = MySQLdb.connect("localhost","testuser","test123","TESTDB" ) # 使用cursor()方法获取操作游标 cursor = db.cursor() # SQL 删除语句 sql = "DELETE FROM EMPLOYEE WHERE AGE > ‘%d‘" % (20) try: # 执行SQL语句 cursor.execute(sql) # 提交修改 db.commit() except: # 发生错误时回滚 db.rollback() # 关闭连接 db.close() 19.事物机制,事务机制可以确保数据一致性。
# SQL删除记录语句 sql = "DELETE FROM EMPLOYEE WHERE AGE > ‘%d‘" % (20) try: # 执行SQL语句 cursor.execute(sql) # 向数据库提交 db.commit() except: # 发生错误时回滚 db.rollback()
时间: 2024-10-20 21:53:04