skynet源码分析之socketchannel

请求回应模式是与外部交互最常用的模式之一。通常协议设计方式有两种:1.每个请求包对应一个回应包,有tcp保证时序,先请求的先回应,但不必收到回应才发送下一个请求,redis的协议就是这种类型;2.每个请求带一个唯一的session标识,回应包也带这个标识。这样每个请求不一定都需要回应,且不用遵循先请求先回应的时序。mongodb的协议就是这种类型。skynet提供socketchannel库封装内部细节,支持上面两种模式。详情参考官方wiki https://github.com/cloudwu/skynet/wiki/SocketChannel

调用socketchannel.channel创建一个channel对象,必须提供ip地址(可以是域名)和端口。采用第一种还是第二种模式依据是否提供response参数,redis没有提供说明用的第一种模式,mongo提供了(第13行)说明用第二种模式。

 1 -- lualib/skynet/db/redis.lua
 2  local channel = socketchannel.channel {
 3      host = db_conf.host,
 4      port = db_conf.port or 6379,
 5      auth = redis_login(db_conf.auth, db_conf.db),
 6      nodelay = true,
 7  }
 8
 9  -- lualib/skynet/db/mongo.lua
10  obj.__sock = socketchannel.channel {
11      host = obj.host,
12      port = obj.port,
13      response = dispatch_reply,
14      auth = mongo_auth(obj),
15      backup = backup,
16      nodelay = true,
17  }
18
19  -- lualib/skynet/socketchannel.lua
20  function socket_channel.channel(desc)
21      local c = {
22          __host = assert(desc.host),
23          __port = assert(desc.port),
24          __backup = desc.backup,
25          __auth = desc.auth,
26          __response = desc.response,     -- It‘s for session mode
27          __request = {}, -- request seq { response func or session }     -- It‘s for order mode
28          __thread = {}, -- coroutine seq or session->coroutine map
29          __result = {}, -- response result { coroutine -> result }
30          __result_data = {},
31          __connecting = {},
32          __sock = false,
33          __closed = false,
34          __authcoroutine = false,
35          __nodelay = desc.nodelay,
36      }
37
38      return setmetatable(c, channel_meta)
39  end

创建完对象后,可以手动调用connect连接对端,如果不connect,在第一次发送请求的时候会尝试去连接。最终调用到connect_once,

第7行,调用socket库api连接对端

第11行,fork一个协程专门处理收到回应包

15-21行,如果是模式1,收到回应包后的处理函数是dispatch_by_order,模式2则是dispatch_by_session

 1 -- lualib/skynet/socketchannel.lua
 2 local function connect_once(self)
 3     if self.__closed then
 4         return false
 5     end
 6     assert(not self.__sock and not self.__authcoroutine)
 7     local fd,err = socket.open(self.__host, self.__port)
 8     ...
 9
10     self.__sock = setmetatable( {fd} , channel_socket_meta )
11     self.__dispatch_thread = skynet.fork(dispatch_function(self), self)
12     ...
13 end
14
15 local function dispatch_function(self)
16     if self.__response then
17         return dispatch_by_session
18     else
19         return dispatch_by_order
20     end
21 end

接下来先介绍发送请求包的流程,之后再介绍如何处理回应包。调用者通过channel:request发送请求包,该接口有三个参数:参数request请求包数据;参数response在模式1下是一个function用来接收回应包,模式2下是一个唯一的session值;参数padding可选,表示将巨大消息拆分成多个小包发送出去。

第2行,检测是否已连接,如果未连接,会尝试去连接

第8行,调用socket库把发送请求包。

第13-16行,不需要回应直接返回。

第18,23,35-48行,保存当前co。如果是模式2,保留session-co映射关系在self.__thread里(38行);如果是模式1,保留response函数在self.__request里,co在self.__threaad里(41,42行)。

43-46行,如果有暂停的co在等待回应包,重启它。

第24行,暂停当前co,等待对方回应包。当收到回应包时,回应处理函数会重启它。

25-32行,返回结果给调用者。

 1 function channel:request(request, response, padding)
 2     assert(block_connect(self, true))       -- connect once
 3     local fd = self.__sock[1]
 4
 5     if padding then
 6         ...
 7     else
 8         if not socket_write(fd , request) then
 9             sock_err(self)
