現在愈來愈多Android開發者使用到RxJava,在Android使用RxJava主要有以下好處:
1,輕松切換線程。之前我們切換線程主要使用Handler等手段來做。
2,輕松解決回調的嵌套問題。現在的app業務邏輯愈來愈復雜,多的時候3,4層回調嵌套,使得代碼可保護性變得很差。RxJava鏈式調用使得這些調用變得扁平化。
隨著RxJava的流行,愈來愈多的開源項目開始支持RxJava,像Retrofit、GreenDao等。這些開源項目支持RxJava使得我們解決復雜業務變得非常方便。
但是這些還不夠,有的時候我們自己的封裝的業務也需要支持RxJava,舉個例子:查詢數據、處理本地文件等操作,總而言之就是1些耗時任務。而且還要處理這些操作的成功、失敗、線程切換等操作。
如果還是想之前那樣做,那就太low。
遇到這類問題,在我腦海里顯現的第1種方式就是通過Observable的create操作符。由于在里面我們可以控制數據的發射。就像上1篇文章那樣《RxJava switchIfEmpty操作符實現Android檢查本地緩存邏輯判斷》
以下代碼片斷:
Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
try {
List<Article> as = articleDao.queryBuilder()
.where(ArticleDao.Properties.CategoryId.eq(categoryId))
.orderDesc(ArticleDao.Properties.Id)
.offset((pageIndex - 1) * pageSize)
.limit(pageSize).list();
if (as == null || as.isEmpty()) {
subscriber.onNext(null);
}else{
subscriber.onNext(as);
}
}catch (Exception e){
subscriber.onError(e);
}
subscriber.onCompleted();
}
});
這樣確切沒有無問題。但是我們要封裝下, 每一個方法都這樣寫保護性和擴大比較差(例如有天我想換種方式來實現而不是create,如果通過方法封裝1下,修改就變得容易多了)
如何封裝呢?通過分析知道,大部份代碼是相同的,只是我們的業務不1樣。那末通過模板方法解決吧。業務方法通過接口回調的方式傳遞進來,由于我們不知道調用者是甚么業務。
回調接口以下(T表示我們業務數據):
public interface MyCallable<T> {
T call();
}
下面是模板代碼:
protected <R> Observable<R> createObservable(final MyCallable<R> callable) {
return Observable.create(new Observable.OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> subscriber) {
try {
R result = callable.call();
subscriber.onNext(result);
} catch (Exception e) {
subscriber.onError(e);
}
subscriber.onCompleted();
}
});
}
使用就非常簡單了調用createObservable方法,實現MyCallable接口便可,然后就是跟使用RxJava1樣處理邏輯。
看過Greendao源碼的人知道,它也是通過這類方式支持RxJava的(下面看看他是怎樣做的):
/**
* Rx version of {@link AbstractDao#loadAll()} returning an Observable.
*/
@Experimental
public Observable<T> load(final K key) {
return wrap(new Callable<T>() {
@Override
public T call() throws Exception {
return dao.load(key);
}
});
}
終究的實現也是通過dao.load(key)同步方法來實現的,關鍵是wrap方法了:
protected <R> Observable<R> wrap(Callable<R> callable) {
return wrap(RxUtils.fromCallable(callable));
}
//通過這個方法再包裝了1層(就是默許設置履行的線程)
protected <R> Observable<R> wrap(Observable<R> observable) {
if (scheduler != null) {
return observable.subscribeOn(scheduler);
} else {
return observable;
}
}
通過代碼可以看到默許履行的線程是IO線程:
/**
* The returned RxDao is a special DAO that let's you interact with Rx Observables using RX's IO scheduler for
* subscribeOn.
*
* @see #rxPlain()
*/
@Experimental
public RxDao<T, K> rx() {
if (rxDao == null) {
rxDao = new RxDao<>(this, Schedulers.io());
}
return rxDao;
}
所以使用greendao不用指定它在IO履行,由于sdk已幫我們設置了。
然后就是RxUtils.fromCallable(callable)
方法了:
class RxUtils {
/** As of RxJava 1.1.7, Observable.fromCallable is still @Beta, so just in case... */
@Internal
static <T> Observable<T> fromCallable(final Callable<T> callable) {
return Observable.defer(new Func0<Observable<T>>() {
@Override
public Observable<T> call() {
T result;
try {
result = callable.call();
} catch (Exception e) {
return Observable.error(e);
}
return Observable.just(result);
}
});
}
}
上面的注釋說通過Observable.fromCallable也能夠實現這樣的邏輯,也就是說代替Observable.defer()方法。
最后greendao是通過defer操作符來實現rx風格的。
通過分析greendao源碼得知,他是通過defer來做的,我們是通過create操作符來做的。那二者有甚么不同?
我們對defer操作符比較陌生,先看看它的源碼:
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) {
return create(new OnSubscribeDefer<T>(observableFactory));
}
說白了就是調用了create(OnSubscribe<T> f)
方法:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
其實我們上面的create操作也是調用過來這個方法。只是defer操作符傳遞的OnSubscribe是OnSubscribeDefer,那我們來看看這是甚么鬼?
public final class OnSubscribeDefer<T> implements OnSubscribe<T> {
final Func0<? extends Observable<? extends T>> observableFactory;
public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) {
this.observableFactory = observableFactory;
}
@Override
public void call(final Subscriber<? super T> s) {
Observable<? extends T> o;
try {
o = observableFactory.call();
} catch (Throwable t) {
Exceptions.throwOrReport(t, s);
return;
}
o.unsafeSubscribe(Subscribers.wrap(s));
}
}
OnSubscribeDefer也是繼承自OnSubscribe,那末他的call方法肯定也是在定閱的時候被調用(就是說定閱的時候才創建這個observable,并且每次定閱都會創建1個新的observable)。
為何Greendao沒有使用create那種方式精確控制數據的發射?現在RxJava2.0對create操作符做出了1些限制,不能馬馬虎虎create了,這樣出現1些問題。具體的rxJav2.0的改動可以看看
他的github說明What’s-different-in⑵.0
關于RxJava的1些參考資料:
pitfalls-of-operator-implementations
subscribe vs unsafeSubscribe
What’s-different-in⑵.0