简易RPC框架-心跳与重连机制

*:first-child {
margin-top: 0 !important; }
body > *:last-child {
margin-bottom: 0 !important; }

a {
color: #4183C4; }
a.absent {
color: #cc0000; }
a.anchor {
display: block;
padding-left: 30px;
margin-left: -30px;
cursor: pointer;
position: absolute;
top: 0;
left: 0;
bottom: 0; }

h1, h2, h3, h4, h5, h6 {
margin: 20px 0 10px;
padding: 0;
font-weight: bold;
-webkit-font-smoothing: antialiased;
cursor: text;
position: relative; }

h1:hover a.anchor, h2:hover a.anchor, h3:hover a.anchor, h4:hover a.anchor, h5:hover a.anchor, h6:hover a.anchor {
background: url() no-repeat 10px center;
text-decoration: none; }

h1 tt, h1 code {
font-size: inherit; }

h2 tt, h2 code {
font-size: inherit; }

h3 tt, h3 code {
font-size: inherit; }

h4 tt, h4 code {
font-size: inherit; }

h5 tt, h5 code {
font-size: inherit; }

h6 tt, h6 code {
font-size: inherit; }

h1 {
font-size: 28px;
color: black; }

h2 {
font-size: 24px;
border-bottom: 1px solid #cccccc;
color: black; }

h3 {
font-size: 18px; }

h4 {
font-size: 16px; }

h5 {
font-size: 14px; }

h6 {
color: #777777;
font-size: 14px; }

p, blockquote, ul, ol, dl, li, table, pre {
margin: 15px 0; }

hr {
background: transparent url() repeat-x 0 0;
border: 0 none;
color: #cccccc;
height: 4px;
padding: 0;
}

body > h2:first-child {
margin-top: 0;
padding-top: 0; }
body > h1:first-child {
margin-top: 0;
padding-top: 0; }
body > h1:first-child + h2 {
margin-top: 0;
padding-top: 0; }
body > h3:first-child, body > h4:first-child, body > h5:first-child, body > h6:first-child {
margin-top: 0;
padding-top: 0; }

a:first-child h1, a:first-child h2, a:first-child h3, a:first-child h4, a:first-child h5, a:first-child h6 {
margin-top: 0;
padding-top: 0; }

h1 p, h2 p, h3 p, h4 p, h5 p, h6 p {
margin-top: 0; }

li p.first {
display: inline-block; }
li {
margin: 0; }
ul, ol {
padding-left: 30px; }

ul :first-child, ol :first-child {
margin-top: 0; }

dl {
padding: 0; }
dl dt {
font-size: 14px;
font-weight: bold;
font-style: italic;
padding: 0;
margin: 15px 0 5px; }
dl dt:first-child {
padding: 0; }
dl dt > :first-child {
margin-top: 0; }
dl dt > :last-child {
margin-bottom: 0; }
dl dd {
margin: 0 0 15px;
padding: 0 15px; }
dl dd > :first-child {
margin-top: 0; }
dl dd > :last-child {
margin-bottom: 0; }

blockquote {
border-left: 4px solid #dddddd;
padding: 0 15px;
color: #777777; }
blockquote > :first-child {
margin-top: 0; }
blockquote > :last-child {
margin-bottom: 0; }

img {
max-width: 100%; }

span.frame {
display: block;
overflow: hidden; }
span.frame > span {
border: 1px solid #dddddd;
display: block;
float: left;
overflow: hidden;
margin: 13px 0 0;
padding: 7px;
width: auto; }
span.frame span img {
display: block;
float: left; }
span.frame span span {
clear: both;
color: #333333;
display: block;
padding: 5px 0 0; }
span.align-center {
display: block;
overflow: hidden;
clear: both; }
span.align-center > span {
display: block;
overflow: hidden;
margin: 13px auto 0;
text-align: center; }
span.align-center span img {
margin: 0 auto;
text-align: center; }
span.align-right {
display: block;
overflow: hidden;
clear: both; }
span.align-right > span {
display: block;
overflow: hidden;
margin: 13px 0 0;
text-align: right; }
span.align-right span img {
margin: 0;
text-align: right; }
span.float-left {
display: block;
margin-right: 13px;
overflow: hidden;
float: left; }
span.float-left span {
margin: 13px 0 0; }
span.float-right {
display: block;
margin-left: 13px;
overflow: hidden;
float: right; }
span.float-right > span {
display: block;
overflow: hidden;
margin: 13px auto 0;
text-align: right; }

code, tt {
margin: 0 2px;
padding: 0 5px;
white-space: nowrap;
border: 1px solid #eaeaea;
background-color: #f8f8f8;
border-radius: 3px; }

pre code {
margin: 0;
padding: 0;
white-space: pre;
border: none;
background: transparent; }

.highlight pre {
background-color: #f8f8f8;
border: 1px solid #cccccc;
font-size: 13px;
line-height: 19px;
overflow: auto;
padding: 6px 10px;
border-radius: 3px; }

pre {
background-color: #f8f8f8;
border: 1px solid #cccccc;
font-size: 13px;
line-height: 19px;
overflow: auto;
padding: 6px 10px;
border-radius: 3px; }
pre code, pre tt {
background-color: transparent;
border: none; }

sup {
font-size: 0.83em;
vertical-align: super;
line-height: 0;
}

kbd {
display: inline-block;
padding: 3px 5px;
font-size: 11px;
line-height: 10px;
color: #555;
vertical-align: middle;
background-color: #fcfcfc;
border: solid 1px #ccc;
border-bottom-color: #bbb;
border-radius: 3px;
box-shadow: inset 0 -1px 0 #bbb
}

* {
-webkit-print-color-adjust: exact;
}
@media screen and (min-width: 914px) {

}
@media print {
table, pre {
page-break-inside: avoid;
}
pre {
word-wrap: break-word;
}
}
-->
code[class*="language-"],
pre[class*="language-"] {
background: #f5f2f0;
}

/* Inline code */
:not(pre) > code[class*="language-"] {
padding: .1em;
border-radius: .3em;
white-space: normal;
}

.token.comment,
.token.prolog,
.token.doctype,
.token.cdata {
color: slategray;
}

.token.punctuation {
color: #999;
}

.namespace {
opacity: .7;
}

.token.property,
.token.tag,
.token.boolean,
.token.number,
.token.constant,
.token.symbol,
.token.deleted {
color: #905;
}

.token.selector,
.token.attr-name,
.token.string,
.token.char,
.token.builtin,
.token.inserted {
color: #690;
}

.token.operator,
.token.entity,
.token.url,
.language-css .token.string,
.style .token.string {
color: #a67f59;
background: hsla(0, 0%, 100%, .5);
}

.token.atrule,
.token.attr-value,
.token.keyword {
color: #07a;
}

.token.function {
color: #DD4A68;
}

.token.regex,
.token.important,
.token.variable {
color: #e90;
}

.token.important,
.token.bold {
font-weight: bold;
}
.token.italic {
font-style: italic;
}

.token.entity {
cursor: help;
}
-->

心跳

就是告诉其它人自己还活着。在简易RPC框架中,采用的是TCP长连接,为了确保长连接有效,就需要客户端与服务端之间有一种通知机制告知对方的存活状态。

如何实现

客户端发送心跳消息

在状态空闲的时候定时给服务端发送消息类型为PING消息。

服务端接收心跳消息

捕获通道空闲状态事件,如果接收客户端PING消息,则发送PONG消息给服务端。如果在一定时间内没有收到客户端的PING消息,则说明客户端已经不在线,此时关闭通道。

客户端管理可用连接

由于服务端会因为长时间接收不到服务端的PING消息而关闭通道,这就导致缓存在客户端的连接的可用性发生变化。需要将不可用的从可用列表中转移出去,并对不可用连接进行处理,比如直接丢弃或者是重新连接。

预备知识

ChannelPipeline与handle的关系。netty中的这些handle和spring mvc中的filter作用是类似的,ChannelPipeline可以理解成handle的容器,里面可以被注册众多处理不同业务功能的事件处理器,比如:

  • 编码
  • 解码
  • 心跳
  • 权限
  • 加密
  • 解密
  • 业务代码执行
  • ......

具体实现

空闲状态处理器

可以利用netty提供的IdleStateHandler来发送PING-PONG消息。这个处理器主要是捕获通道超时事件,主要有三类

  • 读超时,一定时间内没有从通道内读取到任何数据
  • 写超时,一定时间内没有从通道内写入任何数据
  • 读写超时,一定时间内没有从通道内读取或者是写入任何数据

客户端加入空闲状态处理器

客户端捕获读写超时,如果事件触发就给服务端发送PING消息。

服务端加入空闲状态处理器

服务端只需要捕获读超时即可,当读超时触发后就关闭通道。

为什么在空闲状态才发送心跳消息

在正常客户端与服务端有交互的情况下,说明双方都在正常工作不需要额外的心跳来告知对方的存活。只有双方在一定时间内没有接收到对方的消息时才开始采用心跳消息来探测对方的存活,这也是一种提升效率的做法。

抽象心跳处理器

创建AbstractHeartbeatHandler,并继承ChannelInboundHandlerAdapter,服务于客户端与服务端的心跳处理器。在读取方法中判断消息类型:

  • 如果是PING消息就发送PONG消息给客户端
  • 如果收到的是PONG消息,则直接打印消息说明客户端已经成功接收到服务端返回的PONG消息
  • 如果是其它类型的消息,则通知下一个处理器处理消息

public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {

        if(!(msg instanceof RpcMessage)){
            channelHandlerContext.fireChannelRead(msg);
            return;
        }
        RpcMessage message=(RpcMessage)msg;

        if(null==message||null==message.getMessageHeader()){
            channelHandlerContext.fireChannelRead(msg);
            return;
        }
        if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){
            logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody());
        }
        else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){
            this.sendPong(channelHandlerContext);
        }
        else {
            channelHandlerContext.fireChannelRead(msg);
        }

    }