10         end
11     end
12
13     if response == nil then
14         -- no response
15         return
16     end
17
18     return wait_for_response(self, response)
19 end
20
21 local function wait_for_response(self, response)
22     local co = coroutine.running()
23     push_response(self, response, co)
24     skynet.wait(co)
25
26     local result = self.__result[co]
27     self.__result[co] = nil
28     local result_data = self.__result_data[co]
29     self.__result_data[co] = nil
30     ...
31
32     return result_data
33 end
34
35 local function push_response(self, response, co)
36     if self.__response then
37         -- response is session
38         self.__thread[response] = co
39     else
40         -- response is a function, push it to __request
41         table.insert(self.__request, response)
42         table.insert(self.__thread, co)
43         if self.__wait_response then
44             skynet.wakeup(self.__wait_response)
45             self.__wait_response = nil
46         end
47     end
48 end

对于模式1的回应处理函数dispatch_by_order,

第4行,调用pop_response获取第一个未回应的请求包的response和co

第6行,调用response函数,response函数调用socket库的readline/read(24行)来等待socket上的返回,是一个阻塞操作。等socket返回后,response函数返回

第11-16行,返回结果保存在self.__result_data

第17行,重启调用者发送请求包的co,把结果返回给调用者(上面代码的26-32行),至此完成一次与对端请求回应交互

 1 -- lualib/skynet/socketchannel.lua
 2 local function dispatch_by_order(self)
 3     while self.__sock do
 4         local func, co = pop_response(self)
 5         ...
 6         local ok, result_ok, result_data, padding = pcall(func, self.__sock)
 7         if ok then
 8             if padding and result_ok then
 9                 ...
10             else
11                 self.__result[co] = result_ok
12                 if result_ok and self.__result_data[co] then
13                     table.insert(self.__result_data[co], result_data)
14                 else
15                     self.__result_data[co] = result_data
16                 end
17                 skynet.wakeup(co)
18             end
19         end
20 end
21
22 -- lualib/skynet/db/redis.lua
23 local function read_response(fd)
24     local result = fd:readline "\r\n"
25     local firstchar = string.byte(result)
26     local data = string.sub(result,2)
27     return redcmd[firstchar](fd,data)
28 end

对于模式2的回应处理函数dispatch_by_session,

第6行,调用response函数,response函数会调用socket库的readline/read(30行)来等待socket上的返回,是一个阻塞操作。等socket返回后,response函数返回回应包(回应包包含唯一的session)

第8行,通过session获取对应的co

第13-21行,接下来处理跟上面一样,保存回应包内容,重启co。

 1  -- lualib/skynet/socketchannel.lua
 2  local function dispatch_by_session(self)
 3      local response = self.__response
 4      -- response() return session
 5      while self.__sock do
 6          local ok , session, result_ok, result_data, padding = pcall(response, self.__sock)
 7          if ok and session then
 8              local co = self.__thread[session]
 9              if co then
10                  if padding and result_ok then
11                      ...
12                  else
13                      self.__thread[session] = nil
14                      self.__result[co] = result_ok
15                      if result_ok and self.__result_data[co] then
16                          table.insert(self.__result_data[co], result_data)
17                      else
18                          self.__result_data[co] = result_data
19                      end
20                      skynet.wakeup(co)
21                  end
22              else
23                  self.__thread[session] = nil
24                  skynet.error("socket: unknown session :", session)
25              end
26  end
27
28  -- lualib/skynet/db/mongo.lua
29  local function dispatch_reply(so)
30      local len_reply = so:read(4)
31      local reply     = so:read(driver.length(len_reply))
32      ...
33      return reply_id, succ, result
34  end

原文地址:https://www.cnblogs.com/RainRill/p/8892648.html

时间: 2024-10-27 09:27:47

skynet源码分析之socketchannel的相关文章

skynet源码分析:服务

skynet是为多人在线游戏打造的轻量级服务端框架,使用c+lua实现.使用这套框架的一个好处就是,基本只需要lua,很少用到c做开发,一定程度上提高了开发效率. skynet的例子是怎么调用的 服务器: simpledb.lua: skynet.register "SIMPLEDB" 向skynet里注册一个服务 agent.lua: skynet.call("SIMPLEDB", "text", text) 调用相应的服务 main.lua:

skynet源码分析1:开篇明义

