由于一篇Blog写太长无法发表,这里面我们将继续分析下dist.c中的setnode_3这个函数的作用和net_kernel得到连接成功之后又进行了什么操作。
BIF_RETTYPE setnode_3(BIF_ALIST_3) { BIF_RETTYPE ret; Uint flags; unsigned long version; Eterm ic, oc; Eterm *tp; DistEntry *dep = NULL; Port *pp = NULL; /* Prepare for success */ ERTS_BIF_PREP_RET(ret, am_true); /* * Check and pick out arguments */ if (!is_node_name_atom(BIF_ARG_1) || is_not_internal_port(BIF_ARG_2) || (erts_this_node->sysname == am_Noname)) { goto badarg; } if (!is_tuple(BIF_ARG_3)) goto badarg; tp = tuple_val(BIF_ARG_3); if (*tp++ != make_arityval(4)) goto badarg; if (!is_small(*tp)) goto badarg; flags = unsigned_val(*tp++); if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0) goto badarg; ic = *(++tp); oc = *(++tp); if (!is_atom(ic) || !is_atom(oc)) goto badarg; /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */ if (!(DFLAG_EXTENDED_REFERENCES & flags)) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T", BIF_P->common.id); if (BIF_P->common.u.alive.reg) erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name); erts_dsprintf(dsbufp, " attempted to enable connection to node %T " "which is not able to handle extended references.\n", BIF_ARG_1); erts_send_error_to_logger(BIF_P->group_leader, dsbufp); goto badarg; } /* * Arguments seem to be in order. */ /* get dist_entry */ dep = erts_find_or_insert_dist_entry(BIF_ARG_1); if (dep == erts_this_dist_entry) goto badarg; else if (!dep) goto system_limit; /* Should never happen!!! */ //通过Port的ID获取Port的结构 pp = erts_id2port_sflgs(BIF_ARG_2, BIF_P, ERTS_PROC_LOCK_MAIN, ERTS_PORT_SFLGS_INVALID_LOOKUP); erts_smp_de_rwlock(dep); if (!pp || (erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_EXITING)) goto badarg; if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) goto badarg; //如果当前cid和传入的Port的ID相同,且port的sist_entry和找到的dep相同 //那么直接进入结束阶段 if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) goto done; /* Already set */ if (dep->status & ERTS_DE_SFLG_EXITING) { /* Suspend on dist entry waiting for the exit to finish */ ErtsProcList *plp = erts_proclist_create(BIF_P); plp->next = NULL; erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); erts_smp_mtx_lock(&dep->qlock); erts_proclist_store_last(&dep->suspended, plp); erts_smp_mtx_unlock(&dep->qlock); goto yield; } ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); if (pp->dist_entry || is_not_nil(dep->cid)) goto badarg; erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); /* * Dist-ports do not use the "busy port message queue" functionality, but * instead use "busy dist entry" functionality. */ { ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); } //更新Port所关联的dist pp->dist_entry = dep; dep->version = version; dep->creation = 0; ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); #if 1 dep->send = (pp->drv_ptr->outputv ? dist_port_commandv : dist_port_command); #else dep->send = dist_port_command; #endif ASSERT(dep->send); #ifdef DEBUG erts_smp_mtx_lock(&dep->qlock); ASSERT(dep->qsize == 0); erts_smp_mtx_unlock(&dep->qlock); #endif //更新dist_entry的cid erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); if (flags & DFLAG_DIST_HDR_ATOM_CACHE) create_cache(dep); erts_smp_de_rwunlock(dep); dep = NULL; /* inc of refc transferred to port (dist_entry field) */ //增加远程节点的数量 inc_no_nodes(); //发送监控信息到调用的进程 send_nodes_mon_msgs(BIF_P, am_nodeup, BIF_ARG_1, flags & DFLAG_PUBLISHED ? am_visible : am_hidden, NIL); done: if (dep && dep != erts_this_dist_entry) { erts_smp_de_rwunlock(dep); erts_deref_dist_entry(dep); } if (pp) erts_port_release(pp); return ret; yield: ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); goto done; badarg: ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG); goto done; system_limit: ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT); goto done; }
setnode_3首先是将,得到的远程节点的名字放入dist的hash表中,并且将这个表项和连接到远程节点的Port进行了关联。
接着将和远程节点进行连接的Port标记为ERTS_PORT_SFLG_DISTRIBUTION,这样在一个Port出现Busy的时候我们能区分出是普通的Port还是远程连接的Port,在一个Port被销毁的时候,是否要调用dist.c中的erts_do_net_exits来告诉Erts远程节点已经掉线了。当这些都顺利的完成了之后,会在这个Erts内部广播nodeup这个消息,那么nodeup的接收者又是谁呢?nodeup的接收者是那些通过process_flag函数设置了monitor_nodes标记的进程,当然monitor_nodes这选项文档中是没有的。如果我们想监听nodeup事件,只能通过net_kernel:monitors函数来完成。
我们上次说到负责连接远程节点的进程会通知net_kernel进程,让我们接着看下net_kernel收到消息做了什么。
handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}}, State) -> case {Immediate, ets:lookup(sys_dist, Node)} of {true, [Conn]} when Conn#connection.state =:= pending, Conn#connection.owner =:= SetupPid -> ets:insert(sys_dist, Conn#connection{state = up, address = Address, waiting = [], type = Type}), SetupPid ! {self(), inserted}, reply_waiting(Node,Conn#connection.waiting, true), {noreply, State}; _ -> SetupPid ! {self(), bad_request}, {noreply, State} end;
更新ets中的状态,同时发送消息给所有等待的进程,告诉他们远程连接已经成功了,你们可以继续进行后续操作了。
这个时候你会惊奇的发现,心跳在什么地方呢?不急,我们再回头看下net_kernel的init函数
init({Name, LongOrShortNames, TickT}) -> process_flag(trap_exit,true), case init_node(Name, LongOrShortNames) of {ok, Node, Listeners} -> process_flag(priority, max), Ticktime = to_integer(TickT), Ticker = spawn_link(net_kernel, ticker, [self(), Ticktime]), {ok, #state{name = Name, node = Node, type = LongOrShortNames, tick = #tick{ticker = Ticker, time = Ticktime}, connecttime = connecttime(), connections = ets:new(sys_dist,[named_table, protected, {keypos, 2}]), listen = Listeners, allowed = [], verbose = 0 }}; Error -> {stop, Error} end.
net_kernel首先创建了一个ticker进程,它专门负责发心跳给net_kernel进程,然后net_kernel进程会遍历所有远程连接的进程,让他们进行一次心跳。当我们改变了一个节点的心跳时间的时候,我们会开启一个aux_ticker进程帮助我们进行过度,直到所有节点都知道了我们改变了心跳周期为止,当所有节点都知道我们改变了心跳周期,这个aux_ticker进程也就结束了它的历史性任务,安静的退出了。
那么是如何发现远程节点退出的,当然是TCP数据传输发生了故障Port被清理掉了,这个可参见dist.c中的erts_do_net_exits。
当这些都完成了,我们将继续回到global模块和global_group模块中去分析下nodeup的时候,两个节点是如何同步他们的全局名字的。