Netty通过心跳保持长链接

Netty自带心跳检测功能,IdleStateHandler,客户端在写空闲时主动发起心跳请求,服务器接受到心跳请求后给出一个心跳响应。当客户端在一定时间范围内不能够给出响应则断开链接。

Java代码  

  1. public class NettyClient {
  2. public void connect(String remoteServer, int port) throws Exception {
  3. EventLoopGroup workerGroup = new NioEventLoopGroup();
  4. try {
  5. Bootstrap b = new Bootstrap();
  6. b.group(workerGroup).channel(NioSocketChannel.class).remoteAddress(remoteServer, port)
  7. .handler(new ChildChannelHandler());
  8. ChannelFuture f = b.connect();
  9. System.out.println("Netty time Client connected at port " + port);
  10. f.channel().closeFuture().sync();
  11. } finally {
  12. try {
  13. TimeUnit.SECONDS.sleep(5);
  14. try {
  15. System.out.println("重新链接。。。");
  16. connect(remoteServer, port);
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }
  25. public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  26. @Override
  27. protected void initChannel(final SocketChannel ch) throws Exception {
  28. // -8表示lengthAdjustment,让解码器从0开始截取字节,并且包含消息头
  29. ch.pipeline().addLast(new RpcEncoder(NettyMessage.class)).addLast(new RpcDecoder(NettyMessage.class))
  30. .addLast(new IdleStateHandler(120, 10, 0, TimeUnit.SECONDS)).addLast(new HeartBeatReqHandler());
  31. }
  32. }
  33. public static void main(String[] args) {
  34. try {
  35. new NettyClient().connect("127.0.0.1", 12000);
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }

Java代码  

  1. public class SerializationUtil {
  2. private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
  3. private static Objenesis                objenesis    = new ObjenesisStd(true);
  4. private static <T> Schema<T> getSchema(Class<T> clazz) {
  5. @SuppressWarnings("unchecked")
  6. Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
  7. if (schema == null) {
  8. schema = RuntimeSchema.getSchema(clazz);
  9. if (schema != null) {
  10. cachedSchema.put(clazz, schema);
  11. }
  12. }
  13. return schema;
  14. }
  15. /**
  16. * 序列化
  17. *
  18. * @param obj
  19. * @return
  20. */
  21. public static <T> byte[] serializer(T obj) {
  22. @SuppressWarnings("unchecked")
  23. Class<T> clazz = (Class<T>) obj.getClass();
  24. LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
  25. try {
  26. Schema<T> schema = getSchema(clazz);
  27. byte result[] = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
  28. return result;
  29. } catch (Exception e) {
  30. throw new IllegalStateException(e.getMessage(), e);
  31. } finally {
  32. buffer.clear();
  33. }
  34. }
  35. /**
  36. * 反序列化
  37. *
  38. * @param data
  39. * @param clazz
  40. * @return
  41. */
  42. public static <T> T deserializer(byte[] data, Class<T> clazz) {
  43. try {
  44. T obj = objenesis.newInstance(clazz);
  45. Schema<T> schema = getSchema(clazz);
  46. ProtostuffIOUtil.mergeFrom(data, obj, schema);
  47. return obj;
  48. } catch (Exception e) {
  49. throw new IllegalStateException(e.getMessage(), e);
  50. }
  51. }
  52. }

Java代码  

  1. @SuppressWarnings("rawtypes")
  2. public class RpcEncoder extends MessageToByteEncoder {
  3. private Class<?> genericClass;
  4. public RpcEncoder(Class<?> genericClass) {
  5. this.genericClass = genericClass;
  6. }
  7. @Override
  8. public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
  9. if (genericClass.isInstance(in)) {
  10. System.out.println("发送的请求是:"+in);
  11. byte[] data = SerializationUtil.serializer(in);
  12. out.writeInt(data.length);
  13. out.writeBytes(data);
  14. }
  15. }
  16. }

Java代码  

  1. public class RpcDecoder extends ByteToMessageDecoder {
  2. private Class<?> genericClass;
  3. public RpcDecoder(Class<?> genericClass) {
  4. this.genericClass = genericClass;
  5. }
  6. @Override
  7. public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
  8. throws Exception {
  9. if (in.readableBytes() < 4) {
  10. return;
  11. }
  12. in.markReaderIndex();
  13. int dataLength = in.readInt();
  14. if (dataLength < 0) {
  15. ctx.close();
  16. }
  17. if (in.readableBytes() < dataLength) {
  18. in.resetReaderIndex();
  19. }
  20. byte[] data = new byte[dataLength];
  21. in.readBytes(data);
  22. Object obj = SerializationUtil.deserializer(data, genericClass);
  23. System.out.println("接收到的消息是:"+obj);
  24. out.add(obj);
  25. }
  26. }

Java代码  

  1. public class HeartBeatReqHandler extends ChannelDuplexHandler {
  2. /**
  3. * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext,
  4. *      java.lang.Object)
  5. */
  6. @Override
  7. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  8. if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
  9. IdleStateEvent event = (IdleStateEvent) evt;
  10. if (event.state() == IdleState.READER_IDLE) {
  11. System.out.println("read 空闲");
  12. ctx.disconnect();
  13. } else if (event.state() == IdleState.WRITER_IDLE) {
  14. System.out.println("write 空闲");
  15. ctx.writeAndFlush(buildHeartBeat(MessageType.HEARTBEAT_REQ.getType()));
  16. }
  17. }
  18. }
  19. /**
  20. *
  21. * @return
  22. * @author zhangwei<[email protected]>
  23. */
  24. private NettyMessage buildHeartBeat(byte type) {
  25. NettyMessage msg = new NettyMessage();
  26. Header header = new Header();
  27. header.setType(type);
  28. msg.setHeader(header);
  29. return msg;
  30. }
  31. }

Java代码  

  1. public class NettyServer {
  2. public void bind(int port) throws Exception {
  3. EventLoopGroup bossGroup = new NioEventLoopGroup();
  4. EventLoopGroup workerGroup = new NioEventLoopGroup();
  5. try {
  6. ServerBootstrap b = new ServerBootstrap();
  7. b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
  8. .childHandler(new ChildChannelHandler());
  9. ChannelFuture f = b.bind(port).sync();
  10. System.out.println("Netty time Server started at port " + port);
  11. f.channel().closeFuture().sync();
  12. } finally {
  13. bossGroup.shutdownGracefully();
  14. workerGroup.shutdownGracefully();
  15. }
  16. }
  17. public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
  18. @Override
  19. protected void initChannel(final SocketChannel ch) throws Exception {
  20. ch.pipeline().addLast(new RpcDecoder(NettyMessage.class)).addLast(new RpcEncoder(NettyMessage.class))
  21. .addLast(new IdleStateHandler(120, 0, 0, TimeUnit.SECONDS)).addLast(new HeartBeatRespHandler());
  22. }
  23. }
  24. public static void main(String[] args) {
  25. try {
  26. new NettyServer().bind(12000);
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }

Java代码  

  1. public enum MessageType {
  2. LOGIN_REQ((byte) 1), LOGIN_RESP((byte) 2), HEARTBEAT_REQ((byte) 3), HEARTBEAT_RESP((byte) 4);
  3. private byte type;
  4. /**
  5. * @param type
  6. */
  7. private MessageType(byte type) {
  8. this.type = type;
  9. }
  10. public byte getType() {
  11. return type;
  12. }
  13. public void setType(byte type) {
  14. this.type = type;
  15. }
  16. public static MessageType getMessageType(byte type) {
  17. for (MessageType b : MessageType.values()) {
  18. if (b.getType() == type) {
  19. return b;
  20. }
  21. }
  22. return null;
  23. }
  24. }

Java代码  

  1. public class HeartBeatRespHandler extends SimpleChannelInboundHandler<NettyMessage> {
  2. /**
  3. * @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext,
  4. *      java.lang.Object)
  5. */
  6. @Override
  7. protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
  8. if (msg.getHeader() != null && msg.getHeader().getType() == MessageType.HEARTBEAT_REQ.getType()) {
  9. NettyMessage heartBeat = buildHeartBeat(MessageType.HEARTBEAT_RESP.getType());
  10. ctx.writeAndFlush(heartBeat);
  11. } else {
  12. ctx.fireChannelRead(msg);
  13. }
  14. }
  15. /**
  16. * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext,
  17. *      java.lang.Object)
  18. */
  19. @Override
  20. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  21. if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
  22. IdleStateEvent event = (IdleStateEvent) evt;
  23. if (event.state() == IdleState.READER_IDLE) {
  24. System.out.println("read 空闲 关闭链接");
  25. ctx.disconnect();
  26. }
  27. }
  28. }
  29. /**
  30. *
  31. * @return
  32. * @author zhangwei<[email protected]>
  33. */
  34. private NettyMessage buildHeartBeat(byte type) {
  35. NettyMessage msg = new NettyMessage();
  36. Header header = new Header();
  37. header.setType(type);
  38. msg.setHeader(header);
  39. return msg;
  40. }
  41. }

