让我们聊聊Erlang的节点互联(二)

由于一篇Blog写太长无法发表,这里面我们将继续分析下dist.c中的setnode_3这个函数的作用和net_kernel得到连接成功之后又进行了什么操作。

BIF_RETTYPE setnode_3(BIF_ALIST_3)
{
    BIF_RETTYPE ret;
    Uint flags;
    unsigned long version;
    Eterm ic, oc;
    Eterm *tp;
    DistEntry *dep = NULL;
    Port *pp = NULL;

    /* Prepare for success */
    ERTS_BIF_PREP_RET(ret, am_true);

    /*
     * Check and pick out arguments
     */

    if (!is_node_name_atom(BIF_ARG_1) ||
		is_not_internal_port(BIF_ARG_2) ||
		(erts_this_node->sysname == am_Noname)) {
		 goto badarg;
    }

    if (!is_tuple(BIF_ARG_3))
		 goto badarg;
    tp = tuple_val(BIF_ARG_3);
    if (*tp++ != make_arityval(4))
		 goto badarg;
    if (!is_small(*tp))
		 goto badarg;
    flags = unsigned_val(*tp++);
    if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0)
		 goto badarg;
    ic = *(++tp);
    oc = *(++tp);
    if (!is_atom(ic) || !is_atom(oc))
		 goto badarg;

    /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */
    if (!(DFLAG_EXTENDED_REFERENCES & flags)) {
		 erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
		 erts_dsprintf(dsbufp, "%T", BIF_P->common.id);
		 if (BIF_P->common.u.alive.reg)
			  erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name);
		 erts_dsprintf(dsbufp,
					   " attempted to enable connection to node %T "
					   "which is not able to handle extended references.\n",
					   BIF_ARG_1);
		 erts_send_error_to_logger(BIF_P->group_leader, dsbufp);
		 goto badarg;
    }

    /*
     * Arguments seem to be in order.
     */

    /* get dist_entry */
    dep = erts_find_or_insert_dist_entry(BIF_ARG_1);
    if (dep == erts_this_dist_entry)
		 goto badarg;
    else if (!dep)
		 goto system_limit; /* Should never happen!!! */
//通过Port的ID获取Port的结构
    pp = erts_id2port_sflgs(BIF_ARG_2,
			    BIF_P,
			    ERTS_PROC_LOCK_MAIN,
			    ERTS_PORT_SFLGS_INVALID_LOOKUP);
    erts_smp_de_rwlock(dep);

    if (!pp || (erts_atomic32_read_nob(&pp->state)
		& ERTS_PORT_SFLG_EXITING))
		 goto badarg;

    if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
		 goto badarg;
//如果当前cid和传入的Port的ID相同,且port的sist_entry和找到的dep相同
//那么直接进入结束阶段
    if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
		 goto done; /* Already set */

    if (dep->status & ERTS_DE_SFLG_EXITING) {
		 /* Suspend on dist entry waiting for the exit to finish */
		 ErtsProcList *plp = erts_proclist_create(BIF_P);
		 plp->next = NULL;
		 erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
		 erts_smp_mtx_lock(&dep->qlock);
		 erts_proclist_store_last(&dep->suspended, plp);
		 erts_smp_mtx_unlock(&dep->qlock);
		 goto yield;
    }

    ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING));

    if (pp->dist_entry || is_not_nil(dep->cid))
		 goto badarg;

    erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);

    /*
     * Dist-ports do not use the "busy port message queue" functionality, but
     * instead use "busy dist entry" functionality.
     */
    {
		 ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
		 erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
    }
//更新Port所关联的dist
    pp->dist_entry = dep;

    dep->version = version;
    dep->creation = 0;

    ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);

#if 1
    dep->send = (pp->drv_ptr->outputv
		 ? dist_port_commandv
		 : dist_port_command);
#else
    dep->send = dist_port_command;
#endif
    ASSERT(dep->send);

#ifdef DEBUG
    erts_smp_mtx_lock(&dep->qlock);
    ASSERT(dep->qsize == 0);
    erts_smp_mtx_unlock(&dep->qlock);
#endif
//更新dist_entry的cid
    erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);

    if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
		 create_cache(dep);

    erts_smp_de_rwunlock(dep);
    dep = NULL; /* inc of refc transferred to port (dist_entry field) */
//增加远程节点的数量
    inc_no_nodes();
