boost::asio的http client应用笔记

1 踩过的坑

1.1 io_service

boost::asio::io_service::run()会一直运行到没有任务为止,如果中途调用stop(),则所有等待中的任务会立刻执行。解决方案是用run_one(),即

while (keep_running)
    io_service_.run_one();

keep_running是个bool值,要stop io_service的时候直接置false即可。

1.2 deadline_timer

在调用async_wait()后,无论调用deadline_timer::cancel()还是这个deadline_timer都析构掉,handler都会被触发。当然,这个在文档是有写的。规避野指针的办法有两个,一是传入的handler是shared_ptr,二是再封装一层。后者适用于handler的生命周期无法由自身控制的情况,示例代码请看http client一节的TimerHolder类。

1.3 async*

这个其实和deadline_timer::asyn::wait()差不多,async_readasync_read_until等带async_前缀的函数,只要中途被停止(例如调用ip::tcp::socket::close()),Handler都会被执行并传入一个代表aborted的boost::system::error_code。

1.4 ip::tcp::socket

  1. 官方的example过于简单,反而迷惑人了。HTTP协议说来也算复杂,例如chunked encoding还得自己解析。
  2. 从ip::tcp::resolver得到的可能是多个IP,如果把返回的迭代器交给async_connect,那么很可能出错,应为IP里可能有不合理的地址。比如可能返回的是全0的地址。解决办法参考http client代码的DoResolveAndConnect()函数。
  3. socket的read可能会读到额外的数据,这个文档里有写。

2. http client的应用

封装成了C++类。这是单线程的实现(io_service是同一线程下run的),同步地调用socket函数并用deadline_timer来异步返回数据会更容易控制。

不细说了,请看代码。

// header
#include <assert.h>
#include <string>
#include "boost/asio.hpp"
#include "boost/make_shared.hpp"
#include "boost/thread.hpp"

class HttpTransaction {
 public:
  typedef HttpRequest HttpRequest;
  typedef ProxyTransaction ProxyTransaction;

  explicit HttpTransaction(boost::asio::io_service& io);
  ~HttpTransaction();

  void Start() override;

  void Cancel() override;

  const HttpRequest* request() const override {
    return http_request_;
  }
  void set_request(const HttpRequest* request) override {
    http_request_ = request;
  }

  Delegate* delegate() const override { return delegate_; }
  void set_delegate(Delegate* delegate) override {
    delegate_ = delegate;
  }

 private:
  enum State {
    STATE_NONE,
    STATE_CONNECT,
    STATE_SEND_REQUEST,
    STATE_READ_HEADER,
    STATE_READ_BODY,
    STATE_READ_CHUNK_SIZE,
    STATE_READ_CHUNK_DATA,
    STATE_READ_UNTIL_EOF,
    STATE_CALL_ON_FINISHED,
  };

  void DoLoop();

  bool DoResolveAndConnect();
  bool DoSendRequest();
  bool DoReadHeader();
  bool DoReadChunkSize();
  bool DoReadChunkData();
  bool DoReadBody();
  bool DoReadUntilEof();
  bool DoCallOnFinished();

  void CallOnDataReceived(size_t size, size_t additional_consume_size = 0);

  const HttpRequest* http_request_;
  Delegate* delegate_;
  HttpResponse* http_response_;

  boost::asio::ip::tcp::resolver resolver_;
  boost::asio::ip::tcp::socket socket_;
  boost::shared_ptr<boost::asio::streambuf> stream_buf_;

  size_t pending_read_size_;

  State next_state_;

  bool started_;

  class TimerHolder;
  boost::shared_ptr<TimerHolder> timer_holder_;
};

#####################################################################

// implementation
#include <string.h>
#include <assert.h>
#include <algorithm>
#include "boost/algorithm/hex.hpp"
#include "boost/bind.hpp"
#include "http_transaction_impl.h"
#include "http_request.h"
#include "http_response.h"
#include "util/url.h"
#include "util/logging.h"
#include "util/http_util.h"

