netty5心跳与阻塞性业务消息分发实例

  继续之前的例子(netty5心跳与业务消息分发实例),我们在NettyClientHandler把业务消息改为阻塞性的:

package com.wlf.netty.nettyclient.handler;

import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;

import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
 * 客户端处理类
 */
@Slf4j
public class NettyClientHandler extends ChannelHandlerAdapter {

    private static final String AUDIO_PATH = "D:\\input\\寒号鸟.wav";

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        NettyMessage nettyMessage = (NettyMessage) msg;

        // 接收控制数据响应消息成功,每5秒发送pcm数据
        if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
//            ctx.writeAndFlush(buildClientRequest());
//        }
            // 音频文件总时长,单位:秒
            int audioTotal = 122;
            try (RandomAccessFile raf = new RandomAccessFile(AUDIO_PATH, "r")) {

                // 读结束标志
                boolean readFinish = false;

                // 文件总字节数
                long audioLength = raf.length();

                // 每次发送字节数
                long eachLength = audioLength * 5 / audioTotal;

                // 音频数据
                byte[] audioData = null;
                byte[] bytes = new byte[1024];

                long cuccrentLength = 0L;

                // 读取音频文件
                while (true) {
                    // 休眠5秒
                    TimeUnit.SECONDS.sleep(5);

                    // 获取当前时间
                    long startTime = System.currentTimeMillis();
                    while (cuccrentLength <= eachLength) {

                        // 获取5秒内的音频字节流
                        int len = raf.read(bytes);
                        if (len == -1) {
                            readFinish = true;
                            break;
                        }

                        bytes = Arrays.copyOf(bytes, len);
                        audioData = ArrayUtils.addAll(audioData, bytes);
                        cuccrentLength += len;
                    }

                    // 发送5秒的数据包
                    NettyMessage nettyClientApi = buildNettyClientRequest(audioData, startTime);
                    log.info("[client] send client msg : {}", nettyClientApi);
                    ctx.writeAndFlush(nettyClientApi);

                    // 读完了
                    if (readFinish) {
                        log.info("The audio data send finish...");
                        break;
                    }

                    // 重置
                    cuccrentLength = 0L;
                }
            }
        }
    }

   /**
     * long转字节
     *
     * @param values
     * @return
     */
    private byte[] longToBytes(long values) {
        byte[] buffer = new byte[8];
        for (int i = 0; i < 8; i++) {
            int offset = 64 - (i + 1) * 8;
            buffer[i] = (byte) ((values >> offset) & 0xff);
        }
        return buffer;
    }

    /**
     * 将两个数组合并起来
     *
     * @param array1
     * @param array2
     * @return
     */
    private byte[] addAll(byte[] array1, byte... array2) {
        byte[] joinedArray = new byte[array1.length + array2.length];
        System.arraycopy(array1, 0, joinedArray, 0, array1.length);
        System.arraycopy(array2, 0, joinedArray, array1.length, array2.length);
        return joinedArray;
    }

    /**
     * 在处理过程中引发异常时被调用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("[Client] netty client request error: {}", cause.getMessage());
        ctx.close();
    }

    /**
     * 创建请求消息体
     *
     * @param audioData
     * @param time
     * @return
     */
    private NettyMessage buildNettyClientRequest(byte[] audioData, long time) {
        NettyMessage nettyMessage = new NettyMessage();
        Header header = new Header();
        byte[] data = buildPcmData(audioData, time);
        header.setDelimiter(0xABEF0101);
        header.setLength(data.length);
        header.setType((byte) 1);
        header.setReserved((byte) 0);
        nettyMessage.setHeader(header);

        // 设置数据包
        nettyMessage.setData(data);
        return nettyMessage;
    }

    /**
     * 构造PCM请求消息体
     *
     * @return
     */
    private byte[] buildPcmData(byte[] audioData, long time) {
        byte[] timeByte = longToBytes(time);

        return ArrayUtils.addAll(timeByte, audioData);
    }

}

  重启客户端,会发现输出变成这样:

23:35:34.339 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] control response is OK, header : Header{delimiter=-1410399999, length=8, type=0, reserved=0}. sid : 56, interval : 5000
23:35:48.216 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[[email protected]}
23:35:53.259 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=323592, type=1, reserved=0}, data=[[email protected]}
23:35:58.319 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=485384, type=1, reserved=0}, data=[[email protected]}
23:36:03.361 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=647176, type=1, reserved=0}, data=[[email protected]}
23:36:08.433 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=808968, type=1, reserved=0}, data=[[email protected]}
23:36:13.496 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=970760, type=1, reserved=0}, data=[[email protected]}
23:36:18.607 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1132552, type=1, reserved=0}, data=[[email protected]}
23:36:23.694 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1294344, type=1, reserved=0}, data=[[email protected]}
23:36:28.855 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1456136, type=1, reserved=0}, data=[[email protected]}
23:36:33.974 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1617928, type=1, reserved=0}, data=[[email protected]}
23:36:39.134 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1779720, type=1, reserved=0}, data=[[email protected]}
23:36:44.272 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1941512, type=1, reserved=0}, data=[[email protected]}
23:36:49.410 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2103304, type=1, reserved=0}, data=[[email protected]}
23:36:54.650 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2265096, type=1, reserved=0}, data=[[email protected]}
23:36:59.816 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2426888, type=1, reserved=0}, data=[[email protected]}
23:37:05.009 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2588680, type=1, reserved=0}, data=[[email protected]}
23:37:10.200 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2750472, type=1, reserved=0}, data=[[email protected]}
23:37:15.416 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2912264, type=1, reserved=0}, data=[[email protected]}
23:37:20.633 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=3074056, type=1, reserved=0}, data=[[email protected]}
23:37:25.886 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=3235848, type=1, reserved=0}, data=[[email protected]}

  心跳根本没进来,因为业务消息占用了事件循环的IO,还轮不到心跳消息的发送,除非当前的业务消息发送完了。反之亦然,如果是先发送心跳,那业务消息就别指望有机会发送了,因为心跳根本就停不下来。

