ReactiveCocoa2 源码浅析

ReactiveCocoa2 源码浅析

标签(空格分隔): ReactiveCocoa iOS Objective-C



  ? 开车不需要知道离合器是怎么工作的,但如果知道离合器原理,那么车子可以开得更平稳。

  ReactiveCocoa 是一个重型的 FRP 框架,内容十分丰富,它使用了大量内建的 block,这使得其有强大的功能的同时,内部源码也比较复杂。本文研究的版本是2.4.4,小版本间的差别不是太大,无需担心此问题。 这里只探究其核心 RACSignal 源码及其相关部分。本文不会详细解释里面的代码,重点在于讨论那些核心代码是 怎么来 的。文本难免有不正确的地方,请不吝指教,非常感谢。

@protocol RACSubscriber

  信号是一个异步数据流,即一个将要发生的以时间为序的事件序列,它能发射出三种不同的东西:valueerrorcompleted。咱们能异步地捕获这些事件:监听信号,针对其发出的三种东西进行操作。“监听”信息的行为叫做 订阅(subscriber)。我们定义的操作就是观察者,这个被“监听”的信号就是被观察的主体(subject) 。其实,这正是“观察者”设计模式!

  

  RAC 针对这个订阅行为定义了一个协议:RACSubscriber。RACSubscriber 协议是与 RACSignal 打交道的唯一方式。咱们先不探究 RACSignal 的内容,而是先研究下 RACSubscriber 是怎么回事。

  

  先来看下 RACSubscriber 的定义:

// 用于从 RACSignal 中直接接收 values 的对象
@protocol RACSubscriber <NSObject>
@required

/// 发送下一个 value 给 subscribers。value 可以为 nil。
- (void)sendNext:(id)value;

/// 发送 error 给 subscribers。 error 可以为 nil。
///
/// 这会终结整个订阅行为,而且接下来也无法再订阅任何信号了。
- (void)sendError:(NSError *)error;

/// 发送 completed 给 subscribers。
///
/// 这会终结整个订阅行为,而且接下来也无法再订阅任何信号了。
- (void)sendCompleted;

/// 现在重要的是上面三个,先别管这个,忽略掉。
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;

@end

1、NLSubscriber

  咱们自己来实现这个协议看看(本文自定义的类都以 “NL” 开头,以视区别):

// NLSubscriber.h
@interface NLSubscriber : NSObject <RACSubscriber>
@end

// NLSubscriber.m
@implementation NLSubscriber

- (void)sendNext:(id)value {
  NSLog(@"%s value:%@", sel_getName(_cmd), value);
}

- (void)sendCompleted {
  NSLog(@"%s", sel_getName(_cmd));
}

- (void)sendError:(NSError *)error {
  NSLog(@"%s error:%@", sel_getName(_cmd), error);
}

- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
  // to nothing
}

@end

  现在咱们这个类只关心 sendNext:sendError:sendCompleted。本类的实现只是简单的打印一些数据。那怎么来使用这个订阅者呢?RACSignal 类提供了接口来让实现了 RACSubscriber 协议的订阅者订阅信号:

@interface RACSignal (Subscription)
/*
 *  `subscriber` 订阅 receiver 的变化。由 receiver 决定怎么给 subscriber 发送事件。
 *简单来说,就是由这个被订阅的信号来给订阅者 subscriber 发送 `sendNext:` 等消息。
 */
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber;
@end

  用定时器信号来试试看:

  /**
   *  @brief  创建一个定时器信号,每三秒发出一个当时日期值。一共发5次。
   */
   RACSignal *signalInterval = [RACSignal interval:3.0 onScheduler:[RACScheduler mainThreadScheduler]];
  signalInterval = [signalInterval take:5];
  NLSubscriber *subscriber = [[NLSubscriber alloc] init];

  /**
   *  @brief  用订阅者 subscriber 订阅定时器信号
   */
  [signalInterval subscribe:subscriber];

  下面是输出结果:

2015-08-15 17:45:02.612 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:02 +0000
2015-08-15 17:45:05.612 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:05 +0000
2015-08-15 17:45:08.615 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:08 +0000
2015-08-15 17:45:11.613 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:11 +0000
2015-08-15 17:45:14.615 RACPraiseDemo[738:59818] sendNext: value:2015-08-15 09:45:14 +0000
2015-08-15 17:45:14.615 RACPraiseDemo[738:59818] sendCompleted

2、改进NLSubscriber

  现在的这个订阅者类 NLSubscriber 除了打印打东西外,啥也干不了,更别说复用了,如果针对所有的信号都写一个订阅者那也太痛苦了,甚至是不太可能的事。

  

  咱们来改进一下,做到如下几点:

  1. 实现 RACSubscriber 协议

  2. 提供与 RACSubscriber对应的可选的可配的接口。

  

  没错,这正是一个适配器!

  第2点的要求可不少,那怎么才能做到这一点呢?还好,OC 中有 block !咱们可以将 RACSubscriber 协议中的三个方法转为三个 block:

- (void)sendNext:(id)value;           ---->    void (^next)(id value);
- (void)sendError:(NSError *)error;   ---->    void (^error)(NSError *error);
- (void)sendCompleted;                ---->    void (^completed)(void);

  改进目标和改进方向都有了,那咱们来看看改进后的的样子:

// 头文件
/**
 *  @brief  基于 block 的订阅者
 */
@interface NLSubscriber : NSObject <RACSubscriber>

/**
 *  @brief  创建实例
 */
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed;

@end

// 实现文件
@interface NLSubscriber ()

@property (nonatomic, copy) void (^next)(id value);
@property (nonatomic, copy) void (^error)(NSError *error);
@property (nonatomic, copy) void (^completed)(void);

@end

@implementation NLSubscriber

#pragma mark Lifecycle
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
  NLSubscriber *subscriber = [[self alloc] init];

  subscriber->_next = [next copy];
  subscriber->_error = [error copy];
  subscriber->_completed = [completed copy];

  return subscriber;
}

#pragma mark RACSubscriber
- (void)sendNext:(id)value {
  @synchronized (self) {
    void (^nextBlock)(id) = [self.next copy];
    if (nextBlock == nil) return;

    nextBlock(value);
  }
}

- (void)sendError:(NSError *)e {
  @synchronized (self) {
    void (^errorBlock)(NSError *) = [self.error copy];

    if (errorBlock == nil) return;
    errorBlock(e);
  }
}

- (void)sendCompleted {
  @synchronized (self) {
    void (^completedBlock)(void) = [self.completed copy];

    if (completedBlock == nil) return;
    completedBlock();
  }
}

- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
  // to nothing
}

@end

  现在来试试看这个改进版,还是上面那个定时器的例子:

/**
   *  @brief  创建一个定时器信号,每三秒发出一个当时日期值。一共发5次。
   */
  RACSignal *signalInterval = [RACSignal interval:3.0 onScheduler:[RACScheduler mainThreadScheduler]];
  signalInterval = [signalInterval take:5];
  NLSubscriber *subscriber = [NLSubscriber subscriberWithNext:^(id x) {
    NSLog(@"next:%@", x);
  } error:nil completed:^{
    NSLog(@"completed");
  }];

  /**
   *  @brief  用订阅者 subscriber 订阅定时器信号
   */
  [signalInterval subscribe:subscriber];

  输出结果如下:

