在 Android 上使用 RxNetty

在 Android 上使用 RxNetty

Netty是由JBOSS提供的一个Java开源框架,是一个支持TCP/UDP/HTTP等网络协议的通信框架,和Mina类似,广泛应用于RPC框架。RxNetty则是支持RxJava的Netty开源框架,现在我们来看一下在Android上如何使用RxNetty。

添加RxNetty

在 Android Studio 中添加 RxNetty 的依赖:

把RxNetty的tcp包加入到依赖,直接这样编译会有两个问题,第一个问题是jar重复:

com.android.build.api.transform.TransformException: com.android.builder.packaging.DuplicateFileException: Duplicate files copied in APK THIRD-PARTY
File1: C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-core\1.11.2\f4f8cd9874f5cdbc272b715a381c57e65f67ddf2\jmh-core-1.11.2.jar
File2: C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-generator-annprocess\1.11.2\72d854bf76ba5e59596d4c887a6de48e7003bee2\jmh-generator-annprocess-1.11.2.jar

解决办法:

dependencies {
  ...
  compile(‘io.reactivex:rxnetty-tcp:0.5.2-RC1‘) {
    exclude group: ‘org.openjdk.jmh‘
  }
  ...
}

另一个问题是引用的netty包中META-INF/下的部分文件重复。

解决办法:

  packagingOptions {
    ...
    exclude ‘META-INF/INDEX.LIST‘
    exclude ‘META-INF/BenchmarkList‘
    exclude ‘META-INF/io.netty.versions.properties‘
    exclude ‘META-INF/CompilerHints‘
    ...
  }

到这里RxNetty就成功添加到项目模块中了。接下来看看到底如何使用RxNetty。

如何使用

拿TCP协议举例,用过Netty的都清楚创建连接的步骤:

        workerGroup = new NioEventLoopGroup();
        Bootstrap boot = new Bootstrap();
        boot.group(workerGroup)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
              @Override public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline p = ch.pipeline();
                p.addLast("decoder", new MessageDecoder());
                p.addLast("encoder", new MessageEncoder());
                p.addLast("handler", new MessageHandler());
              }
            });
        ChannelFuture f =
            boot.connect("localhost", 8888).syncUninterruptibly();
        channel = f.channel();

自定义的协议需要我们自己实现编码解码Handler,还有最后处理数据的MessageHandler

@Sharable
public class MessageHandler extends SimpleChannelInboundHandler<Message> {
    @Override
    public void messageReceived(ChannelHandlerContext ctx, Message msg)
            throws Exception {
            //处理消息
    }
}

在RxNetty中可以不实现MessageHandler,因为通过注册的观察者可以得到最终解码后的协议对象。

下面是RxNetty创建连接的方法:

  Connection<String, String> mConnection;

  public Observable<Boolean> connect(final String url, final int port) {
    return Observable.create(new Observable.OnSubscribe<Boolean>() {
      @Override public void call(final Subscriber<? super Boolean> subscriber) {
        TcpClient.newClient(url, port).<String, String>addChannelHandlerLast("decoder",
            new Func0<ChannelHandler>() {
              @Override public ChannelHandler call() {
                return new StringDecoder();
              }
            }).<String, String>addChannelHandlerLast("encoder", new Func0<ChannelHandler>() {
          @Override public ChannelHandler call() {
            return new StringEncoder();
          }
        }).createConnectionRequest().subscribe(new Observer<Connection<String, String>>() {
          @Override public void onCompleted() {
            subscriber.onCompleted();
          }

          @Override public void onError(Throwable e) {
            subscriber.onError(e);
          }

          @Override public void onNext(Connection<String, String> connection) {
            mConnection = connection;
            subscriber.onNext(true);
          }
        });
      }
    });
  }

上面的TCP客户端创建了一个字符串解码器、一个字符串编码器,然后创建链接,在链接创建成功后把链接对象connection保存到mConnection方便后面发送数据,同时通知订阅者socket连接成功。

