后台程序处理(二) python threading - queue 模块使用

由于协程没办法完成(一)中所说的任务模式

接下来就尝试一下使用线程和队列来实现一下这个功能

在实现之前,我们先明确一个问题——python的线程是伪并发的。同一时间只能有一个线程在运行。具体怎样的运作方式由解释器决定

然后回顾一下上一章遇到的问题——return以后,需要另外一个线程去检测之前的操作是否执行成功

因此程序的流程设计应该是这样的:

 1 # 大致流程步骤如下
 2 # 1.获取参数(接口被访问时触发)
 3 request_data = request.form
 4 # 2.根据参数查询内容
 5 target = Target.query.filter_by(id=request_data).first()
 6 # 3.将结果插入队列
 7 ans_queue.put(target)
 8 # 4.激活线程
 9 thread.set()
10 # 5.将结果从队列中取出
11 ans_queue.get()
12 # 6.处理结果
13 check()
14 # 7.将线程休眠(阻塞)
15 thread.event.clear()

这样设计的考虑主要是以下几点:

1.简单

2.入队可以保证消息按时间顺序被处理

3.出队可以保证当队列不为空时,检查线程会执行到队列为空为止。免去不必要的唤醒检查。然后在有消息入队时被重新激活

4.其实我们的设计正常来说不会出现3中的检查情况。基本上队列一旦有消息入队,线程就会启动并清空队列

5.入队可以保证消息的完整和独立性,每次请求得到的数据入队后,队列中都是一列数组。处理逻辑更清晰

6.队列中的数据不出栈是不可见的

7.我就是宁愿用全局队列也不想用全局变量

实际接口代码和线程代码如下:

A.队列和线程代码

  1 # 消息队列
  2 lock_queue = Queue()
  3
  4
  5 def check_kill(event):
  6     while True:
  7         # check queue
  8         if lock_queue.empty() is True:
  9             event.clear()
 10         # wait event
 11         if event.is_set() is not True:
 12             event.wait()
 13         # do some work
 14         sids, serials, minutes, hosts, insts, opasses, ospasses = [], [], [], [], [], [], []
 15
 16         # get data until queue empty or datas more than 10
 17         if lock_queue.empty() is not True:
 18             data = lock_queue.get()
 19             for i in data:
 20                 sid, serial, minute, host, inst, opass, ospass = i.split(‘,‘)
 21                 sids.append(sid)
 22                 serials.append(serial)
 23                 minutes.append(minute)
 24                 hosts.append(host)
 25                 insts.append(inst)
 26                 opasses.append(opass)
 27                 ospasses.append(ospass)
 28
 29         # init the command
 30         kill_command = ‘kill -9‘
 31
 32         # each time we deal less or equal 10 check
 33         for i in range(len(minutes)):
 34             current = datetime.datetime.now().minute
 35             if current >= int(minutes[i]):
 36                 passtime = current - int(minutes[i])
 37             else:
 38                 passtime = current + 60 - int(minutes[i])
 39
 40             print("passtime is", passtime)
 41             if (5 - passtime) >= 0:
 42                 time.sleep((5 - passtime)*60)
 43
 44             # split piece of list
 45             sql_sids, sids = sids[0], sids[1:]
 46             sql_serials, serials = serials[0], serials[1:]
 47             sql_hosts, hosts = hosts[0], hosts[1:]
 48             sql_insts, insts = insts[0], insts[1:]
 49             sql_opass, opasses = opasses[0], opasses[1:]
 50             sql_ospass, ospasses = ospasses[0], ospasses[1:]
 51
 52             print("data", sql_hosts, sql_insts, sql_serials, sql_sids)
 53             # create cursor
 54
 55             try:
 56                 conn = sqlite3.connect(‘data-dev.sqlite‘)
 57                 c = conn.cursor()
 58                 cu = c.execute("select ouser,oport,osport,osuser from tool_target where host=‘%s‘ and inst=‘%s‘" % (sql_hosts, sql_insts))
 59
 60                 result = cu.fetchall()
 61
 62                 ouser = result[0][0]
 63                 opass = sql_opass
 64                 str_conn = (sql_hosts
 65                             + ‘:‘
 66                             + str(result[0][1])
 67                             + ‘/‘
 68                             + sql_insts)
 69                 odb = cx_Oracle.connect(ouser, opass, str_conn)
 70                 cursor = odb.cursor()
 71
 72                 # select to find if lock exist
 73                 sql = ‘‘‘select b.spid, a.sid, a.serial#, a.event from v$session a, v$process b
 74                         where a.sid = %s and a.serial# = %s ‘‘‘ % (sql_sids, sql_serials)
 75
 76                 cursor.execute(sql)
 77                 answer = cursor.fetchall()
 78                 print("answer is", answer)
 79                 kill_command += ‘ ‘ + answer[0][0]
 80
 81                 s = paramiko.SSHClient()
 82                 s.load_system_host_keys()
 83                 s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 84                 s.connect(sql_hosts, result[0][2], result[0][3], sql_ospass)
 85                 stdin, stdout, stderr = s.exec_command(kill_command)
 86                 stdout.read()
 87                 print(‘------------------------‘)
 88                 s.close()
 89                 cursor.close()
 90                 odb.close()
 91                 c.close()
 92                 conn.close()
 93             except:
 94                 pass
 95
 96
 97 txkill_ready = threading.Event()
 98 t1 = threading.Thread(target=check_kill, args=(txkill_ready,), name=‘t1‘)
 99 t1.start()
100 # txkill_ready.set()

B.接口代码

 1 @main.route(‘/txlock/startkillurl‘, methods=[‘POST‘])
 2 def start_kill_url():
 3     if request.method == ‘POST‘:
 4         cmd = request.form.getlist(‘list‘)[0]
 5         host = request.form.getlist(‘host‘)[0]
 6         inst = request.form.getlist(‘inst‘)[0]
 7         # print(len(cmd))
 8         # cmd.replace("\n", "")
 9         # cmd.replace("\t", "")
10         # print(len(cmd))
11
12         tooltarget = ToolTarget.query.filter_by(host=host, inst=inst).first()
13         ouser = tooltarget.ouser
14         opass = ToolTarget.de_rsa(pwd=tooltarget.opass)
15         ospass = ToolTarget.de_rsa(pwd=tooltarget.ospass)
16         str_conn = (tooltarget.host
17                     + ‘:‘
18                     + str(tooltarget.oport)
19                     + ‘/‘
20                     + tooltarget.inst)
21         odb = cx_Oracle.connect(ouser, opass, str_conn)
22         cursor = odb.cursor()
23
24         # add into queue
25         c = re.findall(‘\d*,\d*‘, cmd)
26         d = [i+‘,‘+str(datetime.datetime.now().minute)+‘,‘+host+‘,‘+inst+‘,‘+opass+‘,‘+ospass for i in c]
27         # data example : [‘15,5,17‘, ‘16,23,17‘, ‘14,5,17‘, ‘142,1,17‘]
28         lock_queue.put(d)
29         txkill_ready.set()
30
31         try:
32             cursor.execute(cmd)
33             # pass
34         except:
35             return "执行失败,关闭弹窗后会自动刷新列表"
36     return "执行成功,关闭弹窗后会自动刷新列表"
时间: 2024-08-14 23:32:25

后台程序处理(二) python threading - queue 模块使用的相关文章

Python队列queue模块

Python中queue模块常用来处理队列相关问题 队列常用于生产者消费者模型,主要功能为提高效率和程序解耦 1. queue模块的基本使用和相关说明 # -*- coding:utf-8 -*- # Author:Wong Du ''' 队列常用于生产者消费者模型, 主要功能为提高效率和程序解耦 ''' import queue """实例化队列对象不同规则的三种方法""" q1 = queue.Queue(maxsize=2) # 先入先出

python threading queue

import queue,threading l=threading.Lock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) global que self.queue = que def run(self): while True: if self.queue.empty(): break item = self.queue.get() if l.acquire(1

二. python的os模块

一 .os模块 os 模块包括了普遍操作系统的功能 1.name获取操作系统类型 import os print(os.name) # nt 代表是window模块 2.environ 获取操作系统中的环境变量 import os # 获取操作系统中的环境变量 print(os.environ) 3.get获取指定环境变量 print(os.environ.get("PROGRAMFILES")) # C:\Program Files 4.curdir获取当前目录 import os

Python之queue模块

一.queue——同步的队列类 queue模块实现了多生产者,多消费者的队列.当 要求信息必须在多线程间安全交换,这个模块在 线程编程时非常有用 .Queue模块实现了所有要求的锁机制.  说了半天就是Queue模块主要是多线程,保证线程安全使用的. 这个类实现了三种类型的queue,区别仅仅在于进去和取出的位置.在一个FIFO(First In,First Out)队列中,先加先取.在一个LIFO(Last In First Out)的队列中,最后加的先出来(操作起来跟stack一样).pri

Python的Queue模块

1 NAME 2 Queue - A multi-producer, multi-consumer queue. 3 4 CLASSES 5 Queue 6 LifoQueue 7 PriorityQueue 8 exceptions.Exception(exceptions.BaseException) 9 Empty 10 Full 11 12 class Empty(exceptions.Exception) 13 | Exception raised by Queue.get(block

【[email protected]】queue模块-生产者消费者问题

python通过queue模块来提供线程间的通信机制,从而可以让线程分项数据. 个人感觉queue就是管程的概念 一个生产者消费者问题 1 from random import randint 2 from threading import Thread 3 from queue import Queue 4 from time import sleep 5 6 7 def writeq(queue): 8 print('starting put queue...') 9 queue.put('

python threading模块使用 以及python多线程操作的实践(使用Queue队列模块)

今天花了近乎一天的时间研究python关于多线程的问题,查看了大量源码 自己也实践了一个生产消费者模型,所以把一天的收获总结一下. 由于GIL(Global Interpreter Lock)锁的关系,纯的python代码处理一般逻辑的确无法活动性能上的极大提升,但是在处理需要等待外部资源返回或多用户的应用程序中,多线程仍然可以作为一个比较好的工具来进行使用. python提供了两个模块thread和threading 来支持python的多线程操作.通俗的讲一般现在我们只使用threading

Python学习心得(七) 深入理解threading多线程模块

Python提供了多个模块来支持多线程编程,包括thread.threading和queue模块等.thread模块提供了基本的线程和锁定支持:而threading模块提供了更高级别.功能更全面的线程管理.queue模块,用户可以创建一个队列数据结构,用于在多线程之间进行共享. 核心提示:避免使用thread模块 推荐使用更高级别的threading模块,原因如下: 1.threading模块更加先进,有更好的线程支持,并且thread模块中的一些属性会和threading模块有冲突: 2.低级

Python:线程、进程与协程(3)——Queue模块及源码分析

Queue模块是提供队列操作的模块,队列是线程间最常用的交换数据的形式.该模块提供了三种队列: Queue.Queue(maxsize):先进先出,maxsize是队列的大小,其值为非正数时为无线循环队列 Queue.LifoQueue(maxsize):后进先出,相当于栈 Queue.PriorityQueue(maxsize):优先级队列. 其中LifoQueue,PriorityQueue是Queue的子类.三者拥有以下共同的方法: qsize():返回近似的队列大小.为什么要加"近似&q