Java代码  

  1. public class NettyMessage implements Serializable{
  2. /**  */
  3. private static final long serialVersionUID = 1L;
  4. private Header header;
  5. private Object body;
  6. public Header getHeader() {
  7. return header;
  8. }
  9. public void setHeader(Header header) {
  10. this.header = header;
  11. }
  12. public Object getBody() {
  13. return body;
  14. }
  15. public void setBody(Object body) {
  16. this.body = body;
  17. }
  18. /**
  19. * @see java.lang.Object#toString()
  20. */
  21. @Override
  22. public String toString() {
  23. return "NettyMessage [header=" + header + ", body=" + body + "]";
  24. }
  25. }

Java代码  

  1. public class Header implements Serializable{
  2. /**  */
  3. private static final long serialVersionUID = 1L;
  4. private int crcCode=0xabef0101;
  5. private int length;
  6. private long sessionId;
  7. private byte type;
  8. private byte priority;
  9. private Map<String,Object> attachment=new HashMap<>();
  10. public int getCrcCode() {
  11. return crcCode;
  12. }
  13. public void setCrcCode(int crcCode) {
  14. this.crcCode = crcCode;
  15. }
  16. public int getLength() {
  17. return length;
  18. }
  19. public void setLength(int length) {
  20. this.length = length;
  21. }
  22. public long getSessionId() {
  23. return sessionId;
  24. }
  25. public void setSessionId(long sessionId) {
  26. this.sessionId = sessionId;
  27. }
  28. public byte getType() {
  29. return type;
  30. }
  31. public void setType(byte type) {
  32. this.type = type;
  33. }
  34. public byte getPriority() {
  35. return priority;
  36. }
  37. public void setPriority(byte priority) {
  38. this.priority = priority;
  39. }
  40. public Map<String, Object> getAttachment() {
  41. return attachment;
  42. }
  43. public void setAttachment(Map<String, Object> attachment) {
  44. this.attachment = attachment;
  45. }
  46. /**
  47. * @see java.lang.Object#toString()
  48. */
  49. @Override
  50. public String toString() {
  51. return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionId=" + sessionId + ", type=" + type
  52. + ", priority=" + priority + ", attachment=" + attachment + "]";
  53. }
  54. }

