ReactiveCocoa的冷信号与热信号 探讨

背景

ReactiveCocoa(简称RAC)是最初由GitHub团队开发的一套基于Cocoa的FRP框架。FRP即Functional Reactive Programming(函数式响应式编程),其优点是用随时间改变的函数表示用户输入,这样就不需要可变状态了。我们之前的文章“RACSignal的Subscription深入分析”里曾经详细讲解过RAC核心概念之一RACSignal的实现原理。在美团客户端中,我们大量使用了这个框架。冷信号与热信号的概念很容易混淆并造成一定的问题。鉴于这个问题具有一定普遍性,我将用一系列文章讲解RAC中冷信号与热信号的相关知识点,希望可以加深大家的理解。本文是系列文章的第一篇。

p.s. 以下代码和示例基于ReactiveCocoa v2.5

什么是冷信号与热信号

冷热信号的概念源于.NET框架Reactive Extensions(RX)中的Hot Observable和Cold Observable,两者的区别是:

  1. Hot Observable是主动的,尽管你并没有订阅事件,但是它会时刻推送,就像鼠标移动;而Cold Observable是被动的,只有当你订阅的时候,它才会发布消息。
  2. Hot Observable可以有多个订阅者,是一对多,集合可以与订阅者共享信息;而Cold Observable只能一对一,当有不同的订阅者,消息是重新完整发送。

