Rxjava+ReTrofit+okHttp深入浅出-终极封装四(多文件下载之断点续传)

Rxjava+ReTrofit+okHttp深入浅出-终极封装四(多文件下载之断点续传)

背景

断点续传下载一直是移动开发中必不可少的一项重要的技术,同样的Rxjava和Retrofit的结合让这个技术解决起来更加的灵活,我们完全可以封装一个适合自的下载框架,简单而且安全!

效果

实现

下载和之前的http请求可以相互独立,所以我们单独给download建立一个工程moudel处理

1.创建service接口

和以前一样,先写接口

注意:Streaming是判断是否写入内存的标示,如果小文件可以考虑不写,一般情况必须写;下载地址需要通过@url动态指定(不适固定的),@head标签是指定下载的起始位置(断点续传的位置)

 /*断点续传下载接口*/
    @Streaming/*大文件需要加入这个判断,防止下载过程中写入到内存中*/
    @GET
    Observable<ResponseBody> download(@Header("RANGE") String start, @Url String url);

2.复写ResponseBody

和之前的上传封装一样,下载更加的需要进度,所以我们同样覆盖ResponseBody类,写入进度监听回调

/**
 * 自定义进度的body
 * @author wzg
 */
public class DownloadResponseBody extends ResponseBody {
    private ResponseBody responseBody;
    private DownloadProgressListener progressListener;
    private BufferedSource bufferedSource;

    public DownloadResponseBody(ResponseBody responseBody, DownloadProgressListener progressListener) {
        this.responseBody = responseBody;
        this.progressListener = progressListener;
    }

    @Override
    public BufferedSource source() {
        if (bufferedSource == null) {
            bufferedSource = Okio.buffer(source(responseBody.source()));
        }
        return bufferedSource;
    }

    private Source source(Source source) {
        return new ForwardingSource(source) {
            long totalBytesRead = 0L;
            @Override
            public long read(Buffer sink, long byteCount) throws IOException {
                long bytesRead = super.read(sink, byteCount);
                // read() returns the number of bytes read, or -1 if this source is exhausted.
                totalBytesRead += bytesRead != -1 ? bytesRead : 0;
                if (null != progressListener) {
                    progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
                }
                return bytesRead;
            }
        };
    }
}

3.自定义进度回调接口

/**
 * 成功回调处理
 * Created by WZG on 2016/10/20.
 */
public interface DownloadProgressListener {
    /**
     * 下载进度
     * @param read
     * @param count
     * @param done
     */
    void update(long read, long count, boolean done);
}

4.复写Interceptor

复写Interceptor,可以将我们的监听回调通过okhttp的client方法addInterceptor自动加载我们的监听回调和ResponseBody

/**
 * 成功回调处理
 * Created by WZG on 2016/10/20.
 */
public class DownloadInterceptor implements Interceptor {

    private DownloadProgressListener listener;

    public DownloadInterceptor(DownloadProgressListener listener) {
        this.listener = listener;
    }

    @Override
    public Response intercept(Chain chain) throws IOException {
        Response originalResponse = chain.proceed(chain.request());

        return originalResponse.newBuilder()
                .body(new DownloadResponseBody(originalResponse.body(), listener))
                .build();
    }
}

5.封装请求downinfo数据

这个类中的数据可自由扩展,用户自己选择需要保持到数据库中的数据,可以自由选择需要数据库第三方框架,demo采用greenDao框架存储数据

public class DownInfo {
    /*存储位置*/
    private String savePath;
    /*下载url*/
    private String url;
    /*基础url*/
    private String baseUrl;
    /*文件总长度*/
    private long countLength;
    /*下载长度*/
    private long readLength;
    /*下载唯一的HttpService*/
    private HttpService service;
    /*回调监听*/
    private HttpProgressOnNextListener listener;
    /*超时设置*/
    private  int DEFAULT_TIMEOUT = 6;
    /*下载状态*/
    private DownState state;
    }

6.DownState状态封装

很简单,和大多数封装框架一样

public enum  DownState {
    START,
    DOWN,
    PAUSE,
    STOP,
    ERROR,
    FINISH,
}

7.请求HttpProgressOnNextListener回调封装类

注意:这里和DownloadProgressListener不同,这里是下载这个过程中的监听回调,DownloadProgressListener只是进度的监听

