应用场合:比如http请求,有先后次序,需要实现:先请求(request)先发送,并且读取(response)的时候也是遵循这个规则,这个读写构成一个pair(有请求并有返回)
过来,直接上代码吧:
func (cc *ClientConn) Do(req *http.Request) (resp *http.Response, err error) { err = cc.Write(req) //client向http服务器发送请求 if err != nil { return } return cc.Read(req) // client读取http服务器返回回来数据 }
再仔细看看Write的时候干了什么:
func (cc *ClientConn) Write(req *http.Request) (err error) { // Ensure ordered execution of Writes // 生成序号ID,保证 ID= i 比 ID>i 先执行 id := cc.pipe.Next() cc.pipe.StartRequest(id) //这个是重点 defer func() { cc.pipe.EndRequest(id) //当前id执行完,去触发下一个id+1的请求执行 if err != nil { cc.pipe.StartResponse(id) cc.pipe.EndResponse(id) } else { // Remember the pipeline id of this request cc.lk.Lock() cc.pipereq[req] = id //结束的时候保存这个,req的id,方便后面read时候继续按序 cc.lk.Unlock() } }() cc.lk.Lock() //读写锁,防止执行冲突 //判断read/write的错误信息,和net.Conn是否关闭,后面介绍cc的具体struct结构 if cc.re != nil { // no point sending if read-side closed or broken defer cc.lk.Unlock() return cc.re } if cc.we != nil { defer cc.lk.Unlock() return cc.we } if cc.c == nil { // connection closed by user in the meantime defer cc.lk.Unlock() return errClosed } c := cc.c if req.Close { // We write the EOF to the write-side error, because there // still might be some pipelined reads cc.we = ErrPersistEOF } cc.lk.Unlock() //到这里才是具体执行写请求,所以前面都是保证按序请求的步骤 err = cc.writeReq(req, c) cc.lk.Lock() defer cc.lk.Unlock() if err != nil { cc.we = err return err } cc.nwritten++ //次序++ return nil }
再来看看cc的结构:
type ClientConn struct { lk sync.Mutex // 读写锁 c net.Conn // golang连接interface r *bufio.Reader //bufReader re, we error // read/write errors lastbody io.ReadCloser //上一次ioReader nread, nwritten int //读和写的个数 pipereq map[*http.Request]uint //保存pair request和id pipe textproto.Pipeline writeReq func(*http.Request, io.Writer) error //写数据匿名函数 } type Pipeline struct { mu sync.Mutex id uint request sequencer response sequencer } type sequencer struct { mu sync.Mutex id uint wait map[uint]chan uint // 就是用这个管道来阻塞没有到次序的操作 }
按序具体是怎么实现的:
//生成id的代码,批发次序 func (p *Pipeline) Next() uint { p.mu.Lock() id := p.id p.id++ p.mu.Unlock() return id } //执行StartResponse,实际是执行sequencer的Start方法 func (p *Pipeline) StartResponse(id uint) { p.response.Start(id) } func (s *sequencer) Start(id uint) { s.mu.Lock() if s.id == id { //到达当前id咯,可以执行咯,不需要阻塞 s.mu.Unlock() return } c := make(chan uint) if s.wait == nil { s.wait = make(map[uint]chan uint) } s.wait[id] = c //在map里面记录chan s.mu.Unlock() <-c //读取阻塞,等待c的写入 } //当然是前一个id执行结束的时候,后一个id触发阻塞解开的 func (s *sequencer) End(id uint) { s.mu.Lock() if s.id != id { panic("out of sync") } id++ //这里指向后一个id s.id = id if s.wait == nil { s.wait = make(map[uint]chan uint) } c, ok := s.wait[id] if ok { delete(s.wait, id) //删除这个map里面的chan } s.mu.Unlock() if ok { c <- 1 //往这个chan里面写数据,把阻塞解开 } }
同理,read的过程和write过程相似。可见,golang各种锁和管道保证并发环境下的顺序执行
时间: 2024-11-04 14:40:56