空闲状态事件,可以根据不同的状态做不同的行为处理,定义三个可重写事件供客户端与服务端处理器具体确认处理事件。


public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    this.handleReaderIdle(ctx);
                    break;
                case WRITER_IDLE:
                    this.handleWriterIdle(ctx);
                    break;
                case ALL_IDLE:
                    this.handleAllIdle(ctx);
                    break;
                default:
                    break;
            }
        }
    }

客户端心跳处理器

继承抽象心跳处理器,并重写事件发送PING消息。


public class ClientHeartbeatHandler extends AbstractHeartbeatHandler {

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        this.sendPing(ctx);
    }
}

服务端心跳处理器

继承抽象心跳处理器,并重写事件关闭通道。


public class ServerHeartbeatHandler extends AbstractHeartbeatHandler {

    @Override
    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel");
        ctx.close();
    }

}

客户端ChannelPipeline中加入心跳处理器

比如5秒内未写入或者读取通道数据就触发超时事件。


.addLast(new IdleStateHandler(0, 0, Constants.ALLIDLE_TIME_SECONDS));

服务端ChannelPipeline中加入心跳处理器

比如10秒未接收到通道消息就触发读超时事件。


 .addLast(new IdleStateHandler(Constants.READER_TIME_SECONDS, 0, 0))

