rabbitMQ消息队列
rabbitmqctl list_queues 查看目前有多少队列,每个队列有多少消息未处理
rabbitMQ默认采用轮询机制依次给每个接收端发送消息
为了确保一条消息被完整处理完,需要接收端主动向服务端发送确认信号
def callback(ch,method,properties,body):
print("-->",ch,method,properties)
print(" [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag) #接收端想发送端发送消息处理完毕的确认信号
消息持久化
收发端在声明队列时需要指定durable
=
True来使队列持久化,同时消息发送端在发送消息时还需要指定
properties
=
pika.BasicProperties(
delivery_mode
=
2
,
)来使消息持久化
import pika,time
start_time =time.time()
connection =pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
channel =connection.channel()
channel.queue_declare(queue="hello2",durable=True) #接收端申明队列时也需要设置durable为True
channel.basic_publish(exchange ="",
routing_key ="hello",
body ="hello world",
properties =pika.BasicProperties(delivery_mode=2,))
print(" [x] Sent ‘Hello World!‘")
connection.close()
print(time.time()-start_time)
消息合理分发
在接收端接收消息的函数前定义channel.basic_qos(prefetch_count
=
1
),即可让发送端在接收端处理完一条消息后再发送新消息过来否则不再发送新的消息过来
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue ="hello")
# no_ack =True)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
队列转发器
fanout广播转发器
发送端
‘‘‘申明一个转发器并定义转发器类型为广播,名字为logs‘‘‘
channel.exchange_declare(exchange="logs",type="fanout")
msg ="hello world"
‘‘‘广播模式不许要申明消息队列,但是还是要指定routing_key的值‘‘‘
channel.basic_publish(exchange="logs",routing_key="",body=msg)
print(" [x] Sent %r" % msg)
connection.close()
接收端
channel.exchange_declare(exchange="logs",type="fanout")
‘‘‘不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除‘‘‘
result =channel.queue_declare(exclusive=True)
queue_name =result.method.queue
‘‘‘绑定转发器‘‘‘
channel.queue_bind(exchange="logs",queue=queue_name)
print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
def callback(ch,method,properties,body):
print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
direct有选择接收消息转发器
发送端
channel.exchange_declare(exchange=‘direct_logs‘,
type=‘direct‘)
severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘direct_logs‘,
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
接收端
channel.exchange_declare(exchange=‘direct_logs‘,
type=‘direct‘)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange=‘direct_logs‘,
queue=queue_name,
routing_key=severity)
print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
topic更细致的消息过滤转发器
发送端
channel.exchange_declare(exchange=‘topic_logs‘,
type=‘topic‘)
routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘topic_logs‘,
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
接收端
channel.exchange_declare(exchange=‘topic_logs‘,
type=‘topic‘)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange=‘topic_logs‘,
queue=queue_name,
routing_key=binding_key)
print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
*RPC
send端
import pika
import time
connection =
pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘rpc_queue‘)
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n
= int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange=‘‘,
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id= \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,
queue=‘rpc_queue‘)
print(" [x] Awaiting RPC
requests")
channel.start_consuming()
receive端
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange=‘‘,
routing_key=‘rpc_queue‘,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
python操作redis
统计在线用户数及查询指定用户是否在线
setbit n5 55 1 将Key值n5的values值得二进制位第55位设为1,55代表用户ID
getbit n5 55 取key值n5的values值得二进制位第55位的值
基础操作
string操作
import redis
pool =redis.ConnectionPool(host=‘localhost‘,port=‘6379‘)
r =redis.Redis(connection_pool=pool)
# ‘‘‘设置key为name,value为alex的键值对,key已存在则为修改,
# ex,过期时间(秒)
# px,过期时间(毫秒)
# nx,如果设置为True,则只有name不存在时,当前set操作才执行
# xx,如果设置为True,则只有name存在时,岗前set操作才执行‘‘‘
# r.set(‘name‘,‘alex‘,ex=3)
# ‘‘‘按字节截取值的一部分返回,前面指定起始字节位置,后面设置结束位置。在Unicode中一个英文字符一个字节,一个中文3个字节‘‘‘
# print(r.getrange(‘name‘,1,2))
# ‘‘‘修改值的对应字节位置的部分,修改的比原来的长时向后覆盖‘‘‘
# r.setrange(‘name‘,0,‘wa‘)
# ‘‘‘设置新值并返回原来的值‘‘‘
# r.getset(‘name‘,‘jack‘)
# ‘‘‘设置值,只有name不存在时,执行设置操作(添加)‘‘‘
# r.setnx(‘name‘,‘jack‘)
# ‘‘‘获取name对应的值‘‘‘
# r.get(‘name‘)
# ‘‘‘等于r.set(‘name‘,‘alex‘,ex=3)‘‘‘
# r.setex("age",22,4)
# ‘‘‘等于r.setex("age",22,4)‘‘‘
# r.psetex(‘job‘,3,‘python‘)
# ‘‘‘批量设置和批量获取‘‘‘
# r.mset(name=‘alex‘,age=22)
# print(r.mget(‘name‘,‘age‘))
# ‘‘‘设置值的对应二进制位,获取值的对应二进制位‘‘‘
# r.setbit(‘n5‘,1000,1)
# print(r.getbit(‘n5‘,1000))
# ‘‘‘统计值的二进制位为1的个数‘‘‘
# r.bitcount(‘n5‘)
# ‘‘‘返回值的字节长度‘‘‘
# print(r.strlen(‘name‘))
# ‘‘‘将key的值自增指定的数值,如果不指定数值就自增1,key值不存在就创建key=amount的记录‘‘‘
# print(r.incr(‘n2‘,amount=3))
# ‘‘‘作用类似incr,value和amount都要为浮点数‘‘‘
# print(r.incrbyfloat(‘k1‘,amount=2.0))
# ‘‘‘自减指定的数值‘‘‘
# print(r.decr(‘k2‘,amount=5))
# ‘‘‘在指定的key的值后添加字符,返回总的字节数‘‘‘
# print(r.append(‘name‘,‘good‘))
hash操作
‘‘‘name对应的hash中设置一个键值对(不存在,则创建;否则,修改)‘‘‘
r.hset(‘name‘,‘k1‘,‘alex‘)
‘‘‘ 在name对应的hash中获取根据key获取value‘‘‘
print(r.hget(‘name‘,‘k1‘))
‘‘‘批量设置键值对‘‘‘
r.hmset(‘info‘,{‘k1‘:‘alex‘,‘k2‘:‘jack‘,‘k3‘:‘tom‘,‘n1‘:‘faker‘,‘n2‘:‘huni‘})
‘‘‘批量获取指定key的值‘‘‘
print(r.hmget(‘info‘,[‘name‘,‘age‘]))
‘‘‘获取name对应hash的所有键值‘‘‘
print(r.hgetall(‘info‘))
‘‘‘获取name对应的hash中键值对的个数‘‘‘
print(r.hlen(‘info‘))
‘‘‘获取name对应的hash中所有的key的值‘‘‘
print(r.hkeys(‘info‘))
‘‘‘获取name对应的hash中所有的value的值‘‘‘
print(r.hvals(‘info‘))
‘‘‘检查name对应的hash中是否存在当前传入的key‘‘‘
print(r.hexists(‘info‘,‘job‘))
‘‘‘将name对应的hash中指定key的键值对删除‘‘‘
r.hdel(‘info‘,‘name‘)
‘‘‘自增name对应的hash中的指定key的值,不存在则创建key=amount,类似incr‘‘‘
print(r.hincrby(‘test‘,‘k7‘,amount=2))
‘‘‘自增name对应的hash中的指定key的值,不存在则创建key=amount,类似incrbyfloat‘‘‘
print(r.hincrbyfloat(‘test1‘,‘k9‘,amount=2.0))
‘‘‘ 在name对应的hash存放的键值对中,匹配指定条件的key输出其key-value值‘‘‘
print(r.hscan(‘info‘,0,match=‘n*‘))
‘‘‘作用类似hscan,只是将结果以生成器形式返回,用循环的方式取值‘‘‘
a =r.hscan_iter(‘info‘,match=‘k*‘)
for item in a:
print(item)
list操作
‘‘‘向列表list放入元素,新元素会放在旧元素的左侧,一次可以放多个元素‘‘‘
r.lpush(‘list‘,‘alex‘,‘jack‘)
‘‘‘根据指定的索引值在列表中取值‘‘‘
print(r.lindex(‘list‘,0))
‘‘‘将列表中满足指定索引范围的值取出‘‘‘
print(r.lrange(‘list‘,0,80))
print(r.lrange(‘list2‘,0,80))
‘‘‘只有当指定的列表存在时才会将值存入‘‘‘
r.lpushx(‘list‘,‘tom‘)
‘‘‘输出指定列表的元素个数‘‘‘
print(r.llen(‘list‘))
‘‘‘在列表的指定元素的前面后或后面插入新的元素‘‘‘
r.linsert(‘list‘,‘before‘,‘jack‘,‘tom‘)
‘‘‘将列表中指定索引值的位置重新赋值‘‘‘
r.lset(‘list‘,4,‘faker‘)
‘‘‘在列表中从前往后删除三个与指定的值相同的元素,num为负数时为从后往前删除‘‘‘
r.lrem(‘list‘,‘alex‘,num=3)
‘‘‘将列表中从左向右的第一个元素删除并返回操作后列表中从左向右的第一个元素,rpop为从右向左操作‘‘‘
print(r.lpop(‘list‘))
‘‘‘将列表中不在指定索引范围的元素都删除‘‘‘
r.ltrim(‘list‘,2,4)
‘‘‘将第一个列表最右边的元素取出添加到第二个列表的最左边‘‘‘
r.rpoplpush(‘list‘,‘list2‘)
‘‘‘将传入的每个列表中的元素都排序,按传入的列表顺序将列表中的元素从左向右依次删除,
所有列表的元素都删完后将在指定时间内阻塞,等待新值存入列表,时间到了就退出‘‘‘
print(r.blpop((‘list,list2‘),timeout=10))
‘‘‘将第一个列表右侧的一个元素删除并添加到第二个列表的左侧‘‘‘
r.brpoplpush(‘list‘,‘list2‘)
set操作
无序集合
sadd(name,values)
# name对应的集合中添加元素
scard(name)
获取name对应的集合中元素个数
sdiff(keys, *args)
在第一个name对应的集合中且不在其他name对应的集合的元素集合
sdiffstore(dest, keys, *args)
# 获取第一个name对应的集合中且不在其他name对应的集合,再将其新加入到dest对应的集合中
sinter(keys, *args)
# 获取多一个name对应集合的并集
sinterstore(dest, keys, *args)
# 获取多一个name对应集合的并集,再讲其加入到dest对应的集合中
sismember(name, value)
# 检查value是否是name对应的集合的成员
smembers(name)
# 获取name对应的集合的所有成员
smove(src, dst, value)
# 将某个成员从一个集合中移动到另外一个集合
spop(name)
# 从集合的右侧(尾部)移除一个成员,并将其返回
srandmember(name, numbers)
# 从name对应的集合中随机获取
numbers 个元素
srem(name, values)
# 在name对应的集合中删除某些值
sunion(keys, *args)
# 获取多一个name对应的集合的并集
sunionstore(dest,keys, *args)
# 获取多一个name对应的集合的并集,并将结果保存到dest对应的集合中
sscan(name, cursor=0, match=None,
count=None)
sscan_iter(name, match=None, count=None)
# 同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大
有序集合
zadd(name, *args, **kwargs)
# 在name对应的有序集合中添加元素
# 如:
# zadd(‘zz‘, ‘n1‘, 1, ‘n2‘, 2)
# 或
# zadd(‘zz‘, n1=11, n2=22)
zcard(name)
# 获取name对应的有序集合元素的数量
zcount(name, min, max)
# 获取name对应的有序集合中分数 在 [min,max] 之间的个数
zincrby(name, value, amount)
# 自增name对应的有序集合的 name
对应的分数
r.zrange( name, start, end, desc=False,
withscores=False, score_cast_func=float)
# 按照索引范围获取name对应的有序集合的元素
# 参数:
#
name,redis的name
#
start,有序集合索引起始位置(非分数)
#
end,有序集合索引结束位置(非分数)
#
desc,排序规则,默认按照分数从小到大排序
#
withscores,是否获取元素的分数,默认只获取元素的值
#
score_cast_func,对分数进行数据转换的函数
# 更多:
#
从大到小排序
#
zrevrange(name, start, end, withscores=False, score_cast_func=float)
# 按照分数范围获取name对应的有序集合的元素
#
zrangebyscore(name, min, max, start=None, num=None, withscores=False,
score_cast_func=float)
#
从大到小排序
#
zrevrangebyscore(name, max, min, start=None, num=None, withscores=False,
score_cast_func=float)
zrank(name, value)
# 获取某个值在 name对应的有序集合中的排行(从
0 开始)
# 更多:
#
zrevrank(name, value),从大到小排序
zrem(name,
values)
# 删除name对应的有序集合中值是values的成员
# 如:zrem(‘zz‘,
[‘s1‘, ‘s2‘])
zremrangebyrank(name, min, max)
# 根据排行范围删除
zremrangebyscore(name, min, max)
# 根据分数范围删除
zscore(name, value)
# 获取name对应有序集合中 value
对应的分数
zinterstore(dest, keys, aggregate=None)
# 获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为: SUM
MIN MAX
zunionstore(dest, keys, aggregate=None)
# 获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作
# aggregate的值为: SUM
MIN MAX
zscan(name, cursor=0, match=None,
count=None, score_cast_func=float)
zscan_iter(name, match=None,
count=None,score_cast_func=float)
# 同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作
其他操作
delete(*names)
# 根据删除redis中的任意数据类型
exists(name)
# 检测redis的name是否存在
keys(pattern=‘*‘)
# 根据模型获取redis的name
# 更多:
#
KEYS * 匹配数据库中所有 key 。
#
KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
#
KEYS h*llo 匹配 hllo 和 heeeeello 等。
#
KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo
expire(name ,time)
# 为某个redis的某个name设置超时时间
rename(src, dst)
# 对redis的name重命名为
move(name, db))
# 将redis的某个值移动到指定的db下
randomkey()
# 随机获取一个redis的name(不删除)
type(name)
# 获取name对应值的类型
scan(cursor=0, match=None, count=None)
scan_iter(match=None, count=None)
# 同字符串操作,用于增量迭代获取key
管道
import redis,time
pool = redis.ConnectionPool(host=‘localhost‘, port=6379)
r = redis.Redis(connection_pool=pool)
pipe = r.pipeline(transaction=True)
pipe.set(‘n3‘, ‘alex‘)
time.sleep(10)
pipe.set(‘r3‘, ‘sb‘)
pipe.execute()
订阅发布
monitor
import redis
class RedisHelper:
def __init__(self):
self.__conn = redis.Redis(host=‘localhost‘)
self.chan_sub = ‘fm104.5‘
self.chan_pub = ‘fm104.5‘
def public(self, msg):
self.__conn.publish(self.chan_pub, msg)
return True
def subscribe(self):
pub = self.__conn.pubsub()
pub.subscribe(self.chan_sub)
pub.parse_response()
return pub
send端
from monitor import RedisHelper
obj = RedisHelper()
obj.public(‘hello‘)
receive端
from monitor import RedisHelper
obj = RedisHelper()
redis_sub = obj.subscribe()
while True:
msg = redis_sub.parse_response()
print(msg)
sadd(name,values)
1 |
|
scard(name)
1 |
|
sdiff(keys, *args)
1 |
|
sdiffstore(dest, keys, *args)
1 |
|
sinter(keys, *args)
1 |
|
sinterstore(dest, keys, *args)
1 |
|
sismember(name, value)
1 |
|
smembers(name)
1 |
|
smove(src, dst, value)
1 |
|
spop(name)
1 |
|
srandmember(name, numbers)
1 |
|
srem(name, values)
1 |
|
sunion(keys, *args)
1 |
|
sunionstore(dest,keys, *args)
1 |
|
sscan(name, cursor=0, match=None, count=None)
sscan_iter(name, match=None, count=None)
1 |
|