Go并发编程(四)

并发基础

多进程

多线程

基于回调的非阻塞/异步IO

协程

协程

与传统的系统级线程和进程相比,协程的最大优势在于其“轻量级”,可以轻松创建上百万个而不会导致系统资源衰竭,

而线程和进程通常最多也不能超过1万个。这也是协程也叫轻量级线程的原因。多数语言在语法层面并不直接支持协程,而是通过库的方式支持,但用库的方式支持的功能也并不完整,比如仅仅提供轻量级线程的创建、销毁与切换等能力。如果在这样的轻量级线程中调用一个同步 IO 操作,比如网络通信、本地文件读写,都会阻塞其他的并发执行轻量级线程,    从而无法真正达到轻量级线程本身期望达到的目标。

Go 语言在语言级别支持轻量级线程,叫goroutine。Go 语言标准库提供的所有系统调用操作(当然也包括所有同步 IO 操作),都会出让 CPU 给其他goroutine。这让事情变得非常简单,让轻量级线程的切换管理不依赖于系统的线程和进程,也不依赖于CPU的核心数量。

goroutine

在一个函数调用前加上go关键字,这次调用就会在一个新的goroutine中并发执行。当被调用的函数返回时,这个goroutine也自动结束了。需要注意的是,如果这个函数有返回值,那么这个返回值会被丢弃。

  1. package main
  2. import "fmt"
  3. func Add(x, y int) {
  4.     z := x + y
  5.     fmt.Println(z)
  6. }
  7. func main() {
  8.     for i := 0; i < 10; i++ {
  9.         go Add(i, i)
  10.     }
  11. }

Go程序从初始化main package并执行main()函数开始,当main()函数返回时,程序退出,且程序并不等待其他goroutine(非主goroutine)结束。  所以看不到输出结果

要让主函数等待所有goroutine退出后再返回,如何知道goroutine都退出了呢?这就引出了多个goroutine之间通信的问题。下一节我们将主要解决这个问题。

并发通信

在工程上,有两种最常见的并发通信模型:共享数据和消息。

共享数据是指多个并发单元分别保持对同一个数据的引用,实现对该数据的共享。被共享的数据可能有多种形式,比如内存数据块、磁盘文件、网络数据等。在实际工程应用中最常见的无疑是内存了,也就是常说的共享内存。

下面是共享内存的实现

  1. var count int
  2. func Count(lock *sync.Mutex) {
  3. lock.Lock()
  4. count++
  5. fmt.Println(count)
  6. lock.Unlock()
  7. }
  8. func main(){
  9. lock := &sync.Mutex{}
  10. for i := 0; i < 10; i++ {
  11. go Count(lock)
  12. }
  13. for {
  14. lock.Lock()
  15. c := count
  16. lock.Unlock()
  17. runtime.Gosched()
  18. if c > 10 {
  19. break
  20. }
  21. }
  22. }

Go语言提供的是另一种通信模型,即以消息机制而非共享内存作为通信方式。

Go语言提供的消息通信机制被称为channel,接下来我们将详细介绍channel。现在,让我们用Go语言社区的那句著名的口号来结束这一小节:“不要通过共享内存来通信,而应该通过通信来共享内存。”

channel

channel是Go语言在语言级别提供的goroutine间的通信方式。

  1. //声明一个chan
  2. var ch chan int
  3. var mch map[string]chan bool
  4. //声明并初始化一个int类型的chan
  5. chan1 := make(chan int,1)
  6. //将一个数据写入channel中
  7. chan1 <- 1
  8. getchan1 := <-chan1

从channel中取数据与写入数据

  1. chan1 := make(chan int, 1)
  2. //将1写入channel中
  3. chan1 <- 1
  4. //将一个数据从channel中读取到getchan1中
  5. getchan1 := <-chan1
  6. fmt.Println(getchan1) //输出1

select

通过调用select()函数来监控一系列的文件句柄。一旦其中一个文件句柄发生了IO动作,该select()调用就会被返回

select的用法与switch语言非常类似,由select开始一个新的选择块,每个选择条件由case语句来描述。

