skynet源码分析之cluster集群模式

比起slave/harbor集群模式,skynet提供了用的更为广泛的cluster集群模式,参考官方wiki https://github.com/cloudwu/skynet/wiki/Cluster。cluster模式利用socketchannel库(http://www.cnblogs.com/RainRill/p/8892648.html) 与其他skynet进程进行交互,每个请求包带一个唯一的session值,对端回应包附带session值。cluster集群模式tcp通道是单向的,即skynet进程1(集群中的节点)通过tcp通道向进程2发送请求包,进程2回应包也走这一通道。但是,进程2向进程1发送请求包及进程1的回应包则是另一条tcp通道。

每个集群节点都有一份完整的cluster配置,会启动一个clusterd的服务,调用loadconfig加载配置。

第11-19行,加载配置文件(也可以手动传入配置table tmp)

第20-24行,保存节点名-地址映射关系

 1 -- service/clusterd.lua
 2  skynet.start(function()
 3      loadconfig()
 4      skynet.dispatch("lua", function(session , source, cmd, ...)
 5          local f = assert(command[cmd])
 6          f(source, ...)
 7      end)
 8  end)
 9
10  local function loadconfig(tmp)
11      if tmp == nil then
12          tmp = {}
13          if config_name then
14              local f = assert(io.open(config_name))
15              local source = f:read "*a"
16              f:close()
17              assert(load(source, "@"..config_name, "t", tmp))()
18          end
19      end
20      for name,address in pairs(tmp) do
21          ...
22          node_address[name] = address
23          ...
24      end
25  end

以skynet进程1的A服务向skynet进程2的B服务发送请求包及回应为例,说明cluster的工作流程:

对于进程2,配置了 db = "127.0.0.1:2528",启动后调用cluster.open "db"。

第4行,给clusterd服务发送消息。

第12-15行,启动一个gate服务,然后通知gate服务监听配置的地址。gate调用socket.listen监听外部socket连接。

第20行,watchdog就是clusterd服务的地址。

 1 -- lualib/skynet/cluster.lua
 2 function cluster.open(port)
 3     if type(port) == "string" then
 4         skynet.call(clusterd, "lua", "listen", port)
 5     else
 6         skynet.call(clusterd, "lua", "listen", "0.0.0.0", port)
 7     end
 8 end
 9
10 -- service/clusterd.lua
11 function command.listen(source, addr, port)
12     local gate = skynet.newservice("gate")
13     ...
14     skynet.call(gate, "lua", "open", { address = addr, port = port })
15     skynet.ret(skynet.pack(nil))
16 end
17
18 -- servcice/gate.lua
19 function handler.open(source, conf)
20     watchdog = conf.watchdog or source
21 end

对于进程1,调用cluster.call(db, "A", ...),给节点名为db(进程2)的A服务发送请求,最终调用到send_request

第9行,请求包带上唯一的sesssion值

第11行,按cluster定义的模式打包数据

第15行,获取socketchannel对象,如果第一次请求,会先创建socketchannel对象,并建立tcp连接

第16行,调用socketchannel的request接口发送请求包

 1 -- lualib/skynet/cluster.lua
 2 function cluster.call(node, address, ...)
 3     -- skynet.pack(...) will free by cluster.core.packrequest
 4     return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...))
 5 end
 6
 7 -- service/clusterd.lua
 8 local function send_request(source, node, addr, msg, sz)
 9     local session = node_session[node] or 1
10     -- msg is a local pointer, cluster.packrequest will free it
11     local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
12     node_session[node] = new_session
13
14     -- node_channel[node] may yield or throw error
15     local c = node_channel[node]
16
17     return c:request(request, session, padding)
18 end
19
20 function command.req(...)
21     local ok, msg = pcall(send_request, ...)
22     if ok then
23         ...
24         skynet.ret(msg)
25     end
26 end

创建socket对象时提供了response参数(第6行),所以是采用带session值的请求-回应模式。

第11行,协程阻塞在socket.read上,此时暂停co,等待回应包

 1 -- service/clusterd
 2     local host, port = string.match(address, "([^:]+):(.*)$")
 3     c = sc.channel {
 4         host = host,
 5         port = tonumber(port),
 6         response = read_response,
 7         nodelay = true,
 8     }
 9