通过抽象类,可以自由选择需要覆盖的类,不需要完全覆盖!更加灵活

/**
 * 下载过程中的回调处理
 * Created by WZG on 2016/10/20.
 */
public abstract class HttpProgressOnNextListener<T> {
    /**
     * 成功后回调方法
     * @param t
     */
    public abstract void onNext(T t);

    /**
     * 开始下载
     */
    public abstract void onStart();

    /**
     * 完成下载
     */
    public abstract void onComplete();

    /**
     * 下载进度
     * @param readLength
     * @param countLength
     */
    public abstract void updateProgress(long readLength, long countLength);

    /**
     * 失败或者错误方法
     * 主动调用,更加灵活
     * @param e
     */
     public  void onError(Throwable e){

     }

    /**
     * 暂停下载
     */
    public void onPuase(){

    }

    /**
     * 停止下载销毁
     */
    public void onStop(){

    }
}

8.封装回调Subscriber

准备的工作做完,需要将回调和传入回调的信息统一封装到sub中,统一判断;和封装二的原理一样,我们通过自定义Subscriber来提前处理返回的数据,让用户字需要关系成功和失败以及向关心的数据,避免重复多余的代码出现在处理类中

  • sub需要继承DownloadProgressListener,和自带的回调一起组成我们需要的回调结果
  • 传入DownInfo数据,通过回调设置DownInfo的不同状态,保存状态
  • 通过RxAndroid将进度回调指定到主线程中(如果不需要进度最好去掉该处理避免主线程处理负担)
  • update进度回调在断点续传使用时,需要手动判断断点后加载的长度,因为指定断点下载长度下载后总长度=(物理长度-起始下载长度)

/**
 * 用于在Http请求开始时,自动显示一个ProgressDialog
 * 在Http请求结束是,关闭ProgressDialog
 * 调用者自己对请求数据进行处理
 * Created by WZG on 2016/7/16.
 */
public class ProgressDownSubscriber<T> extends Subscriber<T> implements DownloadProgressListener {
    //弱引用结果回调
    private WeakReference<HttpProgressOnNextListener> mSubscriberOnNextListener;
    /*下载数据*/
    private DownInfo downInfo;

    public ProgressDownSubscriber(DownInfo downInfo) {
        this.mSubscriberOnNextListener = new WeakReference<>(downInfo.getListener());
        this.downInfo=downInfo;
    }

    /**
     * 订阅开始时调用
     * 显示ProgressDialog
     */
    @Override
    public void onStart() {
        if(mSubscriberOnNextListener.get()!=null){
            mSubscriberOnNextListener.get().onStart();
        }
        downInfo.setState(DownState.START);
    }

    /**
     * 完成,隐藏ProgressDialog
     */
    @Override
    public void onCompleted() {
        if(mSubscriberOnNextListener.get()!=null){
            mSubscriberOnNextListener.get().onComplete();
        }
        downInfo.setState(DownState.FINISH);
    }

    /**
     * 对错误进行统一处理
     * 隐藏ProgressDialog
     *
     * @param e
     */
    @Override
    public void onError(Throwable e) {
        /*停止下载*/
        HttpDownManager.getInstance().stopDown(downInfo);
        if(mSubscriberOnNextListener.get()!=null){
            mSubscriberOnNextListener.get().onError(e);
        }
        downInfo.setState(DownState.ERROR);
    }

    /**
     * 将onNext方法中的返回结果交给Activity或Fragment自己处理
     *
     * @param t 创建Subscriber时的泛型类型
     */
    @Override
    public void onNext(T t) {
        if (mSubscriberOnNextListener.get() != null) {
            mSubscriberOnNextListener.get().onNext(t);
        }
    }

    @Override
    public void update(long read, long count, boolean done) {
        if(downInfo.getCountLength()>count){
            read=downInfo.getCountLength()-count+read;
        }else{
            downInfo.setCountLength(count);
        }
        downInfo.setReadLength(read);
        if (mSubscriberOnNextListener.get() != null) {
            /*接受进度消息,造成UI阻塞,如果不需要显示进度可去掉实现逻辑,减少压力*/
            rx.Observable.just(read).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                      /*如果暂停或者停止状态延迟,不需要继续发送回调,影响显示*/
                    if(downInfo.getState()==DownState.PAUSE||downInfo.getState()==DownState.STOP)return;
                    downInfo.setState(DownState.DOWN);
                    mSubscriberOnNextListener.get().updateProgress(aLong,downInfo.getCountLength());
                }
            });
        }
    }

}