这里面的Observables可以理解为RACSignal。为了加深理解,我们来看这样的几组代码:

    RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        [subscriber sendCompleted];
        return nil;
    }];
    NSLog(@"Signal was created.");
    [[RACScheduler mainThreadScheduler] afterDelay:0.1 schedule:^{
        [signal subscribeNext:^(id x) {
            NSLog(@"Subscriber 1 recveive: %@", x);
        }];
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{
        [signal subscribeNext:^(id x) {
            NSLog(@"Subscriber 2 recveive: %@", x);
        }];
    }];

以上简单地创建了一个信号,并且依次发送@1,@2,@3作为值。下面分别有两个订阅者在不同的时间段进行了订阅,运行的结果如下:

2015-08-11 18:33:21.681 RACDemos[6505:1125196] Signal was created.
2015-08-11 18:33:21.793 RACDemos[6505:1125196] Subscriber 1 recveive: 1
2015-08-11 18:33:21.793 RACDemos[6505:1125196] Subscriber 1 recveive: 2
2015-08-11 18:33:21.793 RACDemos[6505:1125196] Subscriber 1 recveive: 3
2015-08-11 18:33:22.683 RACDemos[6505:1125196] Subscriber 2 recveive: 1
2015-08-11 18:33:22.683 RACDemos[6505:1125196] Subscriber 2 recveive: 2
2015-08-11 18:33:22.683 RACDemos[6505:1125196] Subscriber 2 recveive: 3

我们可以看到,信号在18:33:21.681时被创建,18:33:21.793依次接到1、2、3三个值,而在18:33:22.683再依次接到1、2、3三个值。说明了变量名为signal的这个信号,在两个不同时间段的订阅过程中,分别完整地发送了所有的消息。

我们再对这段代码进行一个小的改动:

    RACMulticastConnection *connection = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{
            [subscriber sendNext:@1];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
            [subscriber sendNext:@2];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
            [subscriber sendNext:@3];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
            [subscriber sendCompleted];
        }];
        return nil;
    }] publish];
    [connection connect];
    RACSignal *signal = connection.signal;

    NSLog(@"Signal was created.");
    [[RACScheduler mainThreadScheduler] afterDelay:1.1 schedule:^{
        [signal subscribeNext:^(id x) {
            NSLog(@"Subscriber 1 recveive: %@", x);
        }];
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:2.1 schedule:^{
        [signal subscribeNext:^(id x) {
            NSLog(@"Subscriber 2 recveive: %@", x);
        }];
    }];

稍微有些复杂,我们来一一分析:

  • 创建了一个信号,在1秒、2秒、3秒分别发送1、2、3这三个值,4秒发送结束信号。
  • 对这个信号调用publish方法得到一个RACMulticastConnection。
  • 让connection进行连接操作。
  • 获得connection的信号。
  • 分别在1.1秒和2.1秒订阅获得的信号。

抛开RACMulticastConnection是个什么东东,我们先来看下结果:

2015-08-12 11:07:49.943 RACDemos[9418:1186344] Signal was created.
2015-08-12 11:07:52.088 RACDemos[9418:1186344] Subscriber 1 recveive: 2
2015-08-12 11:07:53.044 RACDemos[9418:1186344] Subscriber 1 recveive: 3
2015-08-12 11:07:53.044 RACDemos[9418:1186344] Subscriber 2 recveive: 3

首先告诉大家- [RACSignal publish]- [RACMulticastConnection connect]- [RACMulticastConnection signal]这几个操作生成了一个热信号。
我们再来关注下输出结果的一些细节:

  • 信号在11:07:49.943被创建
  • 11:07:52.088时订阅者1才收到2这个值,说明1这个值没有接收到,时间间隔是2秒多
  • 11:07:53.044时订阅者1和订阅者2同时收到3这个值,时间间隔是3秒多

参考一开始的Hot Observables的论述和两段小程序的输出结果,我们可以确定冷热信号的如下特点:

  1. 热信号是主动的,即使你没有订阅事件,它仍然会时刻推送。如第二个例子,信号在50秒被创建,51秒的时候1这个值就推送出来了,但是当时还没有订阅者。而冷信号是被动的,只有当你订阅的时候,它才会发送消息。如第一个例子。
  2. 热信号可以有多个订阅者,是一对多,信号可以与订阅者共享信息。如第二个例子,订阅者1和订阅者2是共享的,他们都能在同一时间接收到3这个值。而冷信号只能一对一,当有不同的订阅者,消息会从新完整发送。如第一个例子,我们可以观察到两个订阅者没有联系,都是基于各自的订阅时间开始接收消息的。

好的,至此我们知道了什么是冷信号与热信号,了解了它们的特点。下一篇文章我们来看看为什么要区分冷信号与热信号。

前一篇文章我们介绍了冷信号与热信号的概念,可能有同学会问了,为什么RAC要搞得如此复杂呢,只用一种信号不就行了么?要解释这个问题,需要绕一些圈子。

前面可能比较难懂,如果不能很好理解,请仔细阅读相关文档。

最前面提到了RAC是一套基于Cocoa的FRP框架,那就来说说FRP吧。FRP的全称是Functional Reactive Programming,中文译作函数式响应式编程,是RP(Reactive Programm,响应式编程)的FP(Functional Programming,函数式编程)实现。说起来很拗口。太多的细节不多讨论,我们着重关注下FRP的FP特征。

FP有个很重要的概念是和我们的主题相关的,那就是纯函数。

纯函数就是返回值只由输入值决定、而且没有可见副作用的函数或者表达式。这和数学中的函数是一样的,比如:

f(x) = 5x + 1

这个函数在调用的过程中除了返回值以外的没有任何对外界的影响,除了入参x以外也不受任何其他外界因素的影响。

那么副作用都有哪些呢?我来列举以下几个情况:

  • 函数的处理过程中,修改了外部的变量,例如全局变量。一个特殊点的例子,就是如果把OC的一个方法看做一个函数,所有的成员变量的赋值都是对外部变量的修改。是的,从FP的角度看OOP是充满副作用的。
  • 函数的处理过程中,触发了一些额外的动作,例如发送了一个全局的Notification,在console里面输出了一行信息,保存了文件,触发了网络,更新了屏幕等。
  • 函数的处理过程中,受到外部变量的影响,例如全局变量,方法里面用到的成员变量。注意block中捕获的外部变量也算副作用。
  • 函数的处理过程中,受到线程锁的影响算副作用。

由此我们可以看出,在目前的iOS编程中,我们是很难摆脱副作用的。甚至可以这么说,我们iOS编程的目的其实就是产生各种副作用。(基于用户触摸的外界因素,最终反馈到网络变化和屏幕变化上。)

接下来我们来分析副作用与冷热信号的关系。既然iOS编程中少不了副作用,那么RAC在实际的使用中也不可避免地要接触副作用。下面通过一个业务场景,来看看冷信号中副作用的坑:


    self.sessionManager = [[AFHTTPSessionManager alloc] initWithBaseURL:[NSURL URLWithString:@"http://api.xxxx.com"]];

    self.sessionManager.requestSerializer = [AFJSONRequestSerializer serializer];
    self.sessionManager.responseSerializer = [AFJSONResponseSerializer serializer];

    @weakify(self)
    RACSignal *fetchData = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        @strongify(self)
        NSURLSessionDataTask *task = [self.sessionManager GET:@"fetchData" parameters:@{@"someParameter": @"someValue"} success:^(NSURLSessionDataTask *task, id responseObject) {
            [subscriber sendNext:responseObject];
            [subscriber sendCompleted];
        } failure:^(NSURLSessionDataTask *task, NSError *error) {
            [subscriber sendError:error];
        }];
        return [RACDisposable disposableWithBlock:^{
            if (task.state != NSURLSessionTaskStateCompleted) {
                [task cancel];
            }
        }];
    }];

    RACSignal *title = [fetchData flattenMap:^RACSignal *(NSDictionary *value) {
        if ([value[@"title"] isKindOfClass:[NSString class]]) {
            return [RACSignal return:value[@"title"]];
        } else {
            return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]];
        }
    }];

    RACSignal *desc = [fetchData flattenMap:^RACSignal *(NSDictionary *value) {
        if ([value[@"desc"] isKindOfClass:[NSString class]]) {
            return [RACSignal return:value[@"desc"]];
        } else {
            return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]];
        }
    }];

    RACSignal *renderedDesc = [desc flattenMap:^RACStream *(NSString *value) {
        NSError *error = nil;
        RenderManager *renderManager = [[RenderManager alloc] init];
        NSAttributedString *rendered = [renderManager renderText:value error:&error];
        if (error) {
            return [RACSignal error:error];
        } else {
            return [RACSignal return:rendered];
        }
    }];

    RAC(self.someLablel, text) = [[title catchTo:[RACSignal return:@"Error"]]  startWith:@"Loading..."];
    RAC(self.originTextView, text) = [[desc catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."];
    RAC(self.renderedTextView, attributedText) = [[renderedDesc catchTo:[RACSignal return:[[NSAttributedString alloc] initWithString:@"Error"]]] startWith:[[NSAttributedString alloc] initWithString:@"Loading..."]];

    [[RACSignal merge:@[title, desc, renderedDesc]] subscribeError:^(NSError *error) {
        UIAlertView *alertView = [[UIAlertView alloc] initWithTitle:@"Error" message:error.domain delegate:nil cancelButtonTitle:@"OK" otherButtonTitles:nil];
        [alertView show];
    }];

不知道大家有没有被这么一大段的代码吓到,我想要表达的是,在真正的工程中,我们的业务逻辑是很复杂的,而一些坑就隐藏在如此看似复杂但是又很合理的代码之下。所以我尽量模拟了一些需求,使得代码看起来更丰富。下面我们还是来仔细看下这段代码的逻辑吧:

  1. 创建了一个AFHTTPSessionManager用来做网络接口的数据获取。
  2. 创建了一个名为fetchData的信号来通过网络获取信息。
  3. 创建一个名为title的信号从获取的data中取得title字段,如果没有该字段则反馈一个错误。
  4. 创建一个名为desc的信号从获取的data中取得desc字段,如果没有该字段则反馈一个错误。
  5. 针对desc这个信号做一个渲染,得到一个名为renderedDesc的新信号,该信号会在渲染失败的时候反馈一个错误。
  6. title信号所有的错误转换为字符串@"Error"并且在没有获取值之前以字符串@"Loading..."占位,之后与self.someLableltext属性绑定。
  7. desc信号所有的错误转换为字符串@"Error"并且在没有获取值之前以字符串@"Loading..."占位,之后与self.originTextViewtext属性绑定。
  8. renderedDesc信号所有的错误转换为属性字符串@"Error"并且在没有获取值之前以属性字符串@"Loading..."占位,之后与self.renderedTextViewtext属性绑定。
  9. 订阅titledescrenderedDesc这三个信号的任何错误,并且弹出UIAlertView

这些代码体现了RAC的一些优势,例如良好的错误处理和各种链式处理。很不错,对不对?但是很遗憾的告诉大家,这段代码其实有很严重的错误。

如果你去尝试运行这段代码,并且打开Charles查看,你会惊奇的发现,这个网络请求发送了6次。没错,是6次请求。我们也可以想象到类似的代码存在其他副作用的问题,重新刷新了6次屏幕,写入6次文件,发了6个全局通知。

下面来分析,为什么是6次网络请求呢?首先根据上面的知识,可以推断出名为fetchData信号是一个冷信号。那么这个信号在订阅的时候就会执行里面的过程。那这个信号是在什么时候被订阅了呢?仔细回看了代码,我们发现并没有订阅这个信号,只是调用这个信号的flattenMap产生了两个新的信号。

这里有一个很重要的概念,就是任何的信号转换即是对原有的信号进行订阅从而产生新的信号。由此我们可以写出flattenMap的伪代码如下:


- (instancetype)flattenMap_:(RACStream * (^)(id value))block {
{
    return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
       return [self subscribeNext:^(id x) {
           RACSignal *signal = (RACSignal *)block(x);
           [signal subscribeNext:^(id x) {
               [subscriber sendNext:x];
           } error:^(NSError *error) {
               [subscriber sendError:error];
           } completed:^{
               [subscriber sendCompleted];
           }];
       } error:^(NSError *error) {
           [subscriber sendError:error];
       } completed:^{
           [subscriber sendCompleted];
       }];
    }];
}

除了没有高度复用和缺少一些disposable的处理以外,上述代码大致可以比较直观地说明flattenMap的机制。观察会发现其实是在调用这个方法的时候,生成了一个新的信号,并在这个新信号的执行过程中对self进行的了订阅。还需要注意一个细节,就是这个返回信号在未来订阅的时候,才会间接的订阅self。后续的startWithcatchTo等都可以这样理解。

回到我们的问题,那就是说,在fetchDataflattenMap之后,它就会因为名为titledesc信号的订阅而订阅。而后续对desc也会进行flattenMap,得到了renderedDesc,因此未来renderedDesc被订阅的时候,fetchData也会被间接订阅。这就解释了,为什么后续我们用RAC宏进行绑定的时候,fetchData会订阅3次。由于fetchData是冷信号,所以3次订阅意味着它的过程被执行了3次,也就是有3次网络请求。

另外的3次订阅来自RACSignal类的merge方法。根据上述的描述,我们也可以猜测merge方法也一定是创建了一个新的信号,在这个信号被订阅的时候,把它包含的所有信号订阅。所以我们又得到了额外的3次网络请求。

由此可以看到,不熟悉冷热信号对业务造成的影响。我们可以想象对用户流量的影响,对服务器负载的影响,对统计的影响,如果这是一个点赞的接口,会不会造成多次点赞?后果不堪设想啊。而这些都可以通过将fetchData转换为热信号来解决。

接下来也许你会问,如果我的整个计算过程中都没有副作用,是否就不会有这个问题?答案是肯定的。试想下刚才那段代码如果没有网络请求,换成一些标准化的计算会怎样。虽然可以肯定它不会出现bug,但是不要忽视其中的运算也会执行多次。纯函数还有一个概念就是引用透明。在纯函数式语言(例如Haskell)中对此可以进行一定的优化,也就是说纯函数的调用在相同参数下的返回值第二次不需要计算,所以在纯函数式语言里面的FRP并没有冷信号的担忧。然而Objective-C语言中并没有这种纯函数优化,因此有大规模运算的冷信号对性能是有一定影响的。

从上文内容可以看出,如果我们想更好地掌握RAC这个框架,区分冷信号与热信号是十分重要的。接下来的系列第三篇文章,我会揭示冷信号与热信号的本质,帮助大家正确的理解冷信号与热信号。

第一篇文章中我们介绍了冷信号与热信号的概念,前一篇文章我们也讨论了为什么要区分冷信号与热信号,下面我会先为大家揭晓热信号的本质,再给出冷信号转换成热信号的方法。

揭示热信号的本质

ReactiveCocoa中,究竟什么才是热信号呢?冷信号是比较常见的,map一下就会得到一个冷信号。但在RAC中,好像并没有“hot signal”这个单独的说法。原来在RAC的世界中,所有的热信号都属于一个类——RACSubject。接下来我们来看看究竟它为什么这么“神奇”。

在RAC2.5文档的框架概述中,有着这样一段描述:

A subject, represented by the RACSubject class, is a signal that can be manually controlled.

Subjects can be thought of as the "mutable" variant of a signal, much like NSMutableArray is for NSArray. They are extremely useful for bridging non-RAC code into the world of signals.

For example, instead of handling application logic in block callbacks, the blocks can simply send events to a shared subject instead. The subject can then be returned as a RACSignal, hiding the implementation detail of the callbacks.

Some subjects offer additional behaviors as well. In particular, RACReplaySubject can be used to buffer events for future subscribers, like when a network request finishes before anything is ready to handle the result.

从这段描述中,我们可以发现Subject具备如下三个特点:

  1. Subject是“可变”的。
  2. Subject是非RAC到RAC的一个桥梁。
  3. Subject可以附加行为,例如RACReplaySubject具备为未来订阅者缓冲事件的能力。

从第三个特点来看,Subject具备为未来订阅者缓冲事件的能力,那也就说明它是自身是有状态的。根据上文的介绍,Subject是符合热信号的特点的。为了验证它,我们再来做个简单实验:

    RACSubject *subject = [RACSubject subject];
    RACSubject *replaySubject = [RACReplaySubject subject];

    [[RACScheduler mainThreadScheduler] afterDelay:0.1 schedule:^{
        // Subscriber 1
        [subject subscribeNext:^(id x) {
            NSLog(@"Subscriber 1 get a next value: %@ from subject", x);
        }];
        [replaySubject subscribeNext:^(id x) {
            NSLog(@"Subscriber 1 get a next value: %@ from replay subject", x);
        }];

        // Subscriber 2
        [subject subscribeNext:^(id x) {
            NSLog(@"Subscriber 2 get a next value: %@ from subject", x);
        }];
        [replaySubject subscribeNext:^(id x) {
            NSLog(@"Subscriber 2 get a next value: %@ from replay subject", x);
        }];
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:1 schedule:^{
        [subject sendNext:@"send package 1"];
        [replaySubject sendNext:@"send package 1"];
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:1.1 schedule:^{
        // Subscriber 3
        [subject subscribeNext:^(id x) {
            NSLog(@"Subscriber 3 get a next value: %@ from subject", x);
        }];
        [replaySubject subscribeNext:^(id x) {
            NSLog(@"Subscriber 3 get a next value: %@ from replay subject", x);
        }];

        // Subscriber 4
        [subject subscribeNext:^(id x) {
            NSLog(@"Subscriber 4 get a next value: %@ from subject", x);
        }];
        [replaySubject subscribeNext:^(id x) {
            NSLog(@"Subscriber 4 get a next value: %@ from replay subject", x);
        }];
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
        [subject sendNext:@"send package 2"];
        [replaySubject sendNext:@"send package 2"];
    }];

按照时间线来解读一下上述代码:

  1. 0s时创建subjectreplaySubject这两个subject。
  2. 0.1s时Subscriber 1分别订阅了subjectreplaySubject
  3. 0.1s时Subscriber 2也分别订阅了subjectreplaySubject
  4. 1s时分别向subjectreplaySubject发送了"send package 1"这个字符串作为
  5. 1.1s时Subscriber 3分别订阅了subjectreplaySubject
  6. 1.1s时Subscriber 4也分别订阅了subjectreplaySubject
  7. 2s时再分别向subjectreplaySubject发送了"send package 2"这个字符串作为

接下来看一下输出的结果:

2015-09-28 13:35:22.855 RACDemos[13646:1269269] Start
2015-09-28 13:35:23.856 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 1 from subject
2015-09-28 13:35:23.856 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 1 from subject
2015-09-28 13:35:23.857 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 1 from replay subject
2015-09-28 13:35:23.857 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 1 from replay subject
2015-09-28 13:35:24.059 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 1 from replay subject
2015-09-28 13:35:24.059 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 1 from replay subject
2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 2 from subject
2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 2 from subject
2015-09-28 13:35:25.039 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 2 from subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 2 from subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 1 get a next value: send package 2 from replay subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 2 get a next value: send package 2 from replay subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 3 get a next value: send package 2 from replay subject
2015-09-28 13:35:25.040 RACDemos[13646:1269269] Subscriber 4 get a next value: send package 2 from replay subject

结合结果可以分析出如下内容:

  1. 22.855s时,测试启动,subjectreplaySubject创建完毕。
  2. 23.856s时,距离启动大约1s后,Subscriber 1Subscriber 2同时subject接收到了"send package 1"这个值。
  3. 23.857s时,也是距离启动大约1s后,Subscriber 1Subscriber 2同时replaySubject接收到了"send package 1"这个值。
  4. 24.059s时,距离启动大约1.2s后,Subscriber 3Subscriber 4同时replaySubject接收到了"send package 1"这个值。注意Subscriber 3Subscriber 4并没有从subject接收"send package 1"这个值。
  5. 25.039s时,距离启动大约2.1s后,Subscriber 1Subscriber 2Subscriber 3Subscriber 4同时subject接收到了"send package 2"这个值。
  6. 25.040s时,距离启动大约2.1s后,Subscriber 1Subscriber 2Subscriber 3Subscriber 4同时replaySubject接收到了"send package 2"这个值。

只关注subject,根据时间线,我们可以得到下图:

经过观察不难发现,4个订阅者实际上是共享subject的,一旦这个subject发送了值,当前的订阅者就会同时接收到。由于Subscriber 3Subscriber 4的订阅时间稍晚,所以错过了第一次值的发送。这与冷信号是截然不同的反应。冷信号的图类似下图:

对比上面两张图,是不是可以发现,subject类似“直播”,错过了就不再处理。而signal类似“点播”,每次订阅都会从头开始。所以我们有理由认定subject天然就是热信号。

下面再来看看replaySubject,根据时间线,我们能得到另一张图:

将图3与图1对比会发现,Subscriber 3Subscriber 4在订阅后马上接收到了“历史值”。对于Subscriber 3Subscriber 4来说,它们只关心“历史的值”而不关心“历史的时间线”,因为实际上12是间隔1s发送的,但是它们接收到的显然不是。举个生动的例子,就好像科幻电影里面主人公穿越时间线后会先把所有的回忆快速闪过再来到现实一样。(见《X战警:逆转未来》、《蝴蝶效应》)所以我们也有理由认定replaySubject天然也是热信号。

看到这里,我们终于揭开了热信号的面纱,结论就是:

  1. RACSubject及其子类是热信号
  2. RACSignal排除RACSubject类以外的是冷信号

如何将一个冷信号转化成热信号——广播

冷信号与热信号的本质区别在于是否保持状态,冷信号的多次订阅是不保持状态的,而热信号的多次订阅可以保持状态。所以一种将冷信号转换为热信号的方法就是,将冷信号订阅,订阅到的每一个时间通过RACSbuject发送出去,其他订阅者只订阅这个RACSubject

观察下面的代码:

    RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        NSLog(@"Cold signal be subscribed.");
        [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{
            [subscriber sendNext:@"A"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
            [subscriber sendNext:@"B"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
            [subscriber sendCompleted];
        }];

        return nil;
    }];

    RACSubject *subject = [RACSubject subject];
    NSLog(@"Subject created.");

    [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
        [coldSignal subscribe:subject];
    }];

    [subject subscribeNext:^(id x) {
        NSLog(@"Subscriber 1 recieve value:%@.", x);
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
        [subject subscribeNext:^(id x) {
            NSLog(@"Subscriber 2 recieve value:%@.", x);
        }];

执行顺序是这样的:

  1. 创建一个冷信号:coldSignal。该信号声明了“订阅后1.5秒发送‘A’,3秒发送‘B‘,5秒发送完成事件”。
  2. 创建一个RACSubject:subject
  3. 在2秒后使用这个subject订阅coldSignal
  4. 立即订阅这个subject
  5. 4秒后订阅这个subject

如果所料不错的话,通过订阅这个subject并不会引起coldSignal重复执行block的内容。我们来看下结果:

2015-09-28 19:36:45.703 RACDemos[14110:1556061] Subject created.
2015-09-28 19:36:47.705 RACDemos[14110:1556061] Cold signal be subscribed.
2015-09-28 19:36:49.331 RACDemos[14110:1556061] Subscriber 1 recieve value:A.
2015-09-28 19:36:50.999 RACDemos[14110:1556061] Subscriber 1 recieve value:B.
2015-09-28 19:36:50.999 RACDemos[14110:1556061] Subscriber 2 recieve value:B.

参考时间线,会得到下图:

不难发现其中的几个重点:

  1. subject是从一开始就创建好的,等到2s后便开始订阅coldSignal
  2. Subscriber 1subject创建后就开始订阅的,但是第一个接收时间与subject接收coldSignal第一个值的时间是一样的。
  3. Subscriber 2subject创建4s后开始订阅的,所以只能接收到第二个值。

通过观察可以确定,subject就是coldSignal转化的热信号。所以使用RACSubject来将冷信号转化为热信号是可行的。

当然,使用这种RACSubject来订阅冷信号得到热信号的方式仍有一些小的瑕疵。例如subject的订阅者提前终止了订阅,而subject并不能终止对coldSignal的订阅。(RACDisposable是一个比较大的话题,我计划在其他的文章中详细阐述它,也希望感兴趣的同学自己来理解。)所以在RAC库中对于冷信号转化成热信号有如下标准的封装:

- (RACMulticastConnection *)publish;
- (RACMulticastConnection *)multicast:(RACSubject *)subject;
- (RACSignal *)replay;
- (RACSignal *)replayLast;
- (RACSignal *)replayLazily;

这5个方法中,最为重要的就是- (RACMulticastConnection *)multicast:(RACSubject *)subject;这个方法了,其他几个方法也是间接调用它的。我们来看看它的实现:

/// implementation RACSignal (Operations)
- (RACMulticastConnection *)multicast:(RACSubject *)subject {
    [subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
    RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
    return connection;
}

/// implementation RACMulticastConnection

- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
    NSCParameterAssert(source != nil);
    NSCParameterAssert(subject != nil);

    self = [super init];
    if (self == nil) return nil;

    _sourceSignal = source;
    _serialDisposable = [[RACSerialDisposable alloc] init];
    _signal = subject;

    return self;
}

#pragma mark Connecting

- (RACDisposable *)connect {
    BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);

    if (shouldConnect) {
        self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
    }

    return self.serialDisposable;
}

- (RACSignal *)autoconnect {
    __block volatile int32_t subscriberCount = 0;

    return [[RACSignal
        createSignal:^(id<RACSubscriber> subscriber) {
            OSAtomicIncrement32Barrier(&subscriberCount);

            RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
            RACDisposable *connectionDisposable = [self connect];

            return [RACDisposable disposableWithBlock:^{
                [subscriptionDisposable dispose];

                if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
                    [connectionDisposable dispose];
                }
            }];
        }]
        setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
}

虽然代码比较短但不是很好懂,大概来说明一下:

  1. RACSignal类的实例调用- (RACMulticastConnection *)multicast:(RACSubject *)subject时,以selfsubject作为构造参数创建一个RACMulticastConnection实例。
  2. RACMulticastConnection构造的时候,保存sourcesubject作为成员变量,创建一个RACSerialDisposable对象,用于取消订阅。
  3. RACMulticastConnection类的实例调用- (RACDisposable *)connect这个方法的时候,判断是否是第一次。如果是的话_signal这个成员变量来订阅sourceSignal之后返回self.serialDisposable;否则直接返回self.serialDisposable。这里面订阅sourceSignal是重点。
  4. RACMulticastConnectionsignal只读属性,就是一个热信号,订阅这个热信号就避免了各种副作用的问题。它会在- (RACDisposable *)connect第一次调用后,根据sourceSignal的订阅结果来传递事件。
  5. 想要确保第一次订阅就能成功订阅sourceSignal,可以使用- (RACSignal *)autoconnect这个方法,它保证了第一个订阅者触发sourceSignal的订阅,也保证了当返回的信号所有订阅者都关闭连接后sourceSignal被正确关闭连接。

由于RAC是一个线程安全的框架,所以好奇的同学可以了解下“OSAtomic*”这一系列的原子操作。抛开这些应该不难理解上述代码。

了解源码之后,这个方法的正确使用就清楚了,应该像这样:

    RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        NSLog(@"Cold signal be subscribed.");
        [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{
            [subscriber sendNext:@"A"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
            [subscriber sendNext:@"B"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
            [subscriber sendCompleted];
        }];

        return nil;
    }];

    RACSubject *subject = [RACSubject subject];
    NSLog(@"Subject created.");

    RACMulticastConnection *multicastConnection = [coldSignal multicast:subject];
    RACSignal *hotSignal = multicastConnection.signal;

    [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
        [multicastConnection connect];
    }];

    [hotSignal subscribeNext:^(id x) {
        NSLog(@"Subscribe 1 recieve value:%@.", x);
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
        [hotSignal subscribeNext:^(id x) {
            NSLog(@"Subscribe 2 recieve value:%@.", x);
        }];
    }];

或者这样:

    RACSignal *coldSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        NSLog(@"Cold signal be subscribed.");
        [[RACScheduler mainThreadScheduler] afterDelay:1.5 schedule:^{
            [subscriber sendNext:@"A"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:3 schedule:^{
            [subscriber sendNext:@"B"];
        }];

        [[RACScheduler mainThreadScheduler] afterDelay:5 schedule:^{
            [subscriber sendCompleted];
        }];

        return nil;
    }];

    RACSubject *subject = [RACSubject subject];
    NSLog(@"Subject created.");

    RACMulticastConnection *multicastConnection = [coldSignal multicast:subject];
    RACSignal *hotSignal = multicastConnection.autoconnect;

    [[RACScheduler mainThreadScheduler] afterDelay:2 schedule:^{
        [hotSignal subscribeNext:^(id x) {
            NSLog(@"Subscribe 1 recieve value:%@.", x);
        }];
    }];

    [[RACScheduler mainThreadScheduler] afterDelay:4 schedule:^{
        [hotSignal subscribeNext:^(id x) {
            NSLog(@"Subscribe 2 recieve value:%@.", x);
        }];
    }];

以上的两种写法和之前用Subject来传递的例子都可以得到相同的结果。

下面再来看看其他几个方法的实现:

/// implementation RACSignal (Operations)
- (RACMulticastConnection *)publish {
    RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
    RACMulticastConnection *connection = [self multicast:subject];
    return connection;
}

- (RACSignal *)replay {
    RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];

    RACMulticastConnection *connection = [self multicast:subject];
    [connection connect];

    return connection.signal;
}

- (RACSignal *)replayLast {
    RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];

    RACMulticastConnection *connection = [self multicast:subject];
    [connection connect];

    return connection.signal;
}

