Scalaz(57)- scalaz-stream: fs2-多线程编程,fs2 concurrency

fs2的多线程编程模式不但提供了无阻碍I/O(java nio)能力,更为并行运算提供了良好的编程工具。在进入并行运算讨论前我们先示范一下fs2 pipe2对象里的一些Stream合并功能。我们先设计两个帮助函数(helper)来跟踪运算及模拟运算环境:

1   def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap {a =>
2     Task.delay { println(prompt + a); a}}         //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]
3
4   Stream(1,2,3).through(log(">")).run.unsafeRun   //> >1
5                                                   //| >2
6                                                   //| >3

log是个运算跟踪函数。

 1  implicit val strategy = Strategy.fromFixedDaemonPool(4)
 2   //> strategy  : fs2.Strategy = Strategy
 3  implicit val scheduler = Scheduler.fromFixedDaemonPool(2)
 4   //> scheduler  : fs2.Scheduler = Scheduler([email protected][Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0])
 5   def randomDelay[A](max: FiniteDuration): Pipe[Task, A, A] = _.evalMap { a => {
 6     val delay: Task[Int] = Task.delay {
 7       scala.util.Random.nextInt(max.toMillis.toInt)
 8     }
 9     delay.flatMap { d => Task.now(a).schedule(d.millis) }
10    }
11   }      //> randomDelay: [A](max: scala.concurrent.duration.FiniteDuration)fs2.Pipe[fs2.Task,A,A]
12   Stream(1,2,3).through(randomDelay(1.second)).through(log("delayed>")).run.unsafeRun
13                                                   //> delayed>1
14                                                   //| delayed>2
15                                                   //| delayed>3

randomDelay是一个模拟任意延迟运算环境的函数。我们也可以在连接randomDelay前后进行跟踪:

1 Stream(1,2,3).through(log("befor delay>"))
2                .through(randomDelay(1.second))
3                .through(log("after delay>")).run.unsafeRun
4                                                   //> befor delay>1
5                                                   //| after delay>1
6                                                   //| befor delay>2
7                                                   //| after delay>2
8                                                   //| befor delay>3
9                                                   //| after delay>3

值得注意的是randomDelay并不会阻碍(block)当前运算。

下面我们来看看pipe2对象里的合并函数interleave:

 1 val sa = Stream(1,2,3).through(randomDelay(1.second)).through(log("A>"))
 2         //> sa  : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>)
 3 val sb = Stream(1,2,3).through(randomDelay(1.second)).through(log("B>"))
 4         //> sb  : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>)
 5 (sa interleave sb).through(log("AB")).run.unsafeRun
 6                                                   //> A>1
 7                                                   //| B>1
 8                                                   //| AB>1
 9                                                   //| AB>1
10                                                   //| A>2
11                                                   //| B>2
12                                                   //| AB>2
13                                                   //| AB>2
14                                                   //| A>3
15                                                   //| B>3
16                                                   //| AB>3
17                                                   //| AB>3

我们看到合并后的数据发送必须等待sa,sb完成了元素发送之后。这是一种固定顺序的合并操作。merge是一种不定顺序的合并方式,我们看看它的使用示范:

 1 (sa merge sb).through(log("AB>")).run.unsafeRun   //> B>1
 2                                                   //| AB>1
 3                                                   //| B>2
 4                                                   //| AB>2
 5                                                   //| B>3
 6                                                   //| AB>3
 7                                                   //| A>1
 8                                                   //| AB>1
 9                                                   //| A>2
10                                                   //| AB>2
11                                                   //| A>3
12                                                   //| AB>3

