netty5心跳与业务消息分发实例

  继续基于我们之前的demo(参见netty5自定义私有协议实例),这次我们加上连接校验和心跳机制:

  

  只要校验通过,客户端发送心跳和业务消息是两个不同的事件发送的,彼此互不干扰。针对以上流程,我们需要增加4个handler:客户端请求handler、心跳handler ,服务端校验handler、心跳处理handler。当然,引导类也得添加上面对应的handler。上代码:

  新增客户端首次连接handler:

package com.wlf.netty.nettyclient.handler;

import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ControlClientHandler extends ChannelHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(buildControlReq());
    }

    /**
     * 在处理过程中引发异常时被调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("[Client] conrol request error: {}", cause.getMessage());
        ctx.close();
    }

    /**
     * 构造请求消息体
     *
     * @return
     */
    private NettyMessage buildControlReq() {
        NettyMessage nettyMessage = new NettyMessage();
        Header header = new Header();
        byte[] data = buildControlData();
        header.setDelimiter(0xABEF0101);
        header.setLength(data.length);
        header.setType((byte) 0);
        header.setReserved((byte) 0);
        nettyMessage.setHeader(header);

        // 设置数据包
        nettyMessage.setData(data);
        return nettyMessage;
    }

    /**
     * 构造控制请求消息体
     *
     * @return
     */
    private byte[] buildControlData() {
        byte[] result = new byte[2];

        result[0] = (byte) 1;

        result[1] = (byte) 16;
        return result;
    }
}

  服务端校验handler:

package com.wlf.netty.nettyserver.handler;

