Gevent的socket协程安全性分析

一般讨论socket的并发安全性,都是指线程的安全性。。。而且绝大多数的情况下socket都不是线程安全的。。

当然一些框架可能会对socket进行一层封装,让其成为线程安全的。。。例如java的netty框架就是如此,将socket封装成channel,然后让channel封闭到一个线程中,那么这个channel的所有的读写都在它所在的线程中串行的进行,那么自然也就是线程安全的了。。。。。

其实很早看Gevent的源码的时候,就已经看过这部分的东西了,当时就已经知道gevent的socket不是协程安全的,也就是说gevnet的socket不能在不同的协程中同时读取或者写。。。。

例如我们不能同时在两个协程中调用socket.recv方法。。。。

不过好像自己现在已经忘了,那就再看看,顺便写篇博客记录下来,以防以后又忘记了,还找不到资料

那么为什么呢。。?我们来分析一下源代码吧,这里就拿send方法来分析,先来看看gevent的send方法的定义:

    #这里发送数据不会保证发送的数据都发送完,而是能发送多少发送多少
    #然后返回发送的数据大小
    def send(self, data, flags=0, timeout=timeout_default):
        sock = self._sock
        if timeout is timeout_default:
            timeout = self.timeout
        try:
            return sock.send(data, flags)
        except error:
            #EWOULDBLOCK 当前操作可能会阻塞 ,对于非阻塞的socket,也就是说明缓冲区已经写满了
            #那么这里要做的事情就是等待当前的write_event事件,然后再写数据
            ex = sys.exc_info()[1]
            if ex.args[0] != EWOULDBLOCK or timeout == 0.0:
                raise
            sys.exc_clear()
            self._wait(self._write_event)
            try:
                return sock.send(data, flags)
            except error:
                ex2 = sys.exc_info()[1]
                if ex2.args[0] == EWOULDBLOCK:
                    return 0
                raise

也就是说,如果当前没办反发送,那么就会调用_wait方法来等待_write_event事件,

    #等待某一个watcher,可以是read或者write事件,这里可以带有超时
    def _wait(self, watcher, timeout_exc=timeout('timed out')):
        """Block the current greenlet until *watcher* has pending events.

        If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
        By default *timeout_exc* is ``socket.timeout('timed out')``.

        If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
        """
        assert watcher.callback is None, 'This socket is already used by another greenlet: %r' % (watcher.callback, )
        if self.timeout is not None: #在等待之前,先挂起timeout,如果超市了,将会执行timeout
            #如果超时先结束,那么会返回到当前协程抛出异常
            timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
        else:
            timeout = None
        try:
            self.hub.wait(watcher)  #在hub上面等待这个watcher,在这个里面会切换到hub的运行,然后等到watcher有反应了再切换回来
        finally:
            if timeout is not None:
                timeout.cancel()

其实这里很简单,就是在hub上面等待当前socket的写事件的watcher

    #当在调用gevent.sleep的时候如果传入了大于零的时间,将会用这里来处理
    #watcher是一个在loop上面注册的事件,可能是读,写或者定时
    #用于在loop上面注册watcher,然后将当前协程切换出去
    def wait(self, watcher):
        waiter = Waiter() #首先创建一个waiter对象
        unique = object()
        watcher.start(waiter.switch, unique) #当watcher超时的时候将会调用waiter的switch方法这样就可以切换回来当前的协程
        try:
            result = waiter.get() #调用waiter的get方法,主要是让将当前调用sleep的greenlet切换出去,然后切换到hub的运行
            assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique)
        finally:
            watcher.stop()

这个应该很简单吧。。创建一个waiter事件,然后调用watcher对象的start方法,回调方法就设置成当前waiter对象的switch方法,然后调用get方法,将当前的协程切换出去。。。。

那么来来IO类型的watcher的start方法吧:

#I/Owatcher的定义
cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:

    WATCHER_BASE(io)  #通过这个宏定义了一些基本的属性,例如libev的 watcher 引用等

    #这个其实其实就是启动watcher
    #这里的callback一般情况下都是waiter对象的switch方法,这样,当有IO事件之后就可以回到之前的协程了
    def start(self, object callback, *args, pass_events=False):
        CHECK_LOOP2(self.loop)
        if callback is None:
            raise TypeError('callback must be callable, not None')
        self.callback = callback
        if pass_events:
            self.args = (GEVENT_CORE_EVENTS, ) + args
        else:
            self.args = args
        LIBEV_UNREF
        libev.ev_io_start(self.loop._ptr, &self._watcher)  #在libev的loop上面启动这个io watcher
        PYTHON_INCREF

    ACTIVE

嗯,这个是cython的代码,所以稍微别扭一些。。。,那么接下来再来看看libev的ev_io_start函数的实现吧:

