twisted的defer模式和线程池

Reference: http://www.cnblogs.com/mumuxinfei/p/4528910.html

前言:
  最近帮朋友review其模块服务代码, 使用的是python的twisted网络框架. 鉴于之前并没有使用过, 于是决定好好研究一番.
  twisted的reactor模型很好的处理了网络IO事件, 以及定时任务触发. 但包处理后的业务逻辑操作, 需要根据具体的场景来决定.
  本文将讲述twisted如何实现half-sync/half-async的模式, 其线程池和defer模式是如何设计和使用的.

场景构造:
  twisted服务接受业务请求, 后端需要访问mysql. 由于mysql的接口是同步的, 如果安装twisted默认的方式处理话, 其业务操作(mysql)会阻塞reactor的IO事件循环. 这大大降低了twisted的服务能力. 
  为了解决该类问题, twisted支持线程池. 把业务逻辑和IO事件分离, IO操作依旧是异步的, 而业务逻辑则采用线程池来处理.

  

工作线程池:
  在具体讲述defer模式之前, 先谈谈reactor自带的线程池, 这也符合使用half-sync/half-async模式的直观理解.
  先来构造下一个基础样例代码:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

#! /usr/bin/python

#-*- coding: UTF-8 -*-

from twisted.internet import reactor

from twisted.internet import protocol

from twisted.protocols.basic import LineReceiver

import time

class DemoProtocol(LineReceiver):

               

    def lineReceived(self, line):

        # 进行数据包的处理

        reactor.callInThread(self.handle_request, line)

    

    def handle_request(self, line):

        """

            hanlde_request:

                进行具体的业务逻辑处理

        """

        # 边使用sleep(1)来代替模拟

        time.sleep(1)

        # 借助callFromThread响应结果

        reactor.callFromThread(self.write_response, line)

    

    def write_response(self, result):

        self.transport.write("ack:" + str(result) + "\r\n")

class DemoProtocolFactory(protocol.Factory):

    def buildProtocol(self, addr):

        return DemoProtocol()

    

reactor.listenTCP(9090, DemoProtocolFactory())

reactor.run()

  DemoProtocol在收到一行消息, 需要处理一个业务需耗时一秒, 于是其调用callInThread来借助reactor的线程池来执行.
  其callInThread的函数定义如下:


1

2

def callInThread(self, _callable, *args, **kwargs):

        self.getThreadPool().callInThread(_callable, *args, **kwargs)

  从中, 我们可以印证之前的观点, 借助线程池来完成耗时阻塞的业务工作.
  再来看一下callFromThread的函数定义:


1

2

3

4

def callFromThread(self, f, *args, **kw):

        assert callable(f), "%s is not callable" % (f,)

        self.threadCallQueue.append((f, args, kw))

        self.wakeUp()

  其作用是把回调放入主线程(也是reactor主事件循环)的待执行队列中, 并及时唤醒reactor.
  我们把写入响应的操作放入主循环中, 是为了让IO集中在主循环中进行, 避免潜在的线程不安全的问题.

defer模式:
  直接使用reactor的线程池, 非常容易实现half-sync/half-async的模式, 也让IO和业务逻辑隔离. 但reactor设计之初, 更倾向于隐藏其内部的线程池. 于是其引入了defer模式.
  让我们实现与上等同的代码片段:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

#! /usr/bin/python

#-*- coding: UTF-8 -*-

from twisted.internet import reactor

from twisted.internet import protocol

from twisted.protocols.basic import LineReceiver

from twisted.internet.threads import deferToThread

import time

class DemoProtocol(LineReceiver):

               

    def lineReceived(self, line):

        # 进行数据包的处理

        deferToThread(self.handle_request, line).addCallback(self.write_response)

    

    def handle_request(self, line):

        """

            hanlde_request:

                进行具体的业务逻辑处理

        """

        # 边使用sleep(1)来代替模拟

        time.sleep(1)

        return line

    

    def write_response(self, result):

        self.transport.write("ack:" + str(result) + "\r\n")

    

class DemoProtocolFactory(protocol.Factory):

    def buildProtocol(self, addr):

        return DemoProtocol()

    

reactor.listenTCP(9090, DemoProtocolFactory())

reactor.run()

  使用defer后, 代码更加的简洁. 其defer对象, 其实借用了线程池. 
  threads.deferToThread定义如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

def deferToThread(f, *args, **kwargs):

    from twisted.internet import reactor

    return deferToThreadPool(reactor, reactor.getThreadPool(),

                             f, *args, **kwargs)

def deferToThreadPool(reactor, threadpool, f, *args, **kwargs):

    = defer.Deferred()

    def onResult(success, result):

        if success:

            reactor.callFromThread(d.callback, result)

        else:

            reactor.callFromThread(d.errback, result)

    threadpool.callInThreadWithCallback(onResult, f, *args, **kwargs)

    return d

  这边我们可以发现deferToThread, 就是间接调用了callInThread函数, 另一方面, 对其回调函数的执行结果, 进行了onCallback, 以及onErrback的调用. 这些回调函数在主线程中运行.
  defer模式简化了程序编写, 也改变了人们开发的思维模式.

