netty实现客户端服务端心跳重连

                        使用netty实现客户端服务端心跳重连

前言:

公司的加密机调度系统一直使用的是http请求调度的方式去调度,但是会出现网络故障导致某个客户端或者服务端断线的情况,导致很多请求信息以及回执信息丢失的情况,接着我们抛弃了http的方式,改为Tcp的方式去建立客户端和服务器之间的连接,并且要去实现断线重连的功能,经过讨论后决定使用java中成熟的nio框架
– netty去解决这一系列的问题。

1.      
netty简单介绍:

在百度中对netty的解释是:

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

Netty框架并不只是封装了多路复用的IO模型,也包括提供了传统的阻塞式/非阻塞式 同步IO的模型封,Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。其并发高、传输快、封装好的特性受到了许多大公司的青睐,在这里我们就不过多的分析netty的原理和特性了,之后我会写一篇文章专门写一下从io到nio,再到netty的整个过程。重点讲一下netty的魅力所在,今天我们已代码实现为主,讲解一下在springboot架构中,用netty实现服务端和客户端之间的通信以及断线重连等机制。

2.       服务端代码:

首先,引入netty的pom依赖

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha2</version>
</dependency>

然后我们在配置文件中写入服务端的ip和端口号,用于连接

在springboot的application启动类中写入服务端的启动start方法,用于在启动项目时自动启动服务端

 1 @SpringBootApplication
 2 public class Application implements CommandLineRunner {
 3
 4     @Value("${netty.server.port}")
 5     private  int port;
 6
 7     @Value("${netty.server.host}")
 8     private  String host;
 9
10     @Autowired
11     NettyServer server;
12
13     public static void main(String[] args) {
14         SpringApplication.run(Application.class, args);
15     }
16
17
18     @Override
19     public void run(String... strings) throws Exception {
20         this.startServer();
21
22     }
23
24     //启动service
25     public void startServer(){       //这个类实现一个IP套接字地址(IP地址+端口号)  
26         InetSocketAddress address = new InetSocketAddress(host, port);
27         ChannelFuture future = server.start(address);
28
29         Runtime.getRuntime().addShutdownHook(new Thread(){
30             @Override
31             public void run() {
32                 server.destroy();
33             }
34         });
35
36         future.channel().closeFuture().syncUninterruptibly();
37     }
38     }
39
40
41 }

ChannelFuture: 

  Future最早出现于JDK的java.util.concurrent.Future,它用于表示异步操作的结果.由于Netty的Future都是与异步I/O操作相关的,因此命名为ChannelFuture,代表它与Channel操作相关.由于Netty中的所有I / O操作都是异步的,因此Netty为了解决调用者如何获取异步操作结果的问题而专门设计了ChannelFuture接口. 
  因此,Channel与ChannelFuture可以说形影不离的.

然后我们要去重点看server.start()

public class NettyServer {
    private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class);
    private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workGroup = new NioEventLoopGroup();
    private Channel channel;

    /**
     * 开启及服务线程
     */
    public ChannelFuture start(InetSocketAddress address) {
        //服务端引导类
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workGroup)//通过ServerBootstrap的group方法,设置(1)中初始化的主从"线程池"
                .channel(NioServerSocketChannel.class)//指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel
                .childHandler(new NettyServerInitializer())//设置ServerSocketChannel的处理器
                .option(ChannelOption.SO_BACKLOG, 100)// 设置tcp协议的请求等待队列
                .childOption(ChannelOption.SO_KEEPALIVE, true);//配置子通道也就是SocketChannel的选项
        ChannelFuture future = bootstrap.bind(address).syncUninterruptibly();
        logger.info("准备接收——————");

        channel = future.channel();
        return future;
    }

    public void destroy() {
        if(channel != null) {
            channel.close();
        }

        channelGroup.close();
        workGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }

}