10 local function read_response(sock)
11     local sz = socket.header(sock:read(2))
12     local msg = sock:read(sz)
13     return cluster.unpackresponse(msg)      -- session, ok, data, padding
14 end

对于进程2,gate服务收到进程1的tcp连接请求后,

第8行,给clusterd服务发送消息

第17-18行,clusterd收到后,新建一个clusteragent服务。注:clusteragent是skynet最近新加的。参考https://blog.codingnow.com/2018/04/skynet_cluster.html#more

第24-28行,clusteragent服务专门处理进程1的cluster模式的请求。每个cluster节点连接都新建一个cluseteragent服务去处理请求包。

 1 -- service/gate.lua
 2 function handler.connect(fd, addr)
 3     local c = {
 4         fd = fd,
 5         ip = addr,
 6     }
 7     connection[fd] = c
 8     skynet.send(watchdog, "lua", "socket", "open", fd, addr)
 9 end
10
11 -- service/clusterd.lua
12 function command.socket(source, subcmd, fd, msg)
13     if subcmd == "open" then
14         skynet.error(string.format("socket accept from %s", msg))
15         -- new cluster agent
16         cluster_agent[fd] = false
17         local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
18         cluster_agent[fd] = agent
19         ...
20 end
21
22 -- service/clusterdagent.lua
23 skynet.start(function()
24     skynet.register_protocol {
25         name = "client",
26         id = skynet.PTYPE_CLIENT,
27         unpack = cluster.unpackrequest,
28         dispatch = dispatch_request,
29     }
30     ...
31 end

当gate服务收到请求包后,转发给对应的clusteragent服务(第7行),

 1 -- service/gate.lua
 2 function handler.message(fd, msg, sz)
 3     -- recv a package, forward it
 4     local c = connection[fd]
 5     local agent = c.agent
 6     if agent then
 7         skynet.redirect(agent, c.client, "client", fd, msg, sz)
 8     else
 9         skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))
10     end
11 end

clusteragent服务消息分发函数dispatch_request,

第7-9行,如果是push请求,不需要回应,send给目的服务(B服务)后直接返回即可

第11行,如果是call请求,需要回应,给目的服务(B服务)发送消息,然后等待B服务处理完返回。

第14-21行,将消息打包成回应包,通过tcp返回给请求端(skynet进程1)。

进程1收到回应后,重启协程,返回结果给请求服务(A服务)。这就是cluster模式的调用流程。

 1 -- service/clusteragent.lua
 2 local function dispatch_request(_,_,addr, session, msg, sz, padding, is_push)
 3     if cluster.isname(addr) then
 4         addr = register_name[addr]
 5     end
 6     if addr then
 7         if is_push then
 8             skynet.rawsend(addr, "lua", msg, sz)
 9             return  -- no response
10         else
11             ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)
12         end
13     if ok then
14         response = cluster.packresponse(session, true, msg, sz)
15         if type(response) == "table" then
16             for _, v in ipairs(response) do
17                 socket.lwrite(fd, v)
18             end
19         else
20             socket.write(fd, response)
21         end
22      ...
23 end

原文地址:https://www.cnblogs.com/RainRill/p/8900334.html

时间: 2024-10-24 10:06:15

skynet源码分析之cluster集群模式的相关文章

skynet源码分析:服务

skynet是为多人在线游戏打造的轻量级服务端框架,使用c+lua实现.使用这套框架的一个好处就是,基本只需要lua,很少用到c做开发,一定程度上提高了开发效率. skynet的例子是怎么调用的 服务器: simpledb.lua: skynet.register "SIMPLEDB" 向skynet里注册一个服务 agent.lua: skynet.call("SIMPLEDB", "text", text) 调用相应的服务 main.lua:

Redis源码解析:25集群(一)握手、心跳消息以及下线检测

