zmq笔记四: tcp的connect操作

int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0

本文主要是分析代码,方便自己日后查阅.

=========================================

本文以REQ/REP为例,分析一下tcp的connect的实现过程.

void *context = zmq_ctx_new();
void *requester = zmq_socket(context, ZMQ_REQ);
zmq_connect(requester, "tcp://localhost:6666"); //如果是进程间通信,改为"ipc://xxx"

int zmq_connect (void *s_, const char *addr_)
{
    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
        errno = ENOTSOCK;
        return -1;
    }
    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
    int result = s->connect (addr_);
    return result;
}

socket_base_t:connect(addr_)一个函数就能完成ipc,tcp,udp,inproc等等常用的进程通信的连接工作,本文只针对tcp连接进行分析.

进入connect()函数后,首先对连接类型(tcp)和目标地址(localhost:6666)进行合法性解析,不同的连接类型,目标地址有不同的格式.tcp连接必须要有ip和端口.

int zmq::socket_base_t::connect (const char *addr_)
{
    ENTER_MUTEX ();

    //  Process pending commands, if any.
    int rc = process_commands (0, false); //不忘先处理一下命令队列..
    if (unlikely (rc != 0)) {
        EXIT_MUTEX ();
        return -1;
    }

    //  Parse addr_ string.
    std::string protocol;
    std::string address;
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
        EXIT_MUTEX ();
        return -1;
    }
    ....
    bool is_single_connect = (options.type == ZMQ_DEALER ||
                              options.type == ZMQ_SUB ||
                              options.type == ZMQ_REQ); //这三种类型的socket只能有一个连接
    if (unlikely (is_single_connect)) {
        const endpoints_t::iterator it = endpoints.find (addr_);
        if (it != endpoints.end ()) {
            EXIT_MUTEX ();
            return 0;
        }
    }

    //  Choose the I/O thread to run the session in.
    io_thread_t *io_thread = choose_io_thread (options.affinity); //查找负载最小的I/O线程
    if (!io_thread) {
        errno = EMTHREAD;
        EXIT_MUTEX ();
        return -1;
    }

    //  Create session. 创建session对象,以后发送给它的命令消息就会放进这个I/O线程的邮箱里
    session_base_t *session = session_base_t::create (io_thread, true, this,
        options, paddr);
    errno_assert (session);

    //  PGM does not support subscription forwarding; ask for all data to be
    //  sent to this pipe. (same for NORM, currently?)
    bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp";
    pipe_t *newpipe = NULL;

    if (options.immediate != 1 || subscribe_to_all) {
        //  Create a bi-directional pipe.
        object_t *parents [2] = {this, session};
        pipe_t *new_pipes [2] = {NULL, NULL};

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.sndhwm,
            conflate? -1 : options.rcvhwm};
        bool conflates [2] = {conflate, conflate};
        rc = pipepair (parents, new_pipes, hwms, conflates); //创建一对双向"管道",一个pipe_t对象有两个ypipe_t,分别是作为inpipe/outpipe队列,其中inpipe->read,outpipe->write
        errno_assert (rc == 0);

        //  Attach local end of the pipe to the socket object.
        attach_pipe (new_pipes [0], subscribe_to_all); //第一个pipe_t放进socket对象的pipes集合里
        newpipe = new_pipes [0];

        //  Attach remote end of the pipe to the session object later on.
        session->attach_pipe (new_pipes [1]); //第二个pipe_t放到session的pipes集合里,注意,new_pipes的两个pipe_t在pipepairs()生成时已经互为peer
    }

    //  Save last endpoint URI
    paddr->to_string (last_endpoint);

    add_endpoint (addr_, (own_t *) session, newpipe); //开始进行connect
    EXIT_MUTEX ();
    return 0;
}

首先来看一下attach_pipe()做了什么: (socket和session各有不同的attach_pipe()函数,但功能差不多)

