NIO 源码分析(02-2) BIO 源码分析 Socket

目录

  • 一、BIO 最简使用姿势
  • 二、connect 方法
    • 2.1 Socket.connect 方法
    • 2.2 AbstractPlainSocketImpl.connect 方法
    • 2.3 DualStackPlainSocketImpl.socketConnect 方法
  • 三、SocketInputStream
    • 3.1 构造方法
    • 3.2 read 方法
  • 四、SocketInputStream

Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

在上一篇文章中详细分析了 ServerSocket 的源码,Socket 和 ServerSocket 一样也只是一个门面模式,真正的实现也是 SocksSocketImpl,所以关于 setImpl、createImpl、new、bind、listen 都是类似的,本文重点关注其 connect 和 IO 流的读取方法。

一、BIO 最简使用姿势

//1. 连接服务器
Socket socket = new Socket();
socket.connect(new InetSocketAddress(HOST, PORT), 0);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriterout = new PrintWriter(socket.getOutputStream(), true);

//2. 发送请求数据
out.println("客户端发送请求数据...");

//3. 接收服务端数据
String response = in.readLine();
System.out.println("Client: " + response);

ok,代码已经完成!!!下面的源码分析都会基于这个 demo。

二、connect 方法

2.1 Socket.connect 方法

// timeout=0 表示永久阻塞,timeout>0 则指定超时时间
public void connect(SocketAddress endpoint, int timeout) throws IOException {

    InetSocketAddress epoint = (InetSocketAddress) endpoint;
    InetAddress addr = epoint.getAddress ();
    int port = epoint.getPort();

    // 1. 创建底层 socket 套接字
    if (!created)
        createImpl(true);

    // 2. oldImpl 默认为 false,也就是进入第一个 if 条件
    //    checkOldImpl 会判断 impl 中有没有 connect(SocketAddress address, int port) 方法
    //    来设置 oldImpl 的值
    if (!oldImpl)
        impl.connect(epoint, timeout);
    else if (timeout == 0) {
        if (epoint.isUnresolved())
            impl.connect(addr.getHostName(), port);
        else
            impl.connect(addr, port);
    } else
        throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
    connected = true;
    bound = true;
}

总结: Socket 首先和 ServerSocket 一样调用 createImpl 创建底层 socket 对象,然后委托给 impl 完成连接操作

2.2 AbstractPlainSocketImpl.connect 方法

protected void connect(SocketAddress address, int timeout) throws IOException {
    boolean connected = false;
    try {
        InetSocketAddress addr = (InetSocketAddress) address;
        this.port = addr.getPort();
        this.address = addr.getAddress();

        connectToAddress(this.address, port, timeout);
        connected = true;
    } finally {
        if (!connected) {
            close();
        }
    }
}

private void connectToAddress(InetAddress address, int port, int timeout) throws IOException {
    if (address.isAnyLocalAddress()) {
        doConnect(InetAddress.getLocalHost(), port, timeout);
    } else {
        doConnect(address, port, timeout);
    }
}

总结: connect 将连接具体由 doConnect 完成

synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException {
    synchronized (fdLock) {
        if (!closePending && (socket == null || !socket.isBound())) {
            NetHooks.beforeTcpConnect(fd, address, port);
        }
    }
    try {
        acquireFD();
        try {
            socketConnect(address, port, timeout);
            /* socket may have been closed during poll/select */
            synchronized (fdLock) {
                if (closePending) {
                    throw new SocketException ("Socket closed");
                }
            }
            if (socket != null) {
                socket.setBound();
                socket.setConnected();
            }
        } finally {
            releaseFD();
        }
    } catch (IOException e) {
        close();
        throw e;
    }
}

2.3 DualStackPlainSocketImpl.socketConnect 方法

void socketConnect(InetAddress address, int port, int timeout)
    throws IOException {
    int nativefd = checkAndReturnNativeFD();

    if (address == null)
        throw new NullPointerException("inet address argument is null.");

    int connectResult;
    if (timeout <= 0) {
        connectResult = connect0(nativefd, address, port);
    } else {
        configureBlocking(nativefd, false);
        try {
            connectResult = connect0(nativefd, address, port);
            if (connectResult == WOULDBLOCK) {
                waitForConnect(nativefd, timeout);
            }
        } finally {
            configureBlocking(nativefd, true);
        }
    }

    if (localport == 0)
        localport = localPort0(nativefd);
}
补充1:connect0 在 JVM 中的实现
JNIEXPORT jint JNICALL Java_java_net_DualStackPlainSocketImpl_connect0
  (JNIEnv *env, jclass clazz, jint fd, jobject iaObj, jint port) {
    SOCKETADDRESS sa;
    int rv;
    int sa_len = sizeof(sa);

    if (NET_InetAddressToSockaddr(env, iaObj, port, (struct sockaddr *)&sa,
                                 &sa_len, JNI_TRUE) != 0) {
      return -1;
    }

    rv = connect(fd, (struct sockaddr *)&sa, sa_len);
    if (rv == SOCKET_ERROR) {
        int err = WSAGetLastError();
        if (err == WSAEWOULDBLOCK) {
            return java_net_DualStackPlainSocketImpl_WOULDBLOCK;
        } else if (err == WSAEADDRNOTAVAIL) {
            JNU_ThrowByName(env, JNU_JAVANETPKG "ConnectException",
                "connect: Address is invalid on local machine, or port is not valid on remote machine");
        } else {
            NET_ThrowNew(env, err, "connect");
        }
        return -1;  // return value not important.
    }
    return rv;
}

总结: rv = connect(fd, (struct sockaddr *)&sa, sa_len) 建立远程连接。

补充2:waitForConnect 在 JVM 中的实现

和 ServerSocket.waitForNewConnection 一样,也是通过 Winsock 库的 select 函数来实现超时的功能。

JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_waitForConnect
  (JNIEnv *env, jclass clazz, jint fd, jint timeout) {
    int rv, retry;
    int optlen = sizeof(rv);
    fd_set wr, ex;
    struct timeval t;

    FD_ZERO(&wr);
    FD_ZERO(&ex);
    FD_SET(fd, &wr);
    FD_SET(fd, &ex);
    t.tv_sec = timeout / 1000;
    t.tv_usec = (timeout % 1000) * 1000;

    rv = select(fd+1, 0, &wr, &ex, &t);
    if (rv == 0) {
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
                        "connect timed out");
        shutdown( fd, SD_BOTH );
        return;
    }
    if (!FD_ISSET(fd, &ex)) {
        return;         /* connection established */
    }

    for (retry=0; retry<3; retry++) {
        NET_GetSockOpt(fd, SOL_SOCKET, SO_ERROR,
                       (char*)&rv, &optlen);
        if (rv) {
            break;
        }
        Sleep(0);
    }

    if (rv == 0) {
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                        "Unable to establish connection");
    } else {
        NET_ThrowNew(env, rv, "connect");
    }
}

总结: rv = select(fd+1, 0, &wr, &ex, &t) 轮询会阻塞程序。

三、SocketInputStream

3.1 构造方法

SocketInputStream(AbstractPlainSocketImpl impl) throws IOException {
    super(impl.getFileDescriptor());
    this.impl = impl;
    socket = impl.getSocket();
}

总结: SocketInputStream 内部实现上也是对 impl 的封装。SocketInputStream.read 其实也是调用底层 socket 的 read 方法。

3.2 read 方法

int read(byte b[], int off, int length, int timeout) throws IOException {
    int n;

    // EOF already encountered
    if (eof) {
        return -1;
    }

    // connection reset
    if (impl.isConnectionReset()) {
        throw new SocketException("Connection reset");
    }

    // bounds check
    if (length <= 0 || off < 0 || off + length > b.length) {
        if (length == 0) {
            return 0;
        }
        throw new ArrayIndexOutOfBoundsException();
    }

    boolean gotReset = false;

    // acquire file descriptor and do the read
    FileDescriptor fd = impl.acquireFD();
    try {
        n = socketRead(fd, b, off, length, timeout);
        if (n > 0) {
            return n;
        }
    } catch (ConnectionResetException rstExc) {
        gotReset = true;
    } finally {
        impl.releaseFD();
    }

    /*
     * We receive a "connection reset" but there may be bytes still
     * buffered on the socket
     */
    if (gotReset) {
        impl.setConnectionResetPending();
        impl.acquireFD();
        try {
            n = socketRead(fd, b, off, length, timeout);
            if (n > 0) {
                return n;
            }
        } catch (ConnectionResetException rstExc) {
        } finally {
            impl.releaseFD();
        }
    }

    /*
     * If we get here we are at EOF, the socket has been closed,
     * or the connection has been reset.
     */
    if (impl.isClosedOrPending()) {
        throw new SocketException("Socket closed");
    }
    if (impl.isConnectionResetPending()) {
        impl.setConnectionReset();
    }
    if (impl.isConnectionReset()) {
        throw new SocketException("Connection reset");
    }
    eof = true;
    return -1;
}

private int socketRead(FileDescriptor fd, byte b[], int off, int len,
        int timeout) throws IOException {
    return socketRead0(fd, b, off, len, timeout);
}
补充2:socketRead0 在 JVM 中的实现
// src/windows/native/java/net/SocketInputStream.c
JNIEXPORT jint JNICALL Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this,
        jobject fdObj, jbyteArray data, jint off, jint len, jint timeout) {
    char *bufP;
    char BUF[MAX_BUFFER_LEN];
    jint fd, newfd;
    jint nread;

    if (IS_NULL(fdObj)) {
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
        return -1;
    }
    fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
    if (fd == -1) {
        NET_ThrowSocketException(env, "Socket closed");
        return -1;
    }

    /*
     * If the caller buffer is large than our stack buffer then we allocate
     * from the heap (up to a limit). If memory is exhausted we always use
     * the stack buffer.
     */
    if (len <= MAX_BUFFER_LEN) {
        bufP = BUF;
    } else {
        if (len > MAX_HEAP_BUFFER_LEN) {
            len = MAX_HEAP_BUFFER_LEN;
        }
        bufP = (char *)malloc((size_t)len);
        if (bufP == NULL) {
            /* allocation failed so use stack buffer */
            bufP = BUF;
            len = MAX_BUFFER_LEN;
        }
    }

    if (timeout) {
        if (timeout <= 5000 || !isRcvTimeoutSupported) {
            int ret = NET_Timeout (fd, timeout);

            if (ret <= 0) {
                if (ret == 0) {
                    JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
                                    "Read timed out");
                } else if (ret == JVM_IO_ERR) {
                    JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
                } else if (ret == JVM_IO_INTR) {
                    JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
                                    "Operation interrupted");
                }
                if (bufP != BUF) {
                    free(bufP);
                }
                return -1;
            }

            /*check if the socket has been closed while we were in timeout*/
            newfd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
            if (newfd == -1) {
                NET_ThrowSocketException(env, "Socket Closed");
                if (bufP != BUF) {
                    free(bufP);
                }
                return -1;
            }
        }
    }

    // 最关键的代码,recv 从 socketfd 中读取数据
    nread = recv(fd, bufP, len, 0);

    if (nread > 0) {
        (*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP);
    } else {
        if (nread < 0) {
            // Check if the socket has been closed since we last checked.
            // This could be a reason for recv failing.
            if ((*env)->GetIntField(env, fdObj, IO_fd_fdID) == -1) {
                NET_ThrowSocketException(env, "Socket closed");
            } else {
                switch (WSAGetLastError()) {
                    case WSAEINTR:
                        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                            "socket closed");
                        break;
                    case WSAECONNRESET:
                    case WSAESHUTDOWN:
                        /*
                         * Connection has been reset - Windows sometimes reports
                         * the reset as a shutdown error.
                         */
                        JNU_ThrowByName(env, "sun/net/ConnectionResetException", "");
                        break;
                    case WSAETIMEDOUT :
                        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException", "Read timed out");
                        break;
                    default:
                        NET_ThrowCurrent(env, "recv failed");
                }
            }
        }
    }
    if (bufP != BUF) {
        free(bufP);
    }
    return nread;
}

总结: socketRead0 实现很长,其实我们只用关注核心的实现 nread = recv(fd, bufP, len, 0); 即可,毕竟我们不是专门做 c++。

四、SocketInputStream

和 SocketInputStream 类似,就不继续分析了。



每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/11144334.html

时间: 2024-11-13 07:59:27

NIO 源码分析(02-2) BIO 源码分析 Socket的相关文章

JDK1.8源码分析02之阅读源码顺序

序言:阅读JDK源码应该从何开始,有计划,有步骤的深入学习呢? 下面就分享一篇比较好的学习源码顺序的文章,给了我们再阅读源码时,一个指导性的标志,而不会迷失方向. 很多java开发的小伙伴都会阅读jdk源码,然而确不知道应该从哪读起.有些零零散散的学习,知识与知识之间没有相互联系起来,不成知识体系.以下为小编整理的通常所需阅读的源码范围. 标题为包名,后面序号为优先级1-4,优先级递减 1.java.lang 1) Object 1 2) String 1 3) AbstractStringBu

NIO 源码分析(03) 从 BIO 到 NIO

目录 一.NIO 三大组件 Channels.Buffers.Selectors 1.1 Channel 和 Buffer 1.2 Selector 1.3 Linux IO 和 NIO 编程的区别 二.BIO 和 NIO 的区别 2.1 BIO 面向流,NIO 面向缓冲区. Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html) 一.NIO 三大组件 Channels.Buffers.Selectors 1.1 Channel

Eureka 系列(02)客户端源码分析

Eureka 系列(02)客户端源码分析 [TOC] 在上一篇 Eureka 系列(01)最简使用姿态 中对 Eureka 的简单用法做了一个讲解,本节分析一下 EurekaClient 的实现 DiscoveryClient.本文的源码是基于 Eureka-1.9.8. 1)服务注册(发送注册请求到注册中心) 2)服务发现(本质就是获取调用服务名所对应的服务提供者实例信息,包括IP.port等) 3)服务续约(本质就是发送当前应用的心跳请求到注册中心) 4)服务下线(本质就是发送取消注册的HT

Apache Spark源码走读之7 -- Standalone部署方式分析

欢迎转载,转载请注明出处,徽沪一郎. 楔子 在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解. 没有HA的Standalone运行模式 先从比较简单的说起,所谓的没有ha是指master节点没有ha. 组成cluster的两大元素即Master和Worker.slave worker可以有1到

传奇源码分析-客户端(游戏逻辑处理源分析四)

现在假设玩家开始操作游戏:传奇的客户端源代码工程WindHorn一.CWHApp派生CWHWindow和CWHDXGraphicWindow.二.CWHDefProcess派生出CloginProcess.CcharacterProcess.CgameProcess客户端WinMain调用CWHDXGraphicWindow g_xMainWnd;创建一个窗口.客户端CWHDXGraphicWindow在自己的Create函数中调用了CWHWindow的Create来创建窗口,然后再调用自己的C

传奇源码分析-客户端(游戏逻辑处理源分析五 服务器端响应)

器执行流程:(玩家走动) GameSrv服务器ProcessUserHuman线程处理玩家消息:遍历UserInfoList列表,依次调用每个UserInfo的Operate来处理命令队列中的所有操作; pUserInfo->Operate()调用m_pxPlayerObject->Operate()调用.判断玩家if (!m_fIsDead),如果已死,则发送_MSG_FAIL消息.我们在前面看到过,该消息是被优先处理的.否则则调用WalkTo,并发送_MSG_GOOD消息给客户端.Walk

传奇源码分析-客户端(游戏逻辑处理源分析三)

6. 接收怪物,商人,其它玩家的消息:ProcessUserHuman:(其它玩家-服务器处理)CPlayerObject->SearchViewRange();CPlayerObject->Operate();遍历UserInfoList列表,依次调用每个UserInfo的Operate来处理命令队列中的所有操作; pUserInfo->Operate()调用m_pxPlayerObject->Operate()调用.根据分发消息(RM_TURN)向客户端发送SM_TURN消息.

传奇源码分析-客户端(游戏逻辑处理源分析二)

5.接受登录成功后,接收GameSrv服务器发送的消息:接收GameGate发送的消息:CClientSocket::OnSocketMessage的FD_READ事件中,PacketQ.PushQ((BYTE*)pszPacket);把接收到的消息,压入PacketQ队列中.处理PacketQ队列数据是由CGameProcess::Load()时调用OnTimer在CGameProcess::OnTimer中处理的, 处理过程为:OnMessageReceive; ProcessPacket(

Java集合源码学习笔记(二)ArrayList分析

Java集合源码学习笔记(二)ArrayList分析 >>关于ArrayList ArrayList直接继承AbstractList,实现了List. RandomAccess.Cloneable.Serializable接口,为什么叫"ArrayList",因为ArrayList内部是用一个数组存储元素值,相当于一个可变大小的数组,也就是动态数组. (1)继承和实现继承了AbstractList,实现了List:ArrayList是一个数组队列,提供了相关的添加.删除.修