// TimerHolder is needed because the callback is invoked even the timer
// cancelled or deleted.
class HttpTransaction::TimerHolder {
 public:
  TimerHolder(HttpTransaction* trans,
              boost::asio::io_service& io)  // NOLINT
      : trans_(trans),
        timer_(io) {}

  void Schedule() {
    timer_.expires_from_now(boost::posix_time::microseconds(0));
    timer_.async_wait(boost::bind(&TimerHolder::OnTimer, trans_->timer_holder_,
                                  boost::asio::placeholders::error));
  }

  void OnTransactionCancelled() {
    trans_ = NULL;
    timer_.cancel();
  }

 private:
  void OnTimer(const boost::system::error_code& err) {
    if (!err && trans_) {
      trans_->DoLoop();
    }
  }

  HttpTransaction* trans_;
  boost::asio::deadline_timer timer_;
};

HttpTransaction::HttpTransaction(boost::asio::io_service& io)
    : http_request_(NULL),
      delegate_(NULL),
      http_response_(NULL),
      resolver_(io),
      socket_(io),
      stream_buf_(new boost::asio::streambuf),
      pending_read_size_(0),
      next_state_(STATE_NONE),
      started_(false),
      timer_holder_(new TimerHolder(this, io)) {
}

HttpTransaction::~HttpTransaction() {
  Cancel();
}

void HttpTransaction::Start() {
  assert(!started_);
  started_ = true;
  next_state_ = STATE_CONNECT;
  timer_holder_->Schedule();
}

void HttpTransaction::Cancel() {
  next_state_ = STATE_NONE;
  timer_holder_->OnTransactionCancelled();
  socket_.close();
  if (http_response_) {
    delete http_response_;
    http_response_ = NULL;
  }
}

void HttpTransaction::DoLoop() {
  bool rv = false;
  do {
    State state = next_state_;
    next_state_ = STATE_NONE;
    switch (state) {
      case STATE_CONNECT:
        rv = DoResolveAndConnect();
        break;
      case STATE_SEND_REQUEST:
        rv = DoSendRequest();
        break;
      case STATE_READ_HEADER:
        rv = DoReadHeader();
        break;
      case STATE_READ_BODY:
        rv = DoReadBody();
        break;
      case STATE_READ_UNTIL_EOF:
        rv = DoReadUntilEof();
        break;
      case STATE_READ_CHUNK_SIZE:
        rv = DoReadChunkSize();
        break;
      case STATE_READ_CHUNK_DATA:
        rv = DoReadChunkData();
        break;
      case STATE_CALL_ON_FINISHED:
        rv = DoCallOnFinished();
        break;
      default:
        assert(0);
        break;
    }
  } while (rv);
}

bool HttpTransaction::DoResolveAndConnect() {
  URL url(http_request_->url());
  // TODO(liuhx): if url is ip address.
  boost::asio::ip::tcp::resolver::query query(
      url.host(), url.port() == 0 ? url.protocol() : url.port_string());
  boost::system::error_code err;
  boost::asio::ip::tcp::resolver::iterator it = resolver_.resolve(query, err);
  boost::asio::ip::tcp::resolver::iterator end;  // Default is end.
  if (err || it == end) {
    LOG_DEBUG(kLogTagHttpTrans, "resolve error:%s",
                      err.message().c_str());
    delegate_->OnError(this, err.value());
    return false;
  }

  do {
    LOG_INFO(kLogTagHttpTrans, "dns result:%s",
                     it->endpoint().address().to_string().c_str());
    // "unspecified" means address is "0.0.0.0". It may appear on some machines
    // running Apache. Please google it for more detail.
    if (!it->endpoint().address().is_unspecified()) {
      socket_.close();
      LOG_INFO(kLogTagHttpTrans, "connecting:%s",
                       it->endpoint().address().to_string().c_str());
      socket_.connect(*it, err);
      if (!err)
        break;
    }
    ++it;
  } while (it != end);

  if (err) {
    LOG_DEBUG(kLogTagHttpTrans, "connect error:%s",
                      err.message().c_str());
    delegate_->OnError(this, err.value());
    return false;
  }

  next_state_ = STATE_SEND_REQUEST;
  return true;
}