在这里的设置中,.childHandler(new NettyServerInitializer()) 用于设置了服务器管道 NioServerSocketChannel 的处理器handler,

这个handler是我们自定义封装的一些对channel的public class NettyServerInitializer extends ChannelInitializer<Channel>{

    
@Componentpublic class TcpMsgHandler extends ChannelInboundHandlerAdapter {
@Override
    protected void initChannel(Channel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();
        //处理日志
        //pipeline.addLast(new LoggingHandler(LogLevel.INFO));

        //处理心跳
        pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));     //消息编码
        pipeline.addLast(new MessageEncoder());        //粘包长度控制
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4));     //消息解码
        pipeline.addLast(new MessageDecoder());     //自定义hander
        pipeline.addLast(new TcpMsgHandler());
    }
}

ChannelPipeline :
Netty 的 Channel 过滤器实现原理与 Servlet Filter 机制一致,它将 Channel 的数据管道抽象为 ChannelPipeline,消息在 ChannelPipeline 中流动和传递。ChannelPipeline 持有 I/O 事件拦截器 ChannelHandler 的链表,由 ChannelHandler 来对 I/O 事件进行具体的拦截和处理,可以方便地通过新增和删除 ChannelHandler 来实现不同业务逻辑的定制,能够实现对修改封闭和对扩展到支持。

我们看到我们添加了idleStateHandler用来处理心跳,那么心跳究竟是什么呢,我们先来介绍一下心跳  

心跳机制

  • 心跳是在TCP长连接中,客户端和服务端定时向对方发送数据包通知对方自己还在线,保证连接的有效性的一种机制
  • 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性

在我们的服务端中,不会主动发心跳给客户端,只会对对应的心跳消息,进行回应,告诉那些给我发心跳的客户端说:我还活着!

  • 服务端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;
  • 设定IdleStateHandler心跳检测每五秒进行一次读检测,如果五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法
TcpMsgHandler.java


@Component
public class TcpMsgHandler extends ChannelInboundHandlerAdapter {

    private final static Logger logger = LoggerFactory.getLogger("");

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {		}
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                ctx.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
        TcpMsg msg = (TcpMsg) obj;
        try {
              //处理心跳
              ...
              ctx.writeAndFlush(msg);
            }
        }catch(Exception ex){
            logger.info(ex.getMessage());
        }
    }
}
在这里,我们的channelRead比较简单,只是将客户端发来的心跳直接发回去了,实现了响应客户端心跳请求的目的,除了心跳,我们还可以去定义不同的消息类别,比如说是加密请求,还是处理数据的请求,入库的请求等等,我们可以自己从channel中获取到客户端发过来的信息做处理,记得要即使响应,比如,心跳中,我们将msg又返回给了channel:
ctx.writeAndFlush(msg);

在handler中,decoder用于解码的作用,将客户端发来的ByteBuf流的形式,转为我们需要的格式,可以转为我们要的对象,或者是一个string字符串

MessageDecoder.java
public class MessageDecoder extends ByteToMessageDecoder {
    private Logger logger = LoggerFactory.getLogger("");

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int len = in.readableBytes();
            byte[] bytes = new byte[len];       //将ByteBuf转为byte数组
            in.readBytes(bytes);
            try {
                TcpMsg msg = TcpMsg.ByteToObj(bytes);
                out.add(msg);
            } catch (Exception ex) {
                logger.error("MessageDecoder",ex);
            }
    }

}
encoder负责在我们发送数据的时候,将我们的对象、或者是字符串转为byte数组,然后输出
public class MessageEncoder extends MessageToByteEncoder<TcpMsg>{
	private Logger logger = LoggerFactory.getLogger("");
	@Override
	protected void encode(ChannelHandlerContext ctx, TcpMsg msg, ByteBuf out) throws Exception {
		try{
			if (msg.getType() != 0){
				//logger.info("send: " + msg.getType() + ":" + msg.getGuid() + ":" + msg.getBody());
			}
			byte[] src = msg.ToBytes();
			out.writeBytes(src);

		}catch (Exception e){
			logger.error("MessageEncoder",e);
		}
	}
}

