Mina Codec Filter对应协议实现编解码处理

原文地址:Mina Filter(Apache Mina user guide Chapter 9 Codec Filter)

本教程试图解释为什么以及如何使用ProtocolCodecFilter.

1.1.1. 为什么使用ProtocolCodecFilter

TCP 保证交付的所有数据包以正确的顺序。但是不能保证服务端发送方的写操作会导致一个接收端读取事件。参见:http://en.wikipedia.org/wiki/IPv4#Fragmentation_and_reassemblyhttp://en.wikipedia.org/wiki/Nagle%27s_algorithm中的Mina术语:

没有ProtocolCodecFilter调用IoSession.write(Object message) 的发送方可以导致多个messageReceived(IoSessionsession,Object
message)事件接收者;并且多个调用IoSession.write(Object message)会导致单个messageReceived事件。当客户机和服务器运行在同一主机(或一个本地网络),你可能不会遇到这种行为,但你的应用程序应该能够解决这个问题。

大多数网络应用程序需要一种方法来找出当前消息的结束和下一个消息开始。

在IoHandler中你可以实现所有这些逻辑,但添加一个ProtocolCodecFilter会使你的代码更加简洁和更容易维护。

它允许你分离协议逻辑与业务逻辑(IoHandler)。

1.1.2. 怎样使用

你的应用程序只是收到一串字节码,你需要将这些字节转换为消息(更高级别的对象)。