bool HttpTransaction::DoSendRequest() {
  URL url(http_request_->url());
  std::ostream request_stream(stream_buf_.get());
  request_stream << http_request_->method() << " " << url.path()
                 << " HTTP/1.1\r\n";
  if (!http_request_->HasHeader("Host")) {
    request_stream << "Host: " << url.host() << "\r\n";
  }
  if (!http_request_->HasHeader("Connection")) {
    request_stream << "Connection: keep-alive\r\n";
  }
  const char* name;
  const char* value;
  void* iter = NULL;
  while (http_request_->EnumerateHeaderLines(&iter, &name, &value))
    request_stream << name << ": " << value << "\r\n";
  size_t size = 0;
  if (http_request_->body(&value, &size)) {
    if (!http_request_->HasHeader("Content-Length")) {
      request_stream << "Content-Length: " << size << "\r\n";
    }
    request_stream << "\r\n";
    request_stream.write(value, size);
  } else {
    request_stream << "\r\n";
  }

  boost::system::error_code err;
  // boost::asio::write() consumes |stream_buf_|, no need to do it ourselves.
  boost::asio::write(socket_, *stream_buf_, err);
  if (err) {
    LOG_DEBUG(kLogTagHttpTrans, "send request error:%s",
                      err.message().c_str());
    delegate_->OnError(this, err.value());
  } else {
    next_state_ = STATE_READ_HEADER;
    timer_holder_->Schedule();
  }
  return false;
}

bool HttpTransaction::DoReadHeader() {
  boost::system::error_code err;
  boost::asio::read_until(socket_, *stream_buf_, "\r\n\r\n", err);
  if (err) {
    LOG_DEBUG(kLogTagHttpTrans, "read header error:%s",
        err.message().c_str());
    delegate_->OnError(this, err.value());
    return false;
  }

  size_t size = stream_buf_->size();
  const char* data = boost::asio::buffer_cast<const char*>(stream_buf_->data());
  size_t pos = std::string(data, size).find("\r\n\r\n");
  if (pos == std::string::npos) {
    LOG_DEBUG(kLogTagHttpTrans,
                      "Can not find header end. Maybe TCP data Out-of-Order");
    delegate_->OnError(this, 1234);
    return false;
  }
  http_response_ = new HttpResponseImpl(http_request_->url(), data, pos);
  stream_buf_->consume(pos + 4);  // Skip 4 = "\r\n\r\n".
  if (http_response_->status_code() < 100) {
    LOG_DEBUG(kLogTagHttpTrans, "Header invalid");
    delegate_->OnError(this, 2345);
    return false;
  }

  if (http_response_->status_code() != 200) {
    next_state_ = STATE_CALL_ON_FINISHED;
  } else {
    const char* content_length = http_response_->GetHeader("content-length");
    if (content_length) {
      pending_read_size_ = atoi(content_length);
      next_state_ = STATE_READ_BODY;
    } else {
      const char* encoding = http_response_->GetHeader("Transfer-Encoding");
      bool isChunk = encoding && (std::string(encoding).find("chunked") !=
                                  std::string::npos);
      if (isChunk) {
        next_state_ = STATE_READ_CHUNK_SIZE;
      } else {
        next_state_ = STATE_READ_UNTIL_EOF;
      }
    }
  }
  timer_holder_->Schedule();

  delegate_->OnResponseReceived(this, *http_response_);
  return false;
}

bool HttpTransaction::DoReadBody() {
  // If content-length exists, the connection may be keep-alive. We MUST keep
  // counting |pending_read_size_| instead of reading until EOF.
  if (pending_read_size_ == 0) {
    delegate_->OnFinished(this);
    return false;
  }

  while (true) {
    // boost may read addtional data beyond the condition in STATE_READ_HEADER,
    // pass left data first if exists.
    size_t size = stream_buf_->size();
    if (size) {
      next_state_ = STATE_READ_BODY;
      timer_holder_->Schedule();
      // TODO(liuhx): assert -> OnError
      assert(pending_read_size_ >= size);
      pending_read_size_ -= size;
      CallOnDataReceived(size);
      break;
    } else {
      boost::system::error_code err;
      boost::asio::read(socket_, *stream_buf_,
                        boost::asio::transfer_at_least(1), err);
      if (err) {
        LOG_DEBUG(kLogTagHttpTrans, "read body error:%s",
            err.message().c_str());
        delegate_->OnError(this, err.value());
        break;
      }
    }
  }

  return false;
}