3.       客户端代码:

在application配置文件中加入服务端的主机名和端口号

netty.server.host = 127.0.0.1
netty.server.port = 9090

启动类Application

@SpringBootApplication
public class Application{
  @Autowired
	private NettyClient client;

	@Value("${netty.server.port}")
	private int port;

	@Value("${netty.server.host}")
	private String host;

	public static void main(String[] args) throws Exception {
		SpringApplication.run(NettyClientApplication.class, args);
	}

	@Bean
	public NettyClient nettyClient() {
		return new NettyClient();
	}

	@Override
	public void run(String... arg0) throws Exception {
		client.start(host, port);
	}

}

NettyClient.java: 客户端启动类

@Component
public class NettyClient {

    //日志输出
    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
    //主要连接地址
    private static String nettyHost = "";
    //备用连接地址
    private static String nettyHostRe = "";
    private static Integer nettyPort = 0;

    public boolean start(String host1,String host2,int port){
        //主要连接地址
        nettyHost = host1;
        //备用连接地址
        nettyHostRe = host2;
        nettyPort = port;
        //EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据
        EventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        //NioEventLoop
        //客户端引导类
        Bootstrap bootstrap = new Bootstrap();
        //多线程处理
        bootstrap.group(nioEventLoopGroup);
        //指定通道类型为NioServerSocketChannel,一种异步模式
        bootstrap.channel(NioSocketChannel.class);
        //指定请求地址
        bootstrap.remoteAddress(new InetSocketAddress(nettyHost,port));
        bootstrap.option(ChannelOption.TCP_NODELAY,true);
        final ConnectionWatchdog watchDog = new ConnectionWatchdog(bootstrap, new HashedWheelTimer(), nettyHost,nettyHostRe, port) {
            @Override
            public ChannelHandler[] handlers() {
                return new ChannelHandler[]{
                    new MessageEncoder(),
                    new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4),
                    new MessageDecoder(),
                    this,
                    // 每隔5s的时间触发一次userEventTriggered的方法,并且指定IdleState的状态位是WRITER_IDLE
                    new IdleStateHandler(0, 1, 0, TimeUnit.SECONDS),
                    // 实现userEventTriggered方法,并在state是WRITER_IDLE的时候发送一个心跳包到sever端,告诉server端我还活着
                    new ClientHeartBeatHandler(),
                };
            }
        };
        final ChannelFuture future;
        try {
            synchronized (bootstrap) {
                bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(watchDog.handlers());
                    }
                });
                future = bootstrap.connect().sync();// 链接服务器.调用sync()方法会同步阻塞
                //服务端连接ip:
                logger.info("目前服务端连接ip为" + nettyHost);
            }

            if (!future.isSuccess()) {
                logger.info("---- 连接服务器失败,2秒后重试 ---------port=" + port);
                future.channel().eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        start(nettyHost,nettyHostRe,nettyPort);
                    }

                }, 2L, TimeUnit.SECONDS);
            }

        } catch (Exception e) {
            logger.info("exception happends e {}", e);
            return false;
        }
        return true;
    }

}

  

ConnectionWatchdog.java  :重连检测狗,当发现当前的链路不稳定关闭之后,进行重连