Redis集群是Redis提供的分布式数据库方案,通过分片来进行数据共享,并提供复制和故障转移功能. 一:初始化 1:数据结构 在源码中,通过server.cluster记录整个集群当前的状态,比如集群中的所有节点:集群目前的状态,比如是上线还是下线:集群当前的纪元等等.该属性是一个clusterState类型的结构体.该结构体的定义如下: typedef struct clusterState { clusterNode *myself; /* This node */ ... int sta

skynet源码分析1:开篇明义

skynet是云风基于actor模型实现的一个服务器框架,核心七千多行c代码,并提供了一个lua binding.写得比较简明,用起来比较爽快,很合我的胃口,再加之决定在公司最近的一个项目上skynet,所以就决定精读一遍源码,将所思所想所得记录于此,以便用起来心安理得. skynet的实现基于如下几条actor行为: 能创建,销毁其它actor,actor间为平级关系. 能发送消息给其它actor,能接收其它actor的消息. actor间的交互只能通过消息传递. 能处理自己的消息. 在sky

redis源码分析(六)--cluster消息

Redis集群消息 作为支持集群模式的缓存系统,Redis集群中的各个节点需要定期地进行通信,以维持各个节点关于其它节点信息的实时性与一致性.如前一篇文章介绍的,Redis在专用的端口监听集群其它节点的连接,将集群内部的的通信与客户端的通信区分开来,任意两个节点之间建立了两个tcp连接,形成一条全双工的通道.这篇文章将从集群消息方面进行介绍,主要介绍消息的格式.种类与不同场景下的消息处理. 1. 消息格式 首先,Redis集群通信使用的消息可分为消息头与消息体两部分:消息头包含了发送消息的节点的

mqtt协议-broker之moqutte源码研究六之集群

moquette的集群功能是通过Hazelcast来实现的,对Hazelcast不了解的同学可以自行Google以下.在讲解moquette的集群功能之前需要讲解一下moquette的拦截器,因为moquette对Hazelcast的集成本身就是通过拦截器来实现的. 一.拦截器io.moquette.spi.impl.ProtocolProcessor类里面有一个BrokerInterceptor类,这个类就是broker拦截器,这个对象,在processConnect,processPubAc

Redis cluster集群模式的原理

redis cluster redis cluster是Redis的分布式解决方案,在3.0版本推出后有效地解决了redis分布式方面的需求 自动将数据进行分片,每个master上放一部分数据 提供内置的高可用支持,部分master不可用时,还是可以继续工作的 支撑N个redis master node,每个master node都可以挂载多个slave node 高可用,因为每个master都有salve节点,那么如果mater挂掉,redis cluster这套机制,就会自动将某个slave

Java源码分析:深入探讨Iterator模式

作者:兄弟连 java.util包中包含了一系列重要的集合类.本文将从分析源码入手,深入研究一个集合类的内部结构,以及遍历集合的迭代模式的源码实现内幕. 下面我们先简单讨论一个根接口Collection,然后分析一个抽象类AbstractList和它的对应Iterator接口,并仔细研究迭代子模式的实现原理. 本文讨论的源代码版本是JDK 1.4.2,因为JDK 1.5在java.util中使用了很多泛型代码,为了简化问题,所以我们还是讨论1.4版本的代码. 集合类的根接口Collection

skynet源码分析5:lua绑定之地基

前面四篇已经涵盖了skynet的c层核心,剩下的timer,socket模块本身和actor模型没什么关系,且比较独立,最后再看吧.光用skynet的c接口,是很难在这上面写业务逻辑的,所以要找一种更爽快的方式来使用.官方推荐的是lua,利用lua的协程对skynet的消息分发做了封装,使得actor之间的异步消息通信有同步一样的操作感,并且做了一些的扩展模块来方便使用.lua简洁实用的风格我个人也很钟意. 要想做一个lua binding来使用,要有两个必要条件: 根据skynet的模块契约实

skynet源码分析3:消息调度

消息调度在框架中分为两个层次,一个c层的分配,一个是lua层的分发.本文阐述的是c层,从两个方面来说: 工作线程的控制 信箱的调度 与调度相关的代码实现在/skynet-src/skynet_mq.c,/skynet-src/skynet_start.c,/skynet-src/skynet_server.c三个文件中,整体上是一个m:n的调度器. 工作线程的控制 框架运行后,会启动固定的线程来轮流调度sc(skynet_context),线程数由配置文件中的thread字段定义,默认是4个.那