//发送监控信息到调用的进程
    send_nodes_mon_msgs(BIF_P,
			am_nodeup,
			BIF_ARG_1,
			flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
			NIL);
 done:

    if (dep && dep != erts_this_dist_entry) {
		 erts_smp_de_rwunlock(dep);
		 erts_deref_dist_entry(dep);
    }

    if (pp)
		 erts_port_release(pp);

    return ret;

 yield:
    ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P,
			 BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
    goto done;

 badarg:
    ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
    goto done;

 system_limit:
    ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT);
    goto done;
}

setnode_3首先是将,得到的远程节点的名字放入dist的hash表中,并且将这个表项和连接到远程节点的Port进行了关联。

接着将和远程节点进行连接的Port标记为ERTS_PORT_SFLG_DISTRIBUTION,这样在一个Port出现Busy的时候我们能区分出是普通的Port还是远程连接的Port,在一个Port被销毁的时候,是否要调用dist.c中的erts_do_net_exits来告诉Erts远程节点已经掉线了。当这些都顺利的完成了之后,会在这个Erts内部广播nodeup这个消息,那么nodeup的接收者又是谁呢?nodeup的接收者是那些通过process_flag函数设置了monitor_nodes标记的进程,当然monitor_nodes这选项文档中是没有的。如果我们想监听nodeup事件,只能通过net_kernel:monitors函数来完成。

我们上次说到负责连接远程节点的进程会通知net_kernel进程,让我们接着看下net_kernel收到消息做了什么。

handle_info({SetupPid, {nodeup,Node,Address,Type,Immediate}},
	    State) ->
    case {Immediate, ets:lookup(sys_dist, Node)} of
	{true, [Conn]} when Conn#connection.state =:= pending,
			    Conn#connection.owner =:= SetupPid ->
	    ets:insert(sys_dist, Conn#connection{state = up,
						 address = Address,
						 waiting = [],
						 type = Type}),
	    SetupPid ! {self(), inserted},
	    reply_waiting(Node,Conn#connection.waiting, true),
	    {noreply, State};
	_ ->
	    SetupPid ! {self(), bad_request},
	    {noreply, State}
    end;

更新ets中的状态,同时发送消息给所有等待的进程,告诉他们远程连接已经成功了,你们可以继续进行后续操作了。

这个时候你会惊奇的发现,心跳在什么地方呢?不急,我们再回头看下net_kernel的init函数

init({Name, LongOrShortNames, TickT}) ->
    process_flag(trap_exit,true),
    case init_node(Name, LongOrShortNames) of
	{ok, Node, Listeners} ->
	    process_flag(priority, max),
	    Ticktime = to_integer(TickT),
	    Ticker = spawn_link(net_kernel, ticker, [self(), Ticktime]),
	    {ok, #state{name = Name,
			node = Node,
			type = LongOrShortNames,
			tick = #tick{ticker = Ticker, time = Ticktime},
			connecttime = connecttime(),
			connections =
			ets:new(sys_dist,[named_table,
					  protected,
					  {keypos, 2}]),
			listen = Listeners,
			allowed = [],
			verbose = 0
		       }};
	Error ->
	    {stop, Error}
    end.

net_kernel首先创建了一个ticker进程,它专门负责发心跳给net_kernel进程,然后net_kernel进程会遍历所有远程连接的进程,让他们进行一次心跳。当我们改变了一个节点的心跳时间的时候,我们会开启一个aux_ticker进程帮助我们进行过度,直到所有节点都知道了我们改变了心跳周期为止,当所有节点都知道我们改变了心跳周期,这个aux_ticker进程也就结束了它的历史性任务,安静的退出了。

那么是如何发现远程节点退出的,当然是TCP数据传输发生了故障Port被清理掉了,这个可参见dist.c中的erts_do_net_exits。

当这些都完成了,我们将继续回到global模块和global_group模块中去分析下nodeup的时候,两个节点是如何同步他们的全局名字的。

时间: 2024-10-07 09:45:11

让我们聊聊Erlang的节点互联(二)的相关文章

HDSF主要节点讲解(二)工作原理

HDFS(Hadoop Distributed File System )Hadoop分布式文件系统.是根据google发表的论文翻版的.论文为GFS(Google File System)Google 文件系统(中文,英文). HDFS有很多特点: ① 保存多个副本,且提供容错机制,副本丢失或宕机自动恢复.默认存3份. ② 运行在廉价的机器上. ③ 适合大数据的处理.多大?多小?HDFS默认会将文件分割成block,64M为1个block.然后将block按键值对存储在HDFS上,并将键值对的

IPFS系列 多节点搭建 二