void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
    //  First, register the pipe so that we can terminate it later on.
    pipe_->set_event_sink (this); //只会被设置一次,当这个pipe有消息要处理时,实际上是由这个this对象来处理的.
    pipes.push_back (pipe_);

    //  Let the derived socket type know about new pipe.
    xattach_pipe (pipe_, subscribe_to_all_); //还会加入到socket的fair-queue和load-balance-queue,这个先不在这里分析

    //  If the socket is already being closed, ask any new pipes to terminate
    //  straight away.
    if (is_terminating ()) {
        register_term_acks (1);
        pipe_->terminate (false);
    }
}

再看下add_endpoint (addr_, (own_t *) session, newpipe);

void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
{
    //  Activate the session. Make it a child of this socket.
    launch_child (endpoint_); //激活session,把它加入到socket的owned集合
    endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe))); //当前endpoints表示socket包含所有对端的信息,endpoints是以目标地址为key的multimap
}
void zmq::own_t::launch_child (own_t *object_)
{
    //  Specify the owner of the object.
    object_->set_owner (this);

    //  Plug the object into the I/O thread.
    send_plug (object_); //把session对象plug到socket,给socket一个plug类型的命令消息,目标对象是session

    //  Take ownership of the object.
    send_own (this, object_);
}
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
{
    if (inc_seqnum_)
        destination_->inc_seqnum ();

    command_t cmd;
    cmd.destination = destination_;
    cmd.type = command_t::plug;
    send_command (cmd);
}

笔记三简略介绍了一下mailbox和命令消息队列,在这里看一下实际的消息发送过程.实际上继承自object_t的类,object_t类实现了各种send_xxx函数,封装好了发送特定类型的命令消息.最基本的命令消息,必须包括destination和type.消息发送的调用过程如下:

void zmq::object_t::send_command (command_t &cmd_)
{
    ctx->send_command (cmd_.destination->get_tid (), cmd_); //请注意这里的tid
}
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
    slots [tid_]->send (command_);
}
void zmq::mailbox_t::send (const command_t &cmd_)
{
    sync.lock ();
    cpipe.write (cmd_, false);
    const bool ok = cpipe.flush ();
    sync.unlock ();
    if (!ok)
        signaler.send ();
}
    inline void write (const T &value_, bool incomplete_) //ypipe_t
    {
        //  Place the value to the queue, add new terminator element.
        queue.back () = value_;
        queue.push ();

        //  Move the "flush up to here" poiter.
        if (!incomplete_)
            f = &queue.back ();
    }

session的get_tid()返回的其实就是创建session时选择的I/O thread的tid. 回顾一下笔记一的create_socket, context创建的I/O线程都有一个tid, 并且tid是作为context->slot邮箱管理数组的下标. session的基类object_t的构造函数:

zmq::object_t::object_t (object_t *parent_) : //parent正是I/O thread
    ctx (parent_->ctx),
    tid (parent_->tid)
{
}

综上所述,socket对象调用launch_child(p)时,其实就是把p放进socket的owned集合,把p打包成plug命令消息,并发送到它的I/O线程里处理这个消息.

通过笔记二,三可知道,I/O thread的邮箱有消息处理时,是通过邮箱个fd通知的,而这个fd刚好就是mailbox的signaler的r句柄,也就是说, I/O thread的轮询select会在mailbox->send()的siangler->send()之后激活邮箱消息可读.消息读出来后,经过void zmq::object_t::process_command (command_t &cmd_),这是由cmd.destination.process_command (cmd)调用的,所以处理函数还是根据destination来定义:

void zmq::object_t::process_command (command_t &cmd_)
{
    switch (cmd_.type) {
    ......
    case command_t::plug:
        process_plug ();
        process_seqnum ();
        break;
    ......
}
void zmq::session_base_t::process_plug ()
{
    if (active)
        start_connecting (false);
}
void zmq::session_base_t::start_connecting (bool wait_)
{
    zmq_assert (active);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity); //到这里,上一个plug消息算完成了,新建connecter对象相当于一个新的消息需求.首先寻找一个负载小的I/O线程.
    zmq_assert (io_thread);

    //  Create the connecter object.

    if (addr->protocol == "tcp") {
        if (!options.socks_proxy_address.empty()) {
             ......
        }
        else {
            tcp_connecter_t *connecter = new (std::nothrow)
                tcp_connecter_t (io_thread, this, options, addr, wait_);
            alloc_assert (connecter);
            launch_child (connecter); //这一次launch_child调用的是在session对象里调用,过程和上面一样,命令消息发送到io_thread的邮箱去了
        }
        return;
    }
    ......
}

当I/O thread收到处理消息时,调用的是tcp_connecter_t的函数了:

void zmq::tcp_connecter_t::process_plug ()
{
    if (delayed_start)
        add_reconnect_timer ();
    else
        start_connecting ();
}

当delayed_start为true时,只是加了个timer延迟connect操作,最终还是调用start_connecting ():

void zmq::tcp_connecter_t::timer_event (int id_)
{
    zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
    if (id_ == connect_timer_id) { //connecter的timer只有两个timer id,
        connect_timer_started = false;

        rm_fd (handle);
        handle_valid = false;

        close ();
        add_reconnect_timer ();
    }
    else if (id_ == reconnect_timer_id) {
        reconnect_timer_started = false;
        start_connecting (); //最终的调用还是这个函数入口
    }
}

在这里必须先说明非阻塞connect()如何完成三次握手的问题:

//////////以下这段文字摘自http://kenby.iteye.com/blog/1183579//////////

步骤1: 设置非阻塞,启动连接

实现非阻塞 connect ,首先把 sockfd 设置成非阻塞的。这样调用

connect 可以立刻返回,根据返回值和 errno 处理三种情况:

(1) 如果返回 0,表示 connect 成功。

(2) 如果返回值小于 0, errno 为 EINPROGRESS,  表示连接

建立已经启动但是尚未完成。这是期望的结果,不是真正的错误。

(3) 如果返回值小于0,errno 不是 EINPROGRESS,则连接出错了。

步骤2:判断可读和可写

然后把 sockfd 加入 select 的读写监听集合,通过 select 判断 sockfd

是否可写,处理三种情况:

(1) 如果连接建立好了,对方没有数据到达,那么 sockfd 是可写的

(2) 如果在 select 之前,连接就建立好了,而且对方的数据已到达,

那么 sockfd 是可读和可写的。

(3) 如果连接发生错误,sockfd 也是可读和可写的。

判断 connect 是否成功,就得区别 (2) 和 (3),这两种情况下 sockfd 都是

可读和可写的,区分的方法是,调用 getsockopt 检查是否出错。

步骤3:使用 getsockopt 函数检查错误

getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len)

在 sockfd 都是可读和可写的情况下,我们使用 getsockopt 来检查连接

是否出错。但这里有一个可移植性的问题。

如果发生错误,getsockopt 源自 Berkeley 的实现将在变量 error 中

返回错误,getsockopt 本身返回0;然而 Solaris 却让 getsockopt 返回 -1,

并把错误保存在 errno 变量中。所以在判断是否有错误的时候,要处理

这两种情况。

//////////以上这段文字摘自http://kenby.iteye.com/blog/1183579//////////

现在来看下start_connecting()到底做了什么:

void zmq::tcp_connecter_t::start_connecting ()
{
    //  Open the connecting socket.
    const int rc = open ();

    //  Connect may succeed in synchronous manner.
    if (rc == 0) { //条件1
        handle = add_fd (s);
        handle_valid = true;
        out_event ();
    }

    //  Connection establishment may be delayed. Poll for its completion.
    else
    if (rc == -1 && errno == EINPROGRESS) { //条件2
        handle = add_fd (s);
        handle_valid = true;
        set_pollout (handle);
        socket->event_connect_delayed (endpoint, zmq_errno());

        //  add userspace connect timeout
        add_connect_timer ();
    }

    //  Handle any other error condition by eventual reconnect.
    else { //条件3
        if (s != retired_fd)
            close ();
        add_reconnect_timer ();
    }
}

open()函数的主要工作是创建新套接字句柄s,并设置为noblock,然后调用 ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ()); 由于是非阻塞的,所以connect()调用立即返回-1,并且设置errno错误代码为EINPROGRESS表示连接操作还在进行中,而同时三次握手还是在进行中的,握手是否完成可以在poller的select()调用里知道结果.

#ifdef ZMQ_HAVE_WINDOWS
    const int last_error = WSAGetLastError();
    if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
        errno = EINPROGRESS;
    else
        errno = wsa_error_to_errno (last_error);
#else
    if (errno == EINTR)
        errno = EINPROGRESS;
#endif

假如client发起连接时,对端还没启动listen.那么进入start_connecting()的条件2,把s加入到connecter的poller里error( add_fd(s)只是加入到error集合,见笔记二)和write的fd集合,s还没完成三次握手.如果options.connect_timeout >0的话,再给它加一个connect_timer_id的timer. 然后等待I/O线程poller轮询select(). 由于对端还没有listen,套接字s会发生错误,导致触发tcp_connecter_t:in_event()(如果是连接成功,则触发tcp_connecter_t:out_event()). 然而对于tcp_connecter_t来说,in_event()调用的还是out_event(), 所以s的可写或出错都是会调用同一个函数.

void zmq::tcp_connecter_t::out_event ()
{
    if (connect_timer_started) { //如果存在connect timer就去掉
        cancel_timer (connect_timer_id);
        connect_timer_started = false;
    }

    rm_fd (handle); //从poller里去掉s
    handle_valid = false;

    const fd_t fd = connect (); //查看s的状态,判断三次握手是否成功,返回适当的fd值
    //  Handle the error condition by attempt to reconnect.
    if (fd == retired_fd) { //从这次结果看来s三次握手失败了
        close ();//关闭套接字s
        add_reconnect_timer (); //并加一个reconnect timer
        return;
    }
    //到达这里说明s三次握手成功了,连接完成
    tune_tcp_socket (fd);
    tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
    tune_tcp_maxrt (fd, options.tcp_maxrt);

    // remember our fd for ZMQ_SRCFD in messages
    socket->set_fd (fd);

    //  Create the engine object for this connection.
    stream_engine_t *engine = new (std::nothrow)
        stream_engine_t (fd, options, endpoint);
    alloc_assert (engine);

    //  Attach the engine to the corresponding session object.
    send_attach (session, engine);

    //  Shut the connecter down.
    terminate ();

    socket->event_connected (endpoint, (int) fd);
}

  

三次握手是否成功是由tcp_connecter_t::connect ()判断并返回fd,如果成功了,就执行后面的代码; 如果失败就加一个重连的timer,这个timer的处理函数上文已经给出了,最终还是调用start_connecting (),不断循环,直到连接成功为止.

时间: 2024-10-08 11:48:58

zmq笔记四: tcp的connect操作的相关文章

《大型网站技术架构》读书笔记四:瞬时响应之网站的高性能架构

一.网站性能测试 (1)性能测试指标:①响应时间:②并发数:③吞吐量:④性能计数器: (2)性能测试方法:①性能测试:②负载测试:③压力测试:④稳定性测试: (3)性能优化策略: ①性能分析:检查请求处理各个环节的日志,分析哪个环节响应时间不合理,检查监控数据分析影响性能的因素: ②性能优化:Web前端优化,应用服务器优化,存储服务器优化: 二.Web前端性能优化 (1)浏览器访问优化: ①减少http请求:因为http是无状态的,每次请求的开销都比较昂贵(需要建立通信链路.进行数据传输,而服务

Linux学习笔记四:Linux的文件搜索命令

1.文件搜索命令  which 语法:which [命令名称] 范例:$which ls  列出ls命令所在目录 [[email protected] ~]$ which ls alias ls='ls --color=auto' /bin/ls 另外一个命令:whereis [名称名称],也可以列出命令所在目录. [[email protected] ~]$ whereis ls ls: /bin/ls /usr/share/man/man1/ls.1.gz /usr/share/man/ma

老男孩培训视频听课笔记四(在51cto上听的)

实际操作:     1.创建一个目录 mkdir 语法:mkdir [-mp] [目录名称]            一般与配合cd tree pwd等命令来实现,整个操作     2.建议一个文件 利用touch命令来完成 语法:touch [path]/filename            批量创建文件:        for f in `seq 1000`;do touch $f.txt;done         创建文件的命令很多:vi echo> > cat等命令      3.文件

iOS9-by-Tutorials-学习笔记四:APP-瘦身

iOS9-by-Tutorials-学习笔记四:APP-瘦身 本文版权归作者所有,如需转载请联系孟祥月 CSDN博客:http://blog.csdn.net/mengxiangyue 独立博客:http://mengxiangyue.com 这篇文章在书中的标题是App Thinning,这里我给翻译成了App 瘦身. 本文然然进行了一些语法的修改,很开心她为我修改这些东西.她说我转折只会用但是,被她这么一说想想还真是只是会用但是,嘿嘿. iPhone经过这几年的发展,已经发生了很大的变化,例

QT开发(二十四)——QT文件操作

QT开发(二十四)--QT文件操作 一.QT文件操作简介 QT中的IO操作通过统一的接口简化了文件与外部设备的操作方式,QT中文件被当作一种特殊的外部设备,文件操作与外部设备操作相同. 1.IO操作的主要函数接口 打开设备:bool open(OpenMode mode) 读取数据:QByteArray read(qint64 maxSize) 写入数据:qint64 write(const QByteArray & byteArray) 关闭设备:void close() IO操作的本质是连续

初探swift语言的学习笔记四(类对象,函数)

作者:fengsh998 原文地址:http://blog.csdn.net/fengsh998/article/details/29606137 转载请注明出处 假设认为文章对你有所帮助,请通过留言或关注微信公众帐号fengsh998来支持我,谢谢! swift扩展了非常多功能和属性,有些也比較奇P.仅仅有慢慢学习,通过经验慢慢总结了. 以下将初步学习一下类的写法. 码工,最大爱好就是看码,而不是文字,太枯燥. // // computer.swift // swiftDemo // // C

Linux System Programming 学习笔记(四) 高级I/O

1. Scatter/Gather I/O a single system call  to  read or write data between single data stream and multiple buffers This type of I/O is so named because the data is scattered into or gathered from the given vector of buffers Scatter/Gather I/O 相比于 C标准

Boost Thread学习笔记四

barrierbarrier类的接口定义如下: 1 class barrier : private boost::noncopyable   // Exposition only 2 { 3 public: 4   // construct/copy/destruct 5   barrier(size_t n); 6   ~barrier(); 7  8   // waiting 9   bool wait();10 }; barrier类为我们提供了这样一种控制线程同步的机制:前n - 1次调

Go语言学习笔记(四) [array、slices、map]

日期:2014年7月22日 一.array[数组] 1.定义:array 由 [n]<type> 定义,n 标示 array 的长度,而 <type> 标示希望存储的内容的类型. 例如: var arr[10] int arr[0] = 1 arr[1] = 2 数组值类型的:将一个数组赋值给 另一个数组,会复制所有的元素.另外,当向函数内传递一个数组的时候,它将获得一个数组的副本,而不是数组的指针. 2.数组的复合声明.a :=[3]int{1,2,3}或简写为a:=[...]i