原文地址:https://www.cnblogs.com/wuxun1997/p/11749928.html

时间: 2024-10-30 05:09:50

netty5心跳与阻塞性业务消息分发实例的相关文章

netty5心跳与业务消息分发实例

继续基于我们之前的demo(参见netty5自定义私有协议实例),这次我们加上连接校验和心跳机制: 只要校验通过,客户端发送心跳和业务消息是两个不同的事件发送的,彼此互不干扰.针对以上流程,我们需要增加4个handler:客户端请求handler.心跳handler ,服务端校验handler.心跳处理handler.当然,引导类也得添加上面对应的handler.上代码: 新增客户端首次连接handler: package com.wlf.netty.nettyclient.handler; i

.net 业务消息队列

开源QQ群: .net 开源基础服务  238543768 开源地址: http://git.oschina.net/chejiangyi/Dyd.BusinessMQ ## 业务消息队列 ##业务消息队列是应用于业务的解耦和分离,应具备分布式,高可靠性,高性能,高实时性,高稳定性,高扩展性等特性. ## 优点: ##- 大量的业务消息堆积能力- 无单点故障及故障监控,异常提醒- 生产者端负载均衡,故障转移,故障自动恢复,并行消息插入.- 消费者端负载均衡,故障保持,故障自动恢复,并行消息消费.

Android中的观察者模式:消息分发器(MessageDispatcher)

这个功能是在公司项目需求的时候写出来,本来是基础命令字模式的,但是个人喜欢对象,所有后来在一个小项目中使用时,改成了基于对象模式. 首先,是一个接口,我们称之为监听器: [html] view plaincopyprint? /** * * @author poet * */ public interface MessageObserver<T> { void onMessage(T t); } 这里使用的是泛型,泛型<T>除了作为实际监听的对象类型,也作为监听器管理的key,届时

muduo Dispatcher消息分发器 通过多态和模板进行向上类型转换

所谓消息分发(muduo 中,就是接收到buffer之后,额,或者说是 protobuf),在简单的程序设计里面的话,估计就是 type-switch 了,但是这样的话,肯定就不好扩展维护啦. 最后的方法就是,可以根据 type-name 自动去调用相应的方法. typedef boost::function<void (Message*)> ProtobufMessageCallback; 这个算是一个映射咯.muduo 中采用的是 map<Descriptor*,ProtobufMe

Cocos2d-x 3.0 屏幕触摸及消息分发机制

***************************************转载请注明出处:http://blog.csdn.net/lttree******************************************** 题外话: 唉. 开学了!    好烦. 这就已经大三了, 两年前的这时候,我还是懵懂的大一小学弟, 两年后.就要奔上社会就业了. 光阴似箭.日月如梭呀~ 正文: 好久没做cocos2d-x了,这次练习一下.屏幕触摸及消息分发机制. 这里,我用的是cocos2d-

delphi VCL研究之消息分发机制(转)

原文来源,http://blog.csdn.net/sushengmiyan/article/details/8635550 1.VCL 概貌 先看一下VCL类图的主要分支,如图4.1所示.在图中可以看到,TObject是VCL的祖先类,这也是Object Pascal语言所规定的.但实际上,TObject以及TObject声明所在的system.pas整个单元,包括在“编译器魔法”话题中提到的_ClassCreate等函数,都是编译器内置支持的.因此,无法修改.删除system.pas中的任何

RabbitMQ消息队列(六):使用主题进行消息分发[转]

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity(严重级别)的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity(严重级别)设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为

RabbitMQ消息分发轮询和Message Acknowledgment

一.消息分发 RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费. 多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理. 启动3个消费者 生产者依次生成3条消息 可见3条消息分别被3个消费者获取,所以RabbitMQ是采用轮询机制将消息队列Queue中的消息依次发给不同的消费者 二.消息确认(Message Ac

HTTP 请求消息头部实例:

HTTP 请求消息头部实例: Host:rss.sina.com.cn        //客户端指定自己想访问的WEB服务器的域名/IP 地址和端口号User-Agent:Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.8.1.14) Gecko/20080404 Firefox/2.0.0.14              //头域的内容包含发出请求的用户信息. Accept:text/xml,application/xml,applic