@ChannelHandler.Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask,ChannelHandlerHolder{

    //日志输出
    private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class);
    //客户端引导类
    private Bootstrap bootstrap;
    private Timer timer;
    private final String host;
    //备用服务端ip
    private final String host2;
    //使用ip
    private String useHost;
    private final int port;

    private volatile boolean reconnect = true;
    private int attempts;
    //刷新时间
    private volatile long refreshTime = 0L;
    //心跳连接标识
    private volatile boolean heartBeatCheck = false;
    //通道
    private volatile Channel channel;
    //失败次数
    private static int failCount;

    public ConnectionWatchdog(Bootstrap boot, Timer timer, String host,String host2, int port) {
        this.bootstrap = boot;
        this.timer = timer;
        this.host = host;
        this.host2 = host2;
        this.port = port;
    }

    public boolean isReconnect() {
        return reconnect;
    }

    public void setReconnect(boolean reconnect) {
        this.reconnect = reconnect;
    }
       //连接成功时调用的方法
    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        channel = ctx.channel();
        attempts = 0;
        reconnect =false;
        refreshTime = new Date().getTime();
        if (!heartBeatCheck) {
            heartBeatCheck = true;
            channel.eventLoop().scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    long time = new Date().getTime() - refreshTime;
                    logger.info(String.valueOf(time));
                    if (time > 5 * 1000L) {
                        channel.close();
                        logger.info("心跳检查失败");
                    } else {
                        logger.info("心跳检查Successs");
                    }
                }
            }, 5L, 5L, TimeUnit.SECONDS);
        }
        logger.info("Connects with {}.", channel);
        ctx.fireChannelActive();
    }

    /**
     * 因为链路断掉之后,会触发channelInActive方法,进行重连 2秒重连一次
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        reconnect = true;
        logger.warn("Disconnects with {}, doReconnect = {},attempts == {}", ctx.channel(), reconnect, attempts);
        if (reconnect) {
            /*if (attempts < 12) {
                attempts++;
            } else {
                reconnect = false;
            }*/
            long timeout = 2;
            logger.info("再过 {} 秒客户端将进行重连",timeout);
            timer.newTimeout(this, timeout, TimeUnit.SECONDS);
        }
    }

    /*
    * run启动方法
    * */
    public void run(Timeout timeout) throws Exception {
        //Future表示异步操作的结果
        final ChannelFuture future;
        if(failCount > 2){
            //使用备用ip
            if(host.equals(useHost)){
                useHost = host2;
            }else{
                useHost = host;
            }
        }else {
            if(StrUtil.IsNullOrEmpty(useHost)) {
                //首次重连
                useHost = host;
            }
        }
        synchronized (bootstrap) {

            future = bootstrap.connect(useHost, port);
        }
        //使用future监听结果,执行异步操作结束后的回调.
        future.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(final ChannelFuture f) throws Exception {
                boolean succeed = f.isSuccess();
                logger.warn("连接通过 {}, {}.", useHost + ":" + port, succeed ? "成功" : "失败");
                if (!succeed) {
                    logger.info("重连失败");
                    failCount ++;
                    f.channel().pipeline().fireChannelInactive();
                }else{
                    failCount = 0;
                    logger.info("重连成功");
                }
            }
        });

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof TcpMsg) {
            TcpMsg heartMsg = (TcpMsg) msg;
            if (heartMsg.getType()>=0) {
                refreshTime = new Date().getTime();
            }
            logger.warn("得到服务器响应,响应内容为"+ ((TcpMsg) msg).getBody());
        }

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        Channel channel = ctx.channel();
        logger.info("客户端:"+channel.remoteAddress()+"网络异常");
        cause.printStackTrace();
        if(channel.isActive())ctx.close();
    }

} 

这里我们定义了一个变量: refreshTime,当我们从channel中read到了服务端发来的心跳响应消息的话,就刷新refreshTime为当前时间

当连接成功时,会触发channelActive 方法,在这里我们开启了一个定时任务去判断refreshTime和当前时间的时间差,超过5秒说明断线了,要进行重连,我这里由于配置了两个服务器,所有在我的逻辑中,尝试连接2次以上连不上就去连另一个服务器去了

下面的handler用于发送心跳消息,实现userEventTriggered方法,并在state是WRITER_IDLE的时候发送一个心跳包到sever端,告诉server端我还活着

