python 之路12 RabbitMQ Python 操作mysql

1. RabbitMQ简介

  rabbitmq服务类似于mysql、apache服务,只是提供的功能不一样。rabbimq是用来提供发送消息的服务,可以用在不同的应用程序之间进行通信。

2.安装RabbitMQ Ubuntu 14.04

 sudo apt-get install rabbitmq-server

  安装好后,rabbitmq服务就已经启动好了。接下来看下python编写Hello World!的实例。实例的内容就是从send.py发送“Hello World!”到rabbitmq,receive.py  从rabbitmq接收send.py发送的信息  

rabbitmq消息发送流程(来源rabbitmq官网

其中P表示produce,生产者的意思,也可以称为发送者,实例中表现为send.py;C表示consumer,消费者的意思,也可以称为接收者,实例中表现为receive.py;中间红色的表示队列的意思,实例中表现为hello队列。

python使用rabbitmq服务,可以使用现成的类库pika、txAMQP或者py-amqplib,这里选择了pika。

4.安装pika

  sudo apt-get install python3-pip

  sudo pip3 install pika

5.发布 连接到rabbitmq服务器,因为是在本地测试,所以就用localhost就可以了。

 import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(    host = ‘localhost‘))  #链接本地rabbitmq

chan = conn.channel()  #创建一个通信频道

chan.queue_declare(queue=‘wyx‘)  #声明消息队列,消息将在这个队列中进行传递。如果将消息发送到不存在的队列,rabbitmq将会自动清除这些消息。

chan.basic_publish(   #发布    exchange=‘‘,       routing_key=‘wyx‘, #路由键, 相当于字典里面的key    body=‘hello wyx‘  #发送的内容  相当于字典里面的value)

print(‘sent hello wyx‘)

6.订阅 
import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host = ‘localhost‘)) #链接本地rabbitmq

chan = conn.channel() #创建链接频道

chan.queue_declare(‘wyx‘) #如果有wyx这个消息队列,则不创建,否则创建
def callback(ch,method,properties,body):    print(‘revice %r‘ % body)  #接收发布者消息函数,并打印出内容

chan.basic_consume(callback,                    queue=‘wyx‘,                   no_ack=True)

print(‘wait‘)chan.start_consuming()  


7.fanout类型 RabbitMQ  上面的演示,消息是依次发送给绑定到该队列的接收端。如果要广播出去,就要使用交换机,本篇演示了交换机的工作方式也就是exchange的fanout类型

8.发布者

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(    host=‘localhost‘)) #链接RabbitMQchan = conn.channel()  #创建频道