2015-08-15 19:50:43.355 RACPraiseDemo[870:116551] next:2015-08-15 11:50:43 +0000
2015-08-15 19:50:46.358 RACPraiseDemo[870:116551] next:2015-08-15 11:50:46 +0000
2015-08-15 19:50:49.355 RACPraiseDemo[870:116551] next:2015-08-15 11:50:49 +0000
2015-08-15 19:50:52.356 RACPraiseDemo[870:116551] next:2015-08-15 11:50:52 +0000
2015-08-15 19:50:55.356 RACPraiseDemo[870:116551] next:2015-08-15 11:50:55 +0000
2015-08-15 19:50:55.356 RACPraiseDemo[870:116551] completed

  输出结果没什么变化,但是订阅者的行为终于受到咱们的撑控了。再也不用为了一个信号而去实现 RACSubscriber 协议了,只需要拿出 NLSubscriber 这个适配器,再加上咱们想要的自定义的行为即可。如果对信号发出的某个事件不感兴趣,直接传个 nil 可以了,例如上面例子的 error: ,要知道, RACSubscriber 协议中的所有方法都是 @required 的。NLSubscriber 大大方便了我们的工作。

  

  那还以再改进吗?

  

3、RACSignal 类别之 Subscription

  有没有可能把 NLSubscriber 隐藏起来呢?毕竟作为一个信号的消费者,需要了解的越少就越简单,用起来也就越方便。咱们可以通过 OC 中的类别方式,给 RACSignal 加个类别(nl_Subscription),将订阅操作封装到这个信号类中。这样,对于使用这个类的客户而言,甚至不知道订阅者的存在。

  

  nl_Subscription 类别代码如下:

// .h
#import "RACSignal.h"

@interface RACSignal (nl_Subscription)

- (void)nl_subscribeNext:(void (^)(id x))nextBlock;
- (void)nl_subscribeNext:(void (^)(id x))nextBlock completed:(void (^)(void))completedBlock;
- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock;
- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock;
- (void)nl_subscribeCompleted:(void (^)(void))completedBlock;
- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock;
- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock;

@end

// .m
#import "RACSignal+nl_Subscription.h"
#import "NLSubscriber.h"

@implementation RACSignal (nl_Subscription)

- (void)nl_subscribeNext:(void (^)(id x))nextBlock {
  [self nl_subscribeNext:nextBlock error:nil completed:nil];
}

- (void)nl_subscribeNext:(void (^)(id x))nextBlock completed:(void (^)(void))completedBlock {
  [self nl_subscribeNext:nextBlock error:nil completed:completedBlock];
}

- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock {
  [self nl_subscribeNext:nil error:errorBlock completed:nil];
}

- (void)nl_subscribeCompleted:(void (^)(void))completedBlock {
  [self nl_subscribeNext:nil error:nil completed:completedBlock];
}

- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock {
  [self nl_subscribeNext:nextBlock error:errorBlock completed:nil];
}

- (void)nl_subscribeError:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
  [self nl_subscribeNext:nil error:errorBlock completed:completedBlock];
}

- (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
  NLSubscriber *subscriber = [NLSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
  [self subscribe:subscriber];
}

@end

  在这个类别中,将信号的 next:error:completed 以及这三个事件的组合都以 block 的形式封装起来,从以上代码中可以看出,这些方法最终调用的还是 - (void)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock; 方法,而它则封装了订阅者 NLSubsciber

  

  通过这么个小小的封装,客户使用起来就极其方便了:

  /**
   *  @brief  创建一个自定义的信号。
   *        这个信号在被订阅时,会发送一个当前的日期值;
   *        再过三秒后,再次发送此时的日期值;
   *        最后,再发送完成事件。
   */
  RACSignal *signalInterval = [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    [subscriber sendNext:[NSDate date]];

    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(3.0 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
      [subscriber sendNext:[NSDate date]];
      [subscriber sendCompleted];
    });

    return (id)nil;
  }];

  [signalInterval nl_subscribeNext:^(id x) {
    NSLog(@"next:%@", x);
  } error:^(NSError *error) {
    NSLog(@"error:%@", error);
  } completed:^{
    NSLog(@"completed");
  }];

  输出如下:

2015-08-16 23:29:44.406 RACPraiseDemo[653:32675] next:2015-08-16 15:29:44 +0000
2015-08-16 23:29:47.701 RACPraiseDemo[653:32675] next:2015-08-16 15:29:47 +0000
2015-08-16 23:29:47.701 RACPraiseDemo[653:32675] completed

  本例并没有采用之前的 “定时器信号”,而是自己创建的信号,当有订阅者到来时,由这个信号来决定在什么时候发送什么事件。这个例子里发送的事件的逻辑请看代码里的注释。

  

  看到这里,是不是很熟悉了?有没有想起 subscribeNext:,好吧,我就是在使用好多好多次它之后才慢慢入门的,谁让 RAC 的大部分教程里面第一个讲的就是它呢!

  

  到了这里,是不是订阅者这部分就完了呢?我相信你也注意到了,这里有几个不对劲的地方:

  

  1. 无法随时中断订阅操作。想想订阅了一个无限次的定时器信号,无法中断订阅操作的话,定时器就是永不停止的发下去。

  

  2. 订阅完成或错误时,没有统一的地方做清理、扫尾等工作。比如现在有一个上传文件的信号,当上传完成或上传错误时,你得断开与文件服务器的网络连接,还得清空内存里的文件数据。

4、Disposable

RACDisposable

  针对上述两个问题,RACDisposable 应运而生。也就是说 Disposable 有两个作用:

  

  1. 中断订阅某信号

  2. 订阅完成后,执行一些收尾任务(清理、回收等等)。

  订阅者与 Disposable 的关系:

  

  1. 当 Disposable 有“清理”过,那么订阅者就不会再接收到这个被“清理”订阅源的任何事件。举例而言,就是订阅者 subscriberX 订阅了信号 signalA 和 signalB 两个信号,其所对应的 Disposable 分别为 disposableA 和 disposableB,也就是说 subscriberX 会同时接收来自 signalA 和 signalB 的信号。当我们手动强制 “清理” disposableA 后,subscriberX 就不会再接收来自 signalA 的任何事件;而来自 signalB 的事件则不受影响。

  2. 当订阅者 subscriberX 有接收来自任何一个信号的 “error” 或 “completed” 事件时,则不会再接收任何事件了。

  

  可以这么说:Disposable 代表发生了订阅行为

  根据 Disposable 的作用和与订阅者的关系,来总结它所需要提供的接口:

  

  1. 包含清理任务的 block ;

  2. 执行清理任务的方法:- (void)dispose ;

  3. 一个用来表明是否已经 “清理” 过的布尔变量:BOOL disposed 。

  咱们为这个 Disposable 也整了一个类,如下:

// .h file
/**
 *  @brief  一个 disposable 封装了用于拆除、清理订阅的任务工作
 */
@interface NLDisposable : NSObject

/**
 *  @brief  这个 disposable 是否已经拆除过
 */
@property (atomic, assign, getter = isDisposed, readonly) BOOL disposed;

+ (instancetype)disposableWithBlock:(void (^)(void))block;

/**
 *  @brief  执行拆除工作。能多次调用这个消息,只有第一次调用有效。
 */
- (void)dispose;
@end

// .m file

@interface NLDisposable () {
  /**
   *  @brief  类型类似于:@property (copy) void (^disposeBlock)(void);
   *       在 disposal 要执行的任务逻辑。
   *       1、如果没有初妈值的话,则初始值默认为 `self`
   *       2、当已经 disposal 过后,值为 NULL。
   */
  void * volatile _disposeBlock;
}
@end

@implementation NLDisposable

#pragma mark Properties

- (BOOL)isDisposed {
  return _disposeBlock == NULL;
}

#pragma mark Lifecycle

- (id)init {
  self = [super init];
  if (self == nil) return nil;

  _disposeBlock = (__bridge void *)self;
  OSMemoryBarrier();

  return self;
}

- (id)initWithBlock:(void (^)(void))block {
  NSCParameterAssert(block != nil);

  self = [super init];
  if (self == nil) return nil;

  _disposeBlock = (void *)CFBridgingRetain([block copy]);
  OSMemoryBarrier();

  return self;
}

+ (instancetype)disposableWithBlock:(void (^)(void))block {
  return [[self alloc] initWithBlock:block];
}