我们看到merge不会同时等待sa,sb完成后再发送结果,只要其中一个完成发送就开始发送结果了。换言之merge合并基本上是跟着跑的快的那个,所以结果顺序是不规则不可确定的(nondeterministic)。那么从运算时间上来讲:interleave合并所花费时间就是确定的sa+sb,而merge则选sa,sb之间最快的时间。当然总体运算所需时间是相当的,但在merge时我们可以对发出的元素进行并行运算,能大大缩短运算时间。用merge其中一个问题是我们无法确定当前的元素是从那里发出的,我们可以用either来解决这个问题:

 1 (sa either sb).through(log("AB>")).run.unsafeRun  //> A>1
 2                                                   //| AB>Left(1)
 3                                                   //| B>1
 4                                                   //| AB>Right(1)
 5                                                   //| A>2
 6                                                   //| AB>Left(2)
 7                                                   //| B>2
 8                                                   //| AB>Right(2)
 9                                                   //| B>3
10                                                   //| AB>Right(3)
11                                                   //| A>3
12                                                   //| AB>Left(3)

我们通过left,right分辨数据源头。如果再增多一个Stream源头,我们还是可以用merge来合并三个Stream:

 1 val sc = Stream.range(1,10).through(randomDelay(1.second)).through(log("C>"))
 2     //> sc  : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>)
 3 ((sa merge sb) merge sc).through(log("ABC>")).run.unsafeRun
 4                                                   //> B>1
 5                                                   //| ABC>1
 6                                                   //| C>1
 7                                                   //| ABC>1
 8                                                   //| A>1
 9                                                   //| ABC>1
10                                                   //| B>2
11                                                   //| ABC>2
12                                                   //| A>2
13                                                   //| ABC>2
14                                                   //| B>3
15                                                   //| ABC>3
16                                                   //| C>2
17                                                   //| ABC>2
18                                                   //| A>3
19                                                   //| ABC>3
20                                                   //| C>3
21                                                   //| ABC>3
22                                                   //| C>4
23                                                   //| ABC>4
24                                                   //| C>5
25                                                   //| ABC>5
26                                                   //| C>6
27                                                   //| ABC>6
28                                                   //| C>7
29                                                   //| ABC>7
30                                                   //| C>8
31                                                   //| ABC>8
32                                                   //| C>9
33                                                   //| ABC>9

如果我们无法确定数据源头数量的话,那么我们可以用以下的类型款式来表示:

Stream[Task,Stream[Task,A]]

这个类型代表的是Stream of Streams。在外部的Stream里包含了不确定数量的Streams。用具体的例子可以解释:外部的Stream代表客端数据连接(connection),内部的Stream代表每个客端读取的数据。把上面的三个Stream用这种类型来表示的话:

1 val streams:Stream[Task,Stream[Task,Int]] = Stream(sa,sb,sc)
2      //> streams  : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,Int]] = Segment(Emit(Chunk(Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>),Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>), S
3 egment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>))))

现在我们不但需要对内部Stream进行运算还需要把结果打平成Stream[Task,A]。在fs2.concurrent包里就有这样一个组件(combinator):

  def join[F[_],O](maxOpen: Int)(outer: Stream[F,Stream[F,O]])(implicit F: Async[F]): Stream[F,O] = {...}

输入参数outer和运算结果类型都对得上。maxOpen代表最多并行运算数。我们可以用join运算上面合并sa,sb,sc的例子:

 1 val ms = concurrent.join(3)(streams)              //> ms  : fs2.Stream[fs2.Task,Int] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>)
 2 ms.through(log("ABC>")).run.unsafeRun             //> C>1
 3                                                   //| ABC>1
 4                                                   //| A>1
 5                                                   //| ABC>1
 6                                                   //| C>2
 7                                                   //| ABC>2
 8                                                   //| B>1
 9                                                   //| ABC>1
10                                                   //| C>3
11                                                   //| ABC>3
12                                                   //| A>2
13                                                   //| ABC>2
14                                                   //| B>2
15                                                   //| ABC>2
16                                                   //| C>4
17                                                   //| ABC>4
18                                                   //| A>3
19                                                   //| ABC>3
20                                                   //| B>3
21                                                   //| ABC>3
22                                                   //| C>5
23                                                   //| ABC>5
24                                                   //| C>6
25                                                   //| ABC>6
26                                                   //| C>7
27                                                   //| ABC>7
28                                                   //| C>8
29                                                   //| ABC>8
30                                                   //| C>9
31                                                   //| ABC>9