skynet是云风基于actor模型实现的一个服务器框架,核心七千多行c代码,并提供了一个lua binding.写得比较简明,用起来比较爽快,很合我的胃口,再加之决定在公司最近的一个项目上skynet,所以就决定精读一遍源码,将所思所想所得记录于此,以便用起来心安理得. skynet的实现基于如下几条actor行为: 能创建,销毁其它actor,actor间为平级关系. 能发送消息给其它actor,能接收其它actor的消息. actor间的交互只能通过消息传递. 能处理自己的消息. 在sky

skynet源码分析之cluster集群模式

比起slave/harbor集群模式,skynet提供了用的更为广泛的cluster集群模式,参考官方wiki https://github.com/cloudwu/skynet/wiki/Cluster.cluster模式利用socketchannel库(http://www.cnblogs.com/RainRill/p/8892648.html) 与其他skynet进程进行交互,每个请求包带一个唯一的session值,对端回应包附带session值.cluster集群模式tcp通道是单向的,

skynet源码分析5:lua绑定之地基

前面四篇已经涵盖了skynet的c层核心,剩下的timer,socket模块本身和actor模型没什么关系,且比较独立,最后再看吧.光用skynet的c接口,是很难在这上面写业务逻辑的,所以要找一种更爽快的方式来使用.官方推荐的是lua,利用lua的协程对skynet的消息分发做了封装,使得actor之间的异步消息通信有同步一样的操作感,并且做了一些的扩展模块来方便使用.lua简洁实用的风格我个人也很钟意. 要想做一个lua binding来使用,要有两个必要条件: 根据skynet的模块契约实

skynet源码分析3:消息调度

消息调度在框架中分为两个层次,一个c层的分配,一个是lua层的分发.本文阐述的是c层,从两个方面来说: 工作线程的控制 信箱的调度 与调度相关的代码实现在/skynet-src/skynet_mq.c,/skynet-src/skynet_start.c,/skynet-src/skynet_server.c三个文件中,整体上是一个m:n的调度器. 工作线程的控制 框架运行后,会启动固定的线程来轮流调度sc(skynet_context),线程数由配置文件中的thread字段定义,默认是4个.那

skynet 框架snax源码分析1---变量注入

skynet为了简化服务的编写,推出了snax框架,源码里也有一个例子pingserver.这是snax原创文章的第一篇,所以先就分析snax框架里的interface.lua源码,它的实现应用了一个闭包中的upvalue注入技巧. 凡是框架都得遵循框架的约定,snax有两个大的约定,一是约定了一组预置的接口init/exit/hotfix:二是accept/response这两组用来编写服务的接口.本文,并不涉及这些,而是谈accept/response是如何注入给snax服务的. snax框

NIO byteBUffer 讲解 及Mina 源码分析

1.传统的socket: 阻塞式通信模式 tcp连接: 与服务器连接时 .必须等到连接成功后 才返回 . udp连接: 客户端发送数据 ,必须等到发送成功后返回 . 每建立一个 Scoket连接时, 同事创建一个新线程对该 Socket进行单独通信(采用阻塞式通信 ) 这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果 对每一个连接都产生一个线程的无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况 2.1NIO 设计背后的基石:反应器模式,用于事

细水长流Hadoop源码分析(3)RPC Server初始化构造

声明:个人原创,转载请注明出处.文中引用了一些网上或书里的资料,如有不妥之处请告之. 本文是我阅读Hadoop 0.20.2第二遍时写的笔记,在阅读过程中碰到很多问题,最终通过各种途径解决了大部分.Hadoop整个系统设计精良,源码值得学习分布式的同学们阅读,以后会将所有笔记一一贴出,希望能方便大家阅读源码,少走弯路. 目录 4 RPC服务器(org.apache.hadoop,ipc.Server) 4.1 服务器初始化 4 RPC服务器(org.apache.hadoop,ipc.Serve

netty 5 alph1源码分析(服务端创建过程)

参照<Netty系列之Netty 服务端创建>,研究了netty的服务端创建过程.至于netty的优势,可以参照网络其他文章.<Netty系列之Netty 服务端创建>是 李林锋撰写的netty源码分析的一篇好文,绝对是技术干货.但抛开技术来说,也存在一些瑕疵. 缺点如下 代码衔接不连贯,上下不连贯. 代码片段是截图,对阅读代理不便(可能和阅读习惯有关) 本篇主要内容,参照<Netty系列之Netty 服务端创建>,梳理出自己喜欢的阅读风格. 1.整体逻辑图 整体将服务