[Python网络编程]浅析守护进程后台任务的设计与实现

在做基于B/S应用中,经常有需要后台运行任务的需求,最简单比如发送邮件。在一些如防火墙,WAF等项目中,前台只是为了展示内容与各种参数配置,后台守护进程才是重头戏。所以在防火墙配置页面中可能会经常看到调用cgi,但真正做事的一般并不是cgi,比如说执行关机命令,他们的逻辑如下:

(ps:上图所说的前台界面包含通常web开发中的后端,不然也没有socket一说)

为什么要这么设计

你可能疑惑为什么要这么设计,我觉得理由如下:

首先有一点说明,像防火墙等基本上都运行在类Linux平台上

1.安全问题  cgi一般也就拥有www权限,但执行关键等命令需要root,所以需要让后台守护进程去干

2.一般类似防火墙的后台守护进程是C/C++写的,在消息格式上很方便处理,如填充一个消息结构体发送出去,后台进程只需要强制转换为定义的结构体,就轻松获得传递的参数值。

那可不可以去掉中间的cig模块,直接发送消息给后台守护进程呢?

我觉得是可以的,本文的重点也是实现这个方案。

如何实现

由于最近一直在windows下,所以我们的守护进程是运行在windows下的,但其实windows并没有守护进程的概念,相对应的是服务的概念。这里需要安装pywin32包。

class MgrService(win32serviceutil.ServiceFramework):
    """
    Usage: ‘python topmgr.py install|remove|start|stop|restart‘
    """
    #服务名
    _svc_name_ = "Mgr"
    #服务显示名称
    _svc_display_name_ = "Daemon Mgr"
    #服务描述
    _svc_description_ = "Daemon Mgr"

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)

    def SvcDoRun(self):
        self.ReportServiceStatus(win32service.SERVICE_START_PENDING)
        INFO("mgr startting...")
        self.ReportServiceStatus(win32service.SERVICE_RUNNING)
        self.start()
        # 等待服务被停止
        INFO("mgr waitting...")
        win32event.WaitForSingleObject(self.hWaitStop, win32event.INFINITE)
        INFO("mgr end")

    def SvcStop(self):
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        INFO("mgr stopping...")
        self.stop()
        INFO("mgr stopped")
        # 设置事件
        win32event.SetEvent(self.hWaitStop)
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)

    def start(self): pass

    def stop(self): pass

很简单,这样就实现了windows中的服务,也就是说脱离终端,运行于后台。INFO等函数只是简单的记录作用,可直接忽略。

我们要实现自己的后台程序,只需要继承MgrService,并提供start,stop方法就可以了。

由于我们是通过socket来传递消息的,所以在start方法中要监听端口,等待连接,处理连接,这个大家都很擅长。在这里我选择了

单线程,基于协程,底层使用libev(libevent)--- gevent这个高性能网络库。对gevent有兴趣的童鞋可以看看深度分析gevent运行流程

class Engine(MgrService):
    rbufsize = -1
    wbufsize = 0

    def start(self):
        INFO(‘wait connection‘)
        self.server = StreamServer((HOST, PORT), self.msg_handle)
        self.server.serve_forever()

    def msg_handle(self,socket,address):
        try:
            rfile = socket.makefile(‘rb‘, self.rbufsize)
            wfile = socket.makefile(‘wb‘, self.wbufsize)
            headers = Message(rfile).dict

            INFO(‘get a connection from:%s,headers:%s‘ % (str(address), headers))

            if ‘module‘ in headers and headers[‘module‘] in MODULES:
                MODULES[headers[‘module‘]].handle(wfile, headers)
        except Exception:
            ERROR(‘msg_handle exception,please check‘)

    def stop(self):
        if hasattr(self, server):
            self.server.stop()

当有新连接到来,由msg_handle处理,首先读取发送来的消息,消息格式使用了最简单的http的格式,即(键名:键值)的格式,你要问我为什么采用这个格式,哈哈,格式简单,python有现成的库解析。