与switch语句可以选择任何可使用相等比较的条件相比,select有比较多的限制,其中最大的一条限制就是每个case语句里必须是一个IO操作,大致的结构如下:

  1. select {
  2.     case <-chan1:
  3.     // 如果chan1成功读到数据,则进行该case处理语句
  4.     case chan2 <- 1:
  5.     // 如果成功向chan2写入数据,则进行该case处理语句
  6.     default:
  7.     // 如果上面都没有成功,则进入default处理流程
  8. }
  1. ch := make(chan int, 1)
  2. for {
  3.     select {
  4.         case ch <- 0:
  5.         case ch <- 1:
  6.     }
  7.     i := <-ch
  8.     fmt.Println("Value received:", i)
  9. }

缓冲机制

  1. //创建一个带缓冲的channel
  2. c := make(chan int, 1024)

超时机制

在并发编程的通信过程中,最需要处理的就是超时问题,即向channel写数据时发现channel已满,或者从channel试图读取数据时发现channel为空。

如果不正确处理这些情况,很可能会导致整个goroutine锁死。

因为select的特点是只要其中一个case已经完成,程序就会继续往下执行,而不会考虑其他case的情况。 所以可以使用select来避免goroutine阻塞问题

  1. // 首先,我们实现并执行一个匿名的超时等待函数
  2. timeout := make(chan bool, 1)
  3. go func() {
  4.     time.Sleep(1e9) // 等待1秒钟
  5.     timeout <- true
  6. }()
  7. // 然后我们把timeout这个channel利用起来
  8. select {
  9.     case <-ch:
  10.     // 从ch中读取到数据
  11.     case <-timeout:
  12.     // 一直没有从ch中读取到数据,但从timeout中读取到了数据
  13. }

这样使用select机制可以避免永久等待的问题

channel的传递

  1. type PipeData struct {
  2.     value int
  3.     handler func(int) int
  4.     next chan int
  5. }
  1. func handle(queue chan *PipeData) {
  2.     for data := range queue {
  3.     data.next <- data.handler(data.value)
  4.     }
  5. }

单向channel

单向channel只能用于发送或者接收数据。

channel本身必然是同时支持读写的,否则根本没法用。假如一个channel真的只能读,那么肯定只会是空的,因为你没机会往里面写数据。同理,如果一个channel只允许写,即使写进去了,也没有丝毫意义,因为没有机会读取里面的数据。所谓的单向channel概念,其实只是对channel的一种使用限制。

我们在将一个channel变量传递到一个函数时,可以通过将其指定为单向channel变量,从

而限制该函数中可以对此channel的操作,比如只能往这个channel写,或者只能从这个channel读。

  1. var ch1 chan int // ch1是一个正常的channel,不是单向的
  2. var ch2 chan<- float64// ch2是单向channel,只用于写float64数据
  3. var ch3 <-chan int // ch3是单向channel,只用于读取int数据
  1. //单项channel初始化,ch4被转换为一个单项读channel和一个单向写channel
  2. ch4 := make(chan int)
  3. ch5 := <-chan int(ch4) // ch5就是一个单向的读取channel
  4. ch6 := chan<- int(ch4) // ch6 是一个单向的写入channel

关闭channel

  1. close(ch)

如何判断一个channel是否已经被关闭

  1. x, ok := <-ch //返回值是false则表示ch已经被关闭。

多核并行化

  1. type Vector []float64
  2. // 分配给每个CPU的计算任务
  3. func (v Vector) DoSome(i, n int, u Vector, c chan int) {
  4.     for ; i < n; i++ {
  5.     v[i] += u.Op(v[i])
  6. }
  7. c <- 1 // 发信号告诉任务管理者我已经计算完成了
  8. }
  9. const NCPU = 16 // 假设总共有16核
  10. func (v Vector) DoAll(u Vector) {
  11. c := make(chan int, NCPU) // 用于接收每个CPU的任务完成信号
  12. for i := 0; i < NCPU; i++ {
  13. go v.DoSome(i*len(v)/NCPU, (i+1)*len(v)/NCPU, u, c)
  14. }
  15. // 等待所有CPU的任务完成
  16. for i := 0; i < NCPU; i++ {
  17. <-c // 获取到一个数据,表示一个CPU计算完成了
  18. }
  19. // 到这里表示所有计算已经结束
  20. }

