golang http的按序号发送,按序号接收

应用场合:比如http请求,有先后次序,需要实现:先请求(request)先发送,并且读取(response)的时候也是遵循这个规则,这个读写构成一个pair(有请求并有返回)

过来,直接上代码吧:

func (cc *ClientConn) Do(req *http.Request) (resp *http.Response, err error) {
	err = cc.Write(req) //client向http服务器发送请求
	if err != nil {
		return
	}
	return cc.Read(req) // client读取http服务器返回回来数据
}

再仔细看看Write的时候干了什么:

func (cc *ClientConn) Write(req *http.Request) (err error) {

	// Ensure ordered execution of Writes
	// 生成序号ID,保证 ID= i 比 ID>i 先执行
	id := cc.pipe.Next()
	cc.pipe.StartRequest(id) //这个是重点
	defer func() {
		cc.pipe.EndRequest(id) //当前id执行完,去触发下一个id+1的请求执行
		if err != nil {
			cc.pipe.StartResponse(id)  
			cc.pipe.EndResponse(id)
		} else {
			// Remember the pipeline id of this request
			cc.lk.Lock()
			cc.pipereq[req] = id   //结束的时候保存这个,req的id,方便后面read时候继续按序
			cc.lk.Unlock()
		}
	}()

	cc.lk.Lock() //读写锁,防止执行冲突
	//判断read/write的错误信息,和net.Conn是否关闭,后面介绍cc的具体struct结构
	if cc.re != nil { // no point sending if read-side closed or broken
		defer cc.lk.Unlock()
		return cc.re
	}
	if cc.we != nil {
		defer cc.lk.Unlock()
		return cc.we
	}
	if cc.c == nil { // connection closed by user in the meantime
		defer cc.lk.Unlock()
		return errClosed
	}
	c := cc.c
	if req.Close {
		// We write the EOF to the write-side error, because there
		// still might be some pipelined reads
		cc.we = ErrPersistEOF
	}
	cc.lk.Unlock()
        //到这里才是具体执行写请求,所以前面都是保证按序请求的步骤
	err = cc.writeReq(req, c)
	cc.lk.Lock()
	defer cc.lk.Unlock()
	if err != nil {
		cc.we = err
		return err
	}
	cc.nwritten++ //次序++

	return nil
}

再来看看cc的结构:

type ClientConn struct {
	lk              sync.Mutex // 读写锁
	c               net.Conn  // golang连接interface
	r               *bufio.Reader //bufReader
	re, we          error // read/write errors
	lastbody        io.ReadCloser //上一次ioReader
	nread, nwritten int //读和写的个数
	pipereq         map[*http.Request]uint //保存pair request和id

	pipe     textproto.Pipeline
	writeReq func(*http.Request, io.Writer) error //写数据匿名函数
}

type Pipeline struct {
	mu       sync.Mutex
	id       uint
	request  sequencer
	response sequencer
}

type sequencer struct {
	mu   sync.Mutex
	id   uint                
	wait map[uint]chan uint // 就是用这个管道来阻塞没有到次序的操作
}

按序具体是怎么实现的:

//生成id的代码,批发次序
func (p *Pipeline) Next() uint {
	p.mu.Lock()
	id := p.id
	p.id++
	p.mu.Unlock()
	return id
}

//执行StartResponse,实际是执行sequencer的Start方法
func (p *Pipeline) StartResponse(id uint) {
	p.response.Start(id)
}

func (s *sequencer) Start(id uint) {
	s.mu.Lock()
	if s.id == id { //到达当前id咯,可以执行咯,不需要阻塞
		s.mu.Unlock()
		return
	}
	c := make(chan uint) 
	if s.wait == nil {
		s.wait = make(map[uint]chan uint)
	}
	s.wait[id] = c //在map里面记录chan
	s.mu.Unlock()
	<-c //读取阻塞,等待c的写入
}

//当然是前一个id执行结束的时候,后一个id触发阻塞解开的
func (s *sequencer) End(id uint) {
	s.mu.Lock()
	if s.id != id {
		panic("out of sync")
	}
	id++ //这里指向后一个id
	s.id = id
	if s.wait == nil {
		s.wait = make(map[uint]chan uint)
	}
	c, ok := s.wait[id]
	if ok {
		delete(s.wait, id) //删除这个map里面的chan
	}
	s.mu.Unlock()
	if ok {
		c <- 1 //往这个chan里面写数据,把阻塞解开
	}
}

同理,read的过程和write过程相似。可见,golang各种锁和管道保证并发环境下的顺序执行

时间: 2024-11-04 14:40:56

golang http的按序号发送,按序号接收的相关文章

SEQ序号于ACK序号理解总结(一)

Seq:就是我们常说的序号.对于要发送的数据的第一个序号而言这个序号是通过一个算法计算得到一个初始序号(ISN)加1.至于ISN怎么计算而来这里不讨论.这里我们以wireshark序号为准.假设某时序号为1000,简单的理解就是发送方告诉接收端"我发送的数据是从第1000开始的". ACK序号:就是我们常说的确认序号.确认序号是上一次已经成功接收到数据字节序号加1.还可以理解为接收端告诉发送端下一次想接收开始序号.假设某时确认序号为1000,简单的理解就是接收方告诉发送方"我

SEQ序号与ACK序号理解总结(一)

Seq:就是我们常说的序号.对于要发送的数据的第一个序号而言这个序号是通过一个算法计算得到一个初始序号(ISN)加1.至于ISN怎么计算而来这里不讨论.这里我们以wireshark序号为准.假设某时序号为1000,简单的理解就是发送方告诉接收端"我发送的数据是从第1000开始的". ACK序号:就是我们常说的确认序号.确认序号是上一次已经成功接收到数据字节序号加1.还可以理解为接收端告诉发送端下一次想接收开始序号.假设某时确认序号为1000,简单的理解就是接收方告诉发送方"我

TCP传输中序号与确认序号的交互

本实验通过SSH远程登录服务器,然后使用Wireshark抓包分析.开头的三次握手已经省略.关于序号的交互过程,需要记住一点:TCP首部中的确认序号表示已成功收到字节,但还不包含确认序号所指的字节,希望下一次能收到确认序号所指的字节. 当在远程登录软件上键入命令时,客户端便开始了数据的发送,TCP头如下: 初始化序列号ISN = 1,这个序列号是客户端对发送数据的一个标记,以1作为起始值.根据SSH包长度计算下一次将会发送的起始序号为65.确认序号为1表示我希望下次收到起始序号为1的TCP包.

Java基础知识强化之网络编程笔记06:TCP之TCP协议发送数据 和 接收数据

1. TCP协议发送数据 和 接收数据 TCP协议接收数据:• 创建接收端的Socket对象• 监听客户端连接.返回一个对应的Socket对象• 获取输入流,读取数据显示在控制台• 释放资源 TCP协议发送数据: • 创建发送端的Socket对象• 这一步如果成功,就说明连接已经建立成功了.• 获取输出流,写数据• 释放资源 2. 代码实现: (1)发送端: 1 package cn.itcast_06; 2 3 import java.io.IOException; 4 import java

Java基础知识强化之网络编程笔记03:UDP之UDP协议发送数据 和 接收数据

1. UDP协议发送数据 和 接收数据 UDP协议发送数据: • 创建发送端的Socket对象 • 创建数据,并把数据打包 • 调用Socket对象的发送方法,发送数据包 • 释放资源  UDP协议接收数据:       • 创建接收端的Socket对象      • 创建数据包,接收数据(接收容器)      • 调用Socket对象的接收方法,接收数据包      • 解析数据包,并显示在控制台      • 释放资源 2. 代码实现 (1)首先我们先写发送端的程序,如下: 1 packag

STM32F10x_硬件I2C主从通信(轮询发送,中断接收)

Ⅰ.写在前面 关注我分享文章的朋友应该知道我在前面讲述过(软件.硬件)I2C主机控制从机EEPROM的例子.在I2C通信主机控制程序是比较常见的一种,可以说在实际项目中,很多应用都会使用到I2C通信.但在实际项目中作为I2C从机的应用相对要少的多,本文主要讲述关于[STM32F10x_硬件I2C主从通信]中STM32作为从机的例子. 在学习本问内容之前,如果对I2C协议还不太了解的朋友请先去了解一下I2C协议,或看我之前关于I2C通信的文章(我微信公众号和博客都有). 关于STM32硬件I2C作

跨应用(跨进程)发送广播和接收广播

跨应用发送和接收广播,与同应用下的情况差不多,只需要添加一个权限,以及配置一下receiver的android:process属性即可 发送广播的应用中: Java代码   Intent intent = new Intent("info.zhegui.receiver.interprocess"); sendBroadcast(intent); 注意要在manifest.xml添加接收广播的权限,这个权限是receiver自定义的 Java代码   <uses-permissi

TCP 滑动窗口(发送窗口和接收窗口)

TCP的滑动窗口主要有两个作用,一是提供TCP的可靠性,二是提供TCP的流控特性.同时滑动窗口机制还体现了TCP面向字节流的设计思路. TCP的Window是一个16bit位字段,它代表的是窗口的字节容量,也就是TCP的标准窗口最大为2^16-1=65535个字节. 另外在TCP的选项字段中还包含了一个TCP窗口扩大因子,option-kind为3,option-length为3个字节,option-data取值范围0-14.窗口扩大因子用来扩大TCP窗口,可把原来16bit的窗口,扩大为31b

ACE基本的UDP通信(二)连续发送和连续接收 1.0

这里对UDP进行了封装,封装出了一个UDPSender.一个UDPReceiver. UDPSender和UDPReceiver被封装在一个DLL中. 双方由于不需要建立连接的过程,所以一个就连续发送,一个连续接收. 由于接收不过来就会丢包,所以发送速度是接收速度的1/2来缓解接收压力. (其中一次测试,发送10001个,接收到了9860个,之所以丢包就是因为sender和receiver每次动作sleep的时间一样短,改成sender sleep(2time),receiver sleep(1