考虑到后期模块可能很多,所以我们的处理流程自动根据消息的模块参数,调用对应模块的handle方法。
上面代码的那个MODULES是个全局变量,当你添加一个模块的时候需要注册到MODULES中,我提供了module_register方法。

MODULES = {           # module: handle module class
}

def module_register(module_name, handle_class):
    if module_name in MODULES:
        WARN(‘duplicate module_name:‘ + module_name)
    else:
        MODULES[module_name] = handle_class

到这里一切都很自然,但貌似只假设模块有handle方法,自己写一个模块还是很费事,你需要自己去想怎么调用,最有返回什么格式的数据,这都是一件头疼的事情,所以最好提供一个基类模块。

class Module(object):
    SECRE_KEY = "YI-LUO-KEHAN"
    MODULE_NAME = "BASE_MODULE"
    PREFIX = "do_"  # method prefix

    def __init__(self, wfile, headers):
        self.wfile = wfile
        self.headers = headers

    def __getattr__(self, name):
        try:
            return self.headers[name]
        except Exception:
            ERROR("%s has no attr:%s,please check" %(self.MODULE_NAME, name))            

    @classmethod
    def handle(cls, wfile, headers):
        module_obj = cls(wfile, headers)
        module_obj.schedule_default()

    def verify(self):
        if hmac.new(self.SECRE_KEY, self.MODULE_NAME).hexdigest() == self.signature:
            return True
        else:
            WARN("client verify failed,signature:%s" % str(self.signature))

    def schedule_default(self):
        err_code = 0
        if self.verify() and self.action:
            func_name = self.PREFIX + self.action
            try:
                getattr(self, func_name)()
            except AttributeError:
                err_code = 1
                ERROR("%s has no method:%s" %(self.MODULE_NAME, func_name))
            except Exception:
                err_code = 2
                ERROR("module:%s,method:%s,exception" % (self.MODULE_NAME, func_name))
        else:
            err_code = 3

        if err_code:
            self.send_error({‘err_code‘:err_code})

    def send_success(self, msg=‘‘):
        data = {‘success‘:True,‘msg‘:msg}
        self.wfile.write(json.dumps(data))

    def send_error(self, msg=‘‘):
        data = {‘success‘:False,‘msg‘:msg}
        self.wfile.write(json.dumps(data))

在基类模块中我们提供了默认的处理流程,即根据消息中action,调用do_action方法,并提供了一个简单但很有效的认证方法,通过消息的signature字段,可能有些简陋,但没关系,你可以定义自己的认证方法。

下面该写我们自己的模块了,

TASK = {}  # task_id: pid
class ScanModule(Module):
    MODULE_NAME = "SCAN_MODULE"

    def do_start(self):
        self.send_success(‘start ok‘)
        DEBUG(‘------------task start------------‘)
        task_ids = [int(task_id) for task_id in self.task_ids.split(‘,‘) if int(task_id) not in TASK]

        for task_id in task_ids:
            try:
                cmd = ‘python scan.py -t %s‘ % task_id
                DEBUG(cmd)
                self.sub = Popen(cmd, shell=True, cwd=CWD)
                pid = int(self.sub.pid)
                TASK[task_id] = pid
                INFO(‘%s start a new task,task_id:%s,pid:%s‘ %(self.MODULE_NAME, task_id, pid))
            except Exception:
                ERROR(‘%s start a new task,task_id:%s failed‘ % (self.MODULE_NAME, task_id))

    def do_stop(self):
        self.send_success(‘stop ok‘)
        DEBUG(‘------------task stop------------‘)
        task_ids = [int(task_id) for task_id in self.task_ids.split(‘,‘) if int(task_id) in TASK]

        for task_id in task_ids:
            pid = TASK.pop(task_id)
            try:
                INFO(‘%s stop a new task,task_id:%s,pid:%s‘ %(self.MODULE_NAME, task_id, pid))
                call([‘taskkill‘, ‘/F‘, ‘/T‘, ‘/PID‘, str(pid)])
            except Exception:
                ERROR(‘%s taskkill a task failed,task_id:%s,pid:%s‘ %(self.MODULE_NAME, task_id, pid))

