迭代执行:
如果你正在倒弄一些数字,并且手头上的问题可以拆分出同样性质的部分,那么 dispatch_apply 会很有用。
如果你的代码看起来是这样的:
for (size_t y = 0; y < height; ++y) { for (size_t x = 0; x < width; ++x) { // Do something with x and y here } }
对它做个小改动,让它运行的更快:
dispatch_apply(height, dispatch_get_global_queue(0, 0), ^(size_t y) { for (size_t x = 0; x < width; x += 2) { // Do something with x and y here } });
代码运行良好的程度取决于你在循环内部做的操作。
block 中运行的工作必须是非常重要的,否则这个头部信息就显得过于繁重了。除非代码受到计算带宽的约束,每个工作单元为了很好适应缓存大小而读写的内存都是临界的。这会对性能会带来显著的影响。受到临界区约束的代码可能不会很好地运行。使用 dispatch_apply 可能会对性能提升有所帮助,但是性能优化本身就是个很复杂的主题。维基百科上有一篇关于 Memory-bound function 的文章,内存访问速度在 L2,L3 和主存上变化很显著。当你的数据访问模式与缓存大小不匹配时,10倍性能下降的情况并不少见。
组:
很多时候,你发现需要将异步的 block 组合起来去完成一个给定的任务。这些任务中甚至有些是并行的。现在,如果你想要在这些任务都执行完成后运行一些代码,"groups" 可以完成这项任务。看这里的例子:
dispatch_group_t group = dispatch_group_create(); dispatch_queue_t queue = dispatch_get_global_queue(0, 0); dispatch_group_async(group, queue, ^(){ // Do something that takes a while [self doSomeFoo]; dispatch_group_async(group, dispatch_get_main_queue(), ^(){ self.foo = 42; }); }); dispatch_group_async(group, queue, ^(){ // Do something else that takes a while [self doSomeBar]; dispatch_group_async(group, dispatch_get_main_queue(), ^(){ self.bar = 1; }); }); // This block will run once everything above is done: dispatch_group_notify(group, dispatch_get_main_queue(), ^(){ NSLog(@"foo: %d", self.foo); NSLog(@"bar: %d", self.bar); });
需要注意的重要事情是,所有的这些都是非阻塞的。我们从未让当前的线程一直等待直到别的任务做完。恰恰相反,我们只是简单的将多个 block 放入队列。由于代码不会阻塞,所以就不会产生死锁。同时需要注意的是,在这个小并且简单的例子中,我们是怎么在不同的队列间进切换的。
对现有API使用 dispatch_group_t:
一旦你将 groups 作为你的工具箱中的一部分,你可能会怀疑为什么大多数的异步API不把 dispatch_group_t 作为一个可选参数。这没有什么无法接受的理由,仅仅是因为自己添加这个功能太简单了,但是你还是要小心以确保自己使用 groups 的代码是成对出现的。
举例来说,我们可以给 Core Data 的 -performBlock: API 函数添加上 groups,就像这样:
- (void)withGroup:(dispatch_group_t)group performBlock:(dispatch_block_t)block{ if (group == NULL) { [self performBlock:block]; } else { dispatch_group_enter(group); [self performBlock:^(){ block(); dispatch_group_leave(group); }]; } }
当 Core Data 上的一系列操作(很可能和其他的代码组合起来)完成以后,我们可以使用 dispatch_group_notify 来运行一个 block 。
很明显,我们可以给 NSURLConnection 做同样的事情:
+ (void)withGroup:(dispatch_group_t)group sendAsynchronousRequest:(NSURLRequest *)request queue:(NSOperationQueue *)queue completionHandler:(void (^)(NSURLResponse*, NSData*, NSError*))handler{ if (group == NULL) { [self sendAsynchronousRequest:request queue:queue completionHandler:handler]; } else { dispatch_group_enter(group); [self sendAsynchronousRequest:request queue:queue completionHandler:^(NSURLResponse *response, NSData *data, NSError *error){ handler(response, data, error); dispatch_group_leave(group); }]; } }
为了能正常工作,你需要确保:
dispatch_group_enter() 必须要在 dispatch_group_leave()之前运行。
dispatch_group_enter() 和 dispatch_group_leave() 一直是成对出现的(就算有错误产生时)。
事件源:
GCD 有一个较少人知道的特性:事件源 dispatch_source_t。
跟 GCD 一样,它也是很底层的东西。当你需要用到它时,它会变得极其有用。它的一些使用是秘传招数,我们将会接触到一部分的使用。但是大部分事件源在 iOS 平台不是很有用,因为在 iOS 平台有诸多限制,你无法启动进程(因此就没有必要监视进程),也不能在你的 app bundle 之外写数据(因此也就没有必要去监视文件)等等。
GCD 事件源是以极其资源高效的方式实现的。
监视进程:
如果一些进程正在运行而你想知道他们什么时候存在,GCD 能够做到这些。你也可以使用 GCD 来检测进程什么时候分叉,也就是产生子进程或者传送给了进程的一个信号(比如 SIGTERM)
NSRunningApplication *mail = [NSRunningApplication runningApplicationsWithBundleIdentifier:@"com.apple.mail"]; if (mail == nil) { return; } pid_t const pid = mail.processIdentifier; self.source = dispatch_source_create(DISPATCH_SOURCE_TYPE_PROC, pid, DISPATCH_PROC_EXIT, DISPATCH_TARGET_QUEUE_DEFAULT); dispatch_source_set_event_handler(self.source, ^(){ NSLog(@"Mail quit."); }); dispatch_resume(self.source);
当 Mail.app 退出的时候,这个程序会打印出 Mail quit.。
注意:在所有的事件源被传递到你的事件处理器之前,必须调用 dispatch_resume()。
监视文件:
这种可能性是无穷的。你能直接监视一个文件的改变,并且当改变发生时事件源的事件处理将会被调用。
你也可以使用它来监视文件夹,比如创建一个 watch folder:
NSURL *directoryURL; // assume this is set to a directory int const fd = open([[directoryURL path] fileSystemRepresentation], O_EVTONLY); if (fd < 0) { char buffer[80]; strerror_r(errno, buffer, sizeof(buffer)); NSLog(@"Unable to open \"%@\": %s (%d)", [directoryURL path], buffer, errno); return; } dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_VNODE, fd, DISPATCH_VNODE_WRITE | DISPATCH_VNODE_DELETE, DISPATCH_TARGET_QUEUE_DEFAULT); dispatch_source_set_event_handler(source, ^(){ unsigned long const data = dispatch_source_get_data(source); if (data & DISPATCH_VNODE_WRITE) { NSLog(@"The directory changed."); } if (data & DISPATCH_VNODE_DELETE) { NSLog(@"The directory has been deleted."); } }); dispatch_source_set_cancel_handler(source, ^(){ close(fd); }); self.source = source; dispatch_resume(self.source);
你应该总是添加 DISPATCH_VNODE_DELETE 去检测文件或者文件夹是否已经被删除——然后就停止监听。
定时器:
大多数情况下,对于定时事件你会选择 NSTimer。定时器的GCD版本是底层的,它会给你更多控制权——但要小心使用。
需要特别重点指出的是,为了让 iOS 节省电量,需要为 GCD 的定时器接口指定一个低的余地值(leeway value)。如果你不必要的指定了一个低余地值,将会浪费更多的电量。
这里我们设定了一个5秒的定时器,并允许有十分之一秒的余地值:
dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, DISPATCH_TARGET_QUEUE_DEFAULT); dispatch_source_set_event_handler(source, ^(){ NSLog(@"Time flies."); }); dispatch_time_t start dispatch_source_set_timer(source, DISPATCH_TIME_NOW, 5ull * NSEC_PER_SEC, 100ull * NSEC_PER_MSEC); self.source = source; dispatch_resume(self.source);
取消:
所有的事件源都允许你添加一个 cancel handler 。这对清理你为事件源创建的任何资源都是很有帮助的,比如关闭文件描述符。GCD 保证在 cancel handle 调用前,所有的事件处理都已经完成调用。
参考上面的监视文件例子中对 dispatch_source_set_cancel_handler() 的使用。
输入输出:
写出能够在繁重的 I/O 处理情况下运行良好的代码是一件非常棘手的事情。GCD 有一些能够帮上忙的地方。不会涉及太多的细节,我们只简单的分析下问题是什么,GCD 是怎么处理的。
习惯上,当你从一个网络套接字中读取数据时,你要么做一个阻塞的读操作,也就是让你个线程一直等待直到数据变得可用,或者是做反复的轮询。这两种方法都是很浪费资源并且无法度量。然而,kqueue 通过当数据变得可用时传递一个事件解决了轮询的问题,GCD 也采用了同样的方法,但是更加优雅。当向套接字写数据时,同样的问题也存在,这时你要么做阻塞的写操作,要么等待套接字直到能够接收数据。
在处理 I/O 时,还有一个问题就是数据是以数据块的形式到达的。当从网络中读取数据时,依据 MTU(]最大传输单元),数据块典型的大小是在1.5K字节左右。这使得数据块内可以是任何内容。一旦数据到达,你通常只是对跨多个数据块的内容感兴趣。而且通常你会在一个大的缓冲区里将数据组合起来然后再进行处理。假设(人为例子)你收到了这样8个数据块:
0: HTTP/1.1 200 OK\r\nDate: Mon, 23 May 2005 22:38 1: :34 GMT\r\nServer: Apache/1.3.3.7 (Unix) (Red-H 2: at/Linux)\r\nLast-Modified: Wed, 08 Jan 2003 23 3: :11:55 GMT\r\nEtag: "3f80f-1b6-3e1cb03b"\r\nCon 4: tent-Type: text/html; charset=UTF-8\r\nContent- 5: Length: 131\r\nConnection: close\r\n\r\n<html>\r 6: \n<head>\r\n <title>An Example Page</title>\r\n 7: </head>\r\n<body>\r\n Hello World, this is a ve
如果你是在寻找 HTTP 的头部,将所有数据块组合成一个大的缓冲区并且从中查找 \r\n\r\n 是非常简单的。但是这样做,你会大量地复制这些数据。大量 旧的 C 语言 API 存在的另一个问题就是,缓冲区没有所有权的概念,所以函数不得不将数据再次拷贝到自己的缓冲区中——又一次的拷贝。拷贝数据操作看起来是无关紧要的,但是当你正在做大量的 I/O
操作的时候,你会在 profiling tool(Instruments) 中看到这些拷贝操作大量出现。即使你仅仅每个内存区域拷贝一次,你还是使用了两倍的存储带宽并且占用了两倍的内存缓存。
GCD和缓冲区:
最直接了当的方法是使用数据缓冲区。GCD 有一个 dispatch_data_t 类型,在某种程度上和 Objective-C 的 NSData 类型很相似。但是它能做别的事情,而且更通用。
注意,dispatch_data_t 可以被 retained 和 releaseed ,并且 dispatch_data_t 拥有它持有的对象。
这看起来无关紧要,但是我们必须记住 GCD 只是纯 C 的 API,并且不能使用Objective-C。通常的做法是创建一个缓冲区,这个缓冲区要么是基于栈的,要么是 malloc 操作分配的内存区域 —— 这些都没有所有权。
dispatch_data_t 的一个相当独特的属性是它可以基于零碎的内存区域。这解决了我们刚提到的组合内存的问题。当你要将两个数据对象连接起来时:
dispatch_data_t a; // Assume this hold some valid data dispatch_data_t b; // Assume this hold some valid data dispatch_data_t c = dispatch_data_create_concat(a, b);
数据对象 c 并不会将 a 和 b 拷贝到一个单独的,更大的内存区域里去。相反,它只是简单地 retain 了 a 和 b。你可以使用 dispatch_data_apply 来遍历对象 c 持有的内存区域:
dispatch_data_apply(c, ^bool(dispatch_data_t region, size_t offset, const void *buffer, size_t size) { fprintf(stderr, "region with offset %zu, size %zu\n", offset, size); return true; });
类似的,你可以使用 dispatch_data_create_subrange 来创建一个不做任何拷贝操作的子区域。
读和写:
在 GCD 的核心里,调度 I/O(Dispatch I/O) 与所谓的通道有关。调度 I/O 通道提供了一种与从文件描述符中读写不同的方式。创建这样一个通道最基本的方式就是调用:
dispatch_io_t dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd, dispatch_queue_t queue, void (^cleanup_handler)(int error));
这将返回一个持有文件描述符的创建好的通道。在你通过它创建了通道之后,你不准以任何方式修改这个文件描述符。
有两种从根本上不同类型的通道:流和随机存取。如果你打开了硬盘上的一个文件,你可以使用它来创建一个随机存取的通道(因为这样的文件描述符是可寻址的)。如果你打开了一个套接字,你可以创建一个流通道。
如果你想要为一个文件创建一个通道,你最好使用需要一个路径参数的 dispatch_io_create_with_path ,并且让 GCD 来打开这个文件。这是有益的,因为GCD会延迟打开这个文件以限制相同时间内同时打开的文件数量。
类似通常的 read(2),write(2) 和 close(2) 的操作,GCD 提供了 dispatch_io_read,dispatch_io_write 和 dispatch_io_close。无论何时数据读完或者写完,读写操作调用一个回调 block 来结束。这些都是以非阻塞,异步 I/O 的形式高效实现的。
在这你得不到所有的细节,但是这里会提供一个创建TCP服务端的例子:
首先我们创建一个监听套接字,并且设置一个接受连接的事件源:
_isolation = dispatch_queue_create([[self description] UTF8String], 0); _nativeSocket = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); struct sockaddr_in sin = {}; sin.sin_len = sizeof(sin); sin.sin_family = AF_INET6; sin.sin_port = htons(port); sin.sin_addr.s_addr= INADDR_ANY; int err = bind(result.nativeSocket, (struct sockaddr *) &sin, sizeof(sin)); NSCAssert(0 <= err, @""); _eventSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, _nativeSocket, 0, _isolation); dispatch_source_set_event_handler(result.eventSource, ^{ acceptConnection(_nativeSocket); });
当接受了连接,我们创建一个I/O通道:
typedef union socketAddress { struct sockaddr sa; struct sockaddr_in sin; struct sockaddr_in6 sin6; } socketAddressUnion; socketAddressUnion rsa; // remote socket address socklen_t len = sizeof(rsa); int native = accept(nativeSocket, &rsa.sa, &len); if (native == -1) { // Error. Ignore. return nil; } _remoteAddress = rsa; _isolation = dispatch_queue_create([[self description] UTF8String], 0); _channel = dispatch_io_create(DISPATCH_IO_STREAM, native, _isolation, ^(int error) { NSLog(@"An error occured while listening on socket: %d", error); }); //dispatch_io_set_high_water(_channel, 8 * 1024); dispatch_io_set_low_water(_channel, 1); dispatch_io_set_interval(_channel, NSEC_PER_MSEC * 10, DISPATCH_IO_STRICT_INTERVAL); socketAddressUnion lsa; // remote socket address socklen_t len = sizeof(rsa); getsockname(native, &lsa.sa, &len); _localAddress = lsa;
如果我们想要设置 SO_KEEPALIVE(如果使用了HTTP的keep-alive),我们需要在调用 dispatch_io_create 前这么做。创建好 I/O 通道后,我们可以设置读取处理程序:
dispatch_io_read(_channel, 0, SIZE_MAX, _isolation, ^(bool done, dispatch_data_t data, int error){ if (data != NULL) { if (_data == NULL) { _data = data; } else { _data = dispatch_data_create_concat(_data, data); } [self processData]; } });
如果所有你想做的只是读取或者写入一个文件,GCD 提供了两个方便的封装: dispatch_read 和 dispatch_write 。你需要传递给 dispatch_read 一个文件路径和一个在所有数据块读取后调用的 block。类似的,dispatch_write 需要一个文件路径和一个被写入的 dispatch_data_t 对象。
基准测试:
在 GCD 的一个不起眼的角落,你会发现一个适合优化代码的灵巧小工具:
uint64_t dispatch_benchmark(size_t count, void (^block)(void));
dispatch_benchmark函数是(Grand Central Dispatch) 的一部分,但是这个方法并没有被公开声明,所以必须要自己声明。把这个声明放到你的代码中,你就能够测量给定的代码执行的平均的纳秒数。例子如下:
// 声明GCD中dispatch_benchmark函数原型 extern uint64_t dispatch_benchmark(size_t count, void (^block)(void)); void test1(void); void test2(void); int main(int argc, const char * argv[]) { @autoreleasepool { test1(); test2(); } return 0; } void test1(void) { // 执行次数 size_t count = 1000; // 调用dispatch_benchmark函数,传入执行次数和待测试的代码块 NSUInteger length = 1000000; uint64_t time = dispatch_benchmark(count, ^{ @autoreleasepool { NSMutableArray *mutableArray = [NSMutableArray array]; for (NSUInteger i = 0; i < length; i++) { [mutableArray addObject:@(i)]; } } }); NSLog(@"[[NSMutableArray array] addObject:] Avg. Runtime: %llu ns", time); } void test2(void) { // 执行次数 size_t count = 1000; NSUInteger length = 1000000; // 调用dispatch_benchmark函数,传入执行次数和待测试的代码块 uint64_t time = dispatch_benchmark(count, ^{ @autoreleasepool { NSMutableArray *mutableArray = [NSMutableArray arrayWithCapacity:length]; for (NSUInteger i = 0; i < length; i++) { [mutableArray addObject:@(i)]; } } }); NSLog(@"[[NSMutableArray arrayWithCapacity:] addObject:] Avg. Runtime: %llu ns", time); }
在我的机器上输出了:
[[NSMutableArray array] addObject:] Avg. Runtime: 31488580 ns
[[NSMutableArray arrayWithCapacity:] addObject:] Avg. Runtime: 31989987 ns
也就是说两种方法添加1000个对象到 NSMutableArray 分别消耗了31488580纳秒和31989987纳秒。
正如 dispatch_benchmark 的帮助页面指出的,测量性能并非如看起来那样不重要。尤其是当比较并发代码和非并发代码时,你需要注意特定硬件上运行的特定计算带宽和内存带宽。不同的机器会很不一样。如果代码的性能与访问临界区有关,那么我们上面提到的锁竞争问题就会有所影响。
不要把它放到发布代码中,事实上,这是无意义的,它是私有API。它只是在调试和性能分析上起作用。
原子操作:
头文件 libkern/OSAtomic.h 里有许多强大的函数,专门用来底层多线程编程。尽管它是内核头文件的一部分,它也能够在内核之外来帮助编程。
这些函数都是很底层的,并且你需要知道一些额外的事情。就算你已经这样做了,你还可能会发现一两件你不能做,或者不易做的事情。当你正在为编写高性能代码或者正在实现无锁的和无等待的算法工作时,这些函数会吸引你。
这些函数在 atomic(3) 的帮助页里全部有概述——运行 man 3 atomic 命令以得到完整的文档。你会发现里面讨论到了内存屏障。
计数器:
OSAtomicIncrement 和 OSAtomicDecrement 有一个很长的函数列表允许你以原子操作的方式去增加和减少一个整数值 —— 不必使用锁(或者队列)同时也是线程安全的。如果你需要让一个全局的计数器值增加,而这个计数器为了统计目的而由多个线程操作,使用原子操作是很有帮助的。如果你要做的仅仅是增加一个全局计数器,那么无屏障版本的 OSAtomicIncrement 是很合适的,并且当没有锁竞争时,调用它们的代价很小。
类似的,OSAtomicOr ,OSAtomicAnd,OSAtomicXor 的函数能用来进行逻辑运算,而 OSAtomicTest 可以用来设置和清除位。
比较和交换:
OSAtomicCompareAndSwap 能用来做无锁的惰性初始化,如下:
void * sharedBuffer(void){ static void * buffer; if (buffer == NULL) { void * newBuffer = calloc(1, 1024); if (!OSAtomicCompareAndSwapPtrBarrier(NULL, newBuffer, &buffer)) { free(newBuffer); } } return buffer; }
如果没有 buffer,我们会创建一个,然后原子地将其写到 buffer 中如果 buffer 为NULL。在极少的情况下,其他人在当前线程同时设置了 buffer ,我们简单地将其释放掉。因为比较和交换方法是原子的,所以它是一个线程安全的方式去惰性初始化值。NULL的检测和设置 buffer 都是以原子方式完成的。
明显的,使用 dispatch_once() 我们也可以完成类似的事情。
原子队列:
OSAtomicEnqueue() 和 OSAtomicDequeue() 可以让你以线程安全,无锁的方式实现一个LIFO队列(常见的就是栈)。对有潜在精确要求的代码来说,这会是强大的代码。
还有 OSAtomicFifoEnqueue() 和 OSAtomicFifoDequeue() 函数是为了操作FIFO队列,但这些只有在头文件中才有文档 —— 阅读他们的时候要小心。
自旋锁:
最后,OSAtomic.h 头文件定义了使用自旋锁的函数:OSSpinLock。同样的,维基百科有深入的有关自旋锁的信息。使用命令 man 3 spinlock 查看帮助页的 spinlock(3) 。当没有锁竞争时使用自旋锁代价很小。
在合适的情况下,使用自旋锁对性能优化是很有帮助的。一如既往:先测量,然后优化。不要做乐观的优化。
下面是 OSSpinLock 的一个例子:
@interface MyTableViewCell : UITableViewCell @property (readonly, nonatomic, copy) NSDictionary *amountAttributes; @end @implementation MyTableViewCell{ NSDictionary *_amountAttributes; } - (NSDictionary *)amountAttributes{ if (_amountAttributes == nil) { static __weak NSDictionary *cachedAttributes = nil; static OSSpinLock lock = OS_SPINLOCK_INIT; OSSpinLockLock(&lock); _amountAttributes = cachedAttributes; if (_amountAttributes == nil) { NSMutableDictionary *attributes = [[self subtitleAttributes] mutableCopy]; attributes[NSFontAttributeName] = [UIFont fontWithName:@"ComicSans" size:36]; attributes[NSParagraphStyleAttributeName] = [NSParagraphStyle defaultParagraphStyle]; _amountAttributes = [attributes copy]; cachedAttributes = _amountAttributes; } OSSpinLockUnlock(&lock); } return _amountAttributes; }
就上面的例子而言,或许用不着这么麻烦,但它演示了一种理念。我们使用了ARC的 __weak 来确保一旦 MyTableViewCell 所有的实例都不存在, amountAttributes 会调用 dealloc 。因此在所有的实例中,我们可以持有字典的一个单独实例。
这段代码运行良好的原因是我们不太可能访问到方法最里面的部分。这是很深奥的——除非你真正需要,不然不要在你的 App 中使用它。