在Android中不能在UI线程创建网络链接,就连InetSocketAddress类都不能在UI线程中创建,TcpClient.newClient(url, port)...createConnectionRequest()本身是一个Observable,但是由于方法newClient(url, port)中创建了InetSocketAddress类,Android严苛模式会报异常,所以上面创建链接的TcpClient方法在外层又包裹了一个Observable,让它运行在IO线程等其它非UI线程才可以正常创建socket链接。

用来接收数据、发送数据的方法同样返回一个Observable,代码如下:

  public Observable<String> receive() {
    if (mConnection != null) {
      return mConnection.getInput();
    }
    return null;
  }

  public Observable<Void> send(String s) {
    return mConnection.writeString(Observable.just(s));
  }

测试上面方法的客户端代码:

  public void rxNettyClientTest() {
    connect("localhost", 60000).subscribe(new Observer<Boolean>() {
      @Override public void onCompleted() {

      }

      @Override public void onError(Throwable e) {
        //reconnect
        Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
          @Override public void call(Long aLong) {
            if (mConnection != null) mConnection.closeNow();
            rxNettyClientTest();
          }
        });
        System.out.println("reconnect");
      }

      @Override public void onNext(Boolean aBoolean) {
        //send data
        send("hello world!").subscribe(new Action1<Void>() {
          @Override public void call(Void aVoid) {
            System.out.println("send success!");
          }
        });
        //receive data
        receive().subscribe(new Observer<String>() {
          @Override public void onCompleted() {

          }

          @Override public void onError(Throwable e) {
            //reconnect
            Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
              @Override public void call(Long aLong) {
                if (mConnection != null) mConnection.closeNow();
                rxNettyClientTest();
              }
            });
            System.out.println("reconnect");
          }

          @Override public void onNext(String s) {
            System.out.println("receive:" + s);
          }
        });
      }
    });
  }

上面的代码包涵了读、写数据和重连等主要功能。

然后是创建服务端的代码:

  public void rxNettyServerTest() {
    TcpServer<String, String> server;
    server = TcpServer.newServer(60000).<String, String>addChannelHandlerLast("string-decoder",
        new Func0<ChannelHandler>() {
          @Override public ChannelHandler call() {
            return new StringDecoder();
          }
        }).<String, String>addChannelHandlerLast("string-encoder", new Func0<ChannelHandler>() {
      @Override public ChannelHandler call() {
        return new StringEncoder();
      }
    }).start(new ConnectionHandler<String, String>() {
      @Override public Observable<Void> handle(Connection<String, String> newConnection) {
        return newConnection.writeStringAndFlushOnEach(
            newConnection.getInput().map(new Func1<String, String>() {
              @Override public String call(String s) {
                System.out.println("receive:" + s);
                return "echo=> " + s;
              }
            }));
      }
    });
    server.awaitShutdown();
  }

服务端代码比较简单,直接echo客户端发来的数据。

关于线程,在Android中处理网络需要subscribeOn(Schedulers.io()),如果需要在UI线程展示则observeOn(AndroidSchedulers.mainThread())

最后,在Android上使用RxNetty大多数是因为没有合适的socket客户端框架,RxNetty也支持Http协议,Android上的Http协议的可选框架比较多,所以就不在这里介绍了,想要了解的可以到这里RxNetty

时间: 2024-08-09 22:39:24

在 Android 上使用 RxNetty的相关文章

Unity3D之AssetBundle学习:Android上运行笔记

路径统一 在Android上加载StreamingAssets文件夹下的AssetBundle文件,首先需要对加载地址进行处理,注意PC.Android和IOS的地址不一致需要针对不同的平台不同的处理,通用代码如下: 1 //统一不同平台下 StreamingAssets 路径 2 public static readonly string STREAMING_ASSETS_PATH = 3 #if UNITY_ANDROID 4 "jar:file:///" + Applicatio

SharePanel – Android上简单的一键分享,可分享到微信QQ和新浪微博

