Java高并发网络编程(五)Netty应用

推送系统

一、系统设计

二、拆包和粘包

粘包、拆包表现形式

现在假设客户端向服务端连续发送了两个数据包,用packet1和packet2来表示,那么服务端收到的数据可以分为三种,现列举如下:

第一种情况,接收端正常收到两个数据包,即没有发生拆包和粘包的现象,此种情况不在本文的讨论范围内。

第二种情况,接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了发送端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于接收端来说很难处理。

第三种情况,这种情况有两种表现形式,如下图。接收端收到了两个数据包,但是这两个数据包要么是不完整的,要么就是多出来一块,这种情况即发生了拆包和粘包。这两种情况如果不加特殊处理,对于接收端同样是不好处理的。

粘包、拆包发生原因

发生TCP粘包或拆包有很多原因,现列出常见的几点,可能不全面,欢迎补充,

1、要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包即应用程序写入数据的字节大小大于套接字发送缓冲区的大小。

2、进行MSS大小的TCP分段。MSS是最大报文段长度的缩写。MSS是TCP报文段中的数据字段的最大长度。数据字段加上TCP首部才等于整个的TCP报文段。所以MSS并不是TCP报文段的最大长度,而是:MSS=TCP报文段长度-TCP首部长度,

待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。

3、要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。

4、接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。

5、以太网的payload大于MTU进行IP分片。MTU指:一种通信协议的某一层上面所能通过的最大数据包大小。如果IP层有一个数据包要传,而且数据的长度比链路层的MTU大,那么IP层就会进行分片,把数据包分成若干片,让每一片都不超过MTU。注意,IP分片可以发生在原始发送端主机上,也可以发生在中间路由器上。

 TCP粘包和拆包的解决策略

1. 消息定长。例如100字节。发送端给每个数据包添加包首部,首部中应该至少包含数据包的长度,这样接收端在接收到数据后,通过读取包首部的长度字段,便知道每一个数据包的实际长度了。

2. 在包尾部增加回车或者空格符等特殊字符进行分割,典型的如FTP协议,发送端将每个数据包封装为固定长度(不够的可以通过补0填充),这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。

3. 将消息分为消息头和消息尾。可以在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开。

4. 其它复杂的协议,如RTMP协议等。

来源:https://blog.csdn.net/lijieshare/article/details/84815187

public class XNettyServer {
    public static void main(String[] args) throws Exception {
        // 1、 线程定义
        // accept 处理连接的线程池
        EventLoopGroup acceptGroup = new NioEventLoopGroup();
        // read io 处理数据的线程池
        EventLoopGroup readGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptGroup, readGroup);
            // 2、 选择TCP协议,NIO的实现方式
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 3、 职责链定义(请求收到后怎么处理)
                    ChannelPipeline pipeline = ch.pipeline();
                    // TODO 3.1 增加解码器
                    // pipeline.addLast(new XDecoder());
                    // TODO 3.2 打印出内容 handdler
                    pipeline.addLast(new XHandller());
                }
            });
            // 4、 绑定端口
            System.out.println("启动成功,端口 9999");
            b.bind(9999).sync().channel().closeFuture().sync();
        } finally {
            acceptGroup.shutdownGracefully();
            readGroup.shutdownGracefully();
        }
    }
}

也会存在粘包和拆包的问题

自己编写解析器

简单地用长度做处理

// 编解码一定是根据协议~如http
public class XDecoder extends ByteToMessageDecoder {
    static final int PACKET_SIZE = 220; // 每次请求数据大小是220,我们自己定义的协议

    // 用来临时保留没有处理过的请求报文,如只传过来了110个字节,先存着
    ByteBuf tempMsg = Unpooled.buffer();

    // in输入   --- 处理  --- out 输出
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println(Thread.currentThread()+"收到了一次数据包,长度是:" + in.readableBytes());
        // in 请求的数据
        // out 将粘在一起的报文拆分后的结果保留起来

        // 1、 合并报文
        ByteBuf message = null;
        int tmpMsgSize = tempMsg.readableBytes();
        // 如果暂存有上一次余下的请求报文,则合并
        if (tmpMsgSize > 0) {
            message = Unpooled.buffer();
            message.writeBytes(tempMsg);
            message.writeBytes(in);
            System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
        } else {
            message = in;
        }

        // 2、 拆分报文
        // 这个场景下,一个请求固定长度为3,可以根据长度来拆分
        // i+1 i+1 i+1 i+1 i+1
        // 不固定长度,需要应用层协议来约定 如何计算长度
        // 在应用层中,根据单个报文的长度及特殊标记,来将报文进行拆分或合并
        // dubbo rpc协议 = header(16) + body(不固定)
        // header最后四个字节来标识body
        // 长度 = 16 + body长度
        // 0xda, 0xbb 魔数