在Go语言升级到默认支持多CPU的某个版本之前,我们可以先通过设置环境变量

GOMAXPROCS的值来控制使用多少个CPU核心。具体操作方法是通过直接设置环境变量

GOMAXPROCS的值,或者在代码中启动goroutine之前先调用以下这个语句以设置使用16个CPU

核心:

runtime.GOMAXPROCS(16)

到底应该设置多少个CPU核心呢,其实runtime包中还提供了另外一个函数NumCPU()来获

取核心数。可以看到,Go语言其实已经感知到所有的环境信息,下一版本中完全可以利用这些

信息将goroutine调度到所有CPU核心上,从而最大化地利用服务器的多核计算能力。抛弃

GOMAXPROCS只是个时间问题。

出让时间片

  1. runtime.Gosched()

同步

同步锁

Go语言包中的sync包提供了两种锁类型:sync.Mutex和sync.RWMutex

Mutex是最简单的一种锁类型,同时也比较暴力,当一个goroutine获得了Mutex后,其他goroutine就只能乖乖等到这个goroutine释放该Mutex。RWMutex相对友好些,是经典的单写多读模型。在读锁占用的情况下,会阻止写,但不阻止读,也就是多个goroutine可同时获取读锁(调用RLock()方法;而写锁(调用Lock()方法)会阻止任何其他goroutine(无论读和写)进来,整个锁相当于由该goroutine独占。从RWMutex的实现看,RWMutex类型其实组合了Mutex:

对于这两种锁类型,任何一个Lock()或RLock()均需要保证对应有Unlock()或RUnlock()

调用与之对应,否则可能导致等待该锁的所有goroutine处于饥饿状态,甚至可能导致死锁。锁的

典型使用模式如下:

  1. var l sync.Mutex
  2. func foo() {
  3. l.Lock()
  4. defer l.Unlock()
  5. //...
  6. }

这里我们再一次见证了Go语言defer关键字带来的优雅。

全局唯一性操作

对于从全局的角度只需要运行一次的代码,比如全局初始化操作,Go语言提供了一个Once类型来保证全局的唯一性操作,具体代码如下:

  1. var a string
  2. var once sync.Once
  3. func setup() {
  4.     a = "hello, world"
  5. }
  6. func doprint() {
  7.     once.Do(setup)
  8.     print(a)
  9. }
  10. func twoprint() {
  11.     go doprint()
  12.     go doprint()
  13. }

once的Do()方法可以保证在全局范围内只调用指定的函数一次(这里指

setup()函数),而且所有其他goroutine在调用到此语句时,将会先被阻塞,直至全局唯一的

once.Do()调用结束后才继续。

来自为知笔记(Wiz)

时间: 2024-10-01 03:30:10

Go并发编程(四)的相关文章

Java 并发编程(四):如何保证对象的线程安全性

本篇来谈谈 Java 并发编程:如何保证对象的线程安全性. 01.前言 先让我吐一句肺腑之言吧,不说出来会憋出内伤的.<Java 并发编程实战>这本书太特么枯燥了,尽管它被奉为并发编程当中的经典之作,但我还是忍不住.因为第四章"对象的组合"我整整啃了两周的时间,才啃出来点肉丝. 读者朋友们见谅啊.要怪只能怪我自己的学习能力有限,真读不了这种生硬无趣的技术书.但是为了学习,为了进步,为了将来(口号喊得有点大了),只能硬着头皮上. 请随我来,我尽量写得有趣点. 02.线程安全类

Java并发编程(四):并发容器(转)

解决并发情况下的容器线程安全问题的.给多线程环境准备一个线程安全的容器对象. 线程安全的容器对象: Vector, Hashtable.线程安全容器对象,都是使用 synchronized 方法实现的. concurrent 包中的同步容器,大多数是使用系统底层技术实现的线程安全.类似 native. Java8 中使用 CAS. 1.Map/Set 1.1 ConcurrentHashMap/ConcurrentHashSet 底层哈希实现的同步 Map(Set).效率高,线程安全.使用系统底

漫谈并发编程(四):终结任务

使用状态变量来终结任务 有时我们可以使用一个状态变量(如布尔值)来终结任务的执行,这种方式非常平和,且提供给你机会在任务终止前做一些操作.如: public class StateStopTask implements Runnable{ private static volatile boolean isCancled = false; public void run() { while(true) { if(isCancled == true) { System.out.println("S

并发编程(四):ThreadLocal从源码分析总结到内存泄漏

一.目录 1.ThreadLocal是什么?有什么用? 2.ThreadLocal源码简要总结? 3.ThreadLocal为什么会导致内存泄漏? 二.ThreadLocal是什么?有什么用? 引入话题:在并发条件下,如何正确获得共享数据?举例:假设有多个用户需要获取用户信息,一个线程对应一个用户.在mybatis中,session用于操作数据库,那么设置.获取操作分别是session.set().session.get(),如何保证每个线程都能正确操作达到想要的结果? /** * 回顾sync

并发编程实践四:实现正确和高效的锁

你是否觉得锁是一种很神奇的东西,在并发编程中,你只需要将你的代码加上锁,就能保证代码是线程安全的(当然现实和感觉有很大差别,代码的线程安全是非常复杂的),那么,这些都是怎么做到的呢?当存在大量线程同时竞争锁时,竞争失败的锁会怎么做呢?锁又是怎么保证这一切高效的执行的呢?这篇文章将为你回答这些问题,首先我将介绍怎样实现一个正确的锁,然后介绍高效的锁应该具备的条件,最后将介绍两种常用的队列锁算法:CLH锁和MCS锁. 文中将用到一些原子变量的特性,你可以将原子变量看作加强版的volatile变量,具

python 闯关之路四(下)(并发编程与数据库编程) 并发编程重点

并发编程重点: 1 2 3 4 5 6 7 并发编程:线程.进程.队列.IO多路模型 操作系统工作原理介绍.线程.进程演化史.特点.区别.互斥锁.信号. 事件.join.GIL.进程间通信.管道.队列. 生产者消息者模型.异步模型.IO多路复用模型.select\poll\epoll 高性 能IO模型源码实例解析.高并发FTP server开发 1.请写一个包含10个线程的程序,主线程必须等待每一个子线程执行完成之后才结束执行,每一个子线程执行的时候都需要打印当前线程名.当前活跃线程数量: 1

【专家坐堂】四种并发编程模型简介

本文来自网易云社区 概述 并发往往和并行一起被提及,但是我们应该明确的是"并发"不等同于"并行" ?       并发 :同一时间 对待 多件事情 (逻辑层面) ?       并行 :同一时间 做(执行) 多件事情 (物理层面) 并发可以构造出一种问题解决方法,该方法能够被用于并行化,从而让原本只能串行处理的事务并行化,更好地发挥出当前多核CPU,分布式集群的能力. 但是,并发编程和人们正常的思维方式是不一样的,因此才有了各种编程模型的抽象来帮助我们更方便,更不容

python 学习_第四模块 并发编程(多线程)

python 学习_第四模块 并发编程(多线程) 1  开启线程方式 from threading import Thread import time def say(name): time.sleep(2) print("%s hello"%name) if __name__ =="__main__": t = Thread(target=say,args=("alex",)) t.start() print("主线程")

~~并发编程(四):进程方法~~

进击のpython 并发编程--进程方法 开启了进程之后,就要学习一下对应的方法 本小节对进程的一些方法进行简单的理解: 1.Process的join方法 2.Process的terminate与is_alive Process的join方法 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态) ? timeout是可选的超时时间 首先,系统在运行的过程中可能会出现这样的情况: 1.主进程和子进程彼此独立,在都完成运行之后,由系统进行统一回收