chan.exchange_declare(  #创建exchange,名字为logs,类型为fanout,所有队列都可以收到,如果没有log这个名字则创建    exchange=‘logs‘,    type = ‘fanout‘)mess = ‘wyx‘ #需要发布的消息

chan.basic_publish(    exchange=‘logs‘,  #exchage名字    routing_key=‘‘,  #存放的键    body=mess)

print(‘hah %s‘ % mess)conn.close()

9.订阅者
import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(    host=‘127.0.0.1‘,  #链接redis))

chan = conn.channel()  #创建频道,根据链接创建

chan.exchange_declare(exchange=‘logs‘,type=‘fanout‘)  #创建exchange,名字为logs,类型为fanout,所有队列都可以收到,如果没有log这个名字则创建

result = chan.queue_declare(exclusive=True)  #随机创建一个队列queue_name  = result.method.queue   #队列名字

chan.queue_bind(exchange=‘logs‘,queue=queue_name)  #把频道和所有队列绑定

print(‘wait‘)

def callback(ch,method,properties,body):  #body为接受的消息    print(‘haaj %r‘ % body)

chan.basic_consume(callback,queue=queue_name,no_ack=True)  #no_ack 是否做持久话,False为做持久化,True为不做持久化

chan.start_consuming()

10. 远程结果返回  在发布端执行一个命令,订阅者执行命令,并且返回结果
11.发布者
#!/usr/bin/env python3#coding=utf8import pika

class Center(object):    def __init__(self):        self.connection = pika.BlockingConnection(pika.ConnectionParameters(                host=‘localhost‘))  #链接本地rabbitmq

        self.channel = self.connection.channel()  #创建频道

        #定义接收返回消息的队列        result = self.channel.queue_declare(exclusive=True)        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response,                                   no_ack=True,                                   queue=self.callback_queue)

    #定义接收到返回消息的处理方法    def on_response(self, ch, method, props, body):        self.response = body

    def request(self, n):        self.response = None        #发送计算请求,并声明返回队列        self.channel.basic_publish(exchange=‘‘,                                   routing_key=‘exec_queue‘,                                   properties=pika.BasicProperties(                                         reply_to = self.callback_queue,                                         ),                                   body=str(n))        #接收返回的数据        while self.response is None:            self.connection.process_data_events()        return self.response

center = Center()  #实例化#Center类,自动执行__init__函数

print( " 请输入想要执行的命令")mess = input(‘Please enter the command you want to execute‘).strip()response = center.request(mess)  #执行结果

print(" [.] 执行结果 %r" % (response,) )

12 订阅者
#!/usr/bin/env python3#coding=utf8import pikaimport  subprocess

#连接rabbitmq服务器connection = pika.BlockingConnection(pika.ConnectionParameters(        host=‘localhost‘))channel = connection.channel()

#定义队列channel.queue_declare(queue=‘exec_queue‘)print( ‘等待执行命令‘)

#执行命令,并returndef exec_cmd(n):    t = subprocess.Popen(n,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)    stdout = t.stdout.read()    stderr = t.stderr.read()    if stdout:        return stdout    else:        return stderr

#定义接收到消息的处理方法def request(ch, method, properties, body):    print( " [.] 执行命令 (%s)"  % (body,))    body = str(body,encoding=‘utf-8‘)    response = exec_cmd(body)

    #将计算结果发送回控制中心    ch.basic_publish(exchange=‘‘,                     routing_key=properties.reply_to,                     body=str(response))    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)channel.basic_consume(request, queue=‘exec_queue‘)

channel.start_consuming()

13. 使用MySQLdb操作mysql数据库,并连接数据库  
#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import MySQLdb
# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取操作游标
cursor = db.cursor()

# 使用execute方法执行SQL语句
cursor.execute("SELECT VERSION()")

# 使用 fetchone() 方法获取一条数据库。
data = cursor.fetchone()

print("Database version : %s " % data)

# 关闭数据库连接
db.close()

14.创建数据库表
import MySQLdb

# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取操作游标
cursor = db.cursor()

# 如果数据表已经存在使用 execute() 方法删除表。
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")

# 创建数据表SQL语句
sql = """CREATE TABLE EMPLOYEE (
         FIRST_NAME  CHAR(20) NOT NULL,
         LAST_NAME  CHAR(20),
         AGE INT,
         SEX CHAR(1),
         INCOME FLOAT )"""

cursor.execute(sql)

# 关闭数据库连接
db.close()

15.数据库插入操作
import MySQLdb

# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取操作游标
cursor = db.cursor()

# SQL 插入语句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
         LAST_NAME, AGE, SEX, INCOME)
         VALUES (‘Mac‘, ‘Mohan‘, 20, ‘M‘, 2000)"""
try:
   # 执行sql语句
   cursor.execute(sql)
   # 提交到数据库执行
   db.commit()
except:
   # Rollback in case there is any error
   db.rollback()

# 关闭数据库连接
db.close()

16.数据库查询操作
import MySQLdb

# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取操作游标
cursor = db.cursor()

# SQL 查询语句
sql = "SELECT * FROM EMPLOYEE        WHERE INCOME > ‘%d‘" % (1000)
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 获取所有记录列表
   results = cursor.fetchall()
   for row in results:
      fname = row[0]
      lname = row[1]
      age = row[2]
      sex = row[3]
      income = row[4]
      # 打印结果
      print "fname=%s,lname=%s,age=%d,sex=%s,income=%d" %              (fname, lname, age, sex, income )
except:
   print("Error: unable to fecth data")

# 关闭数据库连接
db.close()
17.数据库更新操作
import MySQLdb

# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取操作游标
cursor = db.cursor()

# SQL 更新语句
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1
                          WHERE SEX = ‘%c‘" % (‘M‘)
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 提交到数据库执行
   db.commit()
except:
   # 发生错误时回滚
   db.rollback()

# 关闭数据库连接
db.close()

18.删除操作
import MySQLdb

# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )

# 使用cursor()方法获取操作游标
cursor = db.cursor()

# SQL 删除语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > ‘%d‘" % (20)
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 提交修改
   db.commit()
except:
   # 发生错误时回滚
   db.rollback()

# 关闭连接
db.close()

19.事物机制,事务机制可以确保数据一致性。
# SQL删除记录语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > ‘%d‘" % (20)
try:
   # 执行SQL语句
   cursor.execute(sql)
   # 向数据库提交
   db.commit()
except:
   # 发生错误时回滚
   db.rollback()
 
 

  

 

时间: 2024-10-20 21:53:04

python 之路12 RabbitMQ Python 操作mysql的相关文章

Python不归路_字符编码操作

文件操作补充 上篇随笔中写了文件操作的几个方法,其中truncate()方法遗漏,truncate()方法作用是截取内容,f.truncate()不带参数会清空文件内容,带参数表示截取从零到参数的位置 字符编码 在<Python不归路_零基础学习二>中我们已经学习了一些编码的知识,比如ASCII一共有255个符号,Unicode中,中文字符占两个字节,英文占一个字节,utf-8是unicode的优化方案,中文字节占三个字符.不同字符编码之间需要相互转化才能正常读取.encode和decode,

Python之路之安装Python

本篇文章我们就来谈一谈如何安装Python. 首先当然是要去下载啦!你可以去Python的官网(python.org)下载完整的Python.假如你是一名新手,想学Python试试看,你可以先用Python的在线版本.如图,你点击箭头,就可以进入Python的在线版本.但在线版本有很多不便,比如说在线版的Python就无法保存文件,这就非常蛋疼了.所以在你熟悉了Python的代码结构及使用方式之后,就可以下载Python本地版了.本地版主流的有两个版本,分别是Python2.7和Python3.

python之路一,python基本语法

python基本语法: 1.python基本语句结构: 首先,在其他的语言中,比如java,c++,c#等,没写完一行语句之后,都需要在语句的末尾加一个分号,表示该语句结束,但是在python中,我们是不需要在每一行的末尾增加分号的,python默认每一行为一条语句,当然我们加上也并不会报错,这一点跟js有些像.比如如下代码会顺利执行 1 print("hello world") 2 print("hello world") 3 print("hello

【Python之路】第二十篇--MySQL(二)

视图 视图是一个虚拟表(非真实存在),其本质是[根据SQL语句获取动态的数据集,并为其命名], 用户使用时只需使用[名称]即可获取结果集,并可以将其当作表来使用. 1.创建视图 --格式:CREATE VIEW 视图名称 AS SQL语句 create view v1 as select nid,name from tb1 where nid > 4 2.删除视图 --格式:DROP VIEW 视图名称 deop view v1 3.修改视图 -- 格式:ALTER VIEW 视图名称 AS S

【Python之路Day18】Python Web框架之 Django 进阶操作

Django 进阶篇 一.django的Model基本操作和增.删.改.查. 注:默认使用了sqlite3数据库 如果想使用其他数据库,请在settings里修改 1.创建数据库: 1.创建model类 在app01(或者你的app下)下models.py写入以下内容: from django.db import models # Create your models here. # 这个类是用来生成数据库表的,这个类必须集成models.Model class UserInfo(models.

python之路3:文件操作和函数基础

文件操作 字符编码解码 函数基础 内置函数 一.文件操作 对文件操作流程 打开文件,得到文件句柄并赋值给一个变量 通过句柄对文件进行操作 关闭文件 打开文件的模式有: r,只读模式(默认). w,只写模式.[不可读:不存在则创建:存在则删除内容:] a,追加模式.[可读:不存在则创建:存在则只追加内容:] "+" 表示可以同时读写某个文件 r+,可读写文件.[可读:可写:可追加] w+,写读 a+,追加可写 "U"表示在读取时,可以将 \r \n \r\n自动转换成

【python之路19】文件操作

一.打开文件 文件句柄 = open('文件路径', '模式') 打开文件时,需要指定文件路径和以何等方式打开文件,打开后,即可获取该文件句柄,日后通过此文件句柄对该文件操作. 打开文件的模式有: r ,只读模式[默认] w,只写模式[不可读:不存在则创建:存在则清空内容:] x, 只写模式[不可读:不存在则创建,存在则报错] a, 追加模式[可读:   不存在则创建:存在则只追加内容:] #!usr/bin/env python # -*- coding:utf-8 -*- f = open(

Python之路 day2 文件基础操作

1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #Author:ersa 4 ''' 5 #f,文件句柄;模式 a : append 追加文件内容 6 f = open("yesterday2",'a',encoding="utf-8") 7 8 f.write("\nWhen i was yount i listen to the radio\n") 9 f.write("I lo

Python之路10-字符串操作

name = "my name is jiachen" #首字母大写 print (name.capitalize()) #统计字母出现次数 print (name.count('a')) #居中打印 print (name.center(50,'-')) #字符串装换成bytes类型 print (name.encode('utf-8')) #判断以什么结尾 print (name.endswith('en')) #将tab转多少个空格 print (name.expandtabs(