客户端的结果是:

Java代码  

  1. etty time Client connected at port 12000
  2. write 空闲
  3. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
  4. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
  5. write 空闲
  6. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
  7. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
  8. write 空闲
  9. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
  10. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
  11. write 空闲
  12. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
  13. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
  14. write 空闲
  15. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]
  16. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]
时间: 2024-10-14 21:15:01

Netty通过心跳保持长链接的相关文章

Netty之心跳检测技术(四)

Netty之心跳检测技术(四) 一.简介 "心跳"听起来感觉很牛X的样子,其实只是一种检测端到端连接状态的技术.举个简单的"栗子",现有A.B两端已经互相连接,但是他们之间很长时间没有数据交互,那么A与B如何判断这个连接是否可用呢?我们通常的做法就是,让任何一方,例如我们让A端,定时的发送(例如每5秒钟)一句问候"Are you ok?",如果B都到来自A的问候,回了一句"GUN",A收到了来自B的信息,也不在乎B到底给我回了

纯Socket(BIO)长链接编程的常见的坑和填坑套路

本文章纯属个人经验总结,伪代码也是写文章的时候顺便白板编码的,可能有逻辑问题,请帮忙指正,谢谢. Internet(全球互联网)是无数台机器基于TCP/IP协议族相互通信产生的.TCP/IP协议族分了四层实现,链路层.网络层.传输层.应用层. 与我们应用开发者接触最多的应该是应用层了,例如web应用普遍使用HTTP协议,HTTP协议帮助我们开发者做了非常多的事情,通过HTTP足以完成大部分的通信工作了,但是有时候会有一些特殊的场景出现,使得HTTP协议并不能得心应手的完成工作,这个时候就需要我们

