Reactive Extensions(Rx)并发浅析

Reactive Extensions(Rx)并发浅析

iSun

Design & Code

.Net并行编程 - Reactive Extensions(Rx)并发浅析

关于Reactive Extensions(Rx)

关于Reactive Extensions(Rx),先来看一下来自微软的官方描述:

The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observablesquery asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.

“Reactive Extensions(Rx)是一个类库,它集成了异步、基于可观察(observable)序列的事件驱动编程和LINQ-style的查询操作。使用Rx,开发人员可以用observable对象描述异步数据流,使用LINQ操作符异步查询数据和使用Schedulers控制异步过程中的并发。简而言之,Rx = Observables + LINQ + Schedulers。”

Reactive Extensions(Rx)就一定是多线程?

在以上的描述中,反复出现了一个词“异步”。一般来讲,提到“异步”,首先反应到的就是多线程。那问题来了,使用Reactive Extensions就一定意味着多线程吗?先来看一个示例,代码来了:

 1 static void Main(string[] args)
 2 {
 3     Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
 4     var sub = new Subject<Object>();
 5
 6     sub.Subscribe(o => Console.WriteLine("Received {1} on threadId:{0}",    //为Observable订阅处理器(handler),输出handler thread id
 7         Thread.CurrentThread.ManagedThreadId,
 8         o));
 9     ParameterizedThreadStart notify = obj =>    //委托定义,其内输出被观察对象的thread id
10     {
11         Console.WriteLine("OnNext({1}) on threadId:{0}",
12         Thread.CurrentThread.ManagedThreadId,
13         obj);
14         sub.OnNext(obj);
15     };
16     notify(1);
17     new Thread(notify).Start(2);
18     new Thread(notify).Start(3);
19
20     Console.Read();
21 }

代码中,分别输出了通知者的thread id和callback handler的thread id。这里使用的是Rx默认的线程并发方式。输出结果如下:

无论是在当前线程调用,还是新启线程执行,通知者和处理方法所在线程均为同一个。在该示例中,Rx的线程分配是在free-threaded模式下工作的,free-threaded就意味着我们不强行指其Rx中的subscription, notification执行线程。这是Rx的默认工作模式,而这种模式下subscribing/call OnNext并没有引发新的线程来处理observable序列,线处理方式是单线程(Single Threaded Apartment,STA)。所以,我们可以这样说:单线程是Reactive Extensions(Rx)的默认处理方式。

使用SubscribeOn控制订阅(subscribing)的上下文

IObservable<TSource>的扩展方法SubscribeOn<TSource>(IScheduler)允许我们传入一调度器(Scheduler),控制订阅执行的上下文。

 1 static void Main(string[] args)
 2 {
 3     Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
 4     var source = Observable.Create<int>(
 5     o =>
 6     {
 7         Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
 8         o.OnNext(1);
 9         o.OnNext(2);
10         o.OnNext(3);
11         o.OnCompleted();
12         Console.WriteLine("Finished on threadId:{0}",
13         Thread.CurrentThread.ManagedThreadId);
14         return Disposable.Empty;
15     });
16     source
17         //.SubscribeOn(Scheduler.ThreadPool)
18     .Subscribe(
19     o => Console.WriteLine("Received {1} on threadId:{0}",
20     Thread.CurrentThread.ManagedThreadId,
21     o),
22     () => Console.WriteLine("OnCompleted on threadId:{0}",
23     Thread.CurrentThread.ManagedThreadId));
24     Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
25
26     Console.Read();
27 }

代码中,使用Observable.Create创建一Observable序列,随后订阅该序列。输出结果为:

当序列被订阅source.Subscribe,代理Observable.Create被调用执行。首先是OnNext(1) handler,依次是OnNext(2) OnNext(3) handler和OnCompleted handler,最后执行到“Subscribed on threadId:10”。整个过程是线性的,阻塞(block)式的。这是符合上面分析的Rx默认单线程的模式的。

我们放开.SubscribeOn(Scheduler.ThreadPool)的注释,指定Rx工作在线程池内完成。执行结果如下:

可以看到,所有的handler都是在一新线程内完成的。这是一个非阻塞的(no=-block)模式。

SubscribeOn方法常用来指定Observable notifications的线程执行模式(哪里执行)。其常用于以下的场景中:

  》 UI线程不允许阻塞

  》 不需在UI线程中更新显示

常用的Scheduler属性:

CurrentThread 在当前进程中尽快的调度工作,同步(synchronous,block)
Immediate 在当前进程中立即调度工作,同步(synchronous,block)
NewThread 在新线程中调度工作(asynchronous,no-block)
TaskPool 在任务工厂中调度工作(asynchronous,no-block)
ThreadPool 在线程池中调度工作(asynchronous,no-block)