结果就是我们预料的。上面提到过maxOpen是最大并行运算数。我们用另一个例子来观察:

 1 val rangedStreams = Stream.range(0,5).map {id =>
 2       Stream.range(1,5).through(randomDelay(1.second)).through(log(((‘A‘+id).toChar).toString +">")) }
 3       //> rangedStreams  : fs2.Stream[Nothing,fs2.Stream[fs2.Task,Int]] = Segment(Emit(Chunk(()))).flatMap(<function1>).mapChunks(<function1>)
 4 concurrent.join(3)(rangedStreams).run.unsafeRun   //> B>1
 5                                                   //| A>1
 6                                                   //| C>1
 7                                                   //| B>2
 8                                                   //| C>2
 9                                                   //| A>2
10                                                   //| B>3
11                                                   //| C>3
12                                                   //| C>4
13                                                   //| D>1
14                                                   //| A>3
15                                                   //| A>4
16                                                   //| B>4
17                                                   //| E>1
18                                                   //| E>2
19                                                   //| E>3
20                                                   //| D>2
21                                                   //| D>3
22                                                   //| E>4
23                                                   //| D>4

可以看到一共只有三个运算过程同时存在,如:ABC, ED...

当我们的程序需要与外界程序交互时,可能会以下面的几种形式进行:

1、产生副作用的运算是同步运行的。这种情况最容易处理,因为直接可以获取结果

2、产生副作用的运算是异步的:通过调用一次callback函数来提供运算结果

3、产生副作用的运算是异步的,但结果必须通过多次调用callback函数来分批提供

下面我们就一种一种情况来分析:

1、同步运算最容易处理:我们只需要把运算包嵌在Stream.eval里就行了:

1 def destroyUniverse: Unit = println("BOOOOM!!!")  //> destroyUniverse: => Unit
2 val s = Stream.eval_(Task.delay(destroyUniverse)) ++ Stream("...move on")
3     //> s  : fs2.Stream[fs2.Task,String] = append(attemptEval(Task).flatMap(<function1>).flatMap(<function1>), Segment(Emit(Chunk(()))).flatMap(<function1>))
4 s.runLog.unsafeRun                        //> BOOOOM!!!
5                                           //| res8: Vector[String] = Vector(...move on)

2、第二种情况:fs2里的Async trait有个async是用来登记callback函数的:

trait Async[F[_]] extends Effect[F] { self =>
/**
   Create an `F[A]` from an asynchronous computation, which takes the form
   of a function with which we can register a callback. This can be used
   to translate from a callback-based API to a straightforward monadic
   version.
   */
  def async[A](register: (Either[Throwable,A] => Unit) => F[Unit]): F[A] =
    bind(ref[A]) { ref =>
    bind(register { e => runSet(ref)(e) }) { _ => get(ref) }}
...

我们用一个实际的例子来做示范,假设我们有一个callback函数readBytes:

1 trait Connection {
2   def readBytes(onSuccess: Array[Byte] => Unit, onFailure: Throwable => Unit): Unit

这个Connection就是一个交互界面(interface)。假设它是这样实现实例化的:

1 val conn = new Connection {
2   def readBytes(onSuccess: Array[Byte] => Unit, onFailure: Throwable => Unit): Unit = {
3     Thread.sleep(1000)
4     onSuccess(Array(1,2,3,4,5))
5   }
6 }  //> conn  : demo.ws.fs2Concurrent.connection = [email protected]

我们可以用async登记(register)这个callback函数,把它变成纯代码可组合的(monadic)组件Task[Array[Byte]]:

1 val bytes = T.async[Array[Byte]] { (cb: Either[Throwable,Array[Byte]] => Unit) => {
2    Task.delay { conn.readBytes (
3      ready => cb(Right(ready)),
4      fail => cb(Left(fail))
5    ) }
6 }}             //> bytes  : fs2.Task[Array[Byte]] = Task

这样我们才能用Stream.eval来运算bytes:

1 Stream.eval(bytes).map(_.toList).runLog.unsafeRun //> res9: Vector[List[Byte]] = Vector(List(1, 2, 3, 4, 5))

这种只调用一次callback函数的情况也比较容易处理:当我们来不及处理数据时停止读取就是了。如果需要多次调用callback,比如外部程序也是一个Stream API:一旦数据准备好就调用一次callback进行传送。这种情况下可能出现我们的程序来不及处理收到的数据的状况。我们可以用fs2.async包提供的queue来解决这个问题:

 1 import fs2.async
 2   import fs2.util.Async
 3
 4   type Row = List[String]
 5   // defined type alias Row
 6
 7   trait CSVHandle {
 8     def withRows(cb: Either[Throwable,Row] => Unit): Unit
 9   }
10   // defined trait CSVHandle
11
12   def rows[F[_]](h: CSVHandle)(implicit F: Async[F]): Stream[F,Row] =
13     for {
14       q <- Stream.eval(async.unboundedQueue[F,Either[Throwable,Row]])
15       _ <- Stream.suspend { h.withRows { e => F.unsafeRunAsync(q.enqueue1(e))(_ => ()) }; Stream.emit(()) }
16       row <- q.dequeue through pipe.rethrow
17     } yield row
18   // rows: [F[_]](h: CSVHandle)(implicit F: fs2.util.Async[F])fs2.Stream[F,Row]

enqueue1和dequeue在Queue trait里是这样定义的:

/**
 * Asynchronous queue interface. Operations are all nonblocking in their
 * implementations, but may be ‘semantically‘ blocking. For instance,
 * a queue may have a bound on its size, in which case enqueuing may
 * block until there is an offsetting dequeue.
 */
trait Queue[F[_],A] {
/**
   * Enqueues one element in this `Queue`.
   * If the queue is `full` this waits until queue is empty.
   *
   * This completes after `a`  has been successfully enqueued to this `Queue`
   */
  def enqueue1(a: A): F[Unit]

/** Repeatedly call `dequeue1` forever. */
  def dequeue: Stream[F, A] = Stream.repeatEval(dequeue1)

  /** Dequeue one `A` from this queue. Completes once one is ready. */
  def dequeue1: F[A]
...

我们用enqueue1把一次callback调用存入queue。dequeue的运算结果是Stream[F,Row],所以我们用dequeue运算存在queue里的任务取出数据。

fs2提供了signal,queue,semaphore等数据类型。下面是一些使用示范:async.signal

 1 Stream.eval(async.signalOf[Task,Int](0)).flatMap {s =>
 2     val monitor: Stream[Task,Nothing] =
 3       s.discrete.through(log("s updated>")).drain
 4     val data: Stream[Task,Int] =
 5       Stream.range(10,16).through(randomDelay(1.second))
 6     val writer: Stream[Task,Unit] =
 7       data.evalMap {d => s.set(d)}
 8     monitor merge writer
 9   }.run.unsafeRun                                 //> s updated>0
10                                                   //| s updated>10
11                                                   //| s updated>11
12                                                   //| s updated>12
13                                                   //| s updated>13
14                                                   //| s updated>14
15                                                   //| s updated>15

async.queue使用示范:

 1 Stream.eval(async.boundedQueue[Task,Int](5)).flatMap {q =>
 2     val monitor: Stream[Task,Nothing] =
 3       q.dequeue.through(log("dequeued>")).drain
 4     val data: Stream[Task,Int] =
 5       Stream.range(10,16).through(randomDelay(1.second))
 6     val writer: Stream[Task,Unit] =
 7       data.to(q.enqueue)
 8     monitor mergeHaltBoth  writer
 9
10   }.run.unsafeRun                                 //> dequeued>10
11                                                   //| dequeued>11
12                                                   //| dequeued>12
13                                                   //| dequeued>13
14                                                   //| dequeued>14
15                                                   //| dequeued>15

fs2还在time包里提供了一些定时自动产生数据的函数和类型。我们用一些代码来示范它们的用法:

1 time.awakeEvery[Task](1.second)
2    .through(log("time:"))
3    .take(5).run.unsafeRun                         //> time:1002983266 nanoseconds
4                                                   //| time:2005972864 nanoseconds
5                                                   //| time:3004831159 nanoseconds
6                                                   //| time:4002104307 nanoseconds
7                                                   //| time:5005091850 nanoseconds

awakeEvery产生的是一个无穷数据流,所以我们用take(5)来取前5个元素。我们也可以让它运算5秒钟:

1  val tick = time.awakeEvery[Task](1.second).through(log("time:"))
2     //> tick  : fs2.Stream[fs2.Task,scala.concurrent.duration.FiniteDuration] = Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>)
3  tick.run.unsafeRunFor(5.seconds)                 //> time:1005685270 nanoseconds
4                                                   //| time:2004331473 nanoseconds
5                                                   //| time:3005046945 nanoseconds
6                                                   //| time:4002795227 nanoseconds
7                                                   //| time:5002807816 nanoseconds
8                                                   //| java.util.concurrent.TimeoutException

如果我们希望避免TimeoutException,可以用Task.schedule:

 1 val tick = time.awakeEvery[Task](1.second).through(log("time:"))
 2    //> tick  : fs2.Stream[fs2.Task,scala.concurrent.duration.FiniteDuration] = Seg
 3 ment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>)
 4  tick.interruptWhen(Stream.eval(Task.schedule(true,5.seconds)))
 5       .run.unsafeRun                              //> time:1004963839 nanoseconds
 6                                                   //| time:2005325025 nanoseconds
 7                                                   //| time:3005238921 nanoseconds
 8                                                   //| time:4004240985 nanoseconds
 9                                                   //| time:5001334732 nanoseconds
10                                                   //| time:6003586673 nanoseconds
11                                                   //| time:7004728267 nanoseconds
12                                                   //| time:8004333608 nanoseconds
13                                                   //| time:9003907670 nanoseconds
14                                                   //| time:10002624970 nanoseconds

最直接的方法是用fs2的tim.sleep:

1  (time.sleep[Task](5.seconds) ++ Stream.emit(true)).runLog.unsafeRun
2                                                   //> res14: Vector[Boolean] = Vector(true)
3  tick.interruptWhen(time.sleep[Task](5.seconds) ++ Stream.emit(true))
4     .run.unsafeRun                                //> time:1002078506 nanoseconds
5                                                   //| time:2005144318 nanoseconds
6                                                   //| time:3004049135 nanoseconds
7                                                   //| time:4002963861 nanoseconds
8                                                   //| time:5000088103 nanoseconds
时间: 2024-08-29 20:32:40

Scalaz(57)- scalaz-stream: fs2-多线程编程,fs2 concurrency的相关文章

Scalaz(54)- scalaz-stream: 函数式多线程编程模式-Free Streaming Programming Model

长久以来,函数式编程模式都被认为是一种学术研究用或教学实验用的编程模式.直到近几年由于大数据和多核CPU的兴起造成了函数式编程模式在一些实际大型应用中的出现,这才逐渐改变了人们对函数式编程无用论的观点.通过一段时间对函数式编程方法的学习,我们了解到Free Monad的算式/算法关注分离(separation of concern)可以是一种很实用的函数式编程模式.用Free Monad编写的程序容易理解并具备良好的可维护性.scalaz-stream的流程控制和多线程运算模式可以实现程序的安全

多线程编程1

参考资料: http://blog.csdn.net/JXH_123/article/details/23450031                             秒杀多线程系列 http://www.baidu.com/index.php?tn=utf8kb_oem_dg&addresssearch=1#wd=C%2B%2B%E5%BE%AA%E7%8E%AF%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97&ie=utf-8&tn=utf8kb

你必须掌握的多线程编程

1.多线程编程必备知识 1.1 进程与线程的概念 当我们打开一个应用程序后,操作系统就会为该应用程序分配一个进程ID,例如打开QQ,你将在任务管理器的进程选项卡看到QQ.exe进程,如下图: 进程可以理解为一块包含了某些资源的内存区域,操作系统通过进程这一方式把它的工作划分为不同的单元.一个应用程序可以对应于多个进程. 线程是进程中的独立执行单元,对于操作系统而言,它通过调度线程来使应用程序工作,一个进程中至少包含一个线程,我们把该线程成为主线程.线程与进程之间的关系可以理解为:线程是进程的执行

转载自~浮云比翼:Step by Step:Linux C多线程编程入门(基本API及多线程的同步与互斥)

Step by Step:Linux C多线程编程入门(基本API及多线程的同步与互斥) 介绍:什么是线程,线程的优点是什么 线程在Unix系统下,通常被称为轻量级的进程,线程虽然不是进程,但却可以看作是Unix进程的表亲,同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等.但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage). 一

C语言多线程编程

原文:C语言多线程编程 注:本文内容来源于互联网,感谢作者整理! Windows的多线程编程 c语言 在Windows的多线程编程中,创建线程的函数主要有CreateThread和_beginthread(及_beginthreadex). CreateThread 和 ExitThread    使用API函数CreateThread创建线程时,其中的线程函数原型:  DWORD WINAPI ThreadProc(LPVOID lpParameter);在线程函数返回后,其返回值用作调用Ex

【转】Windows的多线程编程,C/C++

在Windows的多线程编程中,创建线程的函数主要有CreateThread和_beginthread(及_beginthreadex). CreateThread 和 ExitThread    使用API函数CreateThread创建线程时,其中的线程函数原型:  DWORD WINAPI ThreadProc(LPVOID lpParameter);在线程函数返回后,其返回值用作调用ExitThread函数的参数(由系统隐式调用).可以使用GetExitCodeThread函数获得该线程

iOS有三种多线程编程的技术

1.NSThread 2.NSOperationQueue 3.GCD Thread 是这三种范式里面相对轻量级的,但也是使用起来最负责的,你需要自己管理thread的生命周期,线程之间的同步.线程共享同一应用程序的部分内存空间, 它们拥有对数据相同的访问权限.你得协调多个线程对同一数据的访问,一般做法是在访问之前加锁,这会导致一定的性能开销.在 iOS 中我们可以使用多种形式的 thread: Cocoa threads: 使用NSThread 或直接从 NSObject 的类方法 perfo

iOS多线程编程技术之NSThread、Cocoa NSOperation、GCD

简介iOS有三种多线程编程的技术,分别是:(一)NSThread(二)Cocoa NSOperation(三)GCD(全称:Grand Central Dispatch) 这三种编程方式从上到下,抽象度层次是从低到高的,抽象度越高的使用越简单,也是Apple最推荐使用的. 三种方式的优缺点介绍:1)NSThread:优点:NSThread 比其他两个轻量级缺点:需要自己管理线程的生命周期,线程同步.线程同步对数据的加锁会有一定的系统开销 NSThread实现的技术有下面三种:一般使用cocoa

iOS多线程编程Part 2/3 - NSOperation

多线程编程Part 1介绍了NSThread以及NSRunLoop,这篇Blog介绍另一种并发编程技术:NSOPeration. NSOperation & NSOperationQueue 从头文件NSOperation.h来看接口是非常的简洁,NSOperation本身是一个抽象类,定义了一个要执行的工作,NSOperationQueue是一个工作队列,当工作加入到队列后,NSOperationQueue会自动按照优先顺序及工作的从属依赖关系(如果有的话)组织执行. NSOperation是

iOS多线程编程的几种方式

一.线程概述 有些程序是一条直线,起点到终点——如简单的hello world,运行打印完,它的生命周期便结束了,像昙花一现. 有些程序是一个圆,知道循环将它切断——像操作系统,一直运行,直到你关机. 一个运行着的程序就是一个进程或者叫做一个任务,一个进程至少包含一个线程,线程就是程序的执行流. Mac和IOS中的程序启动,创建好一个进程的同时,一个线程便开始运作,这个线程叫做主线程.主线程在程序中的位置和其他线程不同,它是其他线程最终的父线程,且所有的界面的显示操作即AppKit或UIKit的