SharePanel – Android上简单的一键分享,可分享到微信QQ和新浪微博 SharePanel Android上简单的一键分享可分享到微信QQ和新浪微博 简介 效果图 代码块 简介 最近在写一个小程序长微博工具,效果就是编辑长微博,然后一键分享到微信.QQ和新浪微博. 一开始是想直接用Intent.createChooser(target, title)来做,后来一想,这样做不是很好啊,会有许多乱七八糟的应用弹出来,我想优先分享到微信.QQ和微博,于是找了点资料,将一键分享这个部分做

ffmpeg在android上输出滑屏问题处理

ffmpeg部分机器上有花屏的问题 原代码如下: while(av_read_frame(formatCtx, &packet)>=0 && !_stop && NULL!=window && bInit) { // Is this a packet from the video stream? if(packet.stream_index==videoStream) { // Decode video frame avcodec_decode

Android上方便地开发的C程序

如果你基于没有一个专门的开发板练手,那你的Android手机也可以开发大多数C应用程序,安装好后编译C的编译器.本文只写一个Hello World的运行过程.优点是:不需要eclipse,不需要Android源码,不需要Android.mk,不需要NDK.一个C程序员就可以很好的利用Android了,需要一个编译器和一个adb要把程序放到Android系统中. 1.安装adb sudo apt-get install android-tools-adb 2.安装交叉工具链 sudo apt-ge

Android 上的 制表符(tab) —— 一个神奇的字符 (cocos2dx crash)

今天测试发现了游戏的一个问题,系统邮件,如果发了tab,在android上一打开邮件内容就会crash.而且他们很确定是tab的问题. 凭我多个月的经验(确实没多年...)来看,从来没听说在android上会因为一个tab崩溃,而且如果有这个问题,肯定会有很多人遇到,估计早就吵翻天了,搜索了一下,什么可用信息都没有. 于是写个测试工程测试了一下,分别在mac下和windows下,用文本编辑工具编辑了4个txt文档,utf有bom和无bom,内容是" tab abcd ",发现都能正常显

Android上解析Json格式数据

package com.practice.json; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import android.app.Activity; import android.os.Bundle; import android.util.Log; public class JsonDemo extends Activity { /*http://www.hui

在Android上运用Anko和Kotlin开发数据库:SQLite从来不是一件轻松的事(KAD25)

作者:Antonio Leiva 时间:Mar 30, 2017 原文链接:https://antonioleiva.com/databases-anko-kotlin/ 事实告诉我们:在Android中编写数据库是相当无聊的. 使用SQLite时,所需的所有模板在当今世界上都不是一件最令人愉快的事情. 所幸的是,在最新一次Google I / O会议上,它们宣布的其中一项事项(称其为:Room),就是为简化这项工作,开发出足够的库. 然而,运用Anko,我们仍可以继续像使用低级别框架一样工作,

Android上使用OpenGLES2.0显示YUV数据

在Android上用OpenGLES来显示YUV图像,之所以这样做,是因为: 1.Android本身也不能直接显示YUV图像,YUV转成RGB还是必要的: 2.YUV手动转RGB会占用大量的CPU资源,如果以这样的形式播放视频,手机会很热,所以我们尽量让GPU来做这件事: 3.OpenGLES是Android集成到自身框架里的第三方库,它有很多的可取之处. 博主的C/C++不是很好,所以整个过程是在Java层实现的,大家见笑,我主要参考(但不限于)以下文章,十分感谢这些朋友的分享: 1. htt

Android上的MVP:如何组织显示层的内容

MVP(Model View Presenter)模式是著名的MVC(Model View Controller)模式的一个演化版本,目前它在Android应用开发中越来越重要了,大家也都在讨论关于MVP的理论,只是结构化的资料非常少.这就是我写这篇博客的原因,我想鼓励大家多参与讨论,然后把MVP模式运用在项目开发中. 什么是MVP? MVP模式可以分离显示层和逻辑层,所以功能接口如何工作与功能的展示可以实现分离,MVP模式理想化地可以实现同一份逻辑代码搭配不同的显示界面.首先要澄清就是MVP不