Netty 实现心跳机制.md

netty 心跳机制示例,使用netty4,IdleStateHandler 实现. 本文假设你已经了解了Netty的使用,或者至少写过netty的helloworld,知道了netty的基本使用.我们知道使用netty的时候,大多数的东西都与Handler有关,我们的业务逻辑基本都是在Handler中实现的.Netty中自带了一个IdleStateHandler 可以用来实现心跳检测. 心跳检测的逻辑 本文中我们将要实现的心跳检测逻辑是这样的:服务端启动后,等待客户端连接,客户端连接之后,向服

浅析 Netty 实现心跳机制与断线重连

基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题, 我们就需要引入 心跳 机制. 心跳机制

python+uwsgi导致redis无法长链接引起性能下降问题记录

今天在部署python代码到预生产环境时,web站老是出现redis链接未初始化,无法连接到服务的提示,比对了一下开发环境与测试环境代码,完全一致,然后就是查看各种日志,排查了半天也没有查明是什么原因引起的. 没有办法的情况下,直接登录服务器,从uwsgi与nginx中卸载掉这个web服务,然后暴力的在命令操作符下输入python main.py,执行访问发现又正常了......狂吐血400CC...然后是各种怀疑uwsgi和nginx,查看配置与其他服务正常,排除完后只能回归到检查代码. 静下

获取长链接的域名

示例:长链接:https://www.baidu.com?a=1&b=2 域名:www.baidu.com static String getDomainUrl(String url) { String domainUrl = ""; Pattern p = Pattern.compile("(?<=//|)((\\w)+\\.)+\\w+"); Matcher m = p.matcher(url); if (m.find()) { domainUrl

微信公众号开发-长链接转短链接

主要使用场景: 开发者用于生成二维码的原链接(商品.支付二维码等)太长导致扫码速度和成功率下降,将原长链接通过此接口转成短链接再生成二维码将大大提升扫码速度和成功率. http请求方式: POST https://api.weixin.qq.com/cgi-bin/shorturl?access_token=ACCESS_TOKEN 参数说明 参数 是否必须 说明 access_token 是 调用接口凭证 action 是 此处填long2short,代表长链接转短链接 long_url 是

php 微信接口API之长链接转短链接代码示例

[php] view plain copy <?php header("Content-Type: text/html; charset=utf-8"); $longurl; if(isset($_POST['longurl'])){ $longurl = $_POST['longurl']; }else{ die("没有post值进来"); } //echo $longurl; $id = "你的AppID"; $secret = &qu

将一条长链接转成短链接

package com.wanhua.weixin.util; import java.util.HashMap;import java.util.Map; import org.json.JSONObject; import play.cache.Cache;import play.i18n.Messages; import com.alibaba.fastjson.JSON;import com.wanhua.weixin.model.AccessToken; /** * 请求地址url的工