客户端消息示例

正常情况下心跳消息显示如下图所示,消息的内容可以根据自己的情况自行定义。

客户端下线消息示例

停止客户端程序,然后服务端读超时事件触发,并关闭通道。

客户端可用连接管理

由于上述的服务端心跳处理器,在触发读超时后会关闭通信管道,这导致客户端缓存的连接状态会出现不可用的情况,为了让客户端一直只能取到可用连接就需要对从缓存中获取到的连接做状态判断,如果可用直接返回,如果不可用则将连接从可用列表中删除然后取下一个可用连接。

修改获取连接方法

通过channel的isActive属性可以判断连接是否可用,如果不可以做删除并重新获取的操作。


public RpcClientInvoker getInvoker() {
        // ...
        int index = loadbalanceService.index(size);
        RpcClientInvoker invoker= RpcClientInvokerCache.get(index);
        if(invoker.getChannel().isActive()) {
            return invoker;
        }
        else {
            RpcClientInvokerCache.removeHandler(invoker);
            logger.info("invoker is not active,so remove it and get next one");
            return this.getInvoker();
        }
    }

后台启动任务处理不可用连接

启动一个每隔5秒执行一次任务的线程,定时取出不可用连接,然后重连,并将不可用连接删除。

这里我处理的重连是直接丢弃原有不可用连接,然后重新创建新连接。


    private static final Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.class);

    static {
        executorService.schedule(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    List<RpcClientInvoker> notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers();
                    if (!CollectionUtils.isEmpty(notConnectedHandlers)) {
                        for (RpcClientInvoker invoker : notConnectedHandlers) {
                            RpcClientInvokerManager.getInstance(referenceConfig).connect();
                        }
                        RpcClientInvokerCache.clearNotConnectedHandler();
                    }
                }
            }
        }, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS);

    }