IPFS系列 多节点搭建 二 上一篇介绍了IPFS的分布式点对点超媒体传输协议的背景和安装介绍,本篇将继续指导搭建多节点的IPFS私有网络 文件服务.如果没还没开始搭建IPFS节点的小伙伴, 请戳此链接查看上篇文章介绍 : https://www.cnblogs.com/sumingk/articles/9250757.html 按照上一篇文章介绍,在搭建一个IPFS节点,本系列教程使用了 两个Ubuntu 16.04 服务器 + 一台win10主机搭建的三个ipfs节点. 一.IPFS 配置文

ERLANG远端节点奔溃导致发消息进程堵消息问题探源

问题描述:在生产环境中出现一例性能问题,A和B两个结点运行在两台服务器上,A与B互联,A不断向B发送消息.B结点所在机器发生宕机,导致A结点中发送消息的进程赌消息. 追踪过程:通过erlang:process_info(erlang:whereis(Pid))发现current_function一直是gen:do_call/4.messages消息堆积到数十万级别. 源码分析:在代码中向远端发送消息的调用函数为erlang:send(Pid,Msg),Pid是属于远端结点的接收进程.对该函数做一

聊聊高并发(十二)分析java.util.concurrent.atomic.AtomicStampedReference源码来看如何解决CAS的ABA问题

在聊聊高并发(十一)实现几种自旋锁(五)中使用了java.util.concurrent.atomic.AtomicStampedReference原子变量指向工作队列的队尾,为何使用AtomicStampedReference原子变量而不是使用AtomicReference是因为这个实现中等待队列的同一个节点具备不同的状态,而同一个节点会多次进出工作队列,这就有可能出现出现ABA问题. 熟悉并发编程的同学应该知道CAS操作存在ABA问题.我们先看下CAS操作. CAS(Compare and

HDSF主要节点解说(二)工作原理

HDFS(Hadoop Distributed File System )Hadoop分布式文件系统. 是依据google发表的论文翻版的.论文为GFS(Google File System)Google 文件系统(中文.英文). HDFS有非常多特点: ① 保存多个副本,且提供容错机制,副本丢失或宕机自己主动恢复.默认存3份. ② 执行在便宜的机器上. ③ 适合大数据的处理. 多大?多小?HDFS默认会将文件切割成block,64M为1个block.然后将block按键值对存储在HDFS上,并

erlang与设计模式(二)——工厂、抽象工厂、建造者

Russell-X-Shanso 工厂模式.抽象工厂模式.建造者模式,均为创建类模式,其共有的设计思路主要在于根据情况理清并封装创建流程(创建进程.创建gen_server.组建record或maps等复合数据结构等).解耦.定向扩展等等. (注:由于这三个创建类模型解决的问题近似,面向对象语言中的解决方式也较为近似,因此我们放在一起来讨论:) 3.工厂模式( Factory Method ) (1)意图:定义一个用于创建对象的接口,让子类决定实例化哪一个类,工厂方法使一个类的实例化延迟到其子类

erlang四大behaviour之二-gen_fsm

来源:http://www.cnblogs.com/puputu/articles/1701012.html 今天介绍erlang的一个非常重要的behaviour,就是gen_fsm-有限状态机,有限状态机的作用非常之多,比如文本解析,模式匹配.游戏逻辑等等方面的处理都是它的强项,所以这个behaviour非常之重要 1. 有限状态机 有限状态机可以用下面这个公式来表达 State(S) x Event(E) -> Actions(A), State(S') 表示的就是在S状态时如果有事件E发

让我们聊聊Erlang的Trap机制

在分析erlang:send的bif时候发现了一个BIF_TRAP这一系列宏.参考了Erlang自身的一些描述,这些宏是为了实现一种叫做Trap的机制.Trap机制中将Erlang的代码直接引入了Erts中,可以让C函数直接"使用"这些Erlang的函数. 先让我们思考下为什么Erlang为什么要实现Trap机制?让我先拿最近比较火的Go来说下,Go本身是编译型的和Erlang这种OPCode解释型的性质是不同的.Go的Runtime中很多函数本身也是用C语言实现的,为了胶和Go代码和

让我们聊聊Erlang的垃圾回收

原Blog地址,http://www.linkedin.com/pulse/garbage-collection-erlang-tianpo-gao?trk=prof-post. 本文将简单的描述Erlang的垃圾回收,并不是深入的探讨. 在Erlang运行时环境中,Erlang进程采用复制分代回收的方式.分代垃圾回收将内存对象划分为不同的代.在Erlang运行时环境中,有两个代,年轻代和老年代.在Erlang的运行时环境中,内存回收主要有两种,一种叫做部分垃圾回收,另一种叫做全量垃圾回收. 在