9.下载管理类封装HttpDownManager

  • 单利获取
 /**
     * 获取单例
     * @return
     */
    public static HttpDownManager getInstance() {
        if (INSTANCE == null) {
            synchronized (HttpDownManager.class) {
                if (INSTANCE == null) {
                    INSTANCE = new HttpDownManager();
                }
            }
        }
        return INSTANCE;
    }
  • 因为单利所以需要记录正在下载的数据和回到sub
 /*回调sub队列*/
    private HashMap<String,ProgressDownSubscriber> subMap;
    /*单利对象*/
    private volatile static HttpDownManager INSTANCE;

    private HttpDownManager(){
        downInfos=new HashSet<>();
        subMap=new HashMap<>();
    }
  • 开始下载需要记录下载的service避免每次都重复创建,然后请求sercie接口,得到ResponseBody数据后将数据流写入到本地文件中(6.0系统后需要提前申请权限)
 /**
     * 开始下载
     */
    public void startDown(DownInfo info){
        /*正在下载不处理*/
        if(info==null||subMap.get(info.getUrl())!=null){
            return;
        }
        /*添加回调处理类*/
        ProgressDownSubscriber subscriber=new ProgressDownSubscriber(info);
        /*记录回调sub*/
        subMap.put(info.getUrl(),subscriber);
        /*获取service,多次请求公用一个sercie*/
        HttpService httpService;
        if(downInfos.contains(info)){
            httpService=info.getService();
        }else{
            DownloadInterceptor interceptor = new DownloadInterceptor(subscriber);
            OkHttpClient.Builder builder = new OkHttpClient.Builder();
            //手动创建一个OkHttpClient并设置超时时间
            builder.connectTimeout(info.getConnectionTime(), TimeUnit.SECONDS);
            builder.addInterceptor(interceptor);

            Retrofit retrofit = new Retrofit.Builder()
                    .client(builder.build())
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                    .baseUrl(info.getBaseUrl())
                    .build();
            httpService= retrofit.create(HttpService.class);
            info.setService(httpService);
        }
        /*得到rx对象-上一次下載的位置開始下載*/
        httpService.download("bytes=" + info.getReadLength() + "-",info.getUrl())
                /*指定线程*/
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                   /*失败后的retry配置*/
                .retryWhen(new RetryWhenNetworkException())
                /*读取下载写入文件*/
                .map(new Func1<ResponseBody, DownInfo>() {
                    @Override
                    public DownInfo call(ResponseBody responseBody) {
                        try {
                            writeCache(responseBody,new File(info.getSavePath()),info);
                        } catch (IOException e) {
                            /*失败抛出异常*/
                            throw new HttpTimeException(e.getMessage());
                        }
                        return info;
                    }
                })
                /*回调线程*/
                .observeOn(AndroidSchedulers.mainThread())
                /*数据回调*/
                .subscribe(subscriber);

    }
  • 写入文件

    注意:一开始调用进度回调是第一次写入在进度回调之前,所以需要判断一次DownInfo是否获取到下载总长度,没有这选择当前ResponseBody 读取长度为总长度


    /**
     * 写入文件
     * @param file
     * @param info
     * @throws IOException
     */
    public void writeCache(ResponseBody responseBody,File file,DownInfo info) throws IOException{
        if (!file.getParentFile().exists())
            file.getParentFile().mkdirs();
        long allLength;
        if (info.getCountLength()==0){
            allLength=responseBody.contentLength();
        }else{
            allLength=info.getCountLength();
        }
            FileChannel channelOut = null;
            RandomAccessFile randomAccessFile = null;
            randomAccessFile = new RandomAccessFile(file, "rwd");
            channelOut = randomAccessFile.getChannel();
            MappedByteBuffer mappedBuffer = channelOut.map(FileChannel.MapMode.READ_WRITE,
                    info.getReadLength(),allLength-info.getReadLength());
            byte[] buffer = new byte[1024*8];
            int len;
            int record = 0;
            while ((len = responseBody.byteStream().read(buffer)) != -1) {
                mappedBuffer.put(buffer, 0, len);
                record += len;
            }
            responseBody.byteStream().close();
                if (channelOut != null) {
                    channelOut.close();
                }
                if (randomAccessFile != null) {
                    randomAccessFile.close();
                }
    }
  • 停止下载

    调用 subscriber.unsubscribe()解除监听,然后remove记录的下载数据和sub回调,并且设置下载状态(同步数据库自己添加)

 /**
     * 停止下载
     */
    public void stopDown(DownInfo info){
        if(info==null)return;
        info.setState(DownState.STOP);
        info.getListener().onStop();
        if(subMap.containsKey(info.getUrl())) {
            ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
            subscriber.unsubscribe();
            subMap.remove(info.getUrl());
        }
        /*同步数据库*/
    }
  • 暂停下载

    原理和停止下载原理一样

 /**
     * 暂停下载
     * @param info
     */
    public void pause(DownInfo info){
        if(info==null)return;
        info.setState(DownState.PAUSE);
        info.getListener().onPuase();
        if(subMap.containsKey(info.getUrl())){
            ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
            subscriber.unsubscribe();
            subMap.remove(info.getUrl());
        }
        /*这里需要讲info信息写入到数据中,可自由扩展,用自己项目的数据库*/
    }
  • 暂停全部和停止全部下载任务
 /**
     * 停止全部下载
     */
    public void stopAllDown(){
        for (DownInfo downInfo : downInfos) {
            stopDown(downInfo);
        }
        subMap.clear();
        downInfos.clear();
    }

    /**
     * 暂停全部下载
     */
    public void pauseAll(){
        for (DownInfo downInfo : downInfos) {
            pause(downInfo);
        }
        subMap.clear();
        downInfos.clear();
    }
  • 整合代码HttpDownManager

    同样使用了封装二中的retry处理和运行时异常自定义处理封装(不复述了)

    传送门-HttpDownManager

