.Net并行编程 - Reactive Extensions(Rx)并发浅析

关于Reactive Extensions(Rx)

关于Reactive Extensions(Rx),先来看一下来自微软的官方描述:

The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observablesquery asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.

“Reactive Extensions(Rx)是一个类库,它集成了异步、基于可观察(observable)序列的事件驱动编程和LINQ-style的查询操作。使用Rx,开发人员可以用observable对象描述异步数据流,使用LINQ操作符异步查询数据和使用Schedulers控制异步过程中的并发。简而言之,Rx = Observables + LINQ + Schedulers。”

Reactive Extensions(Rx)就一定是多线程?

在以上的描述中,反复出现了一个词“异步”。一般来讲,提到“异步”,首先反应到的就是多线程。那问题来了,使用Reactive Extensions就一定意味着多线程吗?先来看一个示例,代码来了:

 1 static void Main(string[] args)
 2 {
 3     Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
 4     var sub = new Subject<Object>();
 5
 6     sub.Subscribe(o => Console.WriteLine("Received {1} on threadId:{0}",    //为Observable订阅处理器(handler),输出handler thread id
 7         Thread.CurrentThread.ManagedThreadId,
 8         o));
 9     ParameterizedThreadStart notify = obj =>    //委托定义,其内输出被观察对象的thread id
10     {
11         Console.WriteLine("OnNext({1}) on threadId:{0}",
12         Thread.CurrentThread.ManagedThreadId,
13         obj);
14         sub.OnNext(obj);
15     };
16     notify(1);
17     new Thread(notify).Start(2);
18     new Thread(notify).Start(3);
19
20     Console.Read();
21 }

代码中,分别输出了通知者的thread id和callback handler的thread id。这里使用的是Rx默认的线程并发方式。输出结果如下:

无论是在当前线程调用,还是新启线程执行,通知者和处理方法所在线程均为同一个。在该示例中,Rx的线程分配是在free-threaded模式下工作的,free-threaded就意味着我们不强行指其Rx中的subscription, notification执行线程。这是Rx的默认工作模式,而这种模式下subscribing/call OnNext并没有引发新的线程来处理observable序列,线处理方式是单线程(Single Threaded Apartment,STA)。所以,我们可以这样说:单线程是Reactive Extensions(Rx)的默认处理方式。

使用SubscribeOn控制订阅(subscribing)的上下文

IObservable<TSource>的扩展方法SubscribeOn<TSource>(IScheduler)允许我们传入一调度器(Scheduler),控制订阅执行的上下文。

 1 static void Main(string[] args)
 2 {
 3     Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
 4     var source = Observable.Create<int>(
 5     o =>
 6     {
 7         Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
 8         o.OnNext(1);
 9         o.OnNext(2);
10         o.OnNext(3);
11         o.OnCompleted();
12         Console.WriteLine("Finished on threadId:{0}",
13         Thread.CurrentThread.ManagedThreadId);
14         return Disposable.Empty;
15     });
16     source
17         //.SubscribeOn(Scheduler.ThreadPool)
18     .Subscribe(
19     o => Console.WriteLine("Received {1} on threadId:{0}",
20     Thread.CurrentThread.ManagedThreadId,
21     o),
22     () => Console.WriteLine("OnCompleted on threadId:{0}",
23     Thread.CurrentThread.ManagedThreadId));
24     Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
25
26     Console.Read();
27 }

代码中,使用Observable.Create创建一Observable序列,随后订阅该序列。输出结果为:

当序列被订阅source.Subscribe,代理Observable.Create被调用执行。首先是OnNext(1) handler,依次是OnNext(2) OnNext(3) handler和OnCompleted handler,最后执行到“Subscribed on threadId:10”。整个过程是线性的,阻塞(block)式的。这是符合上面分析的Rx默认单线程的模式的。

我们放开.SubscribeOn(Scheduler.ThreadPool)的注释,指定Rx工作在线程池内完成。执行结果如下:

可以看到,所有的handler都是在一新线程内完成的。这是一个非阻塞的(no=-block)模式。

SubscribeOn方法常用来指定Observable notifications的线程执行模式(哪里执行)。其常用于以下的场景中:

  》 UI线程不允许阻塞

  》 不需在UI线程中更新显示

常用的Scheduler属性:

CurrentThread 在当前进程中尽快的调度工作,同步(synchronous,block)
Immediate 在当前进程中立即调度工作,同步(synchronous,block)
NewThread 在新线程中调度工作(asynchronous,no-block)
TaskPool 在任务工厂中调度工作(asynchronous,no-block)
ThreadPool 在线程池中调度工作(asynchronous,no-block)

示例代码 RxConcurrencySample 下载

参考资料:

      The Reactive Extensions(Rx)...

      Reactive Extensions(Rx)介绍

时间: 2024-10-17 17:35:47