// About chunked encoding, refer to http://tools.ietf.org/html/rfc2616#page-25
bool HttpTransaction::DoReadChunkSize() {
  while (true) {
    // boost may read addtional data beyond the condition, find "\r\n" first.
    size_t size = stream_buf_->size();
    if (size) {
      const char* data =
          boost::asio::buffer_cast<const char*>(stream_buf_->data());
      size_t index = std::string(data, size).find("\r\n");
      if (index != std::string::npos) {
        pending_read_size_ = static_cast<size_t>(strtol(data, NULL, 16));
        stream_buf_->consume(index + 2);  // Skip +2 = "\r\n"
        if (pending_read_size_ == 0) {
          delegate_->OnFinished(this);
          return false;
        }
        break;
      }
    }
    boost::system::error_code err;
    boost::asio::read_until(socket_, *stream_buf_, "\r\n", err);
    if (err) {
      LOG_DEBUG(kLogTagHttpTrans, "read chunk size error:%s",
                        err.message().c_str());
      delegate_->OnError(this, err.value());
      return false;
    }
  }

  next_state_ = STATE_READ_CHUNK_DATA;
  return true;
}

bool HttpTransaction::DoReadChunkData() {
  while (true) {
    size_t size = stream_buf_->size();
    if (size) {
      bool reach_end = size >= pending_read_size_;
      size_t data_size = reach_end ? pending_read_size_ : size;
      pending_read_size_ -= data_size;
      next_state_ = reach_end ? STATE_READ_CHUNK_SIZE : STATE_READ_CHUNK_DATA;
      timer_holder_->Schedule();
      CallOnDataReceived(data_size, reach_end ? 2 : 0);  // Skip 2 = "\r\n".
      break;
    } else {
      boost::system::error_code err;
      boost::asio::read_until(socket_, *stream_buf_, "\r\n", err);
      if (err) {
        LOG_DEBUG(kLogTagHttpTrans, "read chunk data error:%s",
                          err.message().c_str());
        delegate_->OnError(this, err.value());
        break;
      }
    }
  }

  return false;
}

bool HttpTransaction::DoReadUntilEof() {
  while (true) {
    size_t size = stream_buf_->size();
    if (size) {
      next_state_ = STATE_READ_UNTIL_EOF;
      timer_holder_->Schedule();

      const char* data =
          boost::asio::buffer_cast<const char*>(stream_buf_->data());
      boost::shared_ptr<boost::asio::streambuf> buf = stream_buf_;
      delegate_->OnDataReceived(this, data, size);
      buf->consume(size);
      break;
    } else {
      boost::system::error_code err;
      boost::asio::read(socket_, *stream_buf_,
                        boost::asio::transfer_at_least(1), err);
      if (err) {
        if (err == boost::asio::error::eof) {
          delegate_->OnFinished(this);
        } else {
          LOG_DEBUG(kLogTagHttpTrans, "%s", err.message().c_str());
          delegate_->OnError(this, err.value());
        }
        break;
      }
    }
  }

  return false;
}

bool HttpTransaction::DoCallOnFinished() {
  delegate_->OnFinished(this);
  return false;
}

void HttpTransaction::CallOnDataReceived(size_t size,
                                             size_t additional_consume_size) {
  const char* data = boost::asio::buffer_cast<const char*>(stream_buf_->data());
  // Because Delegate may delete HttpTransaction during callback, we MUST NOT
  // access member variable after callback method, instead, use the |buf|.
  boost::shared_ptr<boost::asio::streambuf> buf = stream_buf_;
  delegate_->OnDataReceived(this, data, size);
  buf->consume(size + additional_consume_size);
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-13 19:53:31

boost::asio的http client应用笔记的相关文章

10 C++ Boost ASIO网路通信库 TCP/UDP,HTTP

  tcp 同步服务器,显示服务器端时间 tcp 同步服务器,提供多种选择 多线程的tcp 同步服务器 tcp 同步客户端 boost 域名地址解析 tcp异步服务器 tcp 异步客户端 UDP同步服务器 UDP同步客户端 UDP异步服务器 UDP异步客户端 HTTP同步客户端 HTTP异步客户端 同步实验: 异步实验 多线程异步实验 tcp 同步服务器,显示服务器端时间 [email protected]:~/boost$ cat main.cpp  #include <ctime> #in

boost asio io_service学习笔记

构造函数 构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp. Dispatch和post的区别 Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行. Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果是,则直接运行. poll和run的区别 两者代码几乎一样,都是首先检查是否有outstan

boost asio one client one thread

总结了一个简单的boost asio的tcp服务器端与客户端通信流程.模型是一个client对应一个线程.先做一个记录,后续再对此进行优化. 环境:VS2017  + Boost 1.67 server: 1 #include <stdio.h> 2 #include <cstdlib> 3 #include <iostream> 4 #include <boost/thread.hpp> 5 #include <boost/aligned_stora

boost asio学习笔记 [1] - 同步通讯

本文以一段示例代码,说明使用boost asio进行同步通讯的使用方法. #include <iostream> #include <boost/asio.hpp> using namespace std; using boost::asio::ip::tcp;  int main() {     boost::asio::io_service   ioservice;             // I/O事件模型封装,如win下的iocp,unix下的poll, linux下的e

C/C++利用Boost::Asio网络库建立自己的Socket服务器

引言 寸光阴,当下我们或许更需要利用现有的知识,应用现有的技术.网络是当前互联网的根本,了解网络便开始显得极其重要.今天我们利用Boost库中Asio部分,浅尝网络服务器.此处不做过于深入的开展,为达成学习目的,只做简单的异步并发服务器. 注意:本篇代码没有直接引用boost等命名空间,为的是新入门Boost的同学能够更好的了解每个参数在boost的具体命名空间位置,有助于更好的理解boost的布局. 版权所有:_OE_,转载请注明出处:http://blog.csdn.net/csnd_ayo

使用Boost asio实现同步的TCP/IP通信

可以先了解一下Boost asio基本概念,以下是Boost asio实现的同步TCP/IP通信: 服务器程序部分,如果想保留套接字之后继续通信,可以动态申请socket_type,保存指针,因为socket_type貌似不能拷贝: #include "stdafx.h" #include <iostream> #include <boost/asio.hpp> using namespace boost::asio; using namespace std;

boost.asio包装类st_asio_wrapper开发教程(一)

一:什么是st_asio_wrapper它是一个c/s网络编程框架,基于对boost.asio的包装(最低在boost-1.49.0上调试过),目的是快速的构建一个c/s系统: 二:st_asio_wrapper的特点效率高.跨平台.完全异步,当然这是从boost.asio继承而来:自动重连,数据透明传输,自动解决分包粘包问题(必须使用默认的打包解包器,这一特性表现得与udp一样):只支持tcp和udp协议: 三:st_asio_wrapper的大体结构st_asio_wrapper.h:编译器

boost::asio 连接管理11 如何关闭连接

在实际产品运行中,对连接管理有了更新的认识,这里分享一下. shared_ptr管理连接对象的生命周期 shared_ptr的引用计数器决定了连接对象的生命周期.这里我说的连接对象就是在我的前文:http://blog.csdn.net/csfreebird/article/details/8522620 中的Client对象: [cpp] view plaincopyprint? #include "core/connection.h" #include <vector>

boost.asio包装类st_asio_wrapper开发教程(2014.5.23更新)(一)-----转

一:什么是st_asio_wrapper它是一个c/s网络编程框架,基于对boost.asio的包装(最低在boost-1.49.0上调试过),目的是快速的构建一个c/s系统:二:st_asio_wrapper的特点效率高.跨平台.完全异步,当然这是从boost.asio继承而来:自动重连,数据透明传输,自动解决分包粘包问题(必须使用默认的打包解包器,这一特性表现得与udp一样):只支持tcp和udp协议:三:st_asio_wrapper的大体结构st_asio_wrapper.h:编译器版本