补充

有同学说不知道数据库这块怎么替换,所以我加入了greenDao框架去优化数据库存储,在实际运用中可以将这块的逻辑替换成你项目的数据库框架(之前用的都是realm,这回正好练练手)

只需要替换DbUtil的方法即可

传送门-DbUtil

总结

到此我们的Rxjava+ReTrofit+okHttp深入浅出-封装就基本完成了,已经可以完全胜任开发和学习的全部工作,如果后续再使用过程中有任何问题欢迎留言给我,会一直维护!

    1.Retrofit+Rxjava+okhttp基本使用方法
    2.统一处理请求数据格式
    3.统一的ProgressDialog和回调Subscriber处理
    4.取消http请求
    5.预处理http请求
    6.返回数据的统一判断
    7.失败后的retry封装处理
    8.RxLifecycle管理生命周期,防止泄露
    9.文件上传和文件下载(支持多文件断点续传)

如有帮助欢迎start和follow

传送门-下载封装源码

传送门-全部封装源码

Retrofit+Rxjava+okhttp 懒人方式使用一

Rxjava+ReTrofit+okHttp深入浅出-终极封装二(网络请求)

Rxjava+ReTrofit+okHttp深入浅出-终极封装三(文件上传)

时间: 2024-09-29 10:18:04

Rxjava+ReTrofit+okHttp深入浅出-终极封装四(多文件下载之断点续传)的相关文章

Rxjava+ReTrofit+okHttp深入浅出-终极封装

Rxjava+ReTrofit+okHttp深入浅出-终极封装 背景: 学习Rxjava和retrofit已经很长时间了,功能确实很强大,但是使用起来还是有点复杂,代码的重复性太高,所以决定把基于retrofit和rxjava的处理统一封装起来,实现的功能: 1.Retrofit+Rxjava+okhttp基本使用方法 2.统一处理请求数据格式 3.统一的ProgressDialog和回调Subscriber处理 4.取消http请求 5.预处理http请求 5.返回数据的统一判断 效果: 封装

手动缓存Retrofit+OkHttp响应体,不再局限于Get请求缓存

转载请标明出处: http://blog.csdn.net/iamzgx/article/details/51764848 概括 这篇博客是接着上一篇博客学会Retrofit+OkHttp+RxAndroid三剑客的使用,让自己紧跟Android潮流的步伐,没看过的,建议看完上一篇再来看这篇.在上一篇博客中仅仅是简单的讲解了OkHttp的缓存问题,主要是通过http协议里面的control-cache控制缓存,而且是仅仅只能是Get请求才能缓存,如果Post请求OkHttp会让response返

