【Go】优雅的读取http请求或响应的数据

原文链接:https://blog.thinkeridea.com/201901/go/you_ya_de_du_qu_http_qing_qiu_huo_xiang_ying_de_shu_ju.html

http.Request.Bodyhttp.Response.Body 中读取数据方法或许很多,标准库中大多数使用 ioutil.ReadAll 方法一次读取所有数据,如果是 json 格式的数据还可以使用 json.NewDecoderio.Reader 创建一个解析器,假使使用 pprof 来分析程序总是会发现 bytes.makeSlice 分配了大量内存,且总是排行第一,今天就这个问题来说一下如何高效优雅的读取 http 中的数据。

背景介绍

我们有许多 api 服务,全部采用 json 数据格式,请求体就是整个 json 字符串,当一个请求到服务端会经过一些业务处理,然后再请求后面更多的服务,所有的服务之间都用 http 协议来通信(啊, 为啥不用 RPC,因为所有的服务都会对第三方开放,http + json 更好对接),大多数请求数据大小在 1K~4K,响应的数据在 1K~8K,早期所有的服务都使用 ioutil.ReadAll 来读取数据,随着流量增加使用 pprof 来分析发现 bytes.makeSlice 总是排在第一,并且占用了整个程序 1/10 的内存分配,我决定针对这个问题进行优化,下面是整个优化过程的记录。

pprof 分析

这里使用 https://github.com/thinkeridea/go-extend/blob/master/exnet/exhttp/expprof/pprof.go 中的 API 来实现生产环境的 /debug/pprof 监测接口,没有使用标准库的 net/http/pprof 包因为会自动注册路由,且长期开放 API,这个包可以设定 API 是否开放,并在规定时间后自动关闭接口,避免存在工具嗅探。

服务部署上线稳定后(大约过了一天半),通过 curl 下载 allocs 数据,然后使用下面的命令查看分析。

$ go tool pprof allocs
File: xxx
Type: alloc_space
Time: Jan 25, 2019 at 3:02pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 604.62GB, 44.50% of 1358.61GB total
Dropped 776 nodes (cum <= 6.79GB)
Showing top 10 nodes out of 155
      flat  flat%   sum%        cum   cum%
  111.40GB  8.20%  8.20%   111.40GB  8.20%  bytes.makeSlice
  107.72GB  7.93% 16.13%   107.72GB  7.93%  github.com/sirupsen/logrus.(*Entry).WithFields
   65.94GB  4.85% 20.98%    65.94GB  4.85%  strings.Replace
   54.10GB  3.98% 24.96%    56.03GB  4.12%  github.com/json-iterator/go.(*frozenConfig).Marshal
   47.54GB  3.50% 28.46%    47.54GB  3.50%  net/url.unescape
   47.11GB  3.47% 31.93%    48.16GB  3.55%  github.com/json-iterator/go.(*Iterator).readStringSlowPath
   46.63GB  3.43% 35.36%   103.04GB  7.58%  handlers.(*AdserviceHandler).returnAd
   42.43GB  3.12% 38.49%    84.62GB  6.23%  models.LogItemsToBytes
   42.22GB  3.11% 41.59%    42.22GB  3.11%  strings.Join
   39.52GB  2.91% 44.50%    87.06GB  6.41%  net/url.parseQuery

从结果中可以看出采集期间一共分配了 1358.61GB top 10 占用了 44.50% 其中 bytes.makeSlice 占了接近 1/10,那么看看都是谁在调用 bytes.makeSlice 吧。

(pprof) web bytes.makeSlice

从上图可以看出调用 bytes.makeSlice 的最终方法是 ioutil.ReadAll, (受篇幅影响就没有截取 ioutil.ReadAll 上面的方法了),而 90% 都是 ioutil.ReadAll 读取 http 数据调用,找到地方先别急想优化方案,先看看为啥 ioutil.ReadAll 会导致这么多内存分配。

func readAll(r io.Reader, capacity int64) (b []byte, err error) {
    var buf bytes.Buffer
    // If the buffer overflows, we will get bytes.ErrTooLarge.
    // Return that as an error. Any other panic remains.
    defer func() {
        e := recover()
        if e == nil {
            return
        }
        if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
            err = panicErr
        } else {
            panic(e)
        }
    }()
    if int64(int(capacity)) == capacity {
        buf.Grow(int(capacity))
    }
    _, err = buf.ReadFrom(r)
    return buf.Bytes(), err
}

func ReadAll(r io.Reader) ([]byte, error) {
    return readAll(r, bytes.MinRead)
}

以上是标准库 ioutil.ReadAll 的代码,每次会创建一个 var buf bytes.Buffer 并且初始化 buf.Grow(int(capacity)) 的大小为 bytes.MinRead, 这个值呢就是 512,按这个 buffer 的大小读取一次数据需要分配 2~16 次内存,天啊简直不能忍,我自己创建一个 buffer 好不好。

看一下火焰图??吧,其中红框标记的就是 ioutil.ReadAll 的部分,颜色比较鲜艳。

优化读取方法

自己创建足够大的 buffer 减少因为容量不够导致的多次扩容问题。

buffer := bytes.NewBuffer(make([]byte, 4096))
_, err := io.Copy(buffer, request.Body)
if err !=nil{
    return nil, err
}

恩恩这样应该差不多了,为啥是初始化 4096 的大小,这是个均值,即使比 4096 大基本也就多分配一次内存即可,而且大多数数据都是比 4096 小的。

但是这样真的就算好了吗,当然不能这样,这个 buffer 个每请求都要创建一次,是不是应该考虑一下复用呢,使用 sync.Pool 建立一个缓冲池效果就更好了。

以下是优化读取请求的简化代码:

package adapter

import (
    "bytes"
    "io"
    "net/http"
    "sync"

    "github.com/json-iterator/go"
    "github.com/sirupsen/logrus"
    "github.com/thinkeridea/go-extend/exbytes"
)

type Adapter struct {
    pool sync.Pool
}

func New() *Adapter {
    return &Adapter{
        pool: sync.Pool{
            New: func() interface{} {
                return bytes.NewBuffer(make([]byte, 4096))
            },
        },
    }
}

func (api *Adapter) GetRequest(r *http.Request) (*Request, error) {
    buffer := api.pool.Get().(*bytes.Buffer)
    buffer.Reset()
    defer func() {
        if buffer != nil {
            api.pool.Put(buffer)
            buffer = nil
        }
    }()

    _, err := io.Copy(buffer, r.Body)
    if err != nil {
        return nil, err
    }

    request := &Request{}
    if err = jsoniter.Unmarshal(buffer.Bytes(), request); err != nil {
        logrus.WithFields(logrus.Fields{
            "json": exbytes.ToString(buffer.Bytes()),
        }).Errorf("jsoniter.UnmarshalJSON fail. error:%v", err)
        return nil, err
    }
    api.pool.Put(buffer)
    buffer = nil

    // ....

    return request, nil
}

使用 sync.Pool 的方式是不是有点怪,主要是 deferapi.pool.Put(buffer);buffer = nil 这里解释一下,为了提高 buufer 的复用率会在不使用时尽快把 buffer 放回到缓冲池中,defer 之所以会判断 buffer != nil 主要是在业务逻辑出现错误时,但是 buffer 还没有放回缓冲池时把 buffer 放回到缓冲池,因为在每个错误处理之后都写 api.pool.Put(buffer) 不是一个好的方法,而且容易忘记,但是如果在确定不再使用时 api.pool.Put(buffer);buffer = nil 就可以尽早把 buffer 放回到缓冲池中,提高复用率,减少新建 buffer

这样就好了吗,别急,之前说服务里面还会构建请求,看看构建请求如何优化吧。

package adapter

import (
    "bytes"
    "fmt"
    "io"
    "io/ioutil"
    "net/http"
    "sync"

    "github.com/json-iterator/go"
    "github.com/sirupsen/logrus"
    "github.com/thinkeridea/go-extend/exbytes"
)

type Adapter struct {
    pool sync.Pool
}

func New() *Adapter {
    return &Adapter{
        pool: sync.Pool{
            New: func() interface{} {
                return bytes.NewBuffer(make([]byte, 4096))
            },
        },
    }
}

func (api *Adapter) Request(r *Request) (*Response, error) {
    var err error
    buffer := api.pool.Get().(*bytes.Buffer)
    buffer.Reset()
    defer func() {
        if buffer != nil {
            api.pool.Put(buffer)
            buffer = nil
        }
    }()

    e := jsoniter.NewEncoder(buffer)
    err = e.Encode(r)
    if err != nil {
        logrus.WithFields(logrus.Fields{
            "request": r,
        }).Errorf("jsoniter.Marshal failure: %v", err)
        return nil, fmt.Errorf("jsoniter.Marshal failure: %v", err)
    }

    data := buffer.Bytes()
    req, err := http.NewRequest("POST", "http://xxx.com", buffer)
    if err != nil {
        logrus.WithFields(logrus.Fields{
            "data": exbytes.ToString(data),
        }).Errorf("http.NewRequest failed: %v", err)
        return nil, fmt.Errorf("http.NewRequest failed: %v", err)
    }

    req.Header.Set("User-Agent", "xxx")

    httpResponse, err := http.DefaultClient.Do(req)
    if httpResponse != nil {
        defer func() {
            io.Copy(ioutil.Discard, httpResponse.Body)
            httpResponse.Body.Close()
        }()
    }

    if err != nil {
        logrus.WithFields(logrus.Fields{
            "url": "http://xxx.com",
        }).Errorf("query service failed %v", err)
        return nil, fmt.Errorf("query service failed %v", err)
    }

    if httpResponse.StatusCode != 200 {
        logrus.WithFields(logrus.Fields{
            "url":         "http://xxx.com",
            "status":      httpResponse.Status,
            "status_code": httpResponse.StatusCode,
        }).Errorf("invalid http status code")
        return nil, fmt.Errorf("invalid http status code")
    }

    buffer.Reset()
    _, err = io.Copy(buffer, httpResponse.Body)
    if err != nil {
        return nil, fmt.Errorf("adapter io.copy failure error:%v", err)
    }

    respData := buffer.Bytes()
    logrus.WithFields(logrus.Fields{
        "response_json": exbytes.ToString(respData),
    }).Debug("response json")

    res := &Response{}
    err = jsoniter.Unmarshal(respData, res)
    if err != nil {
        logrus.WithFields(logrus.Fields{
            "data": exbytes.ToString(respData),
            "url":  "http://xxx.com",
        }).Errorf("adapter jsoniter.Unmarshal failed, error:%v", err)
        return nil, fmt.Errorf("adapter jsoniter.Unmarshal failed, error:%v", err)
    }

    api.pool.Put(buffer)
    buffer = nil

    // ...
    return res, nil
}

这个示例和之前差不多,只是不仅用来读取 http.Response.Body 还用来创建一个 jsoniter.NewEncoder 用来把请求压缩成 json 字符串,并且作为 http.NewRequestbody 参数, 如果直接用 jsoniter.Marshal 同样会创建很多次内存,jsoniter 也使用 buffer 做为缓冲区,并且默认大小为 512, 代码如下:

func (cfg Config) Froze() API {
    api := &frozenConfig{
        sortMapKeys:                   cfg.SortMapKeys,
        indentionStep:                 cfg.IndentionStep,
        objectFieldMustBeSimpleString: cfg.ObjectFieldMustBeSimpleString,
        onlyTaggedField:               cfg.OnlyTaggedField,
        disallowUnknownFields:         cfg.DisallowUnknownFields,
    }
    api.streamPool = &sync.Pool{
        New: func() interface{} {
            return NewStream(api, nil, 512)
        },
    }
    // .....
    return api
}

而且序列化之后会进行一次数据拷贝:

func (cfg *frozenConfig) Marshal(v interface{}) ([]byte, error) {
    stream := cfg.BorrowStream(nil)
    defer cfg.ReturnStream(stream)
    stream.WriteVal(v)
    if stream.Error != nil {
        return nil, stream.Error
    }
    result := stream.Buffer()
    copied := make([]byte, len(result))
    copy(copied, result)
    return copied, nil
}

既然要用 buffer 那就一起吧^_^,这样可以减少多次内存分配,下读取 http.Response.Body 之前一定要记得 buffer.Reset(), 这样基本就已经完成了 http.Request.Bodyhttp.Response.Body 的数据读取优化了,具体效果等上线跑一段时间稳定之后来查看吧。

效果分析

上线跑了一天,来看看效果吧

$ go tool pprof allocs2
File: connect_server
Type: alloc_space
Time: Jan 26, 2019 at 10:27am (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 295.40GB, 40.62% of 727.32GB total
Dropped 738 nodes (cum <= 3.64GB)
Showing top 10 nodes out of 174
      flat  flat%   sum%        cum   cum%
   73.52GB 10.11% 10.11%    73.52GB 10.11%  git.tvblack.com/tvblack/connect_server/vendor/github.com/sirupsen/logrus.(*Entry).WithFields
   31.70GB  4.36% 14.47%    31.70GB  4.36%  net/url.unescape
   27.49GB  3.78% 18.25%    54.87GB  7.54%  git.tvblack.com/tvblack/connect_server/models.LogItemsToBytes
   27.41GB  3.77% 22.01%    27.41GB  3.77%  strings.Join
   25.04GB  3.44% 25.46%    25.04GB  3.44%  bufio.NewWriterSize
   24.81GB  3.41% 28.87%    24.81GB  3.41%  bufio.NewReaderSize
   23.91GB  3.29% 32.15%    23.91GB  3.29%  regexp.(*bitState).reset
   23.06GB  3.17% 35.32%    23.06GB  3.17%  math/big.nat.make
   19.90GB  2.74% 38.06%    20.35GB  2.80%  git.tvblack.com/tvblack/connect_server/vendor/github.com/json-iterator/go.(*Iterator).readStringSlowPath
   18.58GB  2.56% 40.62%    19.12GB  2.63%  net/textproto.(*Reader).ReadMIMEHeader

哇塞 bytes.makeSlice 终于从前十中消失了,真的太棒了,还是看看 bytes.makeSlice 的其它调用情况吧。

(pprof) web bytes.makeSlice

从图中可以发现 bytes.makeSlice 的分配已经很小了, 且大多数是 http.Request.ParseForm 读取 http.Request.Body 使用 ioutil.ReadAll 原因,这次优化的效果非常的好。

看一下更直观的火焰图??吧,和优化前对比一下很明显 ioutil.ReadAll 看不到了

优化期间遇到的问题

比较惭愧在优化的过程出现了一个过失,导致生产环境2分钟故障,通过自动部署立即回滚才得以快速恢复,之后分析代码解决之后上线才完美优化,下面总结一下出现的问题吧。

在构建 http 请求时我分了两个部分优化,序列化 json 和读取 http.Response.Body 数据,保持一个观点就是尽早把 buffer 放回到缓冲池,因为 http.DefaultClient.Do(req) 是网络请求会相对耗时,在这个之前我把 buffer 放回到缓冲池中,之后读取 http.Response.Body 时在重新获取一个 buffer,大概代码如下:

package adapter

import (
    "bytes"
    "fmt"
    "io"
    "io/ioutil"
    "net/http"
    "sync"

    "github.com/json-iterator/go"
    "github.com/sirupsen/logrus"
    "github.com/thinkeridea/go-extend/exbytes"
)

type Adapter struct {
    pool sync.Pool
}

func New() *Adapter {
    return &Adapter{
        pool: sync.Pool{
            New: func() interface{} {
                return bytes.NewBuffer(make([]byte, 4096))
            },
        },
    }
}

func (api *Adapter) Request(r *Request) (*Response, error) {
    var err error
    buffer := api.pool.Get().(*bytes.Buffer)
    buffer.Reset()
    defer func() {
        if buffer != nil {
            api.pool.Put(buffer)
            buffer = nil
        }
    }()

    e := jsoniter.NewEncoder(buffer)
    err = e.Encode(r)
    if err != nil {
        return nil, fmt.Errorf("jsoniter.Marshal failure: %v", err)
    }

    data := buffer.Bytes()
    req, err := http.NewRequest("POST", "http://xxx.com", buffer)
    if err != nil {
        return nil, fmt.Errorf("http.NewRequest failed: %v", err)
    }

    req.Header.Set("User-Agent", "xxx")

    api.pool.Put(buffer)
    buffer = nil

    httpResponse, err := http.DefaultClient.Do(req)

    // ....

    buffer = api.pool.Get().(*bytes.Buffer)
    buffer.Reset()
    defer func() {
        if buffer != nil {
            api.pool.Put(buffer)
            buffer = nil
        }
    }()
    _, err = io.Copy(buffer, httpResponse.Body)
    if err != nil {
        return nil, fmt.Errorf("adapter io.copy failure error:%v", err)
    }

    // ....

    api.pool.Put(buffer)
    buffer = nil

    // ...
    return res, nil
}

上线之后马上发生了错误 http: ContentLength=2090 with Body length 0 发送请求的时候从 buffer 读取数据发现数据不见了或者数据不够了,我去这是什么鬼,马上回滚恢复业务,然后分析 http.DefaultClient.Do(req)http.NewRequest,在调用 http.NewRequest 是并没有从 buffer 读取数据,而只是创建了一个 req.GetBody 之后在 http.DefaultClient.Do 是才读取数据,因为在 http.DefaultClient.Do 之前把 buffer 放回到缓冲池中,其它 goroutine 获取到 buffer 并进行 Reset 就发生了数据争用,当然会导致数据读取不完整了,真实汗颜,对 http.Client 了解太少,争取有空撸一遍源码。

总结

使用合适大小的 buffer 来减少内存分配,sync.Pool 可以帮助复用 buffer, 一定要自己写这些逻辑,避免使用三方包,三方包即使使用同样的技巧为了避免数据争用,在返回数据时候必然会拷贝一个新的数据返回,就像 jsoniter 虽然使用了 sync.Poolbuffer 但是返回数据时还需要拷贝,另外这种通用包并不能给一个非常贴合业务的初始 buffer 大小,过小会导致数据发生拷贝,过大会太过浪费内存。

程序中善用 buffersync.Pool 可以大大的改善程序的性能,并且这两个组合在一起使用非常的简单,并不会使代码变的复杂。

转载:

本文作者: 戚银(thinkeridea

本文链接: https://blog.thinkeridea.com/201901/go/you_ya_de_du_qu_http_qing_qiu_huo_xiang_ying_de_shu_ju.html

版权声明: 本博客所有文章除特别声明外,均采用 CC BY 4.0 CN协议 许可协议。转载请注明出处!

原文地址:https://www.cnblogs.com/thinkeridea/p/10324806.html

时间: 2024-08-29 13:48:46

【Go】优雅的读取http请求或响应的数据的相关文章

Ajax发送GET、POST请求和响应XML数据案例

1.新建工程 新建一个java web工程,新建一个Servlet文件 AServlet.java,用于返回get和post请求. public class AServlet extends HttpServlet { public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { System.out.println("Hello

【Java&amp;Android开源库代码剖析】のandroid-async-http(如何设计一个优雅的Android网络请求框架,同时支持同步和异步请求)开篇

在<[Java&Android开源库代码剖析]のandroid-smart-image-view>一文中我们提到了android-async-http这个开源库,本文正式开篇来详细介绍这个库的实现,同时结合源码探讨如何设计一个优雅的Android网络请求框架.做过一段时间Android开发的同学应该对这个库不陌生,因为它对Apache的HttpClient API的封装使得开发者可以简洁优雅的实现网络请求和响应,并且同时支持同步和异步请求. 网络请求框架一般至少需要具备如下几个组件:1

HTTP1.1请求和响应的基本用法

HTTP1.1是现在普遍使用的TCP/IP的应用层协议,按照Volley中的描述,可以把HTTP传输数据相关对象分为两个类,一个叫Request(请求),一个是Response(响应). 对于一个基本的Request,我们通过chrome自带的查看工具来查看一下: 在地址栏输入:chrome://net-internals/#events 我们打开百度,然后在上述页面中找到URL_REQUEST: 查看内容是: 看着上面那个两个请求内容,我们来分析一下一个HTTP请求的基本内容如何构造. HTT

HTTP协议(5)HTTP请求和响应

之前曾介绍过,所有的HTTP通信都被构造成一对HTTP请求和HTTP响应,HTTP协议的请求与响应报文都是由"首部header"和"主体body"两部分组成的.其中主体部分是请求和响应的数据,首部部分则规定了请求和响应的内容格式. 对于HTTP请求报文,主要由三部分组成:请求行.请求头.请求正文.在请求头和请求正文之间一般会有两个空行进行间隔.下图是用Burpsuite截获的请求报文.HTTP请求报文的第一行即为请求行,这个报文的请求行就是"GET / H

http请求与响应全过程

HTTP 无状态性 HTTP 协议是无状态的(stateless).也就是说,同一个客户端第二次访问同一个服务器上的页面时,服务器无法知道这个客户端曾经访问过,服务器也无法分辨不同的客户端.HTTP 的无状态特性简化了服务器的设计,使服务器更容易支持大量并发的HTTP 请求. HTTP 持久连接       HTTP1.0 使用的是非持久连接,主要缺点是客户端必须为每一个待请求的对象建立并维护一个新的连接,即每请求一个文档就要有两倍RTT 的开销.因为同一个页面可能存在多个对象,所以非持久连接可

写一个ActionFilter检测WebApi接口请求和响应

我们一般用日志记录每次Action的请求和响应,方便接口出错后排查,不过如果每个Action方法内都写操作日志太麻烦,而且客户端传递了错误JSON或XML,没法对应强类型参数,请求没法进入方法内, 把日志记录操作放在一个ActionFilter即可. [AttributeUsageAttribute(AttributeTargets.Method, Inherited = false, AllowMultiple = false)] public class ApiActionAttribute

Node.js之HTTP请求与响应

在C#.OC中也是客户端发起一个请求,服务端作出响应.我们可以把这个过程抽象理解 . 1.客户端给服务端发起请求相当于向服务端写入一个流(writable) 2.服务端读取客户端的流(readable) 3.服务端向客户端作出响应相当于向客户端写入一个流(writable) 4.客户端读取服务端的响应(readable) 整个流程分为两部分一是客户端的处理而是服务端的处理.最主要的还是客户端请求和服务端响应. 一.http.ClientRequest对象 它实现了writable流,可以使用它的

GET方式的HTTP请求,返回响应体

public static readonly string DefaultUserAgent = "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.2; .NET CLR 1.1.4322; .NET CLR 2.0.50727)"; /// <summary> /// GET方式的HTTP请求 /// </summary> /// <param name="targetUri">&

HTTP请求头部+响应码

本篇博文主要介绍HTTP请求-响应的系列过程,包括四个部分,是在陆续学习中觉着之间有关联总结下来的,以便自己今后忘记后可以快速查看也为各位看到这篇文章的朋友们梳理一下知识.下面,正文开始: 作为一个前端开发人员,我们每天都在与页面打交道,那么,当你输入一个网址的时候,实际究竟会发生什么呢?下面,就先来解答一下这个问题. 一.当在浏览器输入一个网址后,实际会发生什么? 回答之前,先摘一段<淘宝技术这十年>中的"你刚才在淘宝上买了一件东西"里的一段话,对我理解这个问题有启发.