        int size = message.readableBytes();
        int counter = size / PACKET_SIZE;
        for (int i = 0; i < counter; i++) {
            byte[] request = new byte[PACKET_SIZE];
            // 每次从总的消息中读取220个字节的数据
            message.readBytes(request);

            // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
            out.add(Unpooled.copiedBuffer(request));
        }

        // 3、多余的报文存起来
        // 第一个报文: i+  暂存
        // 第二个报文: 1 与第一次
        size = message.readableBytes();
        if (size != 0) {
            System.out.println("多余的数据长度:" + size);
            // 剩下来的数据放到tempMsg暂存 留到下次再进行合并
            tempMsg.clear();
            tempMsg.writeBytes(message.readBytes(size));
        }
    }
}

上面的处理不适用复杂的现实场景,Netty提供了大量的现成的编解码工具,我们一般使用这些工具

三、使用websocket

websocket协议是基于TCP的一种新的网络协议。

它的出现实现了浏览器与服务器双全工(full-duplex)通信:允许服务器主动发送信息给客户端。

半双工:服务器不能主动响应浏览器,只能等待请求后再响应。

多客户端多语言多服务器支持:浏览器、php、Java、ruby、nginx、python、Tomcat、erlang、.net等等

代码示例

public final class WebSocketServer {

    static int PORT = 9000;

    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .childHandler(new WebSocketServerInitializer())
                    .childOption(ChannelOption.SO_REUSEADDR, true);
            for (int i = 0; i < 100; i++) { // 绑定100个端口
                b.bind(++PORT).addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if ("true".equals(System.getProperty("netease.debug")))
                            System.out.println("端口绑定完成:" + future.channel().localAddress());
                    }
                });
            }

            // 端口绑定完成,启动消息随机推送(测试)
            TestCenter.startTest();

            System.in.read();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

开启端口的复用ChannelOption.SO_REUSEADDR,这是底层的TCP的参数,和我们代码无关

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        //  职责链, 数据处理流程
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec()); // 转为http请求
        pipeline.addLast(new HttpObjectAggregator(65536)); // 最大数据量
        pipeline.addLast(new WebSocketServerHandler()); // websocket握手,处理后续消息
        pipeline.addLast(new NewConnectHandler());
    }
}
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    private static final String WEBSOCKET_PATH = "/websocket";

    private WebSocketServerHandshaker handshaker;

    public static final LongAdder counter = new LongAdder();

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) {
        counter.add(1);
        if (msg instanceof FullHttpRequest) {
            // 处理websocket握手
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            // 处理websocket后续的消息
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        // Handle a bad request. //如果http解码失败 则返回http异常 并且判断消息头有没有包含Upgrade字段(协议升级)
        if (!req.decoderResult().isSuccess() || req.method() != GET || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
            return;
        }

        // 构造握手响应返回
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                getWebSocketLocation(req), null, true, 5 * 1024 * 1024);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            // 版本不支持
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
            ctx.fireChannelRead(req.retain()); // 继续传播
        }
    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // Check for closing frame 关闭
        if (frame instanceof CloseWebSocketFrame) {
            Object userId = ctx.channel().attr(AttributeKey.valueOf("userId")).get();
            TestCenter.removeConnection(userId);
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        if (frame instanceof PingWebSocketFrame) { // ping/pong作为心跳
            System.out.println("ping: " + frame);
            ctx.write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        if (frame instanceof TextWebSocketFrame) {
            // Echo the frame
            // TODO 处理具体的数据请求(... 云课堂聊天室,推送给其他的用户)
            //发送到客户端websocket
            ctx.channel().write(new TextWebSocketFrame(((TextWebSocketFrame) frame).text()
                    + ", 欢迎使用Netty WebSocket服务, 现在时刻:"
                    + new java.util.Date().toString()));

            return;
        }
        // 不处理二进制消息
        if (frame instanceof BinaryWebSocketFrame) {
            // Echo the frame
            ctx.write(frame.retain());
        }
    }

    private static void sendHttpResponse(
            ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
        // Generate an error page if response getStatus code is not OK (200).
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
            HttpUtil.setContentLength(res, res.content().readableBytes());
        }

        // Send the response and close the connection if necessary.
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    private static String getWebSocketLocation(FullHttpRequest req) {
        String location = req.headers().get(HttpHeaderNames.HOST) + WEBSOCKET_PATH;
        return "ws://" + location;
    }
}
// 新连接建立了
public class NewConnectHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
        // 解析请求,判断token,拿到用户ID。
        Map<String, List<String>> parameters = new QueryStringDecoder(req.uri()).parameters();
        // String token = parameters.get("token").get(0);  不是所有人都能连接,比如需要登录之后,发放一个推送的token
        String userId = parameters.get("userId").get(0);
        ctx.channel().attr(AttributeKey.valueOf("userId")).getAndSet(userId); // channel中保存userId
        TestCenter.saveConnection(userId, ctx.channel()); // 保存连接

        // 结束
    }
}

