- 0-任务
- 1-同步方式
- 2-异步方式
- 3-异步包装方式
- 4-异步拆分方式
- 5-异步映射方式
- 6-异步Lambda方式
- 7-目标RxJava方式
0-任务
我们有个 Web API,获取指定标签的所有新闻列表,每条新闻包含时间和内容等。
我们的任务就是下载新闻列表,选择最新的新闻,然后保存在本地。
假设第三方提供的jar里面提供了Api和ApiImpl,不可再更改:
假设getNewsList耗时1.5秒,save耗时0.5秒
1.接口:
public interface Api {
//同步方式
List<News> getNewsList(String tag);//获取新闻列表
Uri save(News news);//保存新闻到本地
//异步方式
void getNewsList(String tag, IGetNewsList callback);//获取新闻列表
void save(News news, ISave callback);//保存新闻到本地
interface IGetNewsList {
void onSuccess(List<News> newsList);
void onFailure(Exception e);
}
interface ISave {
void onSuccess(Uri uri);
void onFailure(Exception e);
}
}
2.接口实现类:
public class ApiImpl implements Api {
private Random random = new Random();
//线程池:核心有两个线程,最大线程数量可无限,存活时间60s
private ExecutorService threadPoolExecutor =
new ThreadPoolExecutor(2, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
@Override
public List<News> getNewsList(String tag) {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<News> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add(new News(random.nextInt(100), "这是" + tag + "标签的新闻" + i));
}
return list;
}
@Override
public Uri save(News news) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Uri("地址" + news.hashCode());
}
@Override
public void getNewsList(String tag, IGetNewsList callback) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
callback.onSuccess(getNewsList(tag));
} catch (Exception e) {
callback.onFailure(e);
}
}
});
}
@Override
public void save(News news, ISave callback) {
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
callback.onSuccess(save(news));
} catch (Exception e) {
callback.onFailure(e);
}
}
});
}
}
3.一些实体类:
public class News implements Comparable<News> {
private int time;//发布时间
private String content;//内容
public News(int time, String content) {
this.time = time;
this.content = content;
}
public int getTime() {
return time;
}
public void setTime(int time) {
this.time = time;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public int compareTo(News another) {
return Integer.compare(time, another.time);
}
@Override
public String toString() {
return "News{" +
"time=" + time +
", content=‘" + content + ‘\‘‘ +
‘}‘;
}
}
public class Uri {
String path;
public Uri(String path) {
this.path = path;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
@Override
public String toString() {
return "Uri{" +
"path=‘" + path + ‘\‘‘ +
‘}‘;
}
}
1-同步方式
//1.同步方式
public Uri getLatestUri(String tag) {
Api api = new ApiImpl();//原始接口
List<News> newsList = api.getNewsList(tag);
LogUtil.print("获取新闻列表:" + newsList.toString());
News latestNews = getLatestNews(newsList);
LogUtil.print("获取最新的新闻:" + latestNews.toString());
Uri uri = api.save(latestNews);
LogUtil.print("保存到本地:" + uri.toString());
return uri;
}
private void testSync() {
LogUtil.print("START-同步方式");
Uri uri = client.getLatestUri("编程");
LogUtil.print("END-同步方式:" + uri.toString());
}
这里的getNewsList和save都是耗时操作,会阻塞主线程,显然同步方式不适用!
2-异步方式
//2.异步方式
public void getLatestUriAsync(String tag, UpdateNewsCallback callback) {
Api api = new ApiImpl();//原始接口
api.getNewsList(tag, new Api.IGetNewsList() {
@Override
public void onSuccess(List<News> newsList) {
LogUtil.print("获取新闻列表:" + newsList.toString());
News latestNews = getLatestNews(newsList);
LogUtil.print("获取最新的新闻:" + latestNews.toString());
api.save(latestNews, new Api.ISave() {
@Override
public void onSuccess(Uri uri) {
LogUtil.print("保存到本地:" + uri.toString());
callback.onSuccess(uri);
}
@Override
public void onFailure(Exception e) {
callback.onFailure(e);
}
});
}
@Override
public void onFailure(Exception e) {
callback.onFailure(e);
}
});
}
private void testAysnc() {
LogUtil.print("START-异步方式");
client.getLatestUriAsync("干货", new UpdateNewsCallback() {
@Override
public void onSuccess(Uri uri) {
LogUtil.print("END-异步方式:" + uri.toString());
}
@Override
public void onFailure(Exception e) {
}
});
}
这里我没有将最后得到的Uri切换回主线程,这涉及到线程间通讯的问题,详情可查看我的另一篇博文:自定义消息传递机制
由日志可知,主线程阻塞的问题解决了,获取新闻列表是在thread-1执行的,保存到本地是在thread-2执行的,但是,新的问题又来了:回调嵌套!
这还好只有两层回调,那要是来个四五层回调的嵌套,那代码,想想也是醉了,相信没有人想看这样的代码,那么,怎么办呢,且听下文分解!
3-异步包装方式
首先引入两个辅助类:
1. AsyncWork:异步任务的模板类
2. CallBack:泛型回调类
3. ApiWrapper:接口的包装类
public abstract class AsyncWork<T> {
public abstract void start(Callback<T> callback);
}
public interface Callback<T> {
void onResult(T result);
void onError(Exception e);
}
public class ApiWrapper {
private Api api=new ApiImpl();
//包装异步操作
public AsyncWork<List<News>> getNewsList(String tag) {
return new AsyncWork<List<News>>() {
@Override
public void start(Callback<List<News>> callback) {
api.getNewsList(tag, new Api.IGetNewsList() {
@Override
public void onSuccess(List<News> newsList) {
callback.onResult(newsList);
}
@Override
public void onFailure(Exception e) {
callback.onError(e);
}
});
}
};
}
public AsyncWork<Uri> save(News news) {
return new AsyncWork<Uri>() {
@Override
public void start(Callback<Uri> callback) {
api.save(news, new Api.ISave() {
@Override
public void onSuccess(Uri uri) {
callback.onResult(uri);
}
@Override
public void onFailure(Exception e) {
callback.onError(e);
}
});
}
};
}
}
好了,辅助类已经搞定了,下面正式开始!
//3.异步包装方式
public AsyncWork<Uri> getLatestUriWrapper(String tag) {
ApiWrapper apiWrapper = new ApiWrapper();//包装后的接口
//先根据命令,得到异步操作,然后执行异步操作
return new AsyncWork<Uri>() {
@Override
public void start(Callback<Uri> callback) {
apiWrapper.getNewsList(tag)
.start(new Callback<List<News>>() {
@Override
public void onResult(List<News> newsList) {
LogUtil.print("获取新闻列表:" + newsList.toString());
News latestNews = getLatestNews(newsList);
LogUtil.print("获取最新的新闻:" + latestNews.toString());
apiWrapper.save(latestNews)
.start(new Callback<Uri>() {
@Override
public void onResult(Uri result) {
LogUtil.print("保存到本地:" + result.toString());
callback.onResult(result);
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
};
}
private void testWrapper() {
LogUtil.print("START-异步包装方式");
AsyncWork<Uri> asyncWork = client.getLatestUriWrapper("Java");
asyncWork.start(new Callback<Uri>() {
@Override
public void onResult(Uri result) {
LogUtil.print("END-异步包装方式:" + result.toString());
}
@Override
public void onError(Exception e) {
}
});
}
这里相比第2种异步方式,无非就是多了:
1. 组装:将所有的异步操作都使用异步模板类包装了一下
2. 卸装:AsyncWork.start即可执行异步任务,得到包装的数据
可是,这似乎并没有什么卵用啊,回调还是有两层嵌套,而且还多绕了几个弯,还不如以前好懂了呢,这不是吃饱了撑着么?别急,接下来见分晓!
4-异步拆分方式
//4.异步包装方式-拆分
public AsyncWork<Uri> getLatestUriSplit(String tag) {
ApiWrapper apiWrapper = new ApiWrapper();//包装后的接口
AsyncWork<List<News>> newsListWork = apiWrapper.getNewsList(tag);
AsyncWork<News> latestNewsWork = new AsyncWork<News>() {
@Override
public void start(Callback<News> callback) {
newsListWork.start(new Callback<List<News>>() {
@Override
public void onResult(List<News> result) {
LogUtil.print("获取新闻列表:" + result.toString());
News latestNews=getLatestNews(result);
LogUtil.print("获取最新的新闻:" + latestNews.toString());
callback.onResult(latestNews);
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
};
AsyncWork<Uri> uriWork = new AsyncWork<Uri>() {
@Override
public void start(Callback<Uri> callback) {
latestNewsWork.start(new Callback<News>() {
@Override
public void onResult(News cutest) {
apiWrapper.save(cutest)
.start(new Callback<Uri>() {
@Override
public void onResult(Uri result) {
LogUtil.print("保存到本地:" + result.toString());
callback.onResult(result);
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
};
return uriWork;
}
private void testSplit() {
LogUtil.print("START-异步拆分方式");
AsyncWork<Uri> asyncWork = client.getLatestUriSplit("Java");
asyncWork.start(new Callback<Uri>() {
@Override
public void onResult(Uri result) {
LogUtil.print("END-异步拆分方式:" + result.toString());
}
@Override
public void onError(Exception e) {
}
});
}
这里,我们将三步操作,分成了三个异步任务,开始有了一点链式调用的影子了,不过,在每个任务里面,还有着大量的业务无关的模板代码,显然还存在着很大的优化空间,是时候干掉这些模板代码了!
5-异步映射方式
首先需要拓展一些我们的辅助类:AsyncWork
给它添加了两个方法:map和flatMap
通过这两个方法,我们可以将AsyncWork < T > 转换成 AsyncWork< R >,实现任务的转换!
具体怎么用,等下看了测试类你就知道了!
public abstract class AsyncWork<T> {
public abstract void start(Callback<T> callback);
/**
* map
* 目标:AsyncWork<T> --> AsyncWork<R>
* 要求:call中实现(T -> R)
*/
public <R> AsyncWork<R> map(Func<T, R> func) {
final AsyncWork<T> source = this;
return new AsyncWork<R>() {
@Override
public void start(Callback<R> callback) {
source.start(new Callback<T>() {
@Override
public void onResult(T result) {
R mapped = func.call(result);
callback.onResult(mapped);
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
};
}
/**
* flatMap
* 目标:AsyncWork<T> -> AsyncWork<R>
* 要求:call中实现(T -> AsyncWork<R>)
*/
public <R> AsyncWork<R> flatMap(Func<T, AsyncWork<R>> func) {
final AsyncWork<T> source = this;
return new AsyncWork<R>() {
@Override
public void start(Callback<R> callback) {
source.start(new Callback<T>() {
@Override
public void onResult(T result) {
AsyncWork<R> mapped = func.call(result);
mapped.start(new Callback<R>() {
@Override
public void onResult(R result) {
callback.onResult(result);
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
@Override
public void onError(Exception e) {
callback.onError(e);
}
});
}
};
}
}
辅助类拓展好了,下面开始分析!参考上面的拆分方式,转换流程应该是:
1. 得到AsyncWork< List< News>>:新闻列表的任务包装类
2. 转换成AsyncWork< News>:最新新闻的任务包装类
3. 转换成AsyncWork< Uri>:最新新闻的URI的任务包装类
//5.异步包装方式-映射
public AsyncWork<Uri> getLatestUriMap(String tag) {
ApiWrapper apiWrapper = new ApiWrapper();//包装后的接口
AsyncWork<List<News>> newsListWork = apiWrapper.getNewsList(tag);
//1.使用map
//将AsyncWork<List<News>> 转 AsyncWork<News>
//因为有接口:List<News> -> News
AsyncWork<News> latestNewsWork = newsListWork.map(new Func<List<News>, News>() {
@Override
public News call(List<News> newsList) {
LogUtil.print("获取新闻列表:" + newsList.toString());
News latestNews = getLatestNews(newsList);
LogUtil.print("获取最新的新闻:" + latestNews.toString());
return latestNews;
}
});
// //2.这里不能使用map了!!!
// //因为没有接口:News -> Uri
// //只有接口:News -> AsyncWork<Uri>
// AsyncWork<Uri> uriWork = latestNewsWork.map(new Func<News, Uri>() {
// @Override
// public Uri call(News news) {
// return apiWrapper.save(news);
// }
// });
//3.使用flatMap
//将AsyncWork<News> 转 AsyncWork<Uri>
//只为有接口:News -> AsyncWork<Uri>
AsyncWork<Uri> uriWork = latestNewsWork.flatMap(new Func<News, AsyncWork<Uri>>() {
@Override
public AsyncWork<Uri> call(News result) {
LogUtil.print("保存到本地:" + result.toString());
return apiWrapper.save(result);
}
});
return uriWork;
}
private void testMap() {
LogUtil.print("START-异步映射方式");
AsyncWork<Uri> asyncWork = client.getLatestUriMap("Java");
asyncWork.start(new Callback<Uri>() {
@Override
public void onResult(Uri result) {
LogUtil.print("END-异步映射方式:" + result.toString());
}
@Override
public void onError(Exception e) {
}
});
}
现在我们再看,每个异步任务里面都没有嵌套的回调了,终于摆脱回调地狱了!
相信看了上面的代码之后,大家也都了解了map和flatMap的作用了:
想将AsyncWork< T>转成AsyncWork< R>:
- 如果已有方法:T -> R,则使用map
- 如果已有方法:T -> AsynWork,则使用flatMap
但是,现在的代码显然还是没有最初的同步方式简洁,所以,是时候玩Lambda表达式!
6-异步Lambda方式
Lambda表达式说明:
- Lambda表达式可以认为是匿名方法,左边是形参,右边是方法体,一般用于接口回调的实现
- 特别注意:这里接口中不要有相同入参、相同出参的方法,哪怕方法名不同也不行,因为Lambda表达式是在编译时自动根据入参和出参来寻找方法的
//6.异步包装方式-Lambda
public AsyncWork<Uri> getLatestUriLambda(String tag) {
ApiWrapper apiWrapper = new ApiWrapper();//包装后的接口
AsyncWork<List<News>> catsListAsyncWork = apiWrapper.getNewsList(tag);
AsyncWork<News> cutestCatAsyncWork = catsListAsyncWork.map(cats -> getLatestNews(cats));
AsyncWork<Uri> storedUriAsyncWork = cutestCatAsyncWork.flatMap(cat -> apiWrapper.save(cat));
return storedUriAsyncWork;
}
private void testLambda() {
LogUtil.print("START-异步Lambda方式");
AsyncWork<Uri> asyncWork = client.getLatestUriLambda("Java");
asyncWork.start(new Callback<Uri>() {
@Override
public void onResult(Uri result) {
LogUtil.print("END-异步Lambda方式:" + result.toString());
}
@Override
public void onError(Exception e) {
}
});
}
这里我就没打印中间步骤了,因为不忍心破坏Lambda的简洁的方法体,而又不知道log打印插到别的什么地方好,大家觉得在哪里打印中间步骤的数据好呢?
现在的异步方式,是不是和最开始的同步方式,看起来很相似了啊,这就是RxJava的好处,像写同步的代码一样去写异步的代码!好了,自定义的伪RxJava这里就完结了!
下面我们来看看真正的RxJava是怎么玩这个例子的吧!
7-目标:RxJava方式
同样的,首先定义一个辅助类:
1. ApiRx,将Api的接口转成Rx方式的接口
public class ApiRx {
private Api api=new ApiImpl();
public Observable<List<News>> queryCats(final String query) {
return Observable.create(new Observable.OnSubscribe<List<News>>() {
@Override
public void call(final Subscriber<? super List<News>> subscriber) {
api.getNewsList(query, new Api.IGetNewsList() {
@Override
public void onSuccess(List<News> aNewses) {
subscriber.onNext(aNewses);
}
@Override
public void onFailure(Exception e) {
subscriber.onError(e);
}
});
}
});
}
public Observable<Uri> store(final News aNews) {
return Observable.create(new Observable.OnSubscribe<Uri>() {
@Override
public void call(final Subscriber<? super Uri> subscriber) {
api.save(aNews, new Api.ISave() {
@Override
public void onSuccess(Uri uri) {
subscriber.onNext(uri);
}
@Override
public void onFailure(Exception e) {
subscriber.onError(e);
}
});
}
});
}
}
辅助类搞定,下面正式开始!
//7.目的地-RxJava方式
public Observable<Uri> getLatestUriRxJava(String tag) {
ApiRx apiRx=new ApiRx();//Rxjava的接口
Observable<List<News>> catsListObservable = apiRx.queryCats(tag);
Observable<News> cutestCatObservable = catsListObservable.map(new Func1<List<News>, News>() {
@Override
public News call(List<News> cats) {
return getLatestNews(cats);
}
});
Observable<Uri> storedUriObservable = cutestCatObservable.flatMap(new Func1<News, Observable<? extends Uri>>() {
@Override
public Observable<? extends Uri> call(News cat) {
return apiRx.store(cat);
}
});
return storedUriObservable;
}
private void testRxJava() {
Observer<Uri> observer = new Observer<Uri>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(Uri uri) {
LogUtil.print("END-RxJava方式:" + uri.toString());
}
};
LogUtil.print("START-RxJava方式");
Observable<Uri> uriObservable = client.getLatestUriRxJava("RxJava");
uriObservable.subscribe(observer);
}