测试回顾:
  使用telnet进行测试, 结果正常.
  
  另一方面, twisted的线程池, 其默认是采用延迟初始化的方式.
  服务开启时, 只有主线程一个, 随着请求的到来, 其按需产生更多的worker thread.
  而其线程池默认为10. 我们可以借助suggestThreadPoolSize方法来修改.

时间: 2024-09-30 10:44:16

twisted的defer模式和线程池的相关文章

JAVA 生产者租塞模式的线程池 ThreadPoolExecutor

package com.dubbo.analyzer.executor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 任务执行者<br/> * 当线程不够

Java多线程设计模式(4)线程池模式

前序: Thread-Per-Message Pattern,是一种对于每个命令或请求,都分配一个线程,由这个线程执行工作.它将“委托消息的一端”和“执行消息的一端”用两个不同的线程来实现.该线程模式主要包括三个部分: 1,Request参与者(委托人),也就是消息发送端或者命令请求端 2,Host参与者,接受消息的请求,负责为每个消息分配一个工作线程. 3,Worker参与者,具体执行Request参与者的任务的线程,由Host参与者来启动. 由于常规调用一个方法后,必须等待该方法完全执行完毕

Linux高性能server规划——处理池和线程池

进程池和线程池 池的概念 由于server的硬件资源"充裕".那么提高server性能的一个非常直接的方法就是以空间换时间.即"浪费"server的硬件资源.以换取其执行效率.这就是池的概念. 池是一组资源的集合,这组资源在server启动之初就全然被创建并初始化,这称为静态资源分配. 当server进入正是执行阶段.即開始处理客户请求的时候.假设它须要相关的资源,就能够直接从池中获取,无需动态分配.非常显然,直接从池中取得所需资源比动态分配资源的速度要快得多.由于

MySQL详解(7)-----------MySQL线程池总结(一)

线程池是Mysql5.6的一个核心功能,对于服务器应用而言,无论是web应用服务还是DB服务,高并发请求始终是一个绕不开的话题.当有大量请求并发访问时,一定伴随着资源的不断创建和释放,导致资源利用率低,降低了服务质量.线程池是一种通用的技术,通过预先创建一定数量的线程,当有请求达到时,线程池分配一个线程提供服务,请求结束后,该线程又去服务其他请求. 通过这种方式,避免了线程和内存对象的频繁创建和释放,降低了服务端的并发度,减少了上下文切换和资源的竞争,提高资源利用效率.所有服务的线程池本质都是位

Linux高性能服务器编程——进程池和线程池

进程池和线程池 池的概念 由于服务器的硬件资源"充裕",那么提高服务器性能的一个很直接的方法就是以空间换时间,即"浪费"服务器的硬件资源,以换取其运行效率.这就是池的概念.池是一组资源的集合,这组资源在服务器启动之初就完全被创建并初始化,这称为静态资源分配.当服务器进入正是运行阶段,即开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取,无需动态分配.很显然,直接从池中取得所需资源比动态分配资源的速度要快得多,因为分配系统资源的系统调用都是很耗时的.当

MySQL线程池总结

线程池是Mysql5.6的一个核心功能,对于服务器应用而言,无论是web应用服务还是DB服务,高并发请求始终是一个绕不开的话题.当有大量请求并发访问时,一定伴随着资源的不断创建和释放,导致资源利用率低,降低了服务质量.线程池是一种通用的技术,通过预先创建一定数量的线程,当有请求达到时,线程池分配一个线程提供服务,请求结束后,该线程又去服务其他请求. 通过这种方式,避免了线程和内存对象的频繁创建和释放,降低了服务端的并发度,减少了上下文切换和资源的竞争,提高资源利用效率.所有服务的线程池本质都是位

详解tomcat的连接数与线程池

前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xml 中写到过:Connector的主要功能,是接收连接请求,创建Request和Response对象用于和请求端交换数据:然后分配线程让Engine(也就是Servlet容器)来处理这个请求,并把产生的Request和Response对象传给Engine.当Engine处理完请求后,也会通过Conn

详解 Tomcat 的连接数与线程池

前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xml 中写到过:Connector的主要功能,是接收连接请求,创建Request和Response对象用于和请求端交换数据:然后分配线程让Engine(也就是Servlet容器)来处理这个请求,并把产生的Request和Response对象传给Engine.当Engine处理完请求后,也会通过Conn

【Tomcat】详解tomcat的连接数与线程池

前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xml 中写到过:Connector的主要功能,是接收连接请求,创建Request和Response对象用于和请求端交换数据:然后分配线程让Engine(也就是Servlet容器)来处理这个请求,并把产生的Request和Response对象传给Engine.当Engine处理完请求后,也会通过Conn