Day12 线程池、RabbitMQ和SQLAlchemy

1、with实现上下文管理

#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafeng

#with实现上下文管理import contextlib@contextlib.contextmanagerdef worker_state(self, state_list, worker_thread):    """    用于记录线程中正在等待的线程数    :param self:    :param state_list:    :param worker_thread:    :return:    """    #第二步    state_list.append(worker_thread)    try:        #第三步        yield    finally:        #执行第五步        state_list.remove(worker_thread)

free_list = []current_thread = "wang"#第一步执行with,调用worker_state方法with worker_state(free_list, current_thread):    #第四步    print(123)    print(456)    #执行完成出去的瞬间

2、with实现socket上下文

#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafeng

import contextlibimport socket

@contextlib.contextmanagerdef context_socket(host, port):    sk = socket.socket()    sk.bind((host, port))    sk.listen(5)    try:        yield sk    finally:        sk.close()

with context_socket(‘127.0.0.1‘, 8888) as sock:    print(sock)

3、Redis连接池

import redispool = redis.ConnectionPool(host=‘192.168.195.128‘, port=6379)r = redis.Redis(connection_pool=pool)pipe = r.pipeline(transaction=True)r.set(‘name‘, ‘wanghuafeng‘)r.set(‘role‘, ‘haha‘)t = pipe.execute()print(r.get(‘role‘))

4、Redis发布、订阅

import redis

class RedisHelper:    def __init__(self):        self.__conn = redis.Redis(host=‘192.168.195.128‘)        self.chan_sub = ‘FM98.8‘        self.chan_pub = ‘FM98.8‘

def public(self, msg):        self.__conn.publish(self.chan_pub, msg)        return True

def subscribe(self):        pub = self.__conn.pubsub()        pub.subscribe(self.chan_sub)        pub.parse_response()        return pub
 
#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafeng

from monitor.s22 import RedisHelper

obj = RedisHelper()redis_sub = obj.subscribe()

while True:    msg = redis_sub.parse_response()    print(msg)
 
#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafeng

from monitor.s22 import RedisHelper

obj = RedisHelper()obj.public(‘helo‘)

5、RabbitMQ

#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafeng

import pika

# ######################### 生产者 #########################

connection = pika.BlockingConnection(pika.ConnectionParameters(    host=‘localhost‘))channel = connection.channel()

channel.queue_declare(queue=‘hello‘)

channel.basic_publish(exchange=‘‘,                      routing_key=‘hello‘,                      body=‘Hello World!‘)print(" [x] Sent ‘Hello World!‘")connection.close()
 
#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: wanghuafeng

import pika

# ########################## 消费者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(    host=‘localhost‘))channel = connection.channel()

channel.queue_declare(queue=‘hello‘)

def callback(ch, method, properties, body):    print(" [x] Received %r" % body)

channel.basic_consume(callback,                      queue=‘hello‘,                      no_ack=True)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)channel.start_consuming()
时间: 2024-08-28 09:26:26

Day12 线程池、RabbitMQ和SQLAlchemy的相关文章

Python操作Redis、Memcache、RabbitMQ、SQLAlchemy

Python操作 Redis.Memcache.RabbitMQ.SQLAlchemy redis介绍:redis是一个开源的,先进的KEY-VALUE存储,它通常被称为数据结构服务器,因为键可以包含string(字符串).hash(哈希).list(链表).set(集合)和zset(有序集合),这些数据类型都支持push/pop.add/remove及取交集和并集及更丰富的操作,redis支持各种不同方式的排序.为了保证效率,数据都是缓存在内存中,它也可以周期性的把更新的数据写入磁盘或者把修改

Tornado 线程池应用

Tornado是一个异步框架,在异步操作的时候能提升程序的处理性能.但是如果在程序中碰到同步的逻辑,由于GIL的关系,会直接卡死,导致性能急剧下降. 目前对于mongodb以及redis都有比较不错的异步框架,但是对于Mysql,目前的异步框架都不是很成熟. 在实际应用中,由于一开始不是特别了解,在用了Tornado框架的同时,采用了Sqlalchemy来处理Mysql数据.但是由于这部分Mysql操作是同步的,在并发量上去的时候,不能及时返回,大量请求被拒绝. 由于替换Sqlalchemy会造

基于线程池、消息队列和epoll模型实现Client-Server并发架构

引言 并发是什么?企业在进行产品开发过程中为什么需要考虑这个问题?想象一下天猫的双11和京东的618活动,一秒的点击量就有几十万甚至上百万,这么多请求一下子涌入到服务器,服务器需要对这么多的请求逐个进行消化掉,假如服务器一秒的处理能力就几万,那么剩下的不能及时得到处理的这些请求作何处理?总不能让用户界面一直等着,因此消息队列应运而生,所有的请求都统一放入消息队列,工作线程从消息队列不断的消费,消息队列相当于一个缓冲区,可达到解藕.异步和削峰的目的. Kafka.ActiveMQ.RabbitMQ

Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor

介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? Java new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 1 2 3 4 5 6 7 new Thread(new

线程的控制和线程池

一.WaitHandle: ”.Net 中提供了一些线程间更自由通讯的工具,他们提供了通过"信号"进行通讯的机制 可以通过ManualResetEvent,AutoResetEvent(他是在开门并且一个 WaitOne 通过后自动关门)来进行线程间的通讯 waitOne:    等待开门 Set:           开门 Reset:       关门 static void Main(string[] args) { ManualResetEvent mre = new Manu

内存池、进程池、线程池

首先介绍一个概念"池化技术 ".池化技术 一言以蔽之就是:提前保存大量的资源,以备不时之需以及重复使用. 池化技术应用广泛,如内存池,线程池,连接池等等.内存池相关的内容,建议看看Apache.Nginx等开源web服务器的内存池实现. 起因:由于在实际应用当中,分配内存.创建进程.线程都会设计到一些系统调用,系统调用需要导致程序从用户态切换到内核态,是非常耗时的操作.           因此,当程序中需要频繁的进行内存申请释放,进程.线程创建销毁等操作时,通常会使用内存池.进程池.

缓冲池,线程池,连接池

SSH:[email protected]:unbelievableme/object-pool.git   HTTPS:https://github.com/unbelievableme/object-pool.git 缓冲池 设计要点:包含三个队列:空缓冲队列(emq),装满输入数据的输入的队列(inq),装满输出数据的输出队列(outq),输入程序包括收容输入(hin),提取输入(sin),输出程序包括收容输出(hout)和提取输出(sout). 注意点:输入程序和输出程序会对缓冲区并发访

记5.28大促压测的性能优化—线程池相关问题

目录: 1.环境介绍 2.症状 3.诊断 4.结论 5.解决 6.对比java实现 废话就不多说了,本文分享下博主在5.28大促压测期间解决的一个性能问题,觉得这个还是比较有意思的,值得总结拿出来分享下. 博主所服务的部门是作为公共业务平台,公共业务平台支持上层所有业务系统(2C.UGC.直播等).平台中核心之一的就是订单域相关服务,下单服务.查单服务.支付回调服务,当然结算页暂时还是我们负责,结算页负责承上启下进行下单.结算.跳支付中心.每次业务方进行大促期间平台都要进行一次常规压测,做到心里

线程池的创建

package com.newer.cn; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test1 { public static void main(String[] args) { // 创建线程池的方式 // 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程. E