示例代码 RxConcurrencySample 下载

参考资料:

      The Reactive Extensions(Rx)...

      Reactive Extensions(Rx)介绍

分类: 并行编程

标签: C#并行编程Reactive ExtensionsRxObservableScheduler多线程

时间: 2024-10-20 09:35:47

Reactive Extensions(Rx)并发浅析的相关文章

.Net并行编程 - Reactive Extensions(Rx)并发浅析

关于Reactive Extensions(Rx) 关于Reactive Extensions(Rx),先来看一下来自微软的官方描述: The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represen

001.Reactive Extensions

基本概念 The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Reactive Extensions represents all these data sequences as observable sequences. An applicat

Reactive Extensions

Rx提供了一种新的组织和协调异步事件的方式,极大的简化了代码的编写.Rx最显著的特性是使用可观察集合(Observable Collection)来达到集成异步(composing asynchronous)和基于事件(event-based)的编程的效果. 我的理解是它是一种编程模式的抽象,http://latentflip.com/imperative-vs-declarative/,协同Linq的思想可以很好的工作,使代码更为整洁. Obervable Collection是Rx的核心,你

成为专业程序员路上用到的各种优秀资料、神器及框架

最近想着怎么把自己的知识体系进行整理起来,使用思维导图进行描述,对自己以后的发展也有一个更深的认识,更快的提升自己:看到了下面这篇文章,感觉非常实用,从语言到框架都非常全面,自己也可以继续补充,也是对自己知识体系的一个补充吧. 前言 成为一名专业程序员的道路上,需要坚持练习.学习与积累,技术方面既要有一定的广度,更要有自己的深度. 笔者作为一位tool mad,将工作以来用到的各种优秀资料.神器及框架整理在此,毕竟好记性不如烂键盘,此项目可以作为自己的不时之需. 本人喜欢折腾,记录的东西也比较杂

【真正福利】成为专业程序员路上用到的各种优秀资料、神器及框架

转载,原地址:http://www.cnblogs.com/jasondan/p/6380597.html 据说看到好文章不推荐的人,服务器容易宕机!本文版权归翟士丹(Stan Zhai)和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接,否则保留追究法律责任的权利. 好东西不是随便收集下,发篇博文,骗些点赞的!积累了5年多的东西,是时候放出来跟大家见见面了. 或许有的园友在14年的时候收藏过我的一篇"工欲善其事.必先利其器"的博文,时隔3年,已经

成为专业程序员路上有用的各种优秀资料、神器及框架

目录 资料篇 技术站点 必看书籍 大牛博客 GitHub篇 工具篇 平台工具 常用工具 第三方服务 爬虫相关(好玩的工具) 安全相关 Web服务器性能/压力测试工具/负载均衡器 大数据处理/数据分析/分布式工具 Web前端 语言篇 Scala Java Python Swift .NET C & C++ 其他 游戏开发相关 日志聚合,分布式日志收集 RTP,实时传输协议与音视频 资料篇 技术站点 在线学习:Coursera.edX.Udacity.MIT公开课.MOOC学院.慕课网 Hacker

[转载]程序员路上用到的各种优秀资料、神器及框架

好东西不是随便收集下,发篇博文,骗些点赞的!积累了5年多的东西,是时候放出来跟大家见见面了. 或许有的园友在14年的时候收藏过我的一篇"工欲善其事.必先利其器"的博文,时隔3年,已经做了N多更新,那篇文章也已被我删除,迁移至GitHub,重新进行管理. 这篇文章,大家可以推荐.收藏,让更多的人在园内看到,让福利普照. 因为这篇文章,我以后不会更新. 但项目依旧会更新,所以,更好的做法是,请到GitHub上Star:be-a-professional-programmer 前言 成为一名

[转载] 构建微服务:使用API Gateway

原文: http://mp.weixin.qq.com/s?__biz=MzA5OTAyNzQ2OA==&mid=206889381&idx=1&sn=478ccb35294c58d25d2df2d9ced65cf7&scene=1&key=c76941211a49ab586d79043cb87ac0dfeede574a20b2208ce76058b151624e4273182de582a786668ea347c6f317b389&ascene=0&

Microsoft Orleans 之 入门指南

Microsoft Orleans 在.net用简单方法构建高并发.分布式的大型应用程序框架. 原文:http://dotnet.github.io/orleans/ 在线文档:http://dotnet.github.io/orleans/What's-new-in-Orleans 源码地址:https://github.com/dotnet/orleans 简介:Orleans 框架可以构建大规模.高并发.分布式应用程序,而不需要学习专业分布式以及并发知识框架.它是由微软研究和设计应用于云计