@Component
public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {

	private static final Logger logger = LoggerFactory.getLogger(ClientHeartBeatHandler.class);

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		clientname = ReadFileUtil.readFile("C:/CrawlProgram/wrapper_nettyClient/name.txt");
		if (evt instanceof IdleStateEvent) {
			IdleState state = ((IdleStateEvent) evt).state();
			if (state == IdleState.WRITER_IDLE) {
				//用于心跳的客户端类型为0
				int type = 0;
				//客户端机器名
				String body = clientname;
				TcpMsg msg = new TcpMsg(type,body);
				try {
					ctx.writeAndFlush(msg).sync();
					logger.info("发送消息成功,消息类型为:"+type+",请求id为" + msg.getGuid() + ",客户端机器号为:" + msg.getBody());
				} catch (Exception ex) {
					ex.printStackTrace();
					logger.info("发送失败");
				}
			}
		} else {
			super.userEventTriggered(ctx, evt);
		}
	}

}

  然后就是和服务端一样的decoder、encoder过程,不同的是,我们在decoder的时候使用了线程池去将任务放入队列中去,防止请求慢的时候丢失任务请求

MessageDecoder.java

public class MessageDecoder extends ByteToMessageDecoder {

    private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
    @Autowired
    private VisiableThreadPoolTaskExecutor visiableThreadPoolTaskExecutor;

    //线程池常量
    public static VisiableThreadPoolTaskExecutor executor;

    private TcpMsg tcpMsg;
    List<Object> out;

    // 用@PostConstruct方法引导绑定
    @PostConstruct
    public void init() {
        executor = visiableThreadPoolTaskExecutor;
        encryptService =  encrypt;
        orderService = order;
    }

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        this.context = ctx;
        this.out = out;
        int len = in.readableBytes();
        if (len > 0) {
            logger.info("得到返回数据,长度为" + len);
            byte[] bytes = new byte[len];
            in.readBytes(bytes);
            TcpMsg msg = TcpMsg.ByteToObj(bytes);
            this.tcpMsg = msg;
            logger.info("start asyncServiceExecutor");
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    executeTask();
                }
            });
            logger.info("end executeAsync");
        }

    }

}

  这里,我们使用了netty来实现了服务端、客户端通信、心跳检测的功能。体会到了netty的传输效率高、封装好的特性,用起来简单、实用。我们不仅可以做断线重连、还可以做很多业务请求,可以配置多台客户端去做不同的事情,来达到服务器调度的目的。

  归根结底,netty还是一个框架的东西,我们还是没有过多的去看透nio的本质、我们要做的不仅仅是会用netty,而且还要了解nio、了解netty的实现原理,它的底层是如何封装的,希望大家多去研究,我们一起去搞懂它

Netty 的 Channel 过滤器实现原理与 Servlet Filter 机制一致,它将 Channel 的数据管道抽象为 ChannelPipeline,消息在 ChannelPipeline 中流动和传递。ChannelPipeline 持有 I/O 事件拦截器 ChannelHandler 的链表,由 ChannelHandler 来对 I/O 事件进行具体的拦截和处理,可以方便地通过新增和删除 ChannelHandler 来实现不同业务逻辑的定制,能够实现对修改封闭和对扩展到支持。

原文地址:https://www.cnblogs.com/GodHeng/p/9480726.html

时间: 2024-09-30 10:16:33

netty实现客户端服务端心跳重连的相关文章

一起学Netty(十四)之 Netty生产级的心跳和重连机制

sigh,写这篇博客的时候老脸还是红了一下,心里还是有些唏嘘的,应该算是剽窃吧,每个人的代码功力的确是有差距的,好在文章的标题是"一起学",而不是开涛大神的"跟我学"系列的文章,我们还是多花点时间学习吧,感叹无用~ 最近工作比较忙,但闲暇之余还是看了阿里的冯家春(fengjiachun)的github上的开源代码Jupiter,写的RPC框架让我感叹人外有人,废话不多说,下面的代码全部截取自Jupiter,写了一个比较完整的例子,供大家一起学习分享,再次对@Luca