有三种常见的分割技术将字节流转化为消息:

  • 使用固定长度的消息
  • 使用一个固定长度的标题表明消息体的长度
  • 使用分隔符,例如许多基于文本的协议每条消息之后附加一个换行符(或 CR LF par)(http://www.faqs.org/rfcs/rfc977.html)

在本教程中我们将使用第一和第二种方法,因为他们更容易实现。之后我们将会看到使用分隔符。

1.1.3. 协议编解码示例

我们将开发一个(非常无用的)图形字符生成服务器来说明如何实现自己的协议编解码器(ProtocolEncoder,ProtocolDecoder,ProtocolCodecFactory)。协议是很简单的。

这是一个消息请求层:


4 bytes


4 bytes


4 bytes


width


height


numchars

  • width:要求图像的宽度在网络字节顺序(整数)
  • height:要求的高度形象在网络字节顺序(整数)
  • numchars:字符的数量生成网络字节顺序(整数)

服务器响应两张图片的请求的维度,与请求的数量的字符画。

这是一个消息响应层:


4 bytes


variable length body


4 bytes


variable length body


length1


image1


length2


image2

概述我们需要编码和解码的类的请求和响应:

  • · ImageRequest: 一个简单的POJO 代表我们ImageServer请求。
  • · ImageRequestEncoder: 所使用的编码ImageRequest对象为特定于协议的数据(客户端)。
  • · ImageRequestDecoder: 解码特定于协议的数据到ImageRequest对象(服务器使用)。
  • · ImageResponse: 一个简单的POJO代表我们ImageServer回应。
  • · ImageResponseEncoder: 服务器使用的ImageResponseEncoder编码对象。
  • · ImageResponseDecoder: 客户端使用的ImageResponseDecoder解码对象。
  • · ImageCodecFactory: 这个类创建需要的编码器和解码器。

这是ImageRequest类:

public class ImageRequest {

    private int width;

    private int height;

    private int numberOfCharacters;

    public ImageRequest(int width, int height, int numberOfCharacters) {

        this.width = width;

        this.height = height;

        this.numberOfCharacters = numberOfCharacters;

    }

    public int getWidth() {

        return width;

    }

    public int getHeight() {

        return height;

    }

    public int getNumberOfCharacters() {

        return numberOfCharacters;

    }
}

通常编码比解码简单,所以先从ImageRequestEncoder开始:

public class ImageRequestEncoder implements ProtocolEncoder {

    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {

        ImageRequest request = (ImageRequest) message;

        IoBuffer buffer = IoBuffer.allocate(12, false);

        buffer.putInt(request.getWidth());

        buffer.putInt(request.getHeight());

        buffer.putInt(request.getNumberOfCharacters());

        buffer.flip();

        out.write(buffer);

    }

    public void dispose(IoSession session) throws Exception {

        // nothing to dispose

    }
}

备注:

现在让我们看一看解码器。CumulativeProtocolDecoder为我们提供很大帮助实现编写自己的解码器:它会缓冲所有传入的数据,直到你的解码器决定它能做些什么。在这种情况下,消息都有一个固定大小的,所以是最容易等到所有可用数据:

public class ImageRequestDecoder extends CumulativeProtocolDecoder {

    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {

        if (in.remaining() >= 12) {

            int width = in.getInt();

            int height = in.getInt();

            int numberOfCharachters = in.getInt();

            ImageRequest request = new ImageRequest(width, height, numberOfCharachters);

            out.write(request);

            return true;

        } else {

            return false;

        }

    }
}

备注:

  • 每次一个完整信息解码,你应该把它写入ProtocolDecoderOutput;这些消息将沿着你IoHandler过滤器链,最终抵达messageReceived方法。
  • 你不需要负责释放IoBuffer。
  • 当没有足够的数据解码信息,返回false。

响应也是一个非常简单的POJO:

public class ImageResponse {

    private BufferedImage image1;

    private BufferedImage image2;

    public ImageResponse(BufferedImage image1, BufferedImage image2) {

        this.image1 = image1;

        this.image2 = image2;

    }

    public BufferedImage getImage1() {

        return image1;

    }

    public BufferedImage getImage2() {

        return image2;

    }
}

编码的响应也简单:

public class ImageResponseEncoder extends ProtocolEncoderAdapter {

    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {

        ImageResponse imageResponse = (ImageResponse) message;

        byte[] bytes1 = getBytes(imageResponse.getImage1());

        byte[] bytes2 = getBytes(imageResponse.getImage2());

        int capacity = bytes1.length + bytes2.length + 8;

        IoBuffer buffer = IoBuffer.allocate(capacity, false);

        buffer.setAutoExpand(true);

        buffer.putInt(bytes1.length);

        buffer.put(bytes1);

        buffer.putInt(bytes2.length);

        buffer.put(bytes2);

        buffer.flip();

        out.write(buffer);

    }

    private byte[] getBytes(BufferedImage image) throws IOException {

        ByteArrayOutputStream baos = new ByteArrayOutputStream();

        ImageIO.write(image, "PNG", baos);

        return baos.toByteArray();

    }
}

备注:

  • 当它不可能事先计算IoBuffer的长度时,你可以通过使用一个auto-expanding(动态扩展的)缓冲对象调用buffer.setAutoExpand(true);

现在让我们看一看解码的响应:

public class ImageResponseDecoder extends CumulativeProtocolDecoder {

    private static final String DECODER_STATE_KEY = ImageResponseDecoder.class.getName() + ".STATE";

    public static final int MAX_IMAGE_SIZE = 5 * 1024 * 1024;

    private static class DecoderState {

        BufferedImage image1;

    }

    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {

        DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE_KEY);

        if (decoderState == null) {

            decoderState = new DecoderState();

            session.setAttribute(DECODER_STATE_KEY, decoderState);

        }

        if (decoderState.image1 == null) {

            // try to read first image

            if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {

                decoderState.image1 = readImage(in);

            } else {

                // not enough data available to read first image

                return false;

            }

        }

        if (decoderState.image1 != null) {

            // try to read second image

            if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {

                BufferedImage image2 = readImage(in);

                ImageResponse imageResponse = new ImageResponse(decoderState.image1, image2);

                out.write(imageResponse);

                decoderState.image1 = null;

                return true;

            } else {

                // not enough data available to read second image

                return false;

            }

        }

        return false;

    }

    private BufferedImage readImage(IoBuffer in) throws IOException {

        int length = in.getInt();

        byte[] bytes = new byte[length];

        in.get(bytes);

        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);

        return ImageIO.read(bais);

    }
}

