Learn Riak Core Step By Step
riak core 是 riak的主要组成部分,主要负责分布式的部分,虽然官方有自己的存储后端,但是我们也可以使用其他的后端存储。
Partitioning
& Distributing Work
riak core 在每个节点上都是使用master/worker配置,这样作为一个工作单元来执行,riak core的worker为vnode worker, 在每个节点上由riak_core_sup生成,vnode worker对应的模块为riak_core_vnode
。
如:
节点1:
Node: ‘mfmn2@127.0.0.1‘, Process: <0.80.0> [{registered_name,riak_core_vnode_sup}, {current_function,{gen_server,loop,6}}, {initial_call,{proc_lib,init_p,5}}, {status,waiting}, {message_queue_len,0}, {messages,[]}, {links,[<0.148.0>,<0.152.0>,<0.154.0>,<0.155.0>,<0.153.0>,<0.150.0>, <0.151.0>,<0.149.0>,<0.140.0>,<0.144.0>,<0.146.0>,<0.147.0>, <0.145.0>,<0.142.0>,<0.143.0>,<0.141.0>,<0.136.0>,<0.138.0>, <0.139.0>,<0.137.0>,<0.77.0>,<0.135.0>]}, {dictionary,[{‘$ancestors‘,[riak_core_sup,<0.76.0>]}, {‘$initial_call‘,{supervisor_pre_r14b04,init,1}}]}, {trap_exit,true}, {error_handler,error_handler}, {priority,normal}, {group_leader,<0.75.0>}, {total_heap_size,3571}, {heap_size,2584}, {stack_size,9}, {reductions,4359}, {garbage_collection,[{min_bin_vheap_size,46368}, {min_heap_size,233}, {fullsweep_after,10}, {minor_gcs,6}]}, {suspending,[]}]
节点2:
Node: ‘mfmn1@127.0.0.1‘, Process: <0.80.0> [{registered_name,riak_core_vnode_sup}, {current_function,{gen_server,loop,6}}, {initial_call,{proc_lib,init_p,5}}, {status,waiting}, {message_queue_len,0}, {messages,[]}, {links,[<0.183.0>,<0.274.0>,<0.337.0>,<0.375.0>,<0.387.0>,<0.371.0>, <0.310.0>,<0.326.0>,<0.226.0>,<0.262.0>,<0.210.0>,<0.218.0>, <0.153.0>,<0.165.0>,<0.177.0>,<0.171.0>,<0.159.0>,<0.135.0>, <0.147.0>,<0.141.0>,<0.123.0>,<0.129.0>,<0.77.0>]}, {dictionary,[{‘$ancestors‘,[riak_core_sup,<0.76.0>]}, {‘$initial_call‘,{supervisor_pre_r14b04,init,1}}]}, {trap_exit,true}, {error_handler,error_handler}, {priority,normal}, {group_leader,<0.75.0>}, {total_heap_size,1974}, {heap_size,987}, {stack_size,9}, {reductions,8777}, {garbage_collection,[{min_bin_vheap_size,46368}, {min_heap_size,233}, {fullsweep_after,10}, {minor_gcs,2}]}, {suspending,[]}]
节点3:
Node: ‘mfmn3@127.0.0.1‘, Process: <0.80.0> [{registered_name,riak_core_vnode_sup}, {current_function,{gen_server,loop,6}}, {initial_call,{proc_lib,init_p,5}}, {status,waiting}, {message_queue_len,0}, {messages,[]}, {links,[<0.152.0>,<0.167.0>,<0.179.0>,<0.185.0>,<0.182.0>,<0.173.0>, <0.176.0>,<0.170.0>,<0.161.0>,<0.164.0>,<0.158.0>,<0.155.0>, <0.137.0>,<0.143.0>,<0.149.0>,<0.146.0>,<0.140.0>,<0.128.0>, <0.134.0>,<0.131.0>,<0.125.0>,<0.77.0>]}, {dictionary,[{‘$ancestors‘,[riak_core_sup,<0.76.0>]}, {‘$initial_call‘,{supervisor_pre_r14b04,init,1}}]}, {trap_exit,true}, {error_handler,error_handler}, {priority,normal}, {group_leader,<0.75.0>}, {total_heap_size,3194}, {heap_size,2584}, {stack_size,9}, {reductions,4507}, {garbage_collection,[{min_bin_vheap_size,46368}, {min_heap_size,233}, {fullsweep_after,10}, {minor_gcs,10}]}, {suspending,[]}]
3个节点的vnode worker 加起来刚好22 + 23 + 22 - 3 = 64
(mfmn3@127.0.0.1)3> supervisor:count_children(riak_core_vnode_sup). [{specs,1},{active,21},{supervisors,0},{workers,21}] (mfmn3@127.0.0.1)4>
减去3的原因。
从上面那张图可以看出, riak_core_vnode_master负责与vnode的通信,这些vnode都是fsm,如:
获取当前节点的所有vnode:
(mfmn3@127.0.0.1)8> riak_core_vnode_master:all_nodes(mfmn_vnode). [<0.173.0>,<0.179.0>,<0.185.0>,<0.143.0>,<0.155.0>, <0.164.0>,<0.182.0>,<0.170.0>,<0.161.0>,<0.140.0>,<0.146.0>, <0.152.0>,<0.158.0>,<0.176.0>,<0.167.0>,<0.149.0>,<0.137.0>, <0.125.0>,<0.128.0>,<0.131.0>,<0.134.0>] (mfmn3@127.0.0.1)9>
这也再次证明了该节点的vnode个数为21.
向某个vnode发出Ping请求
这个例子是try-try-try
的例子:
前面会携带一个Pid,而这个Pid是根据hash在ets表中索引出来的,这个Pid是vnode 的Pid,根据这个Pid可以索引到具体的vnode,最后Mod:handle_command是用户的回调函数。
如果master没有找到对应的vnode,那么他会新建一个vnode:
get_vnode(Idx, State=#state{vnode_mod=Mod}) -> case idx2vnode(Idx, State) of no_match -> {ok, Pid} = riak_core_vnode_sup:start_vnode(Mod, Idx), MonRef = erlang:monitor(process, Pid), add_vnode_rec(#idxrec{idx=Idx,pid=Pid,monref=MonRef}, State), Pid; X -> X end.
因为vnode下面是存储后端,所以只要定位到vnode就可以访问后端存储。
下面是try try try
对应的代码:
start(_StartType, _StartArgs) -> case mfmn_sup:start_link() of {ok, Pid} -> ok = riak_core:register([{vnode_module, mfmn_vnode}]), ok = riak_core_ring_events:add_guarded_handler(mfmn_ring_event_handler, []), ok = riak_core_node_watcher_events:add_guarded_handler(mfmn_node_event_handler, []), ok = riak_core_node_watcher:service_up(mfmn, self()), {ok, Pid}; {error, Reason} -> {error, Reason} end. stop(_State) -> ok.
启动master, 注册vnode, 铁添加mfmn_ring_event_handler,mfmn_node_event_handler...
下面就讲解一下try try try
的例子,高手勿喷!!!
第二个例子
Riak Core, The vnode
这是一个Real Time Stastics
,简称RTS
--实时统计应用。
这个系统要解决的两个问题是解析记录和分发记录,这交给entry vnode
处理;第二个时接收实时统计,交给stat
处理。
vnode
- ###什么是vnode
- vnode是一个虚拟节点,和物理节点不一样
- 一个虚拟节点对应一个
erlang
process - 一个虚拟节点是一个behaviour - gen_fsm behaviour
- 一个虚拟节点处理进来的请求
- 一个虚拟节点可能会存储数据,这些数据可以被以后检索到
- 很多虚拟节点会运行在同一个物理节点上
- 每个虚拟机都有一个主虚拟节点,他主要用于和它的所有存活的节点保持联系
如你所见,虚拟节点要处理很多东西,不过Basho已经帮我们处理掉,我们只要实现所提供的
vnode
即可。用户只要理解输入和输出,然后定义所需的回调函数即可。
behaviour -
生命周期
init/1和termiante/2回调函数是虚拟节点的生命边缘,既是起止和终止位置,当一个连接到vnode的进程崩溃,那么handle_exit/3将会被调用。
- ###init([Index]) -> Result
?Index :: int() >= 0 Result :: {ok, State} State :: term()
rts注册了3种vnode -- rts_vnode
、rts_entry_vnode
、 rts_stat_vnode
、
对应的master vnode 分别为:rts_vnode_master
、rts_entry_vnode_master
、rts_stat_vnode_master
。
每种vnode负责不同服务.
rts提供了http的接口方便用户录入数据,其中rts_wm_entry:process_post的数据结构如下
Client: "progski" Entry: "0.0.0.0 - - [21/Mar/2011:18:18:19 +0000] \"GET /blog/2011/aol_meet_riak.html HTTP/1.1\" 200 5865 \"http://www.google.com/\" \"Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.648.151 Safari/534.16\""
Client是?gunzip -c progski.access.log.gz | ./replay
传递过来的.
progski
然后,根据rts_entry来索引可用vnode,
PrefList = riak_core_apl:get_apl(DocIdx, 1, rts_entry), [IdxNode] = PrefList, rts_entry_vnode:entry(IdxNode, Client, Entry).
发送命令:
riak_core_vnode_master:command(IdxNode, {entry, Client, Entry}, ?MASTER).
其中?MASTER为rts_entry_vnode_master
.
如图:
最后交给stat vnode 进行统计。