一般讨论socket的并发安全性,都是指线程的安全性。。。而且绝大多数的情况下socket都不是线程安全的。。
当然一些框架可能会对socket进行一层封装,让其成为线程安全的。。。例如java的netty框架就是如此,将socket封装成channel,然后让channel封闭到一个线程中,那么这个channel的所有的读写都在它所在的线程中串行的进行,那么自然也就是线程安全的了。。。。。
其实很早看Gevent的源码的时候,就已经看过这部分的东西了,当时就已经知道gevent的socket不是协程安全的,也就是说gevnet的socket不能在不同的协程中同时读取或者写。。。。
例如我们不能同时在两个协程中调用socket.recv方法。。。。
不过好像自己现在已经忘了,那就再看看,顺便写篇博客记录下来,以防以后又忘记了,还找不到资料
那么为什么呢。。?我们来分析一下源代码吧,这里就拿send方法来分析,先来看看gevent的send方法的定义:
#这里发送数据不会保证发送的数据都发送完,而是能发送多少发送多少 #然后返回发送的数据大小 def send(self, data, flags=0, timeout=timeout_default): sock = self._sock if timeout is timeout_default: timeout = self.timeout try: return sock.send(data, flags) except error: #EWOULDBLOCK 当前操作可能会阻塞 ,对于非阻塞的socket,也就是说明缓冲区已经写满了 #那么这里要做的事情就是等待当前的write_event事件,然后再写数据 ex = sys.exc_info()[1] if ex.args[0] != EWOULDBLOCK or timeout == 0.0: raise sys.exc_clear() self._wait(self._write_event) try: return sock.send(data, flags) except error: ex2 = sys.exc_info()[1] if ex2.args[0] == EWOULDBLOCK: return 0 raise
也就是说,如果当前没办反发送,那么就会调用_wait方法来等待_write_event事件,
#等待某一个watcher,可以是read或者write事件,这里可以带有超时 def _wait(self, watcher, timeout_exc=timeout('timed out')): """Block the current greenlet until *watcher* has pending events. If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed. By default *timeout_exc* is ``socket.timeout('timed out')``. If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``. """ assert watcher.callback is None, 'This socket is already used by another greenlet: %r' % (watcher.callback, ) if self.timeout is not None: #在等待之前,先挂起timeout,如果超市了,将会执行timeout #如果超时先结束,那么会返回到当前协程抛出异常 timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False) else: timeout = None try: self.hub.wait(watcher) #在hub上面等待这个watcher,在这个里面会切换到hub的运行,然后等到watcher有反应了再切换回来 finally: if timeout is not None: timeout.cancel()
其实这里很简单,就是在hub上面等待当前socket的写事件的watcher
#当在调用gevent.sleep的时候如果传入了大于零的时间,将会用这里来处理 #watcher是一个在loop上面注册的事件,可能是读,写或者定时 #用于在loop上面注册watcher,然后将当前协程切换出去 def wait(self, watcher): waiter = Waiter() #首先创建一个waiter对象 unique = object() watcher.start(waiter.switch, unique) #当watcher超时的时候将会调用waiter的switch方法这样就可以切换回来当前的协程 try: result = waiter.get() #调用waiter的get方法,主要是让将当前调用sleep的greenlet切换出去,然后切换到hub的运行 assert result is unique, 'Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique) finally: watcher.stop()
这个应该很简单吧。。创建一个waiter事件,然后调用watcher对象的start方法,回调方法就设置成当前waiter对象的switch方法,然后调用get方法,将当前的协程切换出去。。。。
那么来来IO类型的watcher的start方法吧:
#I/Owatcher的定义 cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]: WATCHER_BASE(io) #通过这个宏定义了一些基本的属性,例如libev的 watcher 引用等 #这个其实其实就是启动watcher #这里的callback一般情况下都是waiter对象的switch方法,这样,当有IO事件之后就可以回到之前的协程了 def start(self, object callback, *args, pass_events=False): CHECK_LOOP2(self.loop) if callback is None: raise TypeError('callback must be callable, not None') self.callback = callback if pass_events: self.args = (GEVENT_CORE_EVENTS, ) + args else: self.args = args LIBEV_UNREF libev.ev_io_start(self.loop._ptr, &self._watcher) #在libev的loop上面启动这个io watcher PYTHON_INCREF ACTIVE
嗯,这个是cython的代码,所以稍微别扭一些。。。,那么接下来再来看看libev的ev_io_start函数的实现吧:
void noinline ev_io_start (EV_P_ ev_io *w) EV_THROW { int fd = w->fd; if (expect_false (ev_is_active (w))) return; assert (("libev: ev_io_start called with negative fd", fd >= 0)); assert (("libev: ev_io_start called with illegal event mask", !(w->events & ~(EV__IOFDSET | EV_READ | EV_WRITE)))); EV_FREQUENT_CHECK; ev_start (EV_A_ (W)w, 1); //将监视器设置为active //判断当前的anfds的大小是否足够放入新的fd,如果不够的话,那么需要重新分配 array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero); //将这个watcher放到当前fd的wather队列的头部 wlist_add (&anfds[fd].head, (WL)w); /* common bug, apparently */ assert (("libev: ev_io_start called with corrupted watcher", ((WL)w)->next != (WL)w)); fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY); //将该fd放到需要改变的数组,在合适的时候将会在loop上修改 w->events &= ~EV__IOFDSET; EV_FREQUENT_CHECK; }
这里主要就是激活当前的watcher对象,然后将这个watcher对象放到当前文件描述符的watcher链表的头部。。。嗯。。也就是wlist_add方法要做的事情。。。其实看到这里就知道socket肯定不是协程安全的了。。。
嗯,相信大家也一定懂了。。。。不懂的话。。就再看看代码就知道了。。。。。