import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class ControlServerHandler extends ChannelHandlerAdapter {

    // 白名单列表
    private String[] whiteList = new String[]{"127.0.0.1"};

    private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<>();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage nettyMessage = (NettyMessage) msg;

        // 如果是控制数据格式请求消息,说明是客户端首次请求,校验白名单,否则进入下一个处理流程
        if (nettyMessage.getHeader() != null &&
                nettyMessage.getHeader().getType() == (byte) 0) {
            String nodeIndex = ctx.channel().remoteAddress().toString();
            NettyMessage controlResponse = null;

            if (nodeCheck.containsKey(nodeIndex)) {
                log.warn("request ip : {} has login.", nodeIndex);
                controlResponse = buildResponse(false);
            } else {
                InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
                String ip = address.getAddress().getHostAddress();
                boolean isOK = false;

                for (String whiteIp : whiteList) {
                    if (ip.equals(whiteIp)) {
                        isOK = true;
                        break;
                    }
                }

                if (isOK) {
                    nodeCheck.put(nodeIndex, true);
                    // 白名单校验通过,校验是否支持PCM格式
                    byte[] data = nettyMessage.getData();
                    ByteBuf buf = Unpooled.buffer(2);
                    buf.writeBytes(data);
                    byte sample = buf.readByte();

                    if (sample != (byte) 0) {
                        log.error("sample : {} is not 0", sample);
                        controlResponse = buildResponse(false);
                    } else {
                        controlResponse = buildResponse(true);
                    }
                } else {
                    log.error("ip : {} is not in whiteList : {}.", ip, whiteList);
                    controlResponse = buildResponse(false);
                }

            }
            log.info("[server] The control response is : {}, data : {}", controlResponse, controlResponse.getData());
            ctx.writeAndFlush(controlResponse);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    /**
     * 在处理过程中引发异常时被调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("[server] control response error: {}", cause.getMessage());
        // 删除缓存
        nodeCheck.remove(ctx.channel().remoteAddress().toString());
        ctx.close();
    }

    /**
     * 构造响应消息体
     *
     * @param isOk
     * @return
     */
    private NettyMessage buildResponse(boolean isOk) {
        NettyMessage nettyMessage = new NettyMessage();
        Header header = new Header();
        byte[] data = buildData(isOk);
        header.setDelimiter(0xABEF0101);
        header.setLength(data.length);
        header.setType((byte) 0);
        header.setReserved((byte) 0);
        nettyMessage.setHeader(header);
        nettyMessage.setData(data);
        return nettyMessage;
    }

    /**
     * 构建控制数据格式响应消息体
     *
     * @param isOk
     * @return
     */
    private byte[] buildData(boolean isOk) {
        ByteBuf result = null;
        if (isOk) {
            result = Unpooled.buffer(8);
            // 生成sid
            result.writeInt(buildSid());

            // 心跳发送间隔,5000毫秒秒
            result.writeInt(5000);
        } else {
            result = Unpooled.buffer(1);
            result.writeByte((byte) -1);
        }

        return result.array();
    }

    private int buildSid() {
        int max = 100, min = 1;
        long randomNum = System.currentTimeMillis();
        return (int) (randomNum % (max - min) + min);
    }
}

  心跳客户端handler:

package com.wlf.netty.nettyclient.handler;

import com.wlf.netty.nettyapi.constant.Delimiter;
import com.wlf.netty.nettyapi.constant.MessageType;
import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j
public class HeartBeatClientHandler extends ChannelHandlerAdapter {
    private volatile int interval = 5000;
    private volatile ScheduledFuture<?> heartBeat;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage nettyMessage = (NettyMessage) msg;

        // 接收控制数据响应消息成功,发送心跳给服务端
        if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
            byte[] data = nettyMessage.getData();
            ByteBuf buf = Unpooled.buffer(8);
            buf.writeBytes(data);
            int sid = buf.readInt();
            interval = buf.readInt();
            log.info("[client] control response is OK, header : {}. sid : {}, interval : {}", nettyMessage.getHeader(), sid, interval);

            // 每interval(默认5000)豪秒发送一次心跳请求到服务端
            heartBeat = ctx.executor().scheduleAtFixedRate(new Runnable() {
                                                               @Override
                                                               public void run() {
                                                                   NettyMessage heartBeat = buildHeartBeat(sid);
                                                                   log.info("[client] Client send heart beat message to server : ----> {}", heartBeat);
                                                                   ctx.writeAndFlush(heartBeat);
                                                               }
                                                           },
                    0, interval, TimeUnit.MILLISECONDS);

            // 消息继续向后传
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("[Client] heart request error: {}", cause.getMessage());
        if (heartBeat != null) {
            heartBeat.cancel(true);
            heartBeat = null;
        }
        ctx.close();
    }

    /**
     * 构造心跳请求消息体
     *
     * @return
     */
    private NettyMessage buildHeartBeat(int sid) {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        byte[] data = buildData(sid);
        header.setDelimiter(0xABEF0101);
        header.setLength(data.length);
        header.setType((byte) 3);
        header.setReserved((byte) 0);
        message.setHeader(header);

        // 设置数据包
        message.setData(data);
        return message;
    }

   /**
     * 构建心跳响应消息体
     *
     * @param sid
     * @return
     */
    private byte[] buildData(int sid) {
        ByteBuf result = Unpooled.buffer(4);
        result.writeInt(sid);
        return result.array();
    }

}

  服务端心跳handler:

package com.wlf.netty.nettyserver.handler;

