1.概述
Openstack中有一个叫Launcher的概念,即专门用来启动服务的,这个类被放在了oslo_service这个包里面。Launcher分为两种,一种是ServiceLauncher,另一种为ProcessLauncher。ServiceLauncher用来启动单进程的服务,而ProcessLauncher用来启动有多个worker子进程的服务。
2.ServiceLauncher
ServiceLauncher继承自Launcher,启动服务的一个重要成员就是launcher_service,ServiceLauncher没有对该成员函数进行任何改写。
def launch_service(self, service, workers=1): """Load and start the given service. :param service: The service you would like to start, must be an instance of :class:`oslo_service.service.ServiceBase` :param workers: This param makes this method compatible with ProcessLauncher.launch_service. It must be None, 1 or omitted. :returns: None """ if workers is not None and workers != 1: raise ValueError(_("Launcher asked to start multiple workers")) _check_service_base(service) service.backdoor_port = self.backdoor_port self.services.add(service)
laucher_service就是将服务添加到self.services成员里面,services成员的类型是class Services,看看它的add方法。
class Services(object): def __init__(self): self.services = [] self.tg = threadgroup.ThreadGroup() self.done = event.Event() def add(self, service): """Add a service to a list and create a thread to run it. :param service: service to run """ self.services.append(service) self.tg.add_thread(self.run_service, service, self.done) @staticmethod def run_service(service, done): """Service start wrapper. :param service: service to run :param done: event to wait on until a shutdown is triggered :returns: None """ try: service.start() except Exception: LOG.exception(‘Error starting thread.‘) raise SystemExit(1) else: done.wait()
Services这个类的初始化很简单,即创建一个ThreadGroup,ThreadGroup其实是eventlet的GreenPool,Openstack利用eventlet实现并发。add方法,将self.run_service这个方法放入pool中,而service就是它的参数。run_service方法很简单,就是调用service的start方法,这样就完成了服务的启动。
3.ProcessLauncher
ProcessLauncher直接继承于Object,所以其对launch_service方法进行了实现。
class ProcessLauncher(object): def launch_service(self, service, workers=1): """Launch a service with a given number of workers. :param service: a service to launch, must be an instance of :class:`oslo_service.service.ServiceBase` :param workers: a number of processes in which a service will be running """ _check_service_base(service) wrap = ServiceWrapper(service, workers) LOG.info(‘Starting %d workers‘, wrap.workers) while self.running and len(wrap.children) < wrap.workers: self._start_child(wrap)
lauch_service除了接受service参数以外,还需要接受一个workers参数,即子进程的个数,然后调用_start_child启动多个子进程。
def _start_child(self, wrap): if len(wrap.forktimes) > wrap.workers: # Limit ourselves to one process a second (over the period of # number of workers * 1 second). This will allow workers to # start up quickly but ensure we don‘t fork off children that # die instantly too quickly. if time.time() - wrap.forktimes[0] < wrap.workers: LOG.info(‘Forking too fast, sleeping‘) time.sleep(1) wrap.forktimes.pop(0) wrap.forktimes.append(time.time()) pid = os.fork() if pid == 0: self.launcher = self._child_process(wrap.service) while True: self._child_process_handle_signal() status, signo = self._child_wait_for_exit_or_signal( self.launcher) if not _is_sighup_and_daemon(signo): self.launcher.wait() break self.launcher.restart() os._exit(status) LOG.debug(‘Started child %d‘, pid) wrap.children.add(pid) self.children[pid] = wrap return pid
_start_child只是简单的调用了一个os.fork(),然后子进程开始运行,子进程调用_child_process。
def _child_process(self, service): self._child_process_handle_signal() # Reopen the eventlet hub to make sure we don‘t share an epoll # fd with parent and/or siblings, which would be bad eventlet.hubs.use_hub() # Close write to ensure only parent has it open os.close(self.writepipe) # Create greenthread to watch for parent to close pipe eventlet.spawn_n(self._pipe_watcher) # Reseed random number generator random.seed() launcher = Launcher(self.conf, restart_method=self.restart_method) launcher.launch_service(service) return launcher
_child_process其实很简单,创建一个Launcher,调用Laucher.launch_service方法。前面介绍过ServiceLauncher继承自Launcher,也是调用的launch_service方法,将服务启动,因此接下来的步骤与前面一致,最终都将调用service.start方法启动服务。