module_register(ScanModule.MODULE_NAME, ScanModule)

上面实现了一个简单的扫描模块,支持两个action,start,stop。

start很简单,调用gevent的subprocess.Popen运行子进程,并记录pid,stop则使用taskkill直接杀掉该进程。

这里有两点需要注意:

1.不要用原生的subprocess模块,因为原生的subprocess是阻塞的,这可能导致主处理逻辑也阻塞,不能服务更多的请求

最后别忘了调用module_register注册相应模块。

2.方法一开始最好就返回结果,因为前台很可能在等待返回。所以说as soon as possible

下面提供一个客户端用于测试,client.py

#!/usr/bin/env python
#-*-encoding:UTF-8-*-

import hmac
import gevent
from gevent import monkey
monkey.patch_socket()

addr = (‘localhost‘, 6667)

def send_request(module_name,request_headers):
    SECRE_KEY = "YI-LUO-KEHAN"
    socket = gevent.socket.socket()
    socket.connect(addr)
    request_headers[‘module‘] = module_name
    request_headers[‘signature‘] = hmac.new(SECRE_KEY, module_name).hexdigest()
    h = ["%s:%s" %(k, v) for k,v in request_headers.iteritems()]
    h.append(‘\n‘)
    request = ‘\n‘.join(h)
    socket.send(request)
    print socket.recv(8192)
    socket.close()

if __name__ =="__main__":
    import sys
    if sys.argv[1] == ‘start‘:
        send_request(‘SCAN_MODULE‘,{‘action‘:‘start‘,‘task_ids‘:‘1‘})
    else:
        send_request(‘SCAN_MODULE‘,{‘action‘:‘stop‘,‘task_ids‘:‘1‘})

我们来简单的测试一下:

注意:由于要注册到服务,cmd需要管理员权限

至于start中调用的scan.py随便写一个就可以

截图如下,我们看到成功!!!

本文代码已放到github,https://github.com/Skycrab/pymgr

感兴趣的童鞋可以参考,请大家多提意见。

[Python网络编程]浅析守护进程后台任务的设计与实现

时间: 2024-10-17 00:19:15

[Python网络编程]浅析守护进程后台任务的设计与实现的相关文章

《网络编程》守护进程

前言 守护进程是在后台运行并独立于所有终端控制的进程.守护进程没有控制终端源于它们通常是由系统初始化脚本启动,但是也有可能从某个终端由用户在 shell 提示符下键入命令行启动,这种启动方式的守护进程必须亲自脱离与控制终端的关联,从而避免与作业控制.终端会话管理.终端产生信号等发生任何不期望的交互,也可以避免在后台运行的守护进程非预期地输出到终端.有关作业控制.终端控制的内容可参考文章<作业控制.终端控制 和 守护进程> 由于守护进程没有控制终端,当守护进程出错时,必须通过某种输出函数输出错误

python网络编程基础(线程与进程、并行与并发、同步与异步)

python网络编程基础(线程与进程.并行与并发.同步与异步) 目录 线程与进程 并行与并发 同步与异步 线程与进程 进程 前言 进程的出现是为了更好的利用CPU资源使到并发成为可能. 假设有两个任务A和B,当A遇到IO操作,CPU默默的等待任务A读取完操作再去执行任务B,这样无疑是对CPU资源的极大的浪费.聪明的老大们就在想若在任务A读取数据时,让任务B执行,当任务A读取完数据后,再切换到任务A执行.注意关键字切换,自然是切换,那么这就涉及到了状态的保存,状态的恢复,加上任务A与任务B所需要的

Python 网络编程