import com.wlf.netty.nettyapi.constant.MessageType;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class HeartBeatServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage nettyMessage = (NettyMessage) msg;

        // 接收到心跳请求,打印心跳消息,否则进入下一处理流程
        if (nettyMessage.getHeader() != null &&
                nettyMessage.getHeader().getType() == (byte) 3) {
            log.info("[server] Receive client heart beat message : ----> {}", nettyMessage);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    /**
     * 在处理过程中引发异常时被调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("[server] heart response error: {}", cause.getMessage());

        ctx.close();
    }

}

  客户端发送业务消息NettyClientHandler修改,发送触发从channelAcitve事件改为channelRead事件:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage nettyMessage = (NettyMessage) msg;

        // 接收控制数据响应消息成功,每5秒发送pcm数据
        if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
            ctx.writeAndFlush(buildClientRequest());
        }

//    @Override
//    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//        ctx.writeAndFlush(buildClientRequest());
//    }

  客户端引导类NettyClient修改,新增handler:

    public void connect(int port, String host) throws Exception {
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new NettyMessageDecoder());
                            channel.pipeline().addLast(new NettyMessageEncoder());
                            channel.pipeline().addLast(new ControlClientHandler());
                            channel.pipeline().addLast(new HeartBeatClientHandler());
                            channel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().closeFuture().sync();
        } finally {
            workGroup.shutdownGracefully();
        }
    }

  服务端引导类修改:

    public void bind(int port) throws Exception {
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline().addLast(new NettyMessageDecoder());
                            channel.pipeline().addLast(new NettyMessageEncoder());
                            channel.pipeline().addLast(new ControlServerHandler());
                            channel.pipeline().addLast(new HeartBeatServerHandler());
                            channel.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            // 绑定端口
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

  我们试着跑起来看看:

  客户端输出:

22:55:33.725 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.ControlServerHandler - [server] The control response is : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=0, reserved=0}, data=[[email protected]}, data : [0, 0, 0, 18, 0, 0, 19, -120]
22:55:33.741 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:55:38.747 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:55:43.746 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:55:48.769 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:55:53.754 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:55:58.739 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:56:03.738 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:56:08.742 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}

Process finished with exit code -1

  服务端输出:

22:55:33.725 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] control response is OK, header : Header{delimiter=-1410399999, length=8, type=0, reserved=0}. sid : 18, interval : 5000
22:55:33.725 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:55:38.747 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:55:43.746 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:55:48.769 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:55:53.754 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:55:58.739 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:56:03.738 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
22:56:08.742 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}

  我们看到心跳机制是正常的(这里还没有实现心跳超时服务端断连),可是消息并未向后传播,没有进入NettyClientHandler,事件循环被心跳消息IO阻塞了,除非发送完所有心跳消息,否则永远不会进入下个handler。怎么办?我们不在channelRead事件中执行定时发送心跳,换个事件channelReadComplete发。我们设置一个标志,如果在channelRead中读取到服务端响应,说明可以发送心跳了,同时进入下个handler,发送的事情就交给channelReadComplete了:

@Slf4j
public class HeartBeatClientHandler extends ChannelHandlerAdapter {

    // 新增发送标志
    private volatile boolean startSendHeartNow = false;
    private volatile int interval = 5000;
    private volatile ScheduledFuture<?> heartBeat;

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 新增读取服务端消息后的处理
        if (ctx.channel().isRegistered() && ctx.channel().isActive() && startSendHeartNow) {
            heartBeat = ctx.executor().scheduleAtFixedRate(new Runnable() {
                                                               @Override
                                                               public void run() {
                                                                   NettyMessage heartBeat = buildHeartBeat(interval);
                                                                   log.info("[client] Client send heart beat message to server : ----> {}", heartBeat);
                                                                   ctx.writeAndFlush(heartBeat);
                                                               }
                                                           },
                    0, interval, TimeUnit.MILLISECONDS);
            startSendHeartNow = false;
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage nettyMessage = (NettyMessage) msg;

        // 接收控制数据响应消息成功,发送心跳给服务端
        if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
            byte[] data = nettyMessage.getData();
            ByteBuf buf = Unpooled.buffer(8);
            buf.writeBytes(data);
            int sid = buf.readInt();
            interval = buf.readInt();
            log.info("[client] control response is OK, header : {}. sid : {}, interval : {}", nettyMessage.getHeader(), sid, interval);

            // 开始发送心跳
            startSendHeartNow = true;

            // 每interval(默认5000)豪秒发送一次心跳请求到服务端
//            heartBeat = ctx.executor().scheduleAtFixedRate(new Runnable() {
//                                                               @Override
//                                                               public void run() {
//                                                                   NettyMessage heartBeat = buildHeartBeat(sid);
//                                                                   log.info("[client] Client send heart beat message to server : ----> {}", heartBeat);
//                                                                   ctx.writeAndFlush(heartBeat);
//                                                               }
//                                                           },
//                    0, interval, TimeUnit.MILLISECONDS);

            // 消息继续向后传
            ctx.fireChannelRead(msg);
        }
    }

}

  再跑一次:

  服务端输出:

23:51:36.516 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.ControlServerHandler - [server] The control response is : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=0, reserved=0}, data=[[email protected]}, data : [0, 0, 0, 76, 0, 0, 19, -120]
23:51:36.532 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - [server] server receive client message : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=1, reserved=0}, data=[[email protected]}
23:51:36.532 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - data length: 8
23:51:36.532 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - startTime: 1572105096532
23:51:36.532 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
23:51:41.551 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
23:51:46.552 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}
23:51:51.553 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[[email protected]}

  现在客户端同时发送了心跳和业务消息了。

原文地址:https://www.cnblogs.com/wuxun1997/p/11746337.html

时间: 2024-11-11 16:51:01

netty5心跳与业务消息分发实例的相关文章

netty5心跳与阻塞性业务消息分发实例

继续之前的例子(netty5心跳与业务消息分发实例),我们在NettyClientHandler把业务消息改为阻塞性的: package com.wlf.netty.nettyclient.handler; import com.wlf.netty.nettyapi.javabean.Header; import com.wlf.netty.nettyapi.javabean.NettyMessage; import io.netty.channel.ChannelHandlerAdapter;

.net 业务消息队列

开源QQ群: .net 开源基础服务  238543768 开源地址: http://git.oschina.net/chejiangyi/Dyd.BusinessMQ ## 业务消息队列 ##业务消息队列是应用于业务的解耦和分离,应具备分布式,高可靠性,高性能,高实时性,高稳定性,高扩展性等特性. ## 优点: ##- 大量的业务消息堆积能力- 无单点故障及故障监控,异常提醒- 生产者端负载均衡,故障转移,故障自动恢复,并行消息插入.- 消费者端负载均衡,故障保持,故障自动恢复,并行消息消费.

Android中的观察者模式:消息分发器(MessageDispatcher)

这个功能是在公司项目需求的时候写出来,本来是基础命令字模式的,但是个人喜欢对象,所有后来在一个小项目中使用时,改成了基于对象模式. 首先,是一个接口,我们称之为监听器: [html] view plaincopyprint? /** * * @author poet * */ public interface MessageObserver<T> { void onMessage(T t); } 这里使用的是泛型,泛型<T>除了作为实际监听的对象类型,也作为监听器管理的key,届时

delphi VCL研究之消息分发机制(转)

原文来源,http://blog.csdn.net/sushengmiyan/article/details/8635550 1.VCL 概貌 先看一下VCL类图的主要分支,如图4.1所示.在图中可以看到,TObject是VCL的祖先类,这也是Object Pascal语言所规定的.但实际上,TObject以及TObject声明所在的system.pas整个单元,包括在“编译器魔法”话题中提到的_ClassCreate等函数,都是编译器内置支持的.因此,无法修改.删除system.pas中的任何

muduo Dispatcher消息分发器 通过多态和模板进行向上类型转换

所谓消息分发(muduo 中,就是接收到buffer之后,额,或者说是 protobuf),在简单的程序设计里面的话,估计就是 type-switch 了,但是这样的话,肯定就不好扩展维护啦. 最后的方法就是,可以根据 type-name 自动去调用相应的方法. typedef boost::function<void (Message*)> ProtobufMessageCallback; 这个算是一个映射咯.muduo 中采用的是 map<Descriptor*,ProtobufMe

Cocos2d-x 3.0 屏幕触摸及消息分发机制

***************************************转载请注明出处:http://blog.csdn.net/lttree******************************************** 题外话: 唉. 开学了!    好烦. 这就已经大三了, 两年前的这时候,我还是懵懂的大一小学弟, 两年后.就要奔上社会就业了. 光阴似箭.日月如梭呀~ 正文: 好久没做cocos2d-x了,这次练习一下.屏幕触摸及消息分发机制. 这里,我用的是cocos2d-

HTTP 请求消息头部实例:

HTTP 请求消息头部实例: Host:rss.sina.com.cn        //客户端指定自己想访问的WEB服务器的域名/IP 地址和端口号User-Agent:Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.8.1.14) Gecko/20080404 Firefox/2.0.0.14              //头域的内容包含发出请求的用户信息. Accept:text/xml,application/xml,applic

RabbitMQ消息队列(六):使用主题进行消息分发[转]

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity(严重级别)的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity(严重级别)设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为

RabbitMQ消息分发轮询和Message Acknowledgment

一.消息分发 RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费. 多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理. 启动3个消费者 生产者依次生成3条消息 可见3条消息分别被3个消费者获取,所以RabbitMQ是采用轮询机制将消息队列Queue中的消息依次发给不同的消费者 二.消息确认(Message Ac