本文源码

https://github.com/jiangmin168168/jim-framework

文中代码是依赖上述项目的,如果有不明白的可下载源码

引用

本文中的图取自于网格

时间: 2024-10-18 07:52:53

简易RPC框架-心跳与重连机制的相关文章

自行实现一个简易RPC框架

10分钟写一个RPC框架 1.RpcFramework package com.alibaba.study.rpc.framework; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; imp

一起学Netty(十四)之 Netty生产级的心跳和重连机制

sigh,写这篇博客的时候老脸还是红了一下,心里还是有些唏嘘的,应该算是剽窃吧,每个人的代码功力的确是有差距的,好在文章的标题是"一起学",而不是开涛大神的"跟我学"系列的文章,我们还是多花点时间学习吧,感叹无用~ 最近工作比较忙,但闲暇之余还是看了阿里的冯家春(fengjiachun)的github上的开源代码Jupiter,写的RPC框架让我感叹人外有人,废话不多说,下面的代码全部截取自Jupiter,写了一个比较完整的例子,供大家一起学习分享,再次对@Luca

简易RPC框架-私有协议栈

*:first-child { margin-top: 0 !important; } body > *:last-child { margin-bottom: 0 !important; } a { color: #4183C4; } a.absent { color: #cc0000; } a.anchor { display: block; padding-left: 30px; margin-left: -30px; cursor: pointer; position: absolute

基于netty实现的长连接,心跳机制及重连机制

技术:maven3.0.5 + netty4.1.33 + jdk1.8 概述 Netty是由JBOSS提供的一个java开源框架.Netty提供异步的.事件驱动的网络应用程序框架和工具,用以快速开发高性能.高可靠性的网络服务器和客户端程序. 也就是说,Netty 是一个基于NIO的客户.服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户.服务端应用.Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务

一个简单RPC框架是如何炼成的(VI)——引入服务注册机制

开局篇我们说了,RPC框架的四个核心内容 RPC数据的传输. RPC消息 协议 RPC服务注册 RPC消息处理 接下来处理RPC服务的注册机制.所谓注册机制,就是Server需要声明支持哪些rpc方法,然后当客户端发送调用某个声明的rpc方法之后,服务端能自动找到执行该请求的具体方法.以实际的例子为例,这是现在server端处理RPC请求的代码 def procRequest(self): # 循环读取并处理收到的客户端请求 while True: req = self.conn.recv()

3 weekend110的hadoop中的RPC框架实现机制 + hadoop中的RPC应用实例demo

hadoop中的RPC框架实现机制 RPC是Remotr Process Call, 进程间的远程过程调用,不是在一个jvm里. 即,Controller拿不到Service的实例对象. hadoop中的RPC应用实例demo 在windows是调用端,在linux里是服务端. 在这里,需要LoginServiceinterface.java 停止 出错误了,很明显. 这是个很好的思考题?

RPC框架简易实现

RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. RPC采用客户机/服务器模式.请求程序就是一个客户机,而服务提供程序就是一个服务器.首先,客户机调用进程发送一个有进程参数的调用信息到服务进

RPC框架研究(一)Java回调机制

报名了阿里中间件性能大赛,我来说是一个全新的挑战,一切从空白学起,比赛的过程也是学习的过程 是的,想让自己学好,给自己报一个比赛吧~ 就像当初学围棋,也是报了围棋比赛,为了不至于输的太惨,一个星期里学了好多东西 第一天 Java回调机制 晴 首先还是来看看赛事介绍 比赛总共分为两道题:"RPC"与"MOM",两题都需完成.我们会对"RPC"成绩(qps)进行排名,排名前100位的队伍有资格进入MOM的比赛环节,若未进入前100名直接淘汰,最终名次

一个简单RPC框架是怎样炼成的(VI)——引入服务注冊机制

开局篇我们说了.RPC框架的四个核心内容 RPC数据的传输. RPC消息 协议 RPC服务注冊 RPC消息处理 接下来处理RPC服务的注冊机制.所谓注冊机制,就是Server须要声明支持哪些rpc方法.然后当client发送调用某个声明的rpc方法之后,服务端能自己主动找到运行该请求的详细方法.以实际的样例为例.这是如今server端处理RPC请求的代码 def procRequest(self): # 循环读取并处理收到的client请求 while True: req = self.conn