void noinline
ev_io_start (EV_P_ ev_io *w) EV_THROW
{
  int fd = w->fd;

  if (expect_false (ev_is_active (w)))
    return;

  assert (("libev: ev_io_start called with negative fd", fd >= 0));
  assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE))));

  EV_FREQUENT_CHECK;

  ev_start (EV_A_ (W)w, 1);   //将监视器设置为active
  //判断当前的anfds的大小是否足够放入新的fd,如果不够的话,那么需要重新分配
  array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero);
  //将这个watcher放到当前fd的wather队列的头部
  wlist_add (&anfds[fd].head, (WL)w);

  /* common bug, apparently */
  assert (("libev: ev_io_start called with corrupted watcher", ((WL)w)->next != (WL)w));

  fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY);   //将该fd放到需要改变的数组,在合适的时候将会在loop上修改
  w->events &= ~EV__IOFDSET;

  EV_FREQUENT_CHECK;
}

这里主要就是激活当前的watcher对象,然后将这个watcher对象放到当前文件描述符的watcher链表的头部。。。嗯。。也就是wlist_add方法要做的事情。。。其实看到这里就知道socket肯定不是协程安全的了。。。

嗯,相信大家也一定懂了。。。。不懂的话。。就再看看代码就知道了。。。。。

时间: 2024-10-05 05:06:10

Gevent的socket协程安全性分析的相关文章

循环开协程情况分析

循环开协程情况分析 代码 package main import( "fmt" "runtime" "sync" ) //for循环只是用来创建协程而已,当协程创建完成之后,就不再受for控制 func main(){ runtime.GOMAXPROCS(1) wg := sync.WaitGroup{} wg.Add(60) for a:=0;a<10;a++{ fmt.Println("这里先运行",a) go f

这个对协程的分析不错

协程诞生解决的是低速IO和高速的CPU的协调问题,解决这类问题主要有三个有效途径: 异步非阻塞网络编程(libevent.libev.redis.Nginx.memcached这类) 协程(golang.gevent) "轻量级线程",相当于是在语言层面做抽象(Erlang) 对比之下协程的编程难度较低,不要求编程人员要有那么高的抽象思维能力.再加上golang在这方面优秀的实践,协程目前的前途还是一片光明的. 当然还有一点,我们要承认无论你状态机.callback设计得多么精妙,现实

基于协程的Python网络库gevent

import gevent def test1(): print 12 gevent.sleep(0) print 34 def test2(): print 56 gevent.sleep(0) print 78 gevent.joinall([ gevent.spawn(test1), gevent.spawn(test2), ]) 解释下,"gevent.spawn()"方法会创建一个新的greenlet协程对象,并运行它."gevent.joinall()"

Gevent的协程实现原理

之前之所以看greenlet的代码实现,主要就是想要看看gevent库的实现代码...然后知道了gevent的协程是基于greenlet来实现的...所以就又先去看了看greenlet的实现... 这里就不说greenlet的具体实现了,关键就是栈数据的复制拷贝,栈指针的位移... 因为gevent带有自己的I/O以及定时循环,所以它对greenlet又加了一层的扩展... 这里我们用如下的代码来举例子,然后再来具体的分析gevent是如何扩展greenlet的吧: import gevent

协程,greenlet原生协程库, gevent库

yield表达式 在了解协程之前,需要先了解一下生成器中的yield,它不仅可以当做生成器,还能当做一个表达式来使用(yield) def func(): x = (yield) print(x) x = (yield) g = func() print(next(g)) # 这是第一个yield,就暂停了 g.send('hello world') # 恢复暂停位置,将第一个yield赋值, # x = hello world,然后又执行到yield,暂停 --> None hello wor

python中socket、进程、线程、协程、池的创建方式

一.TCP-socket 服务端: import socket tcp_sk = socket.socket() tcp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) tcp_sk.bind(('127.0.0.1',8000)) tcp_sk.listen() conn,addr = tcp_sk.accept() conn.send('你好'.encode('utf-8')) print(conn.recv(1024).deco

Python线程和协程-day10

写在前面 上课第10天,打卡: 感谢Egon老师细致入微的讲解,的确有学到东西! 一.线程 1.关于线程的补充 线程:就是一条流水线的执行过程,一条流水线必须属于一个车间: 那这个车间的运行过程就是一个进程: 即一个进程内,至少有一个线程: 进程是一个资源单位,真正干活的是进程里面的线程: 线程是一个执行单位: 多线程:一个车间内有多条流水线,多个流水线共享该车间的资源: 一个进程内有多个线程,多线程共享一个进程的资源: 线程创建的开销要远远小于创建进程的开销: 进程之间更多的是一种竞争关系:

Day29:协程

一.协程 协程,又称微线程,纤程.英文名Coroutine.一句话说明什么是线程:协程是一种用户态的轻量级线程. 协程拥有自己的寄存器上下文和栈.协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈.因此: 协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置. 1.1 yield与协程 import time """ 传统的生产者-消

Day41:协程

一.协程 协程,又称微线程,纤程.英文名Coroutine.一句话说明什么是线程:协程是一种用户态的轻量级线程. 协程拥有自己的寄存器上下文和栈.协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈.因此: 协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置. 1.1 yield与协程 import time """ 传统的生产者-消