# 定义一个WorkerController,用于执行业务代码
class WorkerController(object):
def __init__(self):
pass
def do_something(self, params):
print("do something")
# 定义一个AsyncWorker,继承Thread,重写run方法,run方法中调用WorkerController的do_something方法
from threading import Thread
from worker_controller import WorkerController
WORKER_CONTROLLER = WorkerController()
class AsyncWorker(Thread):
ASYNC_WORKER_INFO = dict()
def __init__(self, params, ticket_id):
Thread.__init__(self)
self.params = params
self.ticket_id = ticket_id
self.daemon = True
self.start()
def run(self):
AsyncWorker.ASYNC_WORKER_INFO[self.ticket_id] = {
# 保存一些业务信息,之后轮询的时候,可以作为输出返回
"some": "information",
"status": "running"
}
WORKER_CONTROLLER.do_something(self.params)
# 开始任务
@app.route('/do-something', methods=['GET'])
def do_something():
params = request.args.get('params', '').strip()
# 生成一个任务id,用于轮询,例如ticket id
ticket_id = genearteMD5(str(int(round(time.time() * 1000))))
# 实例化一个AsyncWorker
AsyncWorker(params=params, ticket_id=ticket_id)
# 将ticket_id返回
return jsonify({"ticket_id": ticket_id})
# 轮询任务
@app.route("/check-status", methods=['GET'])
def check_status():
ticket_id = request.args.get("ticket_id")
logs = AsyncWorker.ASYNC_WORKER_INFO[str(ticket_id)]
return jsonify(logs)
如何在前端进行异步轮询呢?以angularjs的interval方法为例:https://www.cnblogs.com/CheeseZH/p/12444034.html
原文地址:https://www.cnblogs.com/CheeseZH/p/12444086.html
时间: 2024-10-31 08:36:54