- (void)dealloc {
  if (_disposeBlock == NULL || _disposeBlock == (__bridge void *)self) return;

  CFRelease(_disposeBlock);
  _disposeBlock = NULL;
}

#pragma mark Disposal

- (void)dispose {
  /**
   *  @brief  这里为了让逻辑更清晰,去掉了原子性操作。具体代码请看 RACDisposable
   */
  if (_disposeBlock == NULL || _disposeBlock == (__bridge void *)self) return;

  void (^disposeBlock)(void) = CFBridgingRelease((void *)_disposeBlock);
  _disposeBlock = NULL;
  disposeBlock();
}

@end

  

  从这个类提供的接口来看,显然是做不到 “订阅者与 Disposable 的关系” 中的第2条的。因为这条中所描述的是一个订阅者订阅多个信号,且能手动中断订阅其中一个信号的功能,而 NLDisposable 是单个订阅关系所设计的。

RACCompoundDisposable

  那怎么组织这“多个”的关系呢?数组?Good,就是数组。OK,咱们来相像一下这个方案的初步代码。每个订阅者有一个 Disposable 数组,订阅一个一个信号,则加入一个 Disposable;当手动拆除一个订阅关系时,找到与之相关的 Disposable,发送 dispose 消息,将其从数组中移除;当订阅者不能再接收消息时(接收过 errorcompleted 消息),要 dispose 数组中所有元素,接下来再加入元素时,直接给这个要加入的元素发送 dispose 消息;在多线程环境下,每一次加入或移除或其遍历时,都得加锁。。。(好吧,我编不下去了)

  

  我** ,这么复杂,看来直接用数组来维护是不可行的了。有啥其它可行的法子没?还好,GoF 对此有个方案,叫做“组合模式”:

组合模式 允许你将对象组合成树形结构来表现 “整体/部分” 层次结构。组合能让客户以一致的方式处理个别对象以及对象组合。

  使用组合结构,我们能把相同的操作应用在组合和个别对象上。换句话说,在大多数情况下,我们可以 忽略 对象组合和个别对象之间的差别。

  

  本文毕竟不是来讲模式的,关于这个模式更多的信息,请自行 google。

  

  RAC 中这个组合类叫 RACCompoundDisposable, 咱们的叫 NLCompoundDisposable,来看看咱们这个类的代码:

// .h file
#import "NLDisposable.h"

/**
 *  @brief  A disposable of disposables。当它 dispose 时,它会 dispose
 *       它所包含的所有的 disposables。
 *
 *       如果本 compound disposable 已经 dispose 过后,再来调用 -addDisposable:,
 *     那么其参数 disposable 会立马调用 dispose 方法。
 *
 *       本类中的方法说明请查看 RACCompoundDisposable 中的同名方法。
 *       本类与真正的类 RACCompoundDisposable 代码差别较大,但本质是一样的
 */
@interface NLCompoundDisposable : NLDisposable

/**
 *  @brief  创建并返回一个新的 compound disposable。
 */
+ (instancetype)compoundDisposable;

/**
 *  @brief  创建并返回一个新的包含了 disposables 的 compound disposable。
 *
 *  @param disposables disposable 数组
 *
 *  @return 一个新的 compound disposable
 */
+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables;

/**
 *  @brief  将 disposable 加到本 compound disposable 中。如果本 compound disposable
 *       已经 dispose 的话,那么参数 disposable 会被立即 dispose。
 *
 *          本方法是线程安全的。
 *
 *  @param disposable 要被加入的 disposable。如果它为 nil 的话,那么什么也不会发生。
 */
- (void)addDisposable:(NLDisposable *)disposable;

/**
 *  @brief  从本 compound disposable 移除指定的 disposable(不管这个 disposable 是什么
 *       状态);如果这个 disposable 不在本 compound disposable 中,则什么也不会发生。
 *
 *          本方法是线程安全的。
 *
 *  @param disposable 要被移除的 disposable。可以为 nil。
 */
- (void)removeDisposable:(NLDisposable *)disposable;

@end

// .m file
#import "NLCompoundDisposable.h"
#import <libkern/OSAtomic.h>

@interface NLCompoundDisposable () {
  /**
   *  @brief  同步锁
   */
  OSSpinLock _spinLock;

  /**
   *  @brief  本 compound disposable 所包含的 disposables。
   *
   *          在操作这个数组时,应该使用 _spinLock 进行同步。如果
   *       `_disposed` 为 YES,则这个数组可能为 nil。
   */
  NSMutableArray *_disposables;

  /**
   *  @brief  本 compound disposable 是否已经 dispose 。
   *
   *          在操作这个变量时,应该使用 _spinLock 进行同步。
   */
  BOOL _disposed;
}

@end

@implementation NLCompoundDisposable

#pragma mark Properties
- (BOOL)isDisposed {
  OSSpinLockLock(&_spinLock);
  BOOL disposed = _disposed;
  OSSpinLockUnlock(&_spinLock);

  return disposed;
}

#pragma mark Lifecycle
+ (instancetype)compoundDisposable {
  return [[self alloc] initWithDisposables:nil];
}

+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables {
  return [[self alloc] initWithDisposables:disposables];
}

- (id)initWithDisposables:(NSArray *)otherDisposables {
  self = [self init];
  if (self == nil) return nil;

  if ([otherDisposables count]) {
    _disposables = [NSMutableArray arrayWithArray:otherDisposables];
  }

  return self;
}

- (id)initWithBlock:(void (^)(void))block {
  NLDisposable *disposable = [NLDisposable disposableWithBlock:block];
  return [self initWithDisposables:@[ disposable ]];
}

- (void)dealloc {
  _disposables = nil;
}

#pragma mark Addition and Removal
- (void)addDisposable:(NLDisposable *)disposable {
  NSCParameterAssert(disposable != self);
  if (disposable == nil || disposable.disposed) return;

  BOOL shouldDispose = NO;

  OSSpinLockLock(&_spinLock);
  {
    if (_disposed) {
      shouldDispose = YES;
    } else {
      if (_disposables == nil) {
        _disposables = [NSMutableArray array];
      }
      [_disposables addObject:disposable];
    }
  }
  OSSpinLockUnlock(&_spinLock);

  if (shouldDispose) {
    [disposable dispose];
  }
}

- (void)removeDisposable:(NLDisposable *)disposable {
  if (disposable == nil) return;

  OSSpinLockLock(&_spinLock);
  {
    if (!_disposed) {
      if (_disposables != nil) {
        [_disposables removeObject:disposable];
      }
    }
  }
  OSSpinLockUnlock(&_spinLock);
}

#pragma mark RACDisposable
- (void)dispose {
  NSArray *remainingDisposables = nil;

  OSSpinLockLock(&_spinLock);
  {
        _disposed = YES;
        remainingDisposables = _disposables;
        _disposables = nil;
  }
  OSSpinLockUnlock(&_spinLock);

  if (remainingDisposables == nil) return;

  [remainingDisposables makeObjectsPerformSelector:@selector(dispose)];
}

@end

RACScheduler 简介

  本文不打算研究 RACScheduler 源码,但其又是 RAC 中不可或缺的一个组件,在研究 RACSignal 的源码时不可避免地会遇到它,所以对其作下介绍还是有必要的。其实它的源码并不复杂,可自行研究。

  ReactiveCocoa 中 RACSignal 发送的所有事件的传递交给了一个特殊的框架组件——调度器,即 RACScheduler 类簇(类簇模式稍后介绍)。调度器是为了简化 同步/异步/延迟 事件传递 以及 取消预定的任务(scheduded actions) 这两种 RAC 中常见的动作而提出来的。“事件传递” 简单而言就是些 blocks,RACScheduler 所做的就是:调度这些 blocks (schedule blokcs,还是英文的意思准确些)。我们可以通过那些调度方法所返回的 RACDisposable 对象来取消那些 scheduling blocks。

  

  正如前面所说,RACScheduler 是一个类簇。咱们来看看几种具体的调度器:

RACImmediateScheduler

  这是 RAC 内部使用的私有调度器,只支持同步 scheduling。就是简单的马上执行 block。这个调试器的延迟 scheduling 是通过调用 -[NSThread sleepUntilDate:] 来阻塞当前线程来达到目的的。显然,这样一个调度器,没法取消 scheduling,所以它那些方法返回的 disposables 啥也不会做(实际上,它那些 scheduling 方法返回的是nil)。

RACQueueScheduler

  这个调度器使用 GCD 队列来 scheduling blocks。如果你对 GCD 有所了解的话,你会发现这个调度器的功能很简单,它只是在 GCD 队列 dispatching blocks 上的简单封装罢了。

RACSubscriptionScheduler

  这是另一个内部使用的私有调度器。如果当前线程有调度器(调度器可以与线程相关联起来:associated)那它就将 scheduling 转发给这个线程的调度器;否则就转发给默认的 background queue 调试器。

接口

  调试器有下面一些方法:

- (RACDisposable *)schedule:(void (^)(void))block;

- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block;

- (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block;

- (RACDisposable *)after:(NSDate *)date repeatingEvery:(NSTimeInterval)interval withLeeway:(NSTimeInterval)leeway schedule:(void (^)(void))block;

  scheduling block 如下:

RACDisposable *disposable = [[RACScheduler mainThreadScheduler] afterDelay:5.0 schedule:^{
    // do something
}];

// 如果你想要取消 scheduling block
[disposable dispose]; // block scheduling 被取消了,不会再被执行。

5、Subscriber 和 Disposable

  前面介绍了 Disposable 的来源,现在来研究下怎么使用它。还记得吗,订阅者与信号打交道的唯一方式是 RACSignal 中的一个方法:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber;

自定义信号所对应的类是 RACDynamicSignalRACSignal 采用的是类簇模式。除自定义信号之外还有几种其它的信号,之后会研究到。OC 中的 NSNumber 用的就是类簇模式。类簇是Foundation框架中广泛使用的设计模式。类簇将一些私有的、具体的子类组合在一个公共的、抽象的超类下面,以这种方法来组织类可以简化一个面向对象框架的公开架构,而又不减少功能的丰富性。

  咱们来研究一下自定义信号里的这个方法的实现。这个方法实现的难处在于:“一个订阅者可以订阅多个信号,并可以手动拆除其中任何一个订阅”。针对这个问题,提出了上节讲到的 RACDisposable。也就是说,在每一次订阅时,都会返回一个与这次订阅相关的 Disposable,那怎么做到这一点呢?

  给订阅者添加一个 CompoundDisposable 类型的属性 (毕竟 CompoundDisposable 就是用来针对多个 Disposable 的统一管理而存在的),然后在每一次订阅时,都加一个 Disposable 到这个属性里,行不行?但很可惜,订阅者是一个协议 protocol RACSubscriber,而不是一个具体的类,咱们在使用到它时,都是别人实现了这个协议的类的对象,所以咱们不太可能做到说给这么一个未知的类添加一个属性。

事实上,RAC 中确实有 RACSubscriber 这么一个私有类(它是咱们第一个自定义类 NLSubscriber 的原型),咱们叫它做 class RACSubscriber。嗯,class RACSusbscriber 实现了 protocol RACSubscriber 协议:@interface RACSubscriber : NSObject <RACSubscriber>。有没有想到 class NSObjectprotocol NSObject ?虽然它们形式上确实很像,但千万别混为一谈。RAC 中的其它实现了 protocol RACSubscriber 协议的订阅者类可没有一个继承自 class RACSubscriber 的。

  咱们可以用装饰模式来解决这个问题

装饰模式。在不必改变原类文件和使用继承的情况下,动态地扩展一个对象的功能。

订阅者装饰器 RACPassthroughSubscriber

  在订阅者每一次订阅信号时产生一个 Disposable,并将其与此次订阅关联起来,这是通过装饰器 RACPassthroughSubscriber 来做到的。这个装饰器的功能:

  

  1. 包装真正的订阅者,使自己成为订阅者的替代者。

  

  2. 将真正的订阅者与一个订阅时产生的 Disposable 关联起来。

  

  这正是一个装饰器所应该做的。依之前的,咱们来模仿这个装饰器,新建一个咱们的装饰器:NLPassthroughSubscriber,来看下它的代码:

// .f file
@class RACCompoundDisposable;
@class RACSignal;

/**
 *  @brief  这是一个订阅者的装饰器。在没有 dispose 时,它会把接收到的所有
 *       的事件都转发给真实的订阅者。
 */
@interface NLPassthroughSubscriber : NSObject <RACSubscriber>

/**
 *  @brief  初始化方法
 *
 *  @param subscriber 被包装的真实的订阅者,本装饰器会把接收到的所有的事件都转发给这个订阅者。不能为 nil
 *  @param signal     要发送事件给这个装饰器的信号。
 *  @param disposable 当这个 disposable 接收到 dispose 消息后,将不会再转发事件。不能为 nil
 *
 *  @return 返回一个初始化后的 passthrough subscriber
 */
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable;

@end

// .m file
@interface NLPassthroughSubscriber ()

// 被转发事件的订阅者。
@property (nonatomic, strong, readonly) id<RACSubscriber> innerSubscriber;

// 要发送事件给本装饰器的信号
@property (nonatomic, unsafe_unretained, readonly) RACSignal *signal;

// 代表当前订阅关系的 disposable。当它 dispose 后,将不会再转发任何事件给 `innerSubscriber`。
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

@end

@implementation NLPassthroughSubscriber

#pragma mark Lifecycle

- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
  NSCParameterAssert(subscriber != nil);

  self = [super init];
  if (self == nil) return nil;

  _innerSubscriber = subscriber;
  _signal = signal;
  _disposable = disposable;

  /**
   *  告诉订阅者:发生了订阅行为。并将这次订阅行为相关的 `Disposable` 传给订阅者。
   */
  [self.innerSubscriber didSubscribeWithDisposable:self.disposable];
  return self;
}

#pragma mark RACSubscriber

- (void)sendNext:(id)value {
  /**
   *  如果 disposable 已经 dispose 过,就不再转发事件
   */
  if (self.disposable.disposed) return;

  /**
   *  转发 next 事件
   */
  [self.innerSubscriber sendNext:value];
}

- (void)sendError:(NSError *)error {
  if (self.disposable.disposed) return;

  [self.innerSubscriber sendError:error];
}

- (void)sendCompleted {
  if (self.disposable.disposed) return;

  [self.innerSubscriber sendCompleted];
}

- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
  if (disposable != self.disposable) {
    [self.disposable addDisposable:disposable];
  }
}

@end

自定义信号 RACDynamicSignal 的订阅方法 subscribe

  咱们来看看 RACDynamicSignal 是怎么来使用 RACPassthroughSubscriber 的,这里就不自己写代码了,直接上它的代码:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

  /**
   *  本次订阅相关 disposable。本方法的返回值,起 拆除 本次订阅的作用。
   */
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

  /**
   * 订阅者装饰器。
   */
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

  /**
   *  _didSubscriber 是在 `+ createSignal` 方法中进入的 block 参数。
   *  + (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe;
   *  这个 block 以 subscriber 为参数,返回一个 disposable,即 innerDisposable,而这个 innerDisposable
   *  的作用是在 subscriber 不再订阅本 signal 时,起回收资源的作用。
   */
    if (self.didSubscribe != NULL) {
        RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
            RACDisposable *innerDisposable = self.didSubscribe(subscriber);
            [disposable addDisposable:innerDisposable];
        }];

        [disposable addDisposable:schedulingDisposable];
    }

    return disposable;
}

  可以看到,订阅者装饰器直接伪装成真正的订阅器,传给 didSubscribe 这个 block 使用。在这个 block 中,会有一些事件发送给订阅者装饰器,而这个订阅者装饰器则根据 disposable 的状态来来决定是否转发给真正的订阅者。disposable 作为返回值,返回给外部,也就是说能够从外部来取消这个订阅了。

    RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
        RACDisposable *innerDisposable = self.didSubscribe(subscriber);
        [disposable addDisposable:innerDisposable];
    }];

    [disposable addDisposable:schedulingDisposable];

  从这几行代码中,我们可以看到,didSubscribe 这个 block 是处于 subscriptionScheduler 这个 scheduler 的调度中。RACSubscriptionScheduler 的调度是取决于当前所在的线程的,即 didSubscribe 可能会在不同的调度器中被执行。

  假设当前 -(RACDisposable *)subscribe:(id<RACSubscriber>)subscriber 这个方法是在异步环境下调用的,那么在 disposable 返回后,在schedule block 还没有来得及调用,此时 disposable 中包含 schedulingDisposable。如果我们此时给 disposable 发送 dispose 消息,那么 schedulingDisposable 也会被 dispose,schedule block 就不会执行了;如果是在 schedule block 执行中或执行后给 disposable 发送 dispose 消息,那么 innerDisposableschedulingDisposable 都会被 dispose。这些行为正是咱们所预期的。