Android探索之基于okHttp打造自己的网络请求&lt;Retrofit+Okhttp&gt;(五)

前言: 通过上面的学习,我们不难发现单纯使用okHttp来作为网络库还是多多少收有那么一点点不太方便,而且还需自己来管理接口,对于接口的使用的是哪种请求方式也不能一目了然,出于这个目的接下来学习一下Retrofit+Okhttp的搭配使用. Retrofit介绍: Retrofit和okHttp师出同门,也是Square的开源库,它是一个类型安全的网络请求库,Retrofit简化了网络请求流程,基于OkHtttp做了封装,解耦的更彻底:比方说通过注解来配置请求参数,通过工厂来生成CallAdap

RxJava + Retrofit 的实际应用场景

关于 RxJava Retrofit 很多篇文章都有详细的说明,在这里我想分享一个具体的使用案例,在我的开源项目 就看天气 里的实际应用.也希望跟大家探讨如何优雅的使用. 准备 项目中用到的依赖: compile 'io.reactivex:rxjava:1.1.0' compile 'io.reactivex:rxandroid:1.1.0' compile 'com.google.code.gson:gson:2.4' compile 'com.squareup.retrofit2:retr

RxJava + Retrofit完成网络请求

1.前言 本文基于RxJava.Retrofit的使用,若是对RxJava或Retrofit还不了解的简友可以先了解RxJava.Retrofit的用法再来看这篇文章. 在这片文章之前分别单独介绍过Rxjava以及Retrofit的使用: Android Retrofit 2.0 的使用 Android RxJava的使用(一)基本用法 2.使用 在了解了RxJava和Retrofit分别的用法后,RxJava.Retrofit的搭配使用也就不再话下了. 先看看使用Retrofit完成一次网络请

Retrofit+OKHttp 教你怎么持久化管理Cookie

绪论 最近小编有点忙啊,项目比较紧,所以一直在忙活项目,继之前的自定义组件之后就没再写博客了,如果你没看到之前的自定义组件你可以看一下: Android自定义下拉刷新动画–仿百度外卖下拉刷新 Android自定义组合控件-教你如何自定义下拉刷新和左滑删除 效果还行,源码也已经传到我的Github上了. 那么今天小编来给大家分享点什么呢?对,就是它:Retrofit,话说Retrofit最近真的很火啊,Retrofit+OKHttp现在似乎已经成为了Android网络请求框架的主流框架了吧,小编之

OkHttp深入学习(四)——0kio

上一节<OkHttp深入学习(三)--Cache>我们对okhttp中的Cache缓存机制进行了学习,学习了上一节的内容,如果叫我们自己去设计一个缓存机制,那么我们一定会有了自己的思路,想想还有点小激动.这一节我们继续来看看okhttp这个教科书中还有什么值得我们继续挖掘的东西.果不其然,我们发现了okio这个好东西,该类主要负责对java中io的封装,使得java中的io流读写更加方便,甚至还能提高读写效率.okio项目开源地址请戳这里.在正式学习之前,我们先来了解一下它是如何使用的,随后我

【知识整理】这可能是最好的RxJava 2.x 入门教程(四)

这可能是最好的RxJava 2.x入门教程系列专栏 文章链接: 这可能是最好的RxJava 2.x 入门教程(一) 这可能是最好的RxJava 2.x 入门教程(二) 这可能是最好的RxJava 2.x 入门教程(三) 这可能是最好的RxJava 2.x 入门教程(四) GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples 为了满足大家的饥渴难耐,GitHub将同步更新代码,主要包含基本的代码封装,RxJava 2.x所有操作符

深入浅出Docker(四):Docker的集成测试部署之道

1. 背景 敏捷开发已经流行了很长时间,如今有越来越多的企业开始践行敏捷开发所提倡的以人为中心.迭代.循序渐进的开发理念.在这样的场景下引入Docker技术,首要目的就是使用Docker提供的虚拟化方式,给开发团队建立一套可以复用的开发环境,让开发环境可以通过Image的形式分享给项目的所有开发成员,以简化开发环境的搭建.但是,在没有Docker技术之前就已经有类如Vagrant的开发环境分发技术,软件开发者一样可以创建类似需求的环境配置流程.所以在开发环境方面,Docker技术的优势并不能很好