共享内存和通信两种并发模式原理探究
并发理解
- 人类发明计算机编程的本质目的是为了什么呢?毫无疑问是为了解决人类社会中的各种负责业务场景问题。ok,有了这个出发点,那么想象一下,比如你既可以一心一意只做一件事,你也可以同时做多件事,比如,你计划今天上午计划就是看足球比赛,ok,你今天的工作就是串行的,单进程的,你只需要完成一件事。但是不巧呢,你妈妈说让你帮她切肉,你妈妈上午要出门有点事,同时不巧呢,你老婆说她上午也要出门,让你帮着打扫家里卫生,这时你今天就要同时做三件事,看比赛,切肉,打扫卫生。这时你的场景应该就是看比赛,看一会暂停去,打扫一下卫生,或者去切肉,切累了去看会继续去看比赛,就这样,上午把事情都做完了。
- 程序在运行一个进程事,会有自己的调用栈和堆,还有一个完成的上下文,而cpu在并发调度多个进程时就是会有多个调用栈和堆,多个上下文,每个进程只有在获取cpu调用的时间段时,才能回复这个进程的上下文继续执行,否则就会保存进程的山下文。如果不理解就可以把上边的你当作一个单核的cpu,比赛,切肉,和打扫卫生三件事,哪件事获取了你的时间段,它才会继续下去,否则就会以一种状态保存下来的形式暂停下来。
- 如果你的计算机是双核的,那就相当于你叫了一个哥们来帮你一起做这三件事,他切肉,你打扫卫生,事情会更快的结束,计算机也是这样,由原来的多个进程等待一个cpu的时间段,变成多个进程等待两个cpu的时间段,只要进程获取任一个cpu的时间段就可以继续执行
并发优势
- 由上文我们就可以理解并发的优势
- 更能客观题表现实际现实问题的模型
- 更能充分利用cpu的核心优势
- 更高效地解决问题
并发关键词
- 进程(或者线程/协程)
- cpu时间段
- 如果多个进程之间在业务逻辑上有联系,那么就会涉及到的关键词就是:关键资源使用权的分配
并发案例
- 现在有一个银行,银行里有一个账户,账号对应一张卡,通过这张卡可以对其账户进行存款,取款,查询操作。这个业务模型是单进程的
//账户先抽象出一层接口
type Account interface {
Withdraw(uint)
Deposit(uint)
Balance() int
}
// 银行类和方法
type Bank struct {
account Account //银行里有一个账户
}
func NewBank(account Account) *Bank {
return &Bank{account: account}
}
func (bank *Bank) Withdraw(amount uint, actor_name string) {
fmt.Println("[-]", amount, actor_name)
bank.account.Withdraw(amount)
}
func (bank *Bank) Deposit(amount uint, actor_name string) {
fmt.Println("[+]", amount, actor_name)
bank.account.Deposit(amount)
}
func (bank *Bank) Balance() int {
return bank.account.Balance()
}
//实现Account接口
ype SimpleAccount struct{
balance int
}
func NewSimpleAccount(balance int) *SimpleAccount {
return &SimpleAccount{balance: balance}
}
func (acc *SimpleAccount) Deposit(amount uint) {
acc.setBalance(acc.balance + int(amount))
}
func (acc *SimpleAccount) Withdraw(amount uint) {
if acc.balance >= int(amount) {
acc.setBalance(acc.balance - int(amount))
} else {
panic("杰克穷死")
}
}
func (acc *SimpleAccount) Balance() int {
return acc.balance
}
func (acc *SimpleAccount) setBalance(balance int) {
acc.add_some_latency() //关键:增加一个延时函数,方便演示
acc.balance = balance
}
func (acc *SimpleAccount) add_some_latency() {
<-time.After(time.Duration(rand.Intn(100)) * time.Millisecond)
}
//主函数调用
func main() {
balance := 80
b := NewBank(NewSimpleAccount(balance))
fmt.Println("初始化余额", b.Balance())
b.Withdraw(30, "马伊琍")
fmt.Println("-----------------")
fmt.Println("剩余余额", b.Balance())
}
//结果
初始化余额 80
[-] 30 马伊琍
-----------------
剩余余额 50
- 上边是一个账户对应一张卡,但是现实中,一个账户会有多个附属卡,这些卡都可以对账户进行操作,当多个卡同时对一个账户进行操作时就有可能出现问题:当一张卡A要对账户取钱,这时A先从账户中查询出钱80,然后减去要取走的钱30,最有将减后的结果写入账户,不巧的是同时有一张卡B也要取钱,在A还没有将减后的结果50写入账户之前,也从账户中查询处理总账的钱80,此时钱还没有改变,然后B也做同样的操作,即用总账减去取走的钱10,然后将结果70写入账户,如果A此时已经完成了修改写入操作,此时结果是50,但B还是将70写入了账户,这时账户的最终结果是70,而业务是实际正确结果应该是40,所以就有问题,代码如下
func main() {
balance := 80
b := NewBank(NewSimpleAccount(balance))
fmt.Println("初始化余额", b.Balance())
done := make(chan bool)
go func() { b.Withdraw(30, "马伊琍"); done <- true }()
go func() { b.Withdraw(10, "姚笛"); done <- true }()
//等待 goroutine 执行完成
<-done
<-done
fmt.Println("-----------------")
fmt.Println("剩余余额", b.Balance())
}
//结果
初始化余额 80
[-] 30 马伊琍
[-] 10 姚笛
-----------------
剩余余额 70
- 所以上边并发问题就要另想办法解决。
并发模型
- 什么是并发模型?
- 个人理解并发模型其实就是前辈再解决并发问题后的经验积累的基础上对不同问题模型的不同处理方式的一种总结。
- 我们直到并发的本质是解决如果更合理地去分配cpu的时间段,那么实际业务中,不同的场景cpu的时间段的分配算法也是有所差异。所以并发模型的本质就是合理分配cpu时间段和关键内存资源的不同算法。
- 并发模型分类(并发模型较多,本文只分析以下两种)
- 共享内存模式
- 通信模式
共享内存模式
- 共享内存模式理解
- 这里举一个不太雅但很形象的例子:比如现在多个人去一个洗手间上大号,不巧呢洗手间只有一个坑位。这时呢没办法,只能是一个人进去,关上门,告诉其他人坑位使用中,其他人只能等待,等这个人问题解决完了,打开门,走人,下一个人进去,关门解决问题。
- 上边多个人可以理解为多个进程,坑位可以理解为关键资源,上边问题之所以可以有序解决还有一个关键东西就是那个门。门一旦锁住,后边的人只能等待。
- 借助以上例子我们就可以理解共享内存模式本质就是:当多个进程需要使用关键资源时,那么将关键资源加上排他锁,哪个进程开始使用资源,就将资源锁住,不允许其他进程使用这块资源,如果有其他进程想使用这块进程,那它就只能等待前一个进程将自己的问题处理完,将资源的使用权释放,下一个进程才能使用,同时下一个进程开始使用时也会先给资源加锁。
- 用共享内存模式解决上边问题
- 要解决上边问题,那么核心逻辑就是要实现每个卡在操作账户时要先锁定账户这个核心资源,其他进程来操作这个账户时要等待上一个进程释放账户的使用权后,才能去获取账户的使用权去操作。所以应该给账户扩展加锁,解锁的功能,所以我们在上边简单账户的基础上做扩展。
type LockingAccount struct {
lock sync.Mutex //关于锁的扩展
account *SimpleAccount //继承
}
//封装一下 SimpleAccount
func NewLockingAccount(balance int) *LockingAccount {
return &LockingAccount{account: NewSimpleAccount(balance)}
}
func (acc *LockingAccount) Deposit(amount uint) {
acc.lock.Lock()
defer acc.lock.Unlock()
acc.account.Deposit(amount)
}
func (acc *LockingAccount) Withdraw(amount uint) {
acc.lock.Lock()
defer acc.lock.Unlock()
acc.account.Withdraw(amount)
}
func (acc *LockingAccount) Balance() int {
acc.lock.Lock()
defer acc.lock.Unlock()
return acc.account.Balance()
}
//主函数调用
func main() {
balance := 80
b := NewBank(NewLockingAccount(balance))
fmt.Println("初始化余额", b.Balance())
done := make(chan bool)
go func() { b.Withdraw(30, "马伊琍"); done <- true }()
go func() { b.Withdraw(10, "姚笛"); done <- true }()
//等待 goroutine 执行完成
<-done
<-done
fmt.Println("-----------------")
fmt.Println("剩余余额", b.Balance())
}
//结果
初始化余额 80
[-] 30 马伊琍
[-] 10 姚笛
-----------------
剩余余额 40
//实际流程
________________
_马伊琍_|__姚笛__
加锁 ><
得到余额 80 |
取钱 -30 |
当前余额 50 |
... |
设置余额 50 |
解除锁 <>
|
当前余额 50
|
加锁 ><
得到余额 | 50
取钱 | -10
当前余额 | 40
| ...
设置余额 | 40
解除锁 <>
________________
剩余余额 40
- 如果对mysql比较数的人,应该会想到,innodb的事务是基于行锁实现的,行锁其实就是一个排他锁,其并发模型本质就是共享内存模式。
通信模式
- 举例理解
- 假如现在有三个人分别拿着同一个账户的不同卡ABC,卡A和卡B要去银行对账户进行取钱操作,卡c要去银行进行查询账户操作,一般的银行我们知道是有不同的业务窗口,不同的窗口,假如现在只有三个窗口即取钱窗口,存钱窗口,还有查询窗口。同时我们也知道银行都有一个叫号机,我们去银行办业务,要先去叫号机面前告诉叫号机我们要办理的业务类型,叫号机会给你一个排队号,当你拿到排队号之后,你就只能去等待。
- 上边三个人拿着三张卡对应的就是多进程,人和叫号机,叫号机和业务窗口之间的通信都是通过小纸条,即消息来实现。而这里的消息对应就是go中的channel,叫号机可以暂且理解成go中的关键字select。
- 实际go的程序执行逻辑和上边现实中银行的业务模型还是有点差异,其实际执行流程是这样的:卡A和卡B要去办理同一种业务取钱,那么他们要先建立自己的消息(channel),消息里包含了自己要办理业务的类型(取钱)和相关数据(30,10),(然后将自己的消息交给select【可以理解成叫号机】,其实就是select中的case),然后卡A卡B分别阻塞,去等待结果;当select一旦检测到有人给自己注册了消息,然后就会马上取出一个卡A的消息(假如是卡A的消息先被检测到),同时通知相应取钱业务窗口去处理卡A的取钱操作,卡B则继续阻塞等待,到取钱业务窗口完成卡A的取钱操作后,select则会去取下一条发送过来的消息(channel)。同时在select取出卡A的消息后,卡A会马上得到通知,然后不再阻塞继续执行,完成后续操作后销毁释放系统资源。
- 再来说一下卡C的执行流程,卡C是要查询账户总额,那么它就要先建立并注册自己的channel(消息),但是卡C的业务有一个特殊点就是,它要获取一个反馈结果,即其注册给select的消息还要考虑到业务窗口完成后将查询结果以消息的方式反馈回来,所以卡C的消息结构就需要channel里边再嵌套一个channel,外层的channel为了实现让叫号机(select)可以发通知给业务窗口,里边的channel则是为了让业务窗口处理完业务后将结果放进去,从而让卡C可以再后续操作中拿到业务窗口反馈的消息结果。所以从本质上来讲,卡C的业务是有两个阻塞点的。
- 在上边的过程中要注意select是相继执行的,即只有一个case执行完成后,它才会去遍历取下一个消息。
- 利用通信模式处理上边业务
package main
import (
"fmt"
"time"
"math/rand"
)
// 主函数调用
func main() {
balance:=80
b:=NewBank(NewConcurrentAccount(uint8(balance)))
fmt.Println("初始化余额", b.Balance())
donechan:=make(chan bool)
go func() {
b.Withdraw(uint8(30),"daughter")
donechan<-true
}()
go func() {
b.Withdraw(uint8(10),"son")
donechan<-true
}()
<-donechan
<-donechan
fmt.Println("________________")
fmt.Println("剩余钱",b.Balance())
}
type Account interface {
Deposit(uint82 uint8) //存钱
Withdraw(uint82 uint8) //取钱
Balance()uint8 //查看钱
}
type Bank struct {
account Account
}
func NewBank(account Account)*Bank {
return &Bank{account:account}
}
func (bank *Bank)Deposit(amount uint8,name string) {
fmt.Println("[+]",amount,name)
bank.account.Deposit(amount)
}
func (bank *Bank)Withdraw(amount uint8,name string) {
fmt.Println("[-]",amount,name)
bank.account.Withdraw(amount)
}
func (bank *Bank)Balance()uint8 {
return bank.account.Balance()
}
type SimepleAccount struct {
balance uint8
}
func NewSimepleAccount(balance uint8)*SimepleAccount {
return &SimepleAccount{balance:balance}
}
func (account *SimepleAccount)setBalance(balance uint8) {
account.add_some_latency()
account.balance=balance
}
func (account *SimepleAccount) add_some_latency() {
<-time.After(time.Duration(rand.Intn(100)) * time.Millisecond)
}
func (account *SimepleAccount)Deposit(amount uint8) {
account.setBalance(account.balance+amount)
}
func (account *SimepleAccount)Withdraw(amount uint8) {
account.setBalance(account.balance-amount)
}
func (account *SimepleAccount)Balance() uint8 {
return account.balance
}
//扩展SimpleAccount,增加通信功能
type ConcurrentAccount struct {
account *SimepleAccount
deposit chan uint8
withdraw chan uint8
balance chan chan uint8
}
func NewConcurrentAccount(amount uint8)*ConcurrentAccount {
acc:=&ConcurrentAccount{
account:NewSimepleAccount(amount),
deposit:make(chan uint8),
withdraw:make(chan uint8),
balance:make(chan chan uint8),
}
acc.listen()
return acc
}
func (account *ConcurrentAccount)Balance() uint8 {
ch:=make(chan uint8)
account.balance<-ch //第一层阻塞
return <-ch //第二层阻塞
}
func (account *ConcurrentAccount)Withdraw(amount uint8) {
account.withdraw<- amount
}
func (account *ConcurrentAccount)Deposit(amount uint8) {
account.deposit<- amount
}
func (account *ConcurrentAccount)listen() {
go func() {
for {
select { //叫号机,每一个case都是注册的消息
case amt:=<- account.deposit:
account.account.Deposit(amt)
case amt:=<-account.withdraw:
account.account.Withdraw(amt)
case ch:=<-account.balance:
ch<-account.account.Balance()
}
}
}()
}
- 要从本质理解透彻channel原理,则需要去理解unix的管道机制(简单理解,后续会去专门系统分析其原理),因为channel就是借鉴unix的管道机制实现的。
- 管道是由内核管理的一个缓冲区,相当于我们放入内存中的一个纸条。一个进程在管道的尾部写入数据,另一个进程从管道的头部读出数据。管道包括无名管道和有名管道两种,前者只能用于父进程和子进程间的通信,后者可用于运行于同一系统中的任意两个进程间的通信。下面用一个示意图来表示:
- 管道通信的特点
- 管道通讯是单向的,先进先出,即为队列结构,有固定的读端和写端。
- 数据被进程从管道读出后,在管道中该数据就不存在了。
- 当进程去读取空管道的时候,进程会阻塞。
- 当进程往满管道写数据时,进程会阻塞。
(ps:以上是本人对并发的粗浅理解,欢迎高手指点批评)
github源码:[email protected]:Frankltf/concurrency_go.git
参考:http://ju.outofmemory.cn/entry/73895
https://www.cnblogs.com/biyeymyhjob/archive/2012/11/03/2751593.html
https://blog.csdn.net/u010853261/article/details/53464053
原文地址:https://www.cnblogs.com/frankltf/p/9061407.html
时间: 2024-10-13 02:42:29