6、再次改进NLSubscriber

1、didSubscribeWithDisposable

@protocol RACSubscriber <NSObject>
@required
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;
@end

  这个 RACSubscriber 协议中声明的一个方法,在最开始的时候被我们特意给忽略,现在是时候回过头来看看它了。对于一个订阅者来说,nexterrorcompleted 三种事件分别对应协议里的三种方法,那么这个方法存在的意义是什么呢?

  从 RACSubscriber 协议中,可以看到,当一个订阅者有收到过 errorcompleted 事件后,这个订阅者就不能再接收任何事件了,换句话说,此时这个订阅者会解除所有的订阅关系,且无法再次订阅。既然要解除所有订阅,首先我得知道我订阅过哪些信号是不?而代表一个订阅行为的就是 disposable ,告诉它就传一个给它好了。所以这个方法就是告诉订阅者:你发生了订阅行为。

  那为啥要 RACCompoundDisposable 类型作为参数呢?因为有些订阅者会针对其附加一些操作,而只有这个类型的 disposable 才能动态加入一些操作。接下来我们就会看到的。

2、NLSubscriber 结合 RACDisposable

  这一次改进 NLSubscriber 的目的是让其可以终结自己的订阅能力的功能。同时实现 didSubscribeWithDisposable 方法。千言万语不如实际代码,让我们来一探究竟:

#import "NLSubscriber.h"
#import <ReactiveCocoa.h>

@interface NLSubscriber ()

@property (nonatomic, copy) void (^next)(id value);
@property (nonatomic, copy) void (^error)(NSError *error);
@property (nonatomic, copy) void (^completed)(void);

/**
 *  @brief  代表订阅者本身总体订阅行为的 disposable。
 *       当有接收到 error 或 completed 事件时,应该 dispose 这个 disposable。
 */
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

@end

@implementation NLSubscriber

#pragma mark Lifecycle
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
  NLSubscriber *subscriber = [[self alloc] init];

  subscriber->_next = [next copy];
  subscriber->_error = [error copy];
  subscriber->_completed = [completed copy];

  return subscriber;
}

- (instancetype)init {
  self = [super init];
  if (self == nil) return nil;

  @unsafeify(self);

  /**
   *  当 _disposable 被发送 dispose 消息时,将 next、error 和 completed 这三个
   *  block 设置为 nil,从而间实现订阅者无法再接收任何事件的功能。
   */
  RACDisposable *selfDisposable = [RACDisposable disposableWithBlock:^{
    @strongify(self);

    @synchronized (self) {
      self.next = nil;
      self.error = nil;
      self.completed = nil;
    }
  }];

  _disposable = [RACCompoundDisposable compoundDisposable];
  [_disposable addDisposable:selfDisposable];

  return self;
}

- (void)dealloc {
  [self.disposable dispose];
}

#pragma mark RACSubscriber
- (void)sendNext:(id)value {
  @synchronized (self) {
    void (^nextBlock)(id) = [self.next copy];
    if (nextBlock == nil) return;

    nextBlock(value);
  }
}

- (void)sendError:(NSError *)e {
  @synchronized (self) {
    void (^errorBlock)(NSError *) = [self.error copy];
    [self.disposable dispose];

    if (errorBlock == nil) return;
    errorBlock(e);
  }
}

- (void)sendCompleted {
  @synchronized (self) {
    void (^completedBlock)(void) = [self.completed copy];
    [self.disposable dispose];

    if (completedBlock == nil) return;
    completedBlock();
  }
}

- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)otherDisposable {
  if (otherDisposable.disposed) return;

  /**
   *  将 otherDisposable 添加到 selfDisposable 中。这样当 selfDisposable 被 dispose 时,
   *  otherDisposable 也能被 dispose。
   *
   *  这样,当订阅者接收到 error 或 completed 事件时,就能解除这个订阅者自身的所有订阅行为了。
   */
  RACCompoundDisposable *selfDisposable = self.disposable;
  [selfDisposable addDisposable:otherDisposable];

  @unsafeify(otherDisposable);
  /**
   *  如果这个订阅行为被解除,就将 otherDisposable 从 selfDisposable 中移除。
   *  (我们给 otherDisposable 增加了行为,这也就是参数需要是 RACCompoundDisposable
   *  类型的原因了。当然,其它的订阅者怎么用这个参数就跟其实际的业务相关了。)
   */
  [otherDisposable addDisposable:[RACDisposable disposableWithBlock:^{
    @strongify(otherDisposable);
    [selfDisposable removeDisposable:otherDisposable];
  }]];
}

@end

3、改进类别 nl_Subscription

  还记得么?nl_Subscription 类别中的订阅方法一旦订阅,就无法停止了,这显然有很大的问题。解决这个问题很简单,直接将 disposable 返回即可:

  

// .h file
@interface RACSignal (nl_Subscription)
...
- (RACDisposable *)nl_subscribeError:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock;

@end

// .m file
@implementation RACSignal (nl_Subscription)
...
- (RACDisposable *)nl_subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
  NLSubscriber *subscriber = [NLSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
  return [self subscribe:subscriber];
}

RACSignal - Operations

  本节主要研究这些操作(Operations) —— flattenMap:map:filter: ….

  终于看到你想看的东西了?好吧,我承认,上节的东西很无趣,可能压根不是你想看的东西。但如果没弄清上面的内容的话,直接研究 Operations 可是会比较吃力的哟~

  你以为咱们现在开始研究 Operations?哈哈,你又得失望了~ 咱得先看看这两个类:RACEmptySignalRACReturnSignal

1、两个 RACSignal 的特殊子类 RACEmptySignal 和 RACReturnSignal

1、RACEmptySignal

  RACEmptySignal+[RACSignal empty] 的内部实现,一个私有 RACSignal 子类。它就是一个会立即 completed 的信号。让我们来看看它的 - subscribe: 方法:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

    /**
     *  只要一订阅,就给 subscriber 发送 completed 事件。
     */
    return [RACScheduler.subscriptionScheduler schedule:^{
        [subscriber sendCompleted];
    }];
}

  这样一个订阅者一订阅就会 completed 信号有什么用呢?稍后揭晓。

2、RACReturnSignal

  RACReturnSignal+[RACSignal return:] 的内部实现,也是一个私有 RACSignal 子类。它会同步发送出一个值(即 next)给订阅者,然后再发送 completed 事件。 它比 RACEmptySignal 多了一点点东西,它是。直接看其实现:

@interface RACReturnSignal ()
// 在本信号被订阅时会发送的值。
@property (nonatomic, strong, readonly) id value;
@end

@implementation RACReturnSignal

#pragma mark Lifecycle