Netty实现服务端客户端长连接通讯及心跳检测

通过netty实现服务端与客户端的长连接通讯,及心跳检测.        基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key.每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可.心跳检测通过IdleEvent 事件,定时向服务端放送Ping消息,检测SocketChannel是否终断.         环境JDK1.8 和netty5      

Netty(二):简单的客户端服务端demo

使用Java IO实现客户端服务端 服务端监听端口 客户端连接服务端 客户端循环写数据到服务端 import java.io.IOException; import java.net.Socket; /** * @author ko */ public class Client { private static final String HOST = "127.0.0.1"; private static final int PORT = 8000; private static fi

初探和实现websocket心跳重连

心跳重连缘由 在使用websocket过程中,可能会出现网络断开的情况,比如信号不好,或者网络临时性关闭,这时候websocket的连接已经断开, 而浏览器不会执行websocket 的 onclose方法,我们无法知道是否断开连接,也就无法进行重连操作. 如果当前发送websocket数据到后端,一旦请求超时,onclose便会执行,这时候便可进行绑定好的重连操作. 因此websocket心跳重连就应运而生. 如何实现 在websocket实例化的时候,我们会绑定一些事件: var ws =

MVC验证10-到底用哪种方式实现客户端服务端双重异步验证

原文:MVC验证10-到底用哪种方式实现客户端服务端双重异步验证 本篇将通过一个案例来体验使用MVC的Ajax.BeginForm或jQuery来实现异步提交,并在客户端和服务端双双获得验证.希望能梳理.归纳出一个MVC异步验证的通用解决思路.本篇主要涉及: 1.通过Ajax.BeginForm()方式,返回部分视图显示验证信息.2.通过jQuery+Html.BeginForm()方式,返回部分视图显示验证信息.3.通过jquery,返回json字符串,json字符串中包含部分视图及验证信息.

原理剖析-Netty之服务端启动工作原理分析(下)

一.大致介绍 1.由于篇幅过长难以发布,所以本章节接着上一节来的,上一章节为[原理剖析(第 010 篇)Netty之服务端启动工作原理分析(上)]: 2.那么本章节就继续分析Netty的服务端启动,分析Netty的源码版本为:netty-netty-4.1.22.Final: 二.三.四章节请看上一章节 四.源码分析Netty服务端启动 上一章节,我们主要分析了一下线程管理组对象是如何被实例化的,并且还了解到了每个线程管理组都有一个子线程数组来处理任务: 那么接下来我们就直接从4.6开始分析了:

Netty的服务端的建立

用Netty建立服务端, 首先我们应该新建立一个新的类, 作为服务端, 在其中写一个run方法, 作为启动: 1 public void run(){ 2 // 处理 Nio的Accept 3 EventLoopGroup boss = new NioEventLoopGroup(); 4 // 处理 Nio的Read和Write事件 5 EventLoopGroup worker = new NioEventLoopGroup(); 6 try { 7 // Netty中服务端启动类 8 Se

通过netty实现服务端与客户端的长连接通讯,及心跳检测。

基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key.每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可.心跳检测通过IdleEvent 事件,定时向服务端放送Ping消息,检测SocketChannel是否终断. 环境JDK1.8 和netty5 以下是具体的代码实现和介绍: 1公共的Share部分(主要包含消息协议类型的定义) 设计消息类型:

hadoop运行原理之Job运行(四) JobTracker端心跳机制分析

接着上篇来说,TaskTracker端的transmitHeartBeat()方法通过RPC调用JobTracker端的heartbeat()方法来接收心跳并返回心跳应答.还是先看看这张图,对它的大概流程有个了解. 下面来一段一段的分析该方法. 1 public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 2 boolean restarted, 3 boolean initialContact, 4 bo