.Net并行编程 - Reactive Extensions(Rx)并发浅析的相关文章

Reactive Extensions(Rx)并发浅析

Reactive Extensions(Rx)并发浅析 iSun Design & Code .Net并行编程 - Reactive Extensions(Rx)并发浅析 关于Reactive Extensions(Rx) 关于Reactive Extensions(Rx),先来看一下来自微软的官方描述: The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs us

001.Reactive Extensions

基本概念 The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Reactive Extensions represents all these data sequences as observable sequences. An applicat

C#并行编程-并发集合

原文:C#并行编程-并发集合 菜鸟学习并行编程,参考<C#并行编程高级教程.PDF>,如有错误,欢迎指正. 背景 基于任务的程序设计.命令式数据并行和任务并行都要求能够支持并发更新的数组.列表和集合. 在.NET Framework 4 以前,为了让共享的数组.列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作. 如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制来保证这是一个线程安全的集合. System.Collenctions

C# 并行编程 之 并发集合 (.Net Framework 4.0)(转)

转载地址:http://blog.csdn.net/wangzhiyu1980/article/details/45497907 此文为个人学习<C#并行编程高级教程>的笔记,总结并调试了一些文章中的代码示例. 在以后开发过程中可以加以运用. 对于并行任务,与其相关紧密的就是对一些共享资源,数据结构的并行访问.经常要做的就是对一些队列进行加锁-解锁,然后执行类似插入,删除等等互斥操作. .NetFramework 4.0 中提供了一些封装好的支持并行操作数据容器,可以减少并行编程的复杂程度.

C# 并行编程 之 并发集合 (.Net Framework 4.0)

此文为个人学习<C#并行编程高级教程>的笔记,总结并调试了一些文章中的代码示例. 在以后开发过程中可以加以运用. 对于并行任务,与其相关紧密的就是对一些共享资源,数据结构的并行访问.经常要做的就是对一些队列进行加锁-解锁,然后执行类似插入,删除等等互斥操作. .NetFramework 4.0 中提供了一些封装好的支持并行操作数据容器,可以减少并行编程的复杂程度. 基本信息 .NetFramework中并行集合的名字空间: System.Collections.Concurrent 并行容器:

Unity基于响应式编程(Reactive programming)入门

系列目录 [Unity3D基础]让物体动起来①--基于UGUI的鼠标点击移动 [Unity3D基础]让物体动起来②--UGUI鼠标点击逐帧移动 时光煮雨 Unity3D让物体动起来③—UGUI DoTween&Unity Native2D实现 时光煮雨 Unity3D实现2D人物动画① UGUI&Native2D序列帧动画 时光煮雨 Unity3D实现2D人物动画② Unity2D 动画系统&资源效率 背景 前有慕容小匹夫的一篇<解构C#游戏框架uFrame兼谈游戏架构设计&

多核并行编程技术(一)

首先需要先理解几个概念:串行:最基本的程序执行方式,串行程序的整个运行时,只有一个调用栈和一个运行时上下文,单进程/单线程程序可以认为是串行程序.并发:多线程出现后比较常见的程序执行方式,多线程程序运行时,会有多个运行时上下文和对应的多个调用栈.逻辑上多个线程同时发生,物理上是有操作系统调度,CPU某一时刻依然只执行一个线程的任务,但是某个执行中的线程随时可能被OS调度走,而随后运行的线程操作的数据可能跟刚刚被调度走的线程造成冲突,所以有共享数据同步问题. 多进程如果有共享数据,也符合并发程序的

C#并行编程 z

目录 C#并行编程-相关概念 C#并行编程-Parallel C#并行编程-Task C#并行编程-并发集合 C#并行编程-线程同步原语 C#并行编程-PLINQ:声明式数据并行 背景 基于任务的程序设计.命令式数据并行和任务并行都要求能够支持并发更新的数组.列表和集合. 在.NET Framework 4 以前,为了让共享的数组.列表和集合能够被多个线程更新,需要添加复杂的代码来同步这些更新操作. 如您需要编写一个并行循环,这个循环以无序的方式向一个共享集合中添加元素,那么必须加入一个同步机制

Net并行编程高级教程--Parallel

Net并行编程高级教程--Parallel 一直觉得自己对并发了解不够深入,特别是看了<代码整洁之道>觉得自己有必要好好学学并发编程,因为性能也是衡量代码整洁的一大标准.而且在<失控>这本书中也多次提到并发,不管是计算机还是生物都并发处理着各种事物.人真是奇怪,当你关注一个事情的时候,你会发现周围的事物中就常出现那个事情.所以好奇心驱使下学习并发.便有了此文. 一.理解硬件线程和软件线程 多核处理器带有一个以上的物理内核--物理内核是真正的独立处理单元,多个物理内核使得多条指令能够