+ (RACSignal *)return:(id)value {
    RACReturnSignal *signal = [[self alloc] init];
    signal->_value = value;
    return signal;
}

#pragma mark Subscription

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

    return [RACScheduler.subscriptionScheduler schedule:^{
        [subscriber sendNext:self.value];
        [subscriber sendCompleted];
    }];
}

@end

  纯吐槽:为啥要叫 ReturnSignal 呢?不如直接 OneValueSignal 好了。O(∩_∩)O~~ 不过说真的,RAC 的命名真心不咋地。

  那么发送一个 next 后又 completed 的信号又有啥用呢?等下会知道地。

2、concat: 练手

  -[RACSignal concat:] 是源码较简单,且使用频率也较多的。那咱们就来拿它来练练手好了。

- (RACSignal *)concat:(RACSignal *)signal {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];

        RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            RACDisposable *concattedDisposable = [signal subscribe:subscriber];
            serialDisposable.disposable = concattedDisposable;
        }];

        serialDisposable.disposable = sourceDisposable;
        return serialDisposable;
    }] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}

  RACSerialDisposableRACDisposable 的子类,它包含一个 Disposable,能够在运行时设置这个 Disposable。当设置新的 newDisposable时,老的 oldDisposable 会被 dispose。当 RACSerialDisposabledispose 时,其所包含的 Disposable 会被 dispose

  基本上,对一个 RACSignal 的操作的返回值是一个新的 RACSignal 值时,其内部都是调用了 +[RACSignal createSignal:] 这个方法。这个创建信号返回的实际是自定义信号:RACDynamicSignal,针对它前文有所介绍。

  这里有一个小技巧。因为很多信号的操作是针对该信号本身 self 所发送的值作的操作。那也就是说会订阅 self,那咱们先找到这一句再说:self subscribe:self subscribeNext:...。嗯,找到了这几行:

RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
    [subscriber sendNext:x];
} error:^(NSError *error) {
    [subscriber sendError:error];
} completed:^{
    RACDisposable *concattedDisposable = [signal subscribe:subscriber];
    serialDisposable.disposable = concattedDisposable;
}];

  在订阅了 self 后,将 nexterror 事件发送给订阅者 subscriber。当 self 发送了 completed 事件事,再让 subscriber 订阅参数 signal。也就是当源信号完成后订阅 signal。怎么样,很简单吧。

3、zipWith:

  再来一个练手的玩意。-[RACSignal zipWith:]-[RACSignal concat:] 稍微复杂点。它是将 self 和 参数 signal 两个信号发送的值合并起来发送给订阅者。

- (RACSignal *)zipWith:(RACSignal *)signal {
    NSCParameterAssert(signal != nil);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block BOOL selfCompleted = NO;
        NSMutableArray *selfValues = [NSMutableArray array];

        __block BOOL otherCompleted = NO;
        NSMutableArray *otherValues = [NSMutableArray array];

        void (^sendCompletedIfNecessary)(void) = ^{
            @synchronized (selfValues) {
                BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
                BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
                if (selfEmpty || otherEmpty) [subscriber sendCompleted];
            }
        };

        void (^sendNext)(void) = ^{
            @synchronized (selfValues) {
                if (selfValues.count == 0) return;
                if (otherValues.count == 0) return;

                RACTuple *tuple = [RACTuple tupleWithObjects:selfValues[0], otherValues[0], nil];
                [selfValues removeObjectAtIndex:0];
                [otherValues removeObjectAtIndex:0];

                [subscriber sendNext:tuple];
                sendCompletedIfNecessary();
            }
        };

        RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
            @synchronized (selfValues) {
                [otherValues addObject:x ?: RACTupleNil.tupleNil];
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (selfValues) {
                otherCompleted = YES;
                sendCompletedIfNecessary();
            }
        }];

        return [RACDisposable disposableWithBlock:^{
            [selfDisposable dispose];
            [otherDisposable dispose];
        }];
    }] setNameWithFormat:@"[%@] -zipWith: %@", self.name, signal];
}

  同样的,重点在 [self subscriberNext:][signal subscribeNext:] 处。这里的实现是订阅 selfsignal 信号,然后将它们发送出的值收集起来,当两个都发出了值时,分别拿出两个信号最早发出的值,合并为一个 RACTuple,再发送给订阅者 subscriber。这个也很简单吧,只是代码稍多点而已。

4、bind:

1、说明

  信号的很多 operations 的实现调用来调用去最后都是调用了这个 -[RACSignal bind:] 方法,比如 flattenMap:map:filter 等等。那咱们就来看看这个方法是哪路神仙?

  这是在 RACStream 中声明的抽象方法。来看看它的声明:

typedef RACStream * (^RACStreamBindBlock)(id value, BOOL *stop);
- (instancetype)bind:(RACStreamBindBlock (^)(void))block;

  RACStreamBindBlock 是一个 block。它从一个 RACStream 中接收一个值,并且返回一个与该流相同类型的实例。如果将 stop 设为 YES,则会在返回一个实例后终结此次 bind。如果返回 nil 则会立即终结。

  

  bind: 方法是将流中每一个值都放到 RACStreamBindBlock 中跑一下。来看看其参数:block。然而这有什么卵用呢?好吧,我太笨,从它的说明来看,我真的不能理解它有什么用。

2、源码解读

  既然从方法说明了解不到,那直接来看其源码了。

- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
    NSCParameterAssert(block != NULL);
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACStreamBindBlock bindingBlock = block();

        NSMutableArray *signals = [NSMutableArray arrayWithObject:self];

        RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

        // 三.
        void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
            BOOL removeDisposable = NO;

            @synchronized (signals) {
                [signals removeObject:signal];

                if (signals.count == 0) {
                    [subscriber sendCompleted];
                    [compoundDisposable dispose];
                } else {
                    removeDisposable = YES;
                }
            }

            if (removeDisposable) [compoundDisposable removeDisposable:finishedDisposable];
        };

        // 二.
        void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
            @synchronized (signals) {
                [signals addObject:signal];
            }

            RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
            [compoundDisposable addDisposable:selfDisposable];

            RACDisposable *disposable = [signal subscribeNext:^(id x) {
                [subscriber sendNext:x];
            } error:^(NSError *error) {
                [compoundDisposable dispose];
                [subscriber sendError:error];
            } completed:^{
                @autoreleasepool {
                    completeSignal(signal, selfDisposable);
                }
            }];

            selfDisposable.disposable = disposable;
        };

        @autoreleasepool {
            RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
            [compoundDisposable addDisposable:selfDisposable];

           //  一.
            RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
                // Manually check disposal to handle synchronous errors.
                if (compoundDisposable.disposed) return;

                BOOL stop = NO;
                id signal = bindingBlock(x, &stop);

                @autoreleasepool {
                    if (signal != nil) addSignal(signal);
                    if (signal == nil || stop) {
                        [selfDisposable dispose];
                        completeSignal(self, selfDisposable);
                    }
                }
            } error:^(NSError *error) {
                [compoundDisposable dispose];
                [subscriber sendError:error];
            } completed:^{
                @autoreleasepool {
                    completeSignal(self, selfDisposable);
                }
            }];

            selfDisposable.disposable = bindingDisposable;
        }

        return compoundDisposable;
    }] setNameWithFormat:@"[%@] -bind:", self.name];
}

  我们一步一步来看。先从第 一 步开始,其步骤如下:

  1. 订阅 self

  2. 针对 self 发出的每一个值 x,经过 bindingBlock,获取一个信号:signal

    1. 如果 signal 不为 nil,就转到第二步:addSignal

    2. 如果 signal 为 nil,或 stopYES,则转到第三步:completedSignal

  3. 如果 self 发出 error 事件,则中断订阅;如果 self 发出 completed 事件则转到第三步:completedSignal

  第二步:addSignal:signal

  1. 先将 signal 添加到 signals

  2. 订阅 signal

    1. 将 signalnext 事件转发给订阅者 subscriber

    2. 如果 signal 发送 error 事件则中断订阅

    3. 如果 signal 发送 complete 事件,则转到第三步

  第三步:completeSignal:signal:disposable

  1. 将 signalsignals 中移除

  2. 如果 signals 中没有了 signal,那么订阅就完成了

  好了,来总结一下这个 -bind:

  1. 订阅原信号 self 的 values。

  2. 将 self 发出的任何一个值,都对其使用 bindingBlock 进行转换。

  3. 如果 bindingBlock 返回一个信号,则订阅它,将从它那接收到的每个值都传递给订阅者 subscriber

  4. 如果 bindingBlock 要求结束绑定,则 complete self 信号。

  5. 如果 所有 的信号全都 complete,则给 subscriber 发送 completed 事件.

  6. 如果任何一个信号发出 error,将其发送给 subscriber

  那从中可以玩出什么花样呢?