备注:

我们解码过程的状态存储在会话属性。它也可以存储这个状态在解码器对象本身,但这个有几个缺点:

  • 每个IoSession需要自己的解码器实例
  • Mina确保不会有多个线程同时执行同一IoSession的decode()函数,但它并不能保证它永远是相同的线程。假设第一个数据是由线程1决定它不能解码,一到下一块数据,这可能是由另一个线程处理。为了避免可见性问题,你必须正确地同步访问这个解码器状态(存储在一个ConcurrentHashMap
    IoSession属性,所以它们对其他线程自动可见)。
  • 邮件列表上的讨论已经导致这一结论:选择存储状态IoSession或解码器实例本身更佳。以确保没有两个线程将运行相同的解码方法IoSession,Mina需要做某种形式的同步=
    >这种同步也会确保你不可能上面描述的可见性问题。(感谢亚当Fisk指出这点,)参见:http://www.nabble.com/Tutorial-on-ProtocolCodecFilter,-state-and-threads-t3965413.html

IoBuffer.prefixedDataAvailable()很方便当你的协议使用长度前缀;它支持一个前缀1、2或4字节。

不要忘记重启解码器状态当你解码一个响应(删除会话属性是另一种方式)。

如果响应将包含一个图像,我们不需要存储解码器状态:

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {

    if (in.prefixedDataAvailable(4)) {

        int length = in.getInt();

        byte[] bytes = new byte[length];

        in.get(bytes);

        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);

        BufferedImage image = ImageIO.read(bais);

        out.write(image);

        return true;

    } else {

        return false;

    }
}

现在让我们将代码合并在一起:

public class ImageCodecFactory implements ProtocolCodecFactory {

    private ProtocolEncoder encoder;

    private ProtocolDecoder decoder;

    public ImageCodecFactory(boolean client) {

        if (client) {

            encoder = new ImageRequestEncoder();

            decoder = new ImageResponseDecoder();

        } else {

            encoder = new ImageResponseEncoder();

            decoder = new ImageRequestDecoder();

        }

    }

    public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {

        return encoder;

    }

    public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {

        return decoder;

    }
}

备注:

  • 每一个新会话,Mina会请求ImageCodecFactory编码器和译码器。
  • 因为我们的编码器和译码器存储会话状态,它是安全的,让所有会话共享一个实例。

这是服务器如何使用ProtocolCodecFactory:

public class ImageServer {

    public static final int PORT = 33789;

    public static void main(String[] args) throws IOException {

        ImageServerIoHandler handler = new ImageServerIoHandler();

        NioSocketAcceptor acceptor = new NioSocketAcceptor();

        acceptor.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new ImageCodecFactory(false)));

        acceptor.setLocalAddress(new InetSocketAddress(PORT));

        acceptor.setHandler(handler);

        acceptor.bind();

        System.out.println("server is listenig at port " + PORT);

    }
}

客户机使用的是相同的:

public class ImageClient extends IoHandlerAdapter {

    public static final int CONNECT_TIMEOUT = 3000;

    private String host;

    private int port;

    private SocketConnector connector;

    private IoSession session;

    private ImageListener imageListener;

    public ImageClient(String host, int port, ImageListener imageListener) {

        this.host = host;

        this.port = port;

        this.imageListener = imageListener;

        connector = new NioSocketConnector();

        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ImageCodecFactory(true)));

        connector.setHandler(this);

    }

    public void messageReceived(IoSession session, Object message) throws Exception {

        ImageResponse response = (ImageResponse) message;

        imageListener.onImages(response.getImage1(), response.getImage2());

    }

    ...

出于完整性的考虑,我将添加服务器端IoHandler的代码:

public class ImageServerIoHandler extends IoHandlerAdapter {

    private final static String characters = "mina rocks abcdefghijklmnopqrstuvwxyz0123456789";

    public static final String INDEX_KEY = ImageServerIoHandler.class.getName() + ".INDEX";

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public void sessionOpened(IoSession session) throws Exception {

        session.setAttribute(INDEX_KEY, 0);

    }

    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {

        IoSessionLogger sessionLogger = IoSessionLogger.getLogger(session, logger);

        sessionLogger.warn(cause.getMessage(), cause);

    }

    public void messageReceived(IoSession session, Object message) throws Exception {

        ImageRequest request = (ImageRequest) message;

        String text1 = generateString(session, request.getNumberOfCharacters());

        String text2 = generateString(session, request.getNumberOfCharacters());

        BufferedImage image1 = createImage(request, text1);

        BufferedImage image2 = createImage(request, text2);

        ImageResponse response = new ImageResponse(image1, image2);

        session.write(response);

    }

    private BufferedImage createImage(ImageRequest request, String text) {

        BufferedImage image = new BufferedImage(request.getWidth(), request.getHeight(), BufferedImage.TYPE_BYTE_INDEXED);

        Graphics graphics = image.createGraphics();

        graphics.setColor(Color.YELLOW);

        graphics.fillRect(0, 0, image.getWidth(), image.getHeight());

        Font serif = new Font("serif", Font.PLAIN, 30);

        graphics.setFont(serif);

        graphics.setColor(Color.BLUE);

        graphics.drawString(text, 10, 50);

        return image;

    }

    private String generateString(IoSession session, int length) {

        Integer index = (Integer) session.getAttribute(INDEX_KEY);

        StringBuffer buffer = new StringBuffer(length);

        while (buffer.length() < length) {

            buffer.append(characters.charAt(index));

            index++;

            if (index >= characters.length()) {

                index = 0;

            }

        }

        session.setAttribute(INDEX_KEY, index);

        return buffer.toString();

    }
}

1.1.4. 总结

这里有有更多讲解编码和解码。但我希望这个教程已经让你开始。在不久的将来,我将试着加一些关于DemuxingProtocolCodecFactory,然后我们也会看一看如何使用分隔符,而不是长度的前缀。

时间: 2024-10-25 20:51:52

Mina Codec Filter对应协议实现编解码处理的相关文章

Thrift源码分析(二)-- 协议和编解码

协议和编解码是一个网络应用程序的核心问题之一,客户端和服务器通过约定的协议来传输消息(数据),通过特定的格式来编解码字节流,并转化成业务消息,提供给上层框架调用. Thrift的协议比较简单,它把协议和编解码整合在了一起.抽象类TProtocol定义了协议和编解码的顶层接口.个人感觉采用抽象类而不是接口的方式来定义顶层接口并不好,TProtocol关联了一个TTransport传输对象,而不是提供一个类似getTransport()的接口,导致抽象类的扩展性比接口差. TProtocol主要做了

mina编解码demo

Mina自定义编解码 Mina初了自己定义的字符编码外,用户还可以根据自己的协议自定义编解码.由于mina是基于IoFilter,也就是通过IoFilter拦截和过滤IO中的各种信息.因此对于编码而言,也就是我们把信息通过mina进行传递前,必须要根据相关的传输协议对我们传输的信息进行编码,编码后会把编码的信息通过mina进行传递.可以把编码理解为mina进行数据传输前IoFilter的最后一项处理.对于解码来说,是mina得到数据后的首次处理.即IoFilter后的首次数据处理. 自定义编码在

(中级篇 NettyNIO编解码开发)第十章-Http协议开发应用

HTTPC超文本传输协议〉协议是建立在TCP传输协议之上的应用层协议,它的发展是万维网协会和Internet工作小组IET'F合作的结果.HTTP是一个属于应用层的面向对象的协议,由于其简捷.快速的方式,适用于分布式超媒体信息系统.它于1990年提出,经过多年的使用和发展,得到了不断地完善和扩展.由于HTTP协议是目前Web开发的主流协议,基于HTTP的应用非常广泛,因此,掌握HTTP的开发非常重要,本章将重点介绍如何基于Netty的HTTP协议技进行HTTP服务端和客户端开发.由于Netty的

各种音视频编解码学习详解

各种音视频编解码学习详解 媒体业务是网络的主要业务之间.尤其移动互联网业务的兴起,在运营商和应用开发商中,媒体业务份量极重,其中媒体的编解码服务涉及需求分析.应用开发.释放license收费等等.最近因为项目的关系,需要理清媒体的codec,比较搞的是,在豆丁网上看运营商的规范 标准,同一运营商同样的业务在不同文档中不同的要求,而且有些要求就我看来应当是历史的延续,也就是现在已经很少采用了.所以豆丁上看不出所以然,从 wiki上查.中文的wiki信息量有限,很短,而wiki的英文内容内多,删减版

ffmpeg强化一:编解码过程,基本用法

1  术语: 什么是影片?其实就是一组(很多张)图片,时间间隔很小的连续展示出来,人们就觉得画面中的人物在动,这就是影片.那电影的实质就是N多张图片的集合.那 每张图片和帧又有什么关系呢?事实上,如果一部影片里面的图片,我们原封不动的全部存起来,空间会很大很大很大,但是如果通过一定的算法(这里不讲相关算 法),把每一张图片压缩(编码_encode)一下,变成 帧.再把帧连起来变成流,再把不同的流放到某个容器里面,这就是我们平常看见的电影文件了,文件 碟中谍4.H264.ACC.mkv,他为什么要

FFMPEG视音频编解码零基础学习方法-b

感谢大神分享,虽然现在还看不懂,留着大家一起看啦 PS:有不少人不清楚“FFmpeg”应该怎么读.它读作“ef ef em peg” 0. 背景知识 本章主要介绍一下FFMPEG都用在了哪里(在这里仅列几个我所知的,其实远比这个多).说白了就是为了说明:FFMPEG是非常重要的. 使用FFMPEG作为内核视频播放器: Mplayer,ffplay,射手播放器,暴风影音,KMPlayer,QQ影音... 使用FFMPEG作为内核的Directshow Filter: ffdshow,lav fil

java编解码技术,netty nio

对于java提供的对象输入输出流ObjectInputStream与ObjectOutputStream,可以直接把java对象作为可存储的字节数组写入文件,也可以传输到网络上去.对与java开放人员来说,默认的jdk序列化机制可以避免操作底层的字节数组,从而提升开发效率. 1.为什么需要序列化 网络传输与对象序列化 2.java编解码技术指的什么 netty nio是基于网络传输,当进行远程跨进程服务调用时,需要把被传输的对象编码为字节数组或者bytebuffer对象.而当远程服务读取到byt

视频编解码

所谓视频编码方式就是指通过特定的压缩技术,将某个视频格式的文件转换成另一种视频格式文件的方式.视频流传输中最为重要的编解码标准有国际电联的H.261.H.263.H.264,运动静止图像专家组的M-JPEG和国际标准化组织运动图像专家组的MPEG系列标准,此外在互联网上被广泛应用的还有Real-Networks的RealVideo.微软公司的WMV以及Apple公司的QuickTime等. 中文名 视频编码 外文名 Video Encoding 分    类 H.26x系列,MPEG系列,AVS

FFmpeg音视频编解码实践总结

PS:由于目前开发RTSP服务器 传输模块时用到了h264文件,所以攻了一段时间去实现h264的视频编解码,借用FFmpeg SDK实现了任意文件格式之间的转换,并实现了流媒体实时播放,目前音视频同步需要稍加完善,视频编码代码已成功移植到Visual Stdio平台,如有需要的留下邮箱 以下文档来自FFmpeg工程组(http://www.ffmpeg.com.cn/index.php开发事例) 实现转码一个普通视频文件为视频mpeg4,音频mp3的功能的程序 本程序源引自FFmpeg工程组,实