Python 提供了两个级别访问的网络服务.: 低级别的网络服务支持基本的 Socket,它提供了标准的 BSD Sockets API,可以访问底层操作系统Socket接口的全部方法. 高级别的网络服务模块 SocketServer, 它提供了服务器中心类,可以简化网络服务器的开发. 什么是 Socket? Socket又称"套接字",应用程序通常通过"套接字"向网络发出请求或者应答网络请求,使主机间或者一台计算机上的进程间可以通讯. socket()函数 Pyt

ios开发 网络编程浅析(一)

iphone包含了很多框架和库,从底层的套接字到不同层次的封装,可以方便地给程序添加网络功能. (1)BSD套接字.最底层的套接字,这是Unix网络开发常用的API.如果从其他系统移植程序,而程序用的是BSD套接字,那么网络部分可以继续使用这些API. (2)CFNetwork framework .CFNetwork 也是比较底层的, 是对BSD套接字的一个扩展 .它是一个C语言的库,它是基于BSD套接字,提供了对网络协议的抽象.这些抽象使得用户更容易地操作套接字.处理网络的各种连接..它集成

[python] 网络编程之套接字Socket、TCP和UDP通信实例

很早以前研究过C#和C++的网络通信,参考我的文章: C#网络编程之Tcp实现客户端和服务器聊天 C#网络编程之套接字编程基础知识 C#网络编程之使用Socket类Send.Receive方法的同步通讯 Python网络编程也类似.同时最近找工作笔试面试考察Socket套接字.TCP\UDP区别比较多,所以这篇文章主要精简了<Python核心编程(第二版)>第16章内容.内容包括:服务器和客户端架构.套接字Socket.TCP\UDP通信实例和常见笔试考题. 最后希望文章对你有所帮助,如果有不

PHP高级编程之守护进程,实现优雅重启

PHP高级编程之守护进程 http://netkiller.github.io/journal/php.daemon.html Mr. Neo Chen (陈景峰), netkiller, BG7NYT 中国广东省深圳市龙华新区民治街道溪山美地 518131 +86 13113668890 +86 755 29812080 <[email protected]> 版权 ? 2014 http://netkiller.github.io 版权声明 转载请与作者联系,转载时请务必标明文章原始出处和

Python 网络编程(二)

Python 网络编程 上一篇博客介绍了socket的基本概念以及实现了简单的TCP和UDP的客户端.服务器程序,本篇博客主要对socket编程进行更深入的讲解 一.简化版ssh实现 这是一个极其简单的仿ssh的socket程序,实现的功能为客户端发送命令,服务端接收到客户端的命令,然后在服务器上通过subrocess模块执行命令,如果命令执行有误,输出内容为空,则返回"command error"的语句给客户端,否则将命令执行的结果返回给客户端 服务端 1 2 3 4 5 6 7 8

python网络编程——socket进阶篇(select/poll/epoll)

原 生socket客户端在与服务端建立连接时,即服务端调用accept方法时是阻塞的,同时服务端和客户端在收数据(调用recv)时也是阻塞的.原生 socket服务端在同一时刻只能处理一个客户端请求,即服务端不能同时与多个客户端进行通信,实现并发,导致服务端资源闲置(此时服务端只占据 I/O,CPU空闲). 现在的需求是:我们要让多个客户端连接至服务器端,而且服务器端需要处理来自多个客户端请求.很明显,原生socket实现不了这种需求,此时我们该采用什么方式来处理呢? 解决方法:采用I/O多路复

python 网络编程 (二)---tcp

异常 python的socket模块实际上定义了4种可能出现的异常: 1)与一般I/O 和通信问题有关的socket.error; 2)与查询地址信息有关的socket.gaierror; 3)与其他地址错误有关的socket.herror; 4)与在一个socket上调用settimeout()后,处理超时有关的socket.timeout; import socket, sys, time host = sys.argv[1] textport = sys.argv[2] filename