3、示例

  咱们先用用它,再看看能怎么玩吧。

  

示例1:结合 RACReturnSignal

  RACSignal *signalInterval = [[RACSignal interval:1.0 onScheduler:[RACScheduler mainThreadScheduler]] take:3];

  RACSignal *bindSignal = [signalInterval bind:^RACStreamBindBlock{
    return ^(id value, BOOL *stop) {
      NSLog(@"inner value: %@", value);
      return [RACSignal return:value];
    };
  }];

  [bindSignal subscribeNext:^(id x) {
    NSLog(@"outer value: %@", x);
  }];

  输出如下:

2015-08-27 17:16:17.933 RACPraiseDemo[3063:168556] inner value: 2015-08-27 09:16:17 +0000
2015-08-27 17:16:17.934 RACPraiseDemo[3063:168556] outer value: 2015-08-27 09:16:17 +0000
2015-08-27 17:16:18.931 RACPraiseDemo[3063:168556] inner value: 2015-08-27 09:16:18 +0000
2015-08-27 17:16:18.931 RACPraiseDemo[3063:168556] outer value: 2015-08-27 09:16:18 +0000
2015-08-27 17:16:19.931 RACPraiseDemo[3063:168556] inner value: 2015-08-27 09:16:19 +0000
2015-08-27 17:16:19.931 RACPraiseDemo[3063:168556] outer value: 2015-08-27 09:16:19 +0000

  这个示例就是在 bind: 中简单的返回值。那咱们将这个值变化一下如何?

示例2:结合 RACReturnSignal、转换 value

  RACSignal *signalInterval = [[RACSignal interval:1.0 onScheduler:[RACScheduler mainThreadScheduler]] take:3];

  RACSignal *bindSignal = [signalInterval bind:^RACStreamBindBlock{
    return ^(NSDate *value, BOOL *stop) {
      NSLog(@"inner value: %@", value);

      NSTimeInterval nowTime = [value timeIntervalSince1970];
      return [RACSignal return:@(nowTime)];
    };
  }];

  [bindSignal subscribeNext:^(id x) {
    NSLog(@"outer value: %@", x);
  }];

  输出如下:

2015-08-27 17:34:04.938 RACPraiseDemo[3153:176383] inner value: 2015-08-27 09:34:04 +0000
2015-08-27 17:34:04.939 RACPraiseDemo[3153:176383] outer value: 1440668044.936496
2015-08-27 17:34:05.939 RACPraiseDemo[3153:176383] inner value: 2015-08-27 09:34:05 +0000
2015-08-27 17:34:05.939 RACPraiseDemo[3153:176383] outer value: 1440668045.939163
2015-08-27 17:34:06.941 RACPraiseDemo[3153:176383] inner value: 2015-08-27 09:34:06 +0000
2015-08-27 17:34:06.941 RACPraiseDemo[3153:176383] outer value: 1440668046.941275

  哇哇,这就是个 map: 有木有? 现在,有感受到 RACReturnSignal 的魅力?RACReturnSignal-bind: 结合能转换 value。

示例3:结合 RACEmptySignal

  现在来换个玩法试试看,这回换 RACEmptySignal 来玩玩。

RACSignal *signalInterval = [[RACSignal interval:1.0 onScheduler:[RACScheduler mainThreadScheduler]] take:3];

  __block NSUInteger count = 0;
  RACSignal *bindSignal = [signalInterval bind:^RACStreamBindBlock{
    return ^(NSDate *value, BOOL *stop) {
      NSLog(@"inner value: %@", value);

      ++count;
      if (count % 2 == 0) {
        return [RACSignal empty];
      }

      return [RACSignal return:value];
    };
  }];

  [bindSignal subscribeNext:^(id x) {
    NSLog(@"outer value: %@", x);
  }];

  输出如下:

2015-08-27 17:53:45.345 RACPraiseDemo[3363:188270] inner value: 2015-08-27 09:53:45 +0000
2015-08-27 17:53:45.346 RACPraiseDemo[3363:188270] outer value: 2015-08-27 09:53:45 +0000
2015-08-27 17:53:46.345 RACPraiseDemo[3363:188270] inner value: 2015-08-27 09:53:46 +0000
2015-08-27 17:53:47.342 RACPraiseDemo[3363:188270] inner value: 2015-08-27 09:53:47 +0000
2015-08-27 17:53:47.342 RACPraiseDemo[3363:188270] outer value: 2015-08-27 09:53:47 +0000

  这一次,“outer value” 比 “inner value” 少了一个,这就是 filter: 呀!RACEmptySignalbind: 结合能过滤 value。

示例4:改进 bind:

  经过这几个示例,我们可以发现,直接使用 bind: 是比较麻烦的。而一般情况下,咱们还真用不到 stop,那咱们就改进一下呗:

- (instancetype)flattenMap:(RACStream * (^)(id value))block {
    Class class = self.class;

    return [[self bind:^{
        return ^(id value, BOOL *stop) {
            /**
             *  如果 block 返回 nil,得用 RACEmptySignal 代替,
             *  不然会结束 `bind:`
             */
            id stream = block(value) ?: [class empty];
            NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

            return stream;
        };
    }] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}

  哈哈,这个就是 - flattenMap:了。不必过多解释了吧~

5、-map:

  嗯,这其实就是 -flattenMap:RACReturnSignal 的结合:

- (instancetype)map:(id (^)(id value))block {
    NSCParameterAssert(block != nil);

    Class class = self.class;

    return [[self flattenMap:^(id value) {
        return [class return:block(value)];
    }] setNameWithFormat:@"[%@] -map:", self.name];
}

6、-flatten

  信号可以发送类型的值,当然也包括 RACSignal 类型。例如,RACCommandexecutionSignals 这个信号,它发出的值就是 RACSignal 类型的。对于这种发出的值是 RACSignal 类型的 RACSignal,叫做 signal of signals。这有点类似于 disposable of disposables。

  既然这个信号发出的就是 RACSignal,那在 -flattenMap:中,我们直接将 value 返回就好了。来看看示例:

/**
   *  有效三次的间隔为1秒定时器信号,此时 signalInterval 是一个 signal of NSDates
   */
  RACSignal *signalInterval = [[RACSignal interval:1 onScheduler:[RACScheduler mainThreadScheduler]] take:3];

  /**
   *  将定时器信号里的值修改成 RACSignal 类型
   *  此时,signalInterval 变成了一个 signal of signals
   */
  signalInterval = [signalInterval map:^id(NSDate *date) {
    return [RACSignal return:date];
  }];

  /**
   *  既然 signalInterval 里的值都是信号,那直接将这些信号返回即可
   */
  RACSignal *signal = [signalInterval flattenMap:^RACStream *(RACSignal *returnSignal) {
    return returnSignal;
  }];

  /**
   *  由于 signalInterval 里的值都是包含了一个 NSDate 值的 RACReturnSignal,
   *  经过 `-flattenMap:` 过后,signal 就变成了 signal of NSDates。
   */
  [signal subscribeNext:^(id x) {
    NSLog(@"value: %@", x);
  }];

  输出如下:

2015-08-27 21:16:29.517 RACPraiseDemo[549:11996] value: 2015-08-27 13:16:29 +0000
2015-08-27 21:16:30.516 RACPraiseDemo[549:11996] value: 2015-08-27 13:16:30 +0000
2015-08-27 21:16:31.516 RACPraiseDemo[549:11996] value: 2015-08-27 13:16:31 +0000

7、小结

  RACSignal 的 operations 实在太多,全部在这里列出来不现实,也没有这个必要。我相信,经过前面的解析,你现在再去看其它 的一个 operation 源码,也应该不是太大的难事。

RAC() 宏展开

  RAC 的最大的魅力之一就是绑定:RAC(self, ...) = signal; 这应该是大家经常写的一条语句。有没有想过它是怎么工作的呢?咱们来看点代码:

@property (nonatomic, strong) NSString *text;

//--------
RACSignal *signal = [[RACSignal interval:1 onScheduler:[RACScheduler mainThreadScheduler]] take:3];
  signal = [signal map:^id(id value) {
    return [value description];
  }];

RAC(self, text) = signal;

  重点在 RAC(self, text) = signal; 这一行。先来看看将这个宏展开是什么样子(RAC 对宏的运用很是牛B,有兴趣请看这篇文章):

[[RACSubscriptingAssignmentTrampoline alloc] initWithTarget:(self) nilValue:nil][@"text"] = signal;

  看得更清楚一点:

RACSubscriptingAssignmentTrampoline *assignment = [[RACSubscriptingAssignmentTrampoline alloc] initWithTarget:(self) nilValue:nil];
assignment[@"text"] = signal;

  跳到 RACSubscriptingAssignmentTrampoline 类的声明,可以看到:

@interface RACSubscriptingAssignmentTrampoline : NSObject
- (id)initWithTarget:(id)target nilValue:(id)nilValue;

// 这是可以使用下标 `[]` 语法的关键
- (void)setObject:(RACSignal *)signal forKeyedSubscript:(NSString *)keyPath;
@end

  这个类使用了 clang 的特性,可以使用 []语法([] 的相关文章)。也就是说 assignment[@"text"] = signal;,实际上是这样子的:

[assignment setObject:signal forKeyedSubscript:@"text"];

  再看 - (void)setObject:(RACSignal *)signal forKeyedSubscript:(NSString *)keyPath; 这个方法的实现,我们发现,它其实调用的是 signal 的方法:- (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object nilValue:(id)nilValue,再像上面的方法一样来分析这个方法,我们找到了关键点:

- (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object nilValue:(id)nilValue {
    ...
    RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
        ...
        [object setValue:x ?: nilValue forKeyPath:keyPath];
    }...
    ...
}

  哦,原来它就是订阅了 signal,并将 signal 发出的每一值都设置给 objectkeyPath 属性而已。很简单嘛~

结束

  本文研究了 RAC 中的一些基本组件,并没有对一些高级内容进行深入研究,所以才叫“浅析”。但这些也是对高级内容深入研究的基础,既然有“渔”,何惧无“鱼”呢?

  其实颇想继续分享,但心有余而力不足。

  还可研究的主题:

  1. Subjects 它也是 RACSignal 一些操作的基础,值得研究。难度系数:2 (最高为5)

  2. RACMulticastConnection 常用,值得研究。难度系数:3

  3. Foundation、UIKit、KVO (给各系统类加的 rac_ 扩展),有研究价值。研究过后,你会对 runtime 会有很深入的了解,还会接触到一些 OC 中少用的知识(如 NSProxy 等),能开拓视野。难度系数:5

 

  难度系数是本人 YY 出来的,别较真,仅当参考。

  顺便打个广告:RAC 交流群:430033580

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-18 00:22:56

ReactiveCocoa2 源码浅析的相关文章

Volley框架源码浅析(一)

尊重原创http://blog.csdn.net/yuanzeyao/article/details/25837897 从今天开始,我打算为大家呈现关于Volley框架的源码分析的文章,Volley框架是Google在2013年发布的,主要用于实现频繁而且粒度比较细小的Http请求,在此之前Android中进行Http请求通常是使用HttpUrlConnection和HttpClient进行,但是使用起来非常麻烦,而且效率比较地下,我想谷歌正式基于此种原因发布了Volley框架,其实出了Voll

PM2源码浅析

PM2工作原理 最近在玩一个游戏,<地平线:黎明时分>,最终Boss是一名叫黑底斯的人,所谓为人,也许不对,黑底斯是一段强大的毁灭进程,破坏了盖娅主进程,从而引发的整个大陆机械兽劣化故事. 为什么要讲这么一段呢,是希望大家可以更好地理解pm2的原理,要理解pm2就要理解god和santan的关系,god和santan的关系就相当于盖娅和黑底斯在pm2中的01世界中,每一行代码每一个字节都安静的工作god就是Daemon进程 守护进程,重启进程,守护node程序世界的安宁,santan就是进程的

Android源码浅析(一)——VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置

Android源码浅析(一)--VMware Workstation Pro和Ubuntu Kylin 16.04 LTS安装配置 最近地方工作,就是接触源码的东西了,所以好东西还是要分享,系列开了这么多,完结 的也没几个,主要还是自己覆盖的太广了,却又不精通,嘿嘿,工作需要,所以写下了本篇博客 一.VMware 12 我选择的虚拟机试VMware,挺好用的感觉,下载VMware就不说了,善用搜索键嘛,这里我提供一个我现在在用的 下载地址:链接:http://pan.baidu.com/s/1k

【Spark Core】任务执行机制和Task源码浅析2

引言 上一小节<任务执行机制和Task源码浅析1>介绍了Executor的注册过程. 这一小节,我将从Executor端,就接收LaunchTask消息之后Executor的执行任务过程进行介绍. 1. Executor的launchTasks函数 DriverActor提交任务,发送LaunchTask指令给CoarseGrainedExecutorBackend,接收到指令之后,让它内部的executor来发起任务,即调用空闲的executor的launchTask函数. 下面是Coars

Volley框架源码浅析(二)

尊重原创 http://write.blog.csdn.net/postedit/25921795 在前面的一片文章Volley框架浅析(一)中我们知道在RequestQueue这个类中,有两个队列:本地队列和网络队列 /** The cache triage queue. */ private final PriorityBlockingQueue<Request<?>> mCacheQueue = new PriorityBlockingQueue<Request<

Android手势源码浅析-----手势绘制(GestureOverlayView)

Android手势源码浅析-----手势绘制(GestureOverlayView)

【Spark】Stage生成和Stage源码浅析

引入 上一篇文章<DAGScheduler源码浅析>中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在,这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同时介绍Stage的相关源码. Stage生成 Stage的调度是由DAGScheduler完成的.由RDD的有向无环图DAG切分出了Stage的有向无环图DAG.Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始执行的Stage执行,如果提交的St

转:Spring FactoryBean源码浅析

http://blog.csdn.net/java2000_wl/article/details/7410714 在Spring BeanFactory容器中管理两种bean 1.标准Java Bean 2,另一种是工厂Bean,   即实现了FactoryBean接口的bean  它不是一个简单的Bean 而是一个生产或修饰对象生成的工厂Bean 在向Spring容器获得bean时  对于标准的java Bean  返回的是类自身的实例 而FactoryBean 其返回的对象不一定是自身类的一

Spring FactoryBean源码浅析

在Spring BeanFactory容器中管理两种bean 1.标准Java Bean 2,另一种是工厂Bean,   即实现了FactoryBean接口的bean  它不是一个简单的Bean 而是一个生产或修饰对象生成的工厂Bean 在向Spring容器获得bean时  对于标准的java Bean  返回的是类自身的实例 而FactoryBean 其返回的对象不一定是自身类的一个实例,返回的是该工厂Bean的getObject方法所返回的对象 一个简单的例子 [java] view pla