- (RACSignal *)replayLazily {
    RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
    return [[RACSignal
        defer:^{
            [connection connect];
            return connection.signal;
        }]
        setNameWithFormat:@"[%@] -replayLazily", self.name];
}

这几个方法的实现都相当简单,只是为了简化而封装,具体说明一下:

  1. - (RACMulticastConnection *)publish就是帮忙创建了RACSubject
  2. - (RACSignal *)replay就是用RACReplaySubject来作为subject,并立即执行connect操作,返回connection.signal。其作用是上面提到的replay功能,即后来的订阅者可以收到历史值。
  3. - (RACSignal *)replayLast就是用Capacity为1的RACReplaySubject来替换- (RACSignal *)replay的`subject。其作用是使后来订阅者只收到最后的历史值。
  4. - (RACSignal *)replayLazily- (RACSignal *)replay的区别就是replayLazily会在第一次订阅的时候才订阅sourceSignal

所以,其实本质仍然是

使用一个Subject来订阅原始信号,并让其他订阅者订阅这个Subject,这个Subject就是热信号。

现在再回过来看下之前系列文章第二篇中那个业务场景的例子,其实修改的方法很简单,就是在网络获取的fetchData这个信号后面,增加一个replayLazily变换,就不会出现网络请求重发6次的问题了。

修改后的代码如下,大家可以试试:


    self.sessionManager = [[AFHTTPSessionManager alloc] initWithBaseURL:[NSURL URLWithString:@"http://api.xxxx.com"]];

    self.sessionManager.requestSerializer = [AFJSONRequestSerializer serializer];
    self.sessionManager.responseSerializer = [AFJSONResponseSerializer serializer];

    @weakify(self)
    RACSignal *fetchData = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        @strongify(self)
        NSURLSessionDataTask *task = [self.sessionManager GET:@"fetchData" parameters:@{@"someParameter": @"someValue"} success:^(NSURLSessionDataTask *task, id responseObject) {
            [subscriber sendNext:responseObject];
            [subscriber sendCompleted];
        } failure:^(NSURLSessionDataTask *task, NSError *error) {
            [subscriber sendError:error];
        }];
        return [RACDisposable disposableWithBlock:^{
            if (task.state != NSURLSessionTaskStateCompleted) {
                [task cancel];
            }
        }];
    }] replayLazily];  // modify here!!

    RACSignal *title = [fetchData flattenMap:^RACSignal *(NSDictionary *value) {
        if ([value[@"title"] isKindOfClass:[NSString class]]) {
            return [RACSignal return:value[@"title"]];
        } else {
            return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]];
        }
    }];

    RACSignal *desc = [fetchData flattenMap:^RACSignal *(NSDictionary *value) {
        if ([value[@"desc"] isKindOfClass:[NSString class]]) {
            return [RACSignal return:value[@"desc"]];
        } else {
            return [RACSignal error:[NSError errorWithDomain:@"some error" code:400 userInfo:@{@"originData": value}]];
        }
    }];

    RACSignal *renderedDesc = [desc flattenMap:^RACStream *(NSString *value) {
        NSError *error = nil;
        RenderManager *renderManager = [[RenderManager alloc] init];
        NSAttributedString *rendered = [renderManager renderText:value error:&error];
        if (error) {
            return [RACSignal error:error];
        } else {
            return [RACSignal return:rendered];
        }
    }];

    RAC(self.someLablel, text) = [[title catchTo:[RACSignal return:@"Error"]]  startWith:@"Loading..."];
    RAC(self.originTextView, text) = [[desc catchTo:[RACSignal return:@"Error"]] startWith:@"Loading..."];
    RAC(self.renderedTextView, attributedText) = [[renderedDesc catchTo:[RACSignal return:[[NSAttributedString alloc] initWithString:@"Error"]]] startWith:[[NSAttributedString alloc] initWithString:@"Loading..."]];

    [[RACSignal merge:@[title, desc, renderedDesc]] subscribeError:^(NSError *error) {
        UIAlertView *alertView = [[UIAlertView alloc] initWithTitle:@"Error" message:error.domain delegate:nil cancelButtonTitle:@"OK" otherButtonTitles:nil];
        [alertView show];
    }];

当然,细心的同学会发现这样修改,仍然有许多计算上的浪费,例如将fetchData转换为title的block会执行多次,将fetchData转换为desc的block也会执行多次。但是由于这些block都是无副作用的,计算量并不大,可以忽略不计。如果计算量大的,也需要对中间的信号进行热信号的转换。不过请不要忽略冷热信号的转换本身也是有计算代价的。

好的,写到这里,我们终于揭开RAC中冷信号与热信号的全部面纱,也知道如何使用了。希望这个系列文章可以让大家更好地了解RAC,避免使用RAC遇到的误区。谢谢大家。

参考链接:

1.http://tech.meituan.com/talk-about-reactivecocoas-cold-signal-and-hot-signal-part-1.html

2.http://tech.meituan.com/talk-about-reactivecocoas-cold-signal-and-hot-signal-part-2.html

3.http://tech.meituan.com/talk-about-reactivecocoas-cold-signal-and-hot-signal-part-3.html

时间: 2024-10-08 11:20:26

ReactiveCocoa的冷信号与热信号 探讨的相关文章

RACSignal 冷信号和热信号底层实现分析

前言 由于最近在写关于RACSignal底层实现分析的文章,当然也逃不了关于冷热信号操作的分析.这篇文章打算分析分析如何从冷信号转成热信号的底层实现. 目录 1.关于冷信号和热信号的概念 2.RACSignal热信号 3.RACSignal冷信号 4.冷信号是如何转换成热信号的 一. 关于冷信号和热信号的概念 冷热信号的概念是源自于源于.NET框架Reactive Extensions(RX)中的Hot Observable和Cold Observable, Hot Observable是主动的

UNIX环境编程学习笔记(24)——信号处理进阶学习之信号集和进程信号屏蔽字

lienhua342014-11-03 1 信号传递过程 信号源为目标进程产生了一个信号,然后由内核来决定是否要将该信号传递给目标进程.从信号产生到传递给目标进程的流程图如图 1 所示, 图 1: 信号产生.传递到处理的流程图 进程可以阻塞信号的传递.当信号源为目标进程产生了一个信号之后,内核会执行依次执行下面操作, 1. 如果目标进程设置了忽略该信号,则内核直接将该信号丢弃. 2. 如果目标进程没有阻塞该信号,则内核将该信号传递给目标进程,由目标进程执行相对应操作. 3. 如果目标进程设置阻塞

单端信号与差分信号(转)

单端信号早期的数字总线大部分使用单端信号做信号传输,如TTL/CMOS信号都是单端信号.所谓单端信号,是指用一根信号线的高低电平的变化来进行0.1信息的传输,这个电平的高低变化是相对于其公共的参考地平面的.单端信号由于结构简单,可以用简单的晶体管电路实现,而且集成度高.功耗低,因此在数字电路中得到最广泛的应用.下图是个单端信号的传输模型. 当信号传输速率更高时,为了减小信号的跳变时间和功耗,信号的幅度一般都会相应减小.比如以前大量使用的5V的TTL信号现在使用越来越少,更多使用的是3.3V/2.

Linux信号实践(3) --信号内核表示

信号在内核中的表示 执行信号的处理动作称为信号递达(Delivery),信号从产生到递达之间的状态,称为信号未决(Pending).进程可以选择阻塞(Block)某个信号.被阻塞的信号产生时将保持在未决状态,直到进程解除对此信号的阻塞,才执行递达的动作. 注意,阻塞和忽略是不同,只要信号被阻塞就不会递达,而忽略是在递达之后可选的一种处理动作.信号在内核中的表示可以看作是这样的: 图-信号的发送过程 解释说明: 1)PCB进程控制块(task_struct)中函数有信号屏蔽状态字(block)和信

信号的上升沿与周期(高速信号与高频信号)

在硬件设计中经常需要对频率比较高的信号进行特殊照顾,比如DDR3内存的频率经常能达到1GHz以上,PCB布线的时候通常要考虑到信号完整性的问题,做阻抗匹配和严格的拓扑结构,但实际分析信号完整性的时候,我们的研究对象是信号的上升沿时间,在数字信号中上升沿和信号频率没有必然联系,所以归根结底我们对高频信号的特殊照顾,都是从其上升沿时间的角度出发的,也就是说上升沿时间短的高速信号是我们在硬件设计中需要特殊照顾的对象,而不是高频信号.对一个周期为1KHz的低频信号,如果它的上升沿时间很短,这个信号就是高

2信号处理之:信号产生原因,进程处理信号行为,信号集处理函数,PCB的信号集,sigprocmask()和sigpending(),信号捕捉设定,sigaction,C标准库信号处理函数,可重入函数,

 1信号产生原因 2.进程处理信号行为 manpage里信号3中处理方式: SIG_IGN SIG_DFL                                            默认Term动作 a signal handling function 进程处理信号 A默认处理动作 term   中断 core    core(调试的时候产生) gcc –g file.c ulimit –c 1024 gdb a.out core ign      忽略 stop     停止

Linux环境编程之信号(一):信号基本概述

引言 假如在后台运行一个可执行程序./a.out,如果想终止该程序,通常会按下Ctrl-C,从而产生一个中断,其实这个过程的实现就是通过信号完成的.信号是软件中断,它提供了一种处理异步事件的方法. (一) 每个信号都有一个名字,这些名字都以三个字符SIG开头.例如SIGALARM是闹钟信号,当由alarm函数设置的计时器超时后产生此信号.Linux除支持31种不同信号外,还支持应用程序额外定义信号.信号定义在<bits/signum.h>中,也可以通过命令kill -l查看. (二)信号的产生

APUE学习笔记——10.11~10.13 信号集、信号屏蔽字、未决信号

如有转载,请注明出处:Windeal专栏 首先简述下几个概念的关系: 我们通过信号集建立信号屏蔽字,使得信号发生阻塞,被阻塞的信号即未决信号. 信号集: 信号集:其实就是一系列的信号.用sigset_t set表示. 数据类型:sigset_t 类似于整型(位数可能超过整型,因而不能用整型表示). 我们一般在sigprocmask()等函数中使用信号集,用于创建一系列进程要阻塞的信号,告诉内核不允许这些信号发生. 几个关于信号集的函数: #include <signal.h> int sige

APUE学习笔记——10.可靠信号与不可靠信号

首先说明:现在大部分Unix系系统如Linux都已经实现可靠信号. 1~31信号与SIGRTMIN-SIGRTMAX之间并不是可靠信号与不可靠信号的区别,在大多数系统下他们都是可靠信号. 只不过: 1~31信号                              --不支持排队,为普通信号. SIGRTMIN-SIGRTMAX信号 --支持排队,实时信号 不可靠信号 什么是不可靠信号: 不可靠的意思是信号可能丢失或者被错误处理. 在早起系统中,信号存在两大缺陷,导致了信号不可靠. 缺陷一: