条件锁condition与Queue()

在学习之前你应该先了解锁和队列基础

import queue
import time
import random
import threading
import asyncio
import   logging
# from queue import Empty
logging.basicConfig(level = logging.INFO,format = ‘%(asctime)s  - %(levelname)s -->%(funcName)s  at line %(lineno)d: \n %(message)s‘)
log= logging.getLogger()
# queue.qsize 不可以限制,作为应答式condition
q_init = queue.Queue(5)

async def jobs(item):
    time.sleep(random.randint(1,3))
    status = random.randint(0, 1)
    if status == 0:
        return ("success",item)
    else:
        return ("failed",item)

async def do_work(item):
    logging.info("do something %s,time start %s" % (item, time.asctime()))
    a =await jobs(item)
    return a

def async_runner(checker):
    new_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(new_loop)
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(do_work(checker))
    loop.run_until_complete(asyncio.wait([task]))
    st = task.result()
    return st

def worker_consumer(q_init,cond):
        # checker=None
        # while checker !="stop":
        while True:
            try:
                if cond.acquire():
                    if not q_init.empty():
                        cond.notify()
                        checker = q_init.get()
                        if checker == "stop":
                            cond.release()
                            break
                        st = async_runner(checker)
                        if st[0] in ["success", "failed"]:
                            logging.info("%s task finished status is %s" % (st[1], st[0]))
                            logging.info("大王我们吃完了%s鹌鹑蛋,再来点吧" % checker)
                            q_init.task_done()
                    cond.release()
            except Exception:
                 logging.debug("queue is empty ")

def producer(cond,q_init):
        item = 1
        while True:
            if cond.acquire():
                if q_init.qsize() <5:
                    q_init.put(item)
                    logging.info("孩儿们,加了%s鹌鹑蛋通知下可以吃啦" % item)
                    cond.notify()
                else:
                    cond.wait()
                cond.release()
                item += 1
                if item >= 11:
                   break
        q_init.join()
        for  i in range(thread_num):
            q_init.put("stop")

if __name__ == ‘__main__‘:
        # 消费者>生产者
        thread_num=5
        cond=threading.Condition()
        producer = [threading.Thread(target=producer,args=(cond,q_init)) for i in range(1)]
        consumer = [threading.Thread(target=worker_consumer, args=(q_init,cond)) for i in range(thread_num)]
        for p in producer:
                p.start()
        for k in consumer:
                k.start()

        for m in consumer:
            m.join()

  结果:

2019-12-21 12:50:01,781  - INFO -->producer  at line 70:
 孩儿们,加了1鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:01,781  - INFO -->producer  at line 70:
 孩儿们,加了2鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:01,781  - INFO -->producer  at line 70:
 孩儿们,加了3鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:01,781  - INFO -->producer  at line 70:
 孩儿们,加了4鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:01,782  - INFO -->producer  at line 70:
 孩儿们,加了5鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:01,785  - INFO -->do_work  at line 22:
 do something 1,time start Sat Dec 21 12:50:01 2019
2019-12-21 12:50:04,787  - INFO -->worker_consumer  at line 57:
 1 task finished status is failed
2019-12-21 12:50:04,787  - INFO -->worker_consumer  at line 58:
 大王我们吃完了1鹌鹑蛋,再来点吧
2019-12-21 12:50:04,788  - INFO -->do_work  at line 22:
 do something 2,time start Sat Dec 21 12:50:04 2019
2019-12-21 12:50:06,789  - INFO -->worker_consumer  at line 57:
 2 task finished status is failed
2019-12-21 12:50:06,789  - INFO -->worker_consumer  at line 58:
 大王我们吃完了2鹌鹑蛋,再来点吧
2019-12-21 12:50:06,791  - INFO -->do_work  at line 22:
 do something 3,time start Sat Dec 21 12:50:06 2019
2019-12-21 12:50:08,793  - INFO -->worker_consumer  at line 57:
 3 task finished status is failed
2019-12-21 12:50:08,793  - INFO -->worker_consumer  at line 58:
 大王我们吃完了3鹌鹑蛋,再来点吧
2019-12-21 12:50:08,794  - INFO -->do_work  at line 22:
 do something 4,time start Sat Dec 21 12:50:08 2019
2019-12-21 12:50:09,795  - INFO -->worker_consumer  at line 57:
 4 task finished status is failed
2019-12-21 12:50:09,795  - INFO -->worker_consumer  at line 58:
 大王我们吃完了4鹌鹑蛋,再来点吧
2019-12-21 12:50:09,796  - INFO -->do_work  at line 22:
 do something 5,time start Sat Dec 21 12:50:09 2019
2019-12-21 12:50:12,798  - INFO -->worker_consumer  at line 57:
 5 task finished status is success
2019-12-21 12:50:12,798  - INFO -->worker_consumer  at line 58:
 大王我们吃完了5鹌鹑蛋,再来点吧
2019-12-21 12:50:12,799  - INFO -->producer  at line 70:
 孩儿们,加了7鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:12,799  - INFO -->producer  at line 70:
 孩儿们,加了8鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:12,799  - INFO -->producer  at line 70:
 孩儿们,加了9鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:12,799  - INFO -->producer  at line 70:
 孩儿们,加了10鹌鹑蛋通知下可以吃啦
2019-12-21 12:50:12,801  - INFO -->do_work  at line 22:
 do something 7,time start Sat Dec 21 12:50:12 2019
2019-12-21 12:50:14,802  - INFO -->worker_consumer  at line 57:
 7 task finished status is success
2019-12-21 12:50:14,802  - INFO -->worker_consumer  at line 58:
 大王我们吃完了7鹌鹑蛋,再来点吧
2019-12-21 12:50:14,804  - INFO -->do_work  at line 22:
 do something 8,time start Sat Dec 21 12:50:14 2019
2019-12-21 12:50:17,805  - INFO -->worker_consumer  at line 57:
 8 task finished status is success
2019-12-21 12:50:17,805  - INFO -->worker_consumer  at line 58:
 大王我们吃完了8鹌鹑蛋,再来点吧
2019-12-21 12:50:17,806  - INFO -->do_work  at line 22:
 do something 9,time start Sat Dec 21 12:50:17 2019
2019-12-21 12:50:20,808  - INFO -->worker_consumer  at line 57:
 9 task finished status is failed
2019-12-21 12:50:20,808  - INFO -->worker_consumer  at line 58:
 大王我们吃完了9鹌鹑蛋,再来点吧
2019-12-21 12:50:20,810  - INFO -->do_work  at line 22:
 do something 10,time start Sat Dec 21 12:50:20 2019
2019-12-21 12:50:23,810  - INFO -->worker_consumer  at line 57:
 10 task finished status is failed
2019-12-21 12:50:23,810  - INFO -->worker_consumer  at line 58:
 大王我们吃完了10鹌鹑蛋,再来点吧

  

原文地址:https://www.cnblogs.com/SunshineKimi/p/12076618.html

时间: 2024-07-30 21:05:28

条件锁condition与Queue()的相关文章

递归锁+条件锁+互斥锁-04-多线程

1 // 2 // ViewController.m 3 // 05-递归锁(recursive)+条件锁(condition) 4 // 5 // Created by mac on 16/4/20. 6 // Copyright © 2016年 mac. All rights reserved. 7 // 8 /* 9 10 3). 互斥锁 11 NSLock *_lock; 12 13 3)NSLock :不能多次调用,会产生死锁 14 15 2016-04-20 16:06:44.600

经典笔试题:线程通信(使用重入锁(ReentrantLock)和条件队列(Condition)实现线程间通信)

经典笔试题: 1.自定义容器,提供新增元素(add)和获取元素数量(size)方法.2.启动两个线程.线程1向容器中新增10个数据.线程2监听容器元素数量,当容器元素数量为5时,线程2输出信息并终止. package com.gaopeng.programming.test2; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.conc

条件锁

ReentrantLock类有一个方法newCondition用来生成这个锁对象的一个条件(ConditionObject)对象,它实现了Condition接口.Condition提供了线程通讯的一套机制await和signal等线程间进行通讯的方法.. 1.适用场景 当某线程获取了锁对象,但因为某些条件没有满足,需要在这个条件上等待,直到条件满足才能够往下继续执行时,就需要用到条件锁. 这种情况下,线程主动在某条件上阻塞,当其它线程发现条件发生变化时,就可以唤醒阻塞在此条件上的线程. 2.使用

13.Java5条件阻塞Condition的应用

1 import java.util.concurrent.locks.Condition; 2 import java.util.concurrent.locks.Lock; 3 import java.util.concurrent.locks.ReentrantLock; 4 5 /** 6 * Java5条件阻塞Condition的应用 7 * Condition的功能类似在传统线程技术中的Object.wait()和Object.notify()功能. 8 * 在等待Condition

条件锁类

#include <pthread.h> class CTestLock { public: CTestLock() { pthread_mutex_init(&mutex_t_, NULL); pthread_cond_init(&cond_t_, NULL); } ~CTestLock() { pthread_mutex_destroy(&mutex_t_); pthread_cond_destroy(&cond_t_); } int Lock() { in

NSConditionLock 条件锁

有时候不是简单的需要 加锁/解锁, 而是需要根据一定条件满足后进行 加锁/解锁. 以一个生产中与消费者的例子,介绍条件锁的用法. static NSInteger CONDITION_NO_DATA //条件一: 没有数据 static NSInteger CONDITION_HAS_DATA //条件二: 有数据 //初始化锁时,指定一个默认的条件 NSConditionLock *lock = [[NSConditionLock alloc] initWithCondition:CONDIT

多线程(十一、AQS原理-ReentrantLock的条件队列Condition)

1.Condition介绍 1.1 Condition是对线程的wait,notify的增强 1.2 在ReentrantLock中他的实现类是AQS中的ConditionObject,实现了Condition接口,利用AQS的节点,实现了条件队列. 2.案例分析 2.1 说明:Thread-1获取锁,然后await,释放锁:Thread-2获得锁,唤醒Thread-1,释放锁:Thread-1重新获取锁,释放锁. 2.2 代码 2.2.1 Thread-1 import java.util.c

条件阻塞Condition的应用

Condition的功能类似在传统线程技术中的Object.wait和Object.notity的功能. 例子:生产者与消费者 import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.lock

Linux组件封装(二) 条件变量Condition的封装

声明代码如下: 1 #ifndef CONDITION_H 2 #define CONDITION_H 3 4 #include <pthread.h> 5 #include "noncopyable.h" 6 7 class MutexLock; 8 9 10 class Condition : NonCopyable 11 { 12 public: 13 Condition(MutexLock &mutex); 14 ~Condition(); 15 16 vo