保存到TestCenter

// 正常情况是,后台系统通过接口请求,把数据丢到对应的MQ队列,再由推送服务器读取
public class TestCenter {
    // 此处假设一个用户一台设备,否则用户的通道应该是多个。
    // TODO 还应该有一个定时任务,用于检测失效的连接(类似缓存中的LRU算法,长时间不使用,就拿出来检测一下是否断开了);
    static ConcurrentHashMap<String, Channel> userInfos = new ConcurrentHashMap<String, Channel>();

    // 保存信息
    public static void saveConnection(String userId, Channel channel) {
        userInfos.put(userId, channel);
    }

    // 退出的时候移除掉
    public static void removeConnection(Object userId) {
        if (userId != null) {
            userInfos.remove(userId.toString());
        }
    }

    final static byte[] JUST_TEST = new byte[1024];

    public static void startTest() {
        // 发一个tony吧
        System.arraycopy("tony".getBytes(), 0, JUST_TEST, 0, 4);
        final String sendmsg = System.getProperty("netease.server.test.sendmsg", "false");
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
            try {
                // 压力测试,在用户中随机抽取1/10进行发送
                if (userInfos.isEmpty()) {
                    return;
                }
                int size = userInfos.size();
                ConcurrentHashMap.KeySetView<String, Channel> keySetView = userInfos.keySet();
                String[] keys = keySetView.toArray(new String[]{});
                System.out.println(WebSocketServerHandler.counter.sum() + " : 当前用户数量" + keys.length);
                if (Boolean.valueOf(sendmsg)) { // 是否开启发送
                    for (int i = 0; i < (size > 10 ? size / 10 : size); i++) {
                        // 提交任务给它执行
                        String key = keys[new Random().nextInt(size)];
                        Channel channel = userInfos.get(key);
                        if (channel == null) {
                            continue;
                        }
                        if (!channel.isActive()) {
                            userInfos.remove(key);
                            continue;
                        }
                        channel.eventLoop().execute(() -> {
                            channel.writeAndFlush(new TextWebSocketFrame(new String(JUST_TEST))); // 推送1024字节
                        });

                    }
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }, 1000L, 2000L, TimeUnit.MILLISECONDS);
    }
}

浏览器测试

<!-- saved from url=(0022)http://127.0.0.1:8080/ -->
<html><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"><title>Web Socket Test</title></head>
<body>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
  window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
  // 随机数
  var random = Math.floor(Math.random()*(10000 - 10 +1) + 10)
  socket = new WebSocket("ws://127.0.0.1:9001/websocket?userId=" + random);
  socket.onmessage = function(event) {
    var ta = document.getElementById(‘responseText‘);
    ta.value = ta.value + ‘\n‘ + event.data
  };
  socket.onopen = function(event) {
    var ta = document.getElementById(‘responseText‘);
    ta.value = "Web Socket opened!";
  };
  socket.onclose = function(event) {
    var ta = document.getElementById(‘responseText‘);
    ta.value = ta.value + "Web Socket closed";
  };
} else {
  alert("Your browser does not support Web Socket.");
}

function send(message) {
  if (!window.WebSocket) { return; }
  if (socket.readyState == WebSocket.OPEN) {
    socket.send(message);
  } else {
    alert("The socket is not open.");
  }
}
</script>
<form onsubmit="return false;">
<input type="text" name="message" value="Hello, World!"><input type="button" value="Send Web Socket Data" onclick="send(this.form.message.value)">
<h3>Output</h3>
<textarea id="responseText" style="width:500px;height:300px;"></textarea>
</form>

</body></html>

浏览器扛不住巨量的请求,使用Java客户端进行测试

public final class WebSocketClient {

    public static void main(String[] args) throws Exception {
        final String host = System.getProperty("netease.pushserver.host", "127.0.0.1");
        final String maxSize = System.getProperty("netease.client.port.maxSize", "100");
        final String maxConnections = System.getProperty("netease.client.port.maxConnections", "60000");
        int port = 9001;

        EventLoopGroup group = new NioEventLoopGroup();
        try {

            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_REUSEADDR, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new HttpClientCodec());
                    p.addLast(new HttpObjectAggregator(8192));
                    p.addLast(WebSocketClientCompressionHandler.INSTANCE);
                    p.addLast("webSocketClientHandler", new WebSocketClientHandler());
                }
            });
            // tcp 建立连接
            for (int i = 0; i < 100; i++) { // 服务端有100个端口,发起对100个端口反复的连接
                for (int j = 0; j < 60000; j++) { // 每个端口6万次连接
                    b.connect(host, port).sync().get();
                }
                port++;
            }
            System.in.read();
        } finally

        {
            group.shutdownGracefully();
        }
    }
}
// handler 处理多个事件~ 包括tcp连接建立之后的事件
// open websocket
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketClientHandshaker handshaker;
    private ChannelPromise handshakeFuture;

    public ChannelFuture handshakeFuture() {
        return handshakeFuture;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        handshakeFuture = ctx.newPromise();
    }

    static AtomicInteger counter = new AtomicInteger(0);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        if (handshaker == null) {
            InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
            URI uri = null;
            try {
                uri = new URI("ws://" + address.getHostString() + ":" + address.getPort() + "/websocket?userId=" + counter.incrementAndGet());
            } catch (Exception e) {
                e.printStackTrace();
            }
            handshaker = WebSocketClientHandshakerFactory.newHandshaker(
                    uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
        }
        handshaker.handshake(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if ("true".equals(System.getProperty("netease.debug"))) System.out.println("WebSocket Client disconnected!");
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel ch = ctx.channel();
        if (!handshaker.isHandshakeComplete()) {
            try {
                handshaker.finishHandshake(ch, (FullHttpResponse) msg);
                if ("true".equals(System.getProperty("netease.debug")))
                    System.out.println("WebSocket Client connected!");
                handshakeFuture.setSuccess();
            } catch (WebSocketHandshakeException e) {
                if ("true".equals(System.getProperty("netease.debug")))
                    System.out.println("WebSocket Client failed to connect");
                handshakeFuture.setFailure(e);
            }
            return;
        }

        if (msg instanceof FullHttpResponse) {
            FullHttpResponse response = (FullHttpResponse) msg;
            throw new IllegalStateException(
                    "Unexpected FullHttpResponse (getStatus=" + response.status() +
                            ", content=" + response.content().toString(CharsetUtil.UTF_8) + ‘)‘);
        }

        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            if ("true".equals(System.getProperty("netease.debug")))
                System.out.println("WebSocket Client received message: " + textFrame.text());
        } else if (frame instanceof PongWebSocketFrame) {
            if ("true".equals(System.getProperty("netease.debug")))
                System.out.println("WebSocket Client received pong");
        } else if (frame instanceof CloseWebSocketFrame) {
            if ("true".equals(System.getProperty("netease.debug")))
                System.out.println("WebSocket Client received closing");
            ch.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        if (!handshakeFuture.isDone()) {
            handshakeFuture.setFailure(cause);
        }
        ctx.close();
    }
}

网络四元组:客户端IP,服务端IP,客户端端口,服务端端口,这四元组规定了一个连接

客户端端口数量有限,服务器只有一个端口的情况下,同一个客户端只能对它发送6万多个连接,

服务器开启多个接口,服务器每开启一个端口,客户端可增加6万多连接

上面的测试环境,为了增加连接容纳量,服务端和客户端都开启了端口复用

打包上传服务器 服务端6G4核,客户端6G2核

运行服务端程序

运行客户端程序

客户端的端口是操作系统分配好的,也可以自己指定分配区间

报错

文件描述符

open files太小了,调参数

允许100万个文件描述符

重新登陆,生效

原文地址:https://www.cnblogs.com/aidata/p/11528795.html

时间: 2024-11-03 21:39:45

Java高并发网络编程(五)Netty应用的相关文章

Java高并发网络编程(四)Netty

在网络应用开发的过程中,直接使用JDK提供的NIO的API,比较繁琐,而且想要进行性能提升,还需要结合多线程技术. 由于网络编程本身的复杂性,以及JDK API开发的使用难度较高,所以在开源社区中,涌现出来了很多对JDK NIO进行封装.增强的网络编程框架,比如Netty.Mina等. 一.Netty简介 https://netty.io/ 官网 Netty是一个高性能.高可扩展性的异步事件驱动的网络应用程序框架,它极大简化了TCP和UDP客户端和服务器开发等网络编程. Netty重要的四个内容

Java高并发网络编程(一)

一.OSI网络七层模型 因特网是一个极为复杂的网络,分层有助于我们对网络的理解 .分层也是一种标准,为了使不同厂商的计算机能够互相通信,以便在更大范围内建立计算机网络,有必要建立一个国际范围的网络体系结构标准. ISO组织制定了OSI网络七层模型 应用层 表示层 会话层 传输层 网络层 链路层 物理层 而因特网只用到了五层 应用层 传输层 网络层 链路层 物理层 低三层: 屏蔽底层网络的复杂性 物理层:使原始的数据比特流能在物理介质上传输. 数据链路层:通过校验.确认和反馈重发等手段,形成稳定的

Java高并发网络编程(三)NIO

从Java 1.4开始,Java提供了新的非阻塞IO操作API,用意是替代Java IO和Java Networking相关的API. NIO中有三个核心组件: Buffer缓冲区 Channel通道 Selector选择器 一.Buffer缓冲区 缓冲区本质上是一个可以写入数据的内存块(类似数组),然后可以再次读取.此内存块包含在NIO Buffer对象中,该对象提供了一组方法,可以更轻松地使用内存块. 相比较直接对数组的操作,BufferAPI更容易操作和管理. 使用Buffer进行数据写入

Linux下高并发网络编程

1.修改用户进程可打开文件数限制 在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时, 最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的限制(这是因为系统 为每个TCP连接都要创建一个socket句柄,每个socket句柄同时也是一个文件句柄). 可使用ulimit命令查看系统允许当前用户进程打开的文件数限制: [[email protected] ~]$ ulimit -n 1024 这表示当前用户的每个进程最多允许同时打开1024个文件,这1024

Java网络编程和NIO详解9:基于NIO的网络编程框架Netty

Java网络编程和NIO详解9:基于NIO的网络编程框架Netty 转自https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/ netty是基于NIO实现的异步事件驱动的网络编程框架,学完NIO以后,应该看看netty的实现,netty框架涉及的内容特别多,这里只介绍netty的基本使用和实现原理,更多扩展的内容将在以后推出. 本系列文章首发于我的个人博客:https://h2pl.github.io/ 欢迎

Java高并发编程(一)

1.原子量级操作(读.++操作.写分为最小的操作量单位,在多线程中进行原子量级编程保证程序可见性(有序性人为规定)) 由于某些问题在多线程条件下:产生了竞争的问题,(例如:在多线程中一个简单的计数器增加)如果在程序中不采用同步的机制,那么在程序的运行结果中,多个线程在访问此资源时候,产生Racing.解决这个问题,采用某种方式阻止其他线程在该线程使用该变量的时候使用该变量 采用原子级操作:1.采用加锁的机制(最好的操作)2.Java.concurrent.atomic包包含一些原子量操作:Ato

Java学习之网络编程实例

转自:http://www.cnblogs.com/springcsc/archive/2009/12/03/1616413.html 多谢分享 网络编程 网络编程对于很多的初学者来说,都是很向往的一种编程技能,但是很多的初学者却因为很长一段时间无法进入网络编程的大门而放弃了对于该部分技术的学习. 在学习网络编程以前,很多初学者可能觉得网络编程是比较复杂的系统工程,需要了解很多和网络相关的基础知识,其实这些都不是很必需的.首先来问一个问题:你 会打手机吗?很多人可能说肯定会啊,不就是按按电话号码

黑马程序员————java中的网络编程

------<a href="http://www.itheima.com" target="blank">Java培训.Android培训.iOS培训..Net培训</a>.期待与您交流! ------- java中的网络编程 一.网络编程概述:基于互联网的编程 就是用来实现网络互连的不同计算机上运行的程序间可以进行数据交换. 二.网络模型:OSI和TCP/IP 1.OSI(Open System Interconnection开放系统互连

Java高并发是不是你的菜??

自从JAVA5.0增加了最初由DougLea编写的高质量的.广泛使用的.并发实用程序util.concurrent并变成了JSR-166的新包之后,在Java内置所提供的类库中,就提供了越来越多的并发编程的实用工具类.学习并掌握这些技术对于专注于Java并发编程的开发人员来讲是基本的公里,随着Java版本的不断更新与改进,开发人员可以通过Java新版本所带来的新特性,无需从头重新编写并发程序工具类. 我们该学习Java并发嘛? 我们该如何学习Java并发? CPU这么多核了,我们如何更好的利用?