boost asio 学习(九) boost::asio 网络封装

http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting-
started-with-boostasio?pg=10

9. A boost::asio network wrapper (TCP)

现在我们了解asio和TCP网络方面的知识,我们可以尝试下封装网络底层。通过使用这个封装,我们可以重用代码并且将精力集中于业务逻
辑方面而不在网络通讯方面花费太多精力。

重要提示:本代码仅仅用于教学目的。不要在商业系统中使用该代码,因为它可能存在BUG.代码是设计用来在特定环境运行,因此在非预

计环境中可能会出现异常情况。本人在几个项目中使用该代码,遇到过几个需要修改的小问题。

另外,代码中使用了vector和list以及shared_ptr,可能造成大量的分配操作,在某些环境下是不适用的。代码仅仅是用以教学目的。

#pragma once

#ifndef NETWORK_H_
#define NETWORK_H_

//-----------------------------------------------------------------------------

#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <string>
#include <vector>
#include <list>
#include <boost/cstdint.hpp>

//-----------------------------------------------------------------------------

using boost::uint64_t;
using boost::uint32_t;
using boost::uint16_t;
using boost::uint8_t;

using boost::int64_t;
using boost::int32_t;
using boost::int16_t;
using boost::int8_t;

//-----------------------------------------------------------------------------

class Hive;
class Acceptor;
class Connection;

//-----------------------------------------------------------------------------

class Connection : public boost::enable_shared_from_this< Connection >
{
	friend class Acceptor;
	friend class Hive;

private:
	boost::shared_ptr< Hive > m_hive;
	boost::asio::ip::tcp::socket m_socket;
	boost::asio::strand m_io_strand;
	boost::asio::deadline_timer m_timer;
	boost::posix_time::ptime m_last_time;
	std::vector< uint8_t > m_recv_buffer;
	std::list< int32_t > m_pending_recvs;
	std::list< std::vector< uint8_t > > m_pending_sends;
	int32_t m_receive_buffer_size;
	int32_t m_timer_interval;
	volatile uint32_t m_error_state;

protected:
	Connection( boost::shared_ptr< Hive > hive );
	virtual ~Connection();

private:
	Connection( const Connection & rhs );
	Connection & operator =( const Connection & rhs );
	void StartSend();
	void StartRecv( int32_t total_bytes );
	void StartTimer();
	void StartError( const boost::system::error_code & error );
	void DispatchSend( std::vector< uint8_t > buffer );
	void DispatchRecv( int32_t total_bytes );
	void DispatchTimer( const boost::system::error_code & error );
	void HandleConnect( const boost::system::error_code & error );
	void HandleSend( const boost::system::error_code & error, std::list< std::vector< uint8_t > >::iterator itr );
	void HandleRecv( const boost::system::error_code & error, int32_t actual_bytes );
	void HandleTimer( const boost::system::error_code & error );

private:
	// Called when the connection has successfully connected to the local
	// host.
	virtual void OnAccept( const std::string & host, uint16_t port ) = 0;

	// Called when the connection has successfully connected to the remote
	// host.
	virtual void OnConnect( const std::string & host, uint16_t port ) = 0;

	// Called when data has been sent by the connection.
	virtual void OnSend( const std::vector< uint8_t > & buffer ) = 0;

	// Called when data has been received by the connection.
	virtual void OnRecv( std::vector< uint8_t > & buffer ) = 0;

	// Called on each timer event.
	virtual void OnTimer( const boost::posix_time::time_duration & delta ) = 0;

	// Called when an error is encountered.
	virtual void OnError( const boost::system::error_code & error ) = 0;

public:
	// Returns the Hive object.
	boost::shared_ptr< Hive > GetHive();

	// Returns the socket object.
	boost::asio::ip::tcp::socket & GetSocket();

	// Returns the strand object.
	boost::asio::strand & GetStrand();

	// Sets the application specific receive buffer size used. For stream
	// based protocols such as HTTP, you want this to be pretty large, like
	// 64kb. For packet based protocols, then it will be much smaller,
	// usually 512b - 8kb depending on the protocol. The default value is
	// 4kb.
	void SetReceiveBufferSize( int32_t size );

	// Returns the size of the receive buffer size of the current object.
	int32_t GetReceiveBufferSize() const;

	// Sets the timer interval of the object. The interval is changed after
	// the next update is called.
	void SetTimerInterval( int32_t timer_interval_ms );

	// Returns the timer interval of the object.
	int32_t GetTimerInterval() const;

	// Returns true if this object has an error associated with it.
	bool HasError();

	// Binds the socket to the specified interface.
	void Bind( const std::string & ip, uint16_t port );

	// Starts an a/synchronous connect.
	void Connect( const std::string & host, uint16_t port );

	// Posts data to be sent to the connection.
	void Send( const std::vector< uint8_t > & buffer );

	// Posts a recv for the connection to process. If total_bytes is 0, then
	// as many bytes as possible up to GetReceiveBufferSize() will be
	// waited for. If Recv is not 0, then the connection will wait for exactly
	// total_bytes before invoking OnRecv.
	void Recv( int32_t total_bytes = 0 );

	// Posts an asynchronous disconnect event for the object to process.
	void Disconnect();
};

//-----------------------------------------------------------------------------

class Acceptor : public boost::enable_shared_from_this< Acceptor >
{
	friend class Hive;

private:
	boost::shared_ptr< Hive > m_hive;
	boost::asio::ip::tcp::acceptor m_acceptor;
	boost::asio::strand m_io_strand;
	boost::asio::deadline_timer m_timer;
	boost::posix_time::ptime m_last_time;
	int32_t m_timer_interval;
	volatile uint32_t m_error_state;

private:
	Acceptor( const Acceptor & rhs );
	Acceptor & operator =( const Acceptor & rhs );
	void StartTimer();
	void StartError( const boost::system::error_code & error );
	void DispatchAccept( boost::shared_ptr< Connection > connection );
	void HandleTimer( const boost::system::error_code & error );
	void HandleAccept( const boost::system::error_code & error, boost::shared_ptr< Connection > connection );

protected:
	Acceptor( boost::shared_ptr< Hive > hive );
	virtual ~Acceptor();

private:
	// Called when a connection has connected to the server. This function
	// should return true to invoke the connection‘s OnAccept function if the
	// connection will be kept. If the connection will not be kept, the
	// connection‘s Disconnect function should be called and the function
	// should return false.
	virtual bool OnAccept( boost::shared_ptr< Connection > connection, const std::string & host, uint16_t port ) = 0;

	// Called on each timer event.
	virtual void OnTimer( const boost::posix_time::time_duration & delta ) = 0;

	// Called when an error is encountered. Most typically, this is when the
	// acceptor is being closed via the Stop function or if the Listen is
	// called on an address that is not available.
	virtual void OnError( const boost::system::error_code & error ) = 0;

public:
	// Returns the Hive object.
	boost::shared_ptr< Hive > GetHive();

	// Returns the acceptor object.
	boost::asio::ip::tcp::acceptor & GetAcceptor();

	// Returns the strand object.
	boost::asio::strand & GetStrand();

	// Sets the timer interval of the object. The interval is changed after
	// the next update is called. The default value is 1000 ms.
	void SetTimerInterval( int32_t timer_interval_ms );

	// Returns the timer interval of the object.
	int32_t GetTimerInterval() const;

	// Returns true if this object has an error associated with it.
	bool HasError();

public:
	// Begin listening on the specific network interface.
	void Listen( const std::string & host, const uint16_t & port );

	// Posts the connection to the listening interface. The next client that
	// connections will be given this connection. If multiple calls to Accept
	// are called at a time, then they are accepted in a FIFO order.
	void Accept( boost::shared_ptr< Connection > connection );

	// Stop the Acceptor from listening.
	void Stop();
};

//-----------------------------------------------------------------------------

class Hive : public boost::enable_shared_from_this< Hive >
{
private:
	boost::asio::io_service m_io_service;
	boost::shared_ptr< boost::asio::io_service::work > m_work_ptr;
	volatile uint32_t m_shutdown;

private:
	Hive( const Hive & rhs );
	Hive & operator =( const Hive & rhs );

public:
	Hive();
	virtual ~Hive();

	// Returns the io_service of this object.
	boost::asio::io_service & GetService();

	// Returns true if the Stop function has been called.
	bool HasStopped();

	// Polls the networking subsystem once from the current thread and
	// returns.
	void Poll();

	// Runs the networking system on the current thread. This function blocks
	// until the networking system is stopped, so do not call on a single
	// threaded application with no other means of being able to call Stop
	// unless you code in such logic.
	void Run();

	// Stops the networking system. All work is finished and no more
	// networking interactions will be possible afterwards until Reset is called.
	void Stop();

	// Restarts the networking system after Stop as been called. A new work
	// object is created ad the shutdown flag is cleared.
	void Reset();
};

//-----------------------------------------------------------------------------

#endif

  

#include "network.h"
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/interprocess/detail/atomic.hpp>

//-----------------------------------------------------------------------------

Hive::Hive()
: m_work_ptr( new boost::asio::io_service::work( m_io_service ) ), m_shutdown( 0 )
{
}

Hive::~Hive()
{
}

boost::asio::io_service & Hive::GetService()
{
	return m_io_service;
}

bool Hive::HasStopped()
{
	return ( boost::interprocess::detail::atomic_cas32( &m_shutdown, 1, 1 ) == 1 );
}

void Hive::Poll()
{
	m_io_service.poll();
}

void Hive::Run()
{
	m_io_service.run();
}

void Hive::Stop()
{
	if( boost::interprocess::detail::atomic_cas32( &m_shutdown, 1, 0 ) == 0 )
	{
		m_work_ptr.reset();
		m_io_service.run();
		m_io_service.stop();
	}
}

void Hive::Reset()
{
	if( boost::interprocess::detail::atomic_cas32( &m_shutdown, 0, 1 ) == 1 )
	{
		m_io_service.reset();
		m_work_ptr.reset( new boost::asio::io_service::work( m_io_service ) );
	}
}

//-----------------------------------------------------------------------------

Acceptor::Acceptor( boost::shared_ptr< Hive > hive )
: m_hive( hive ), m_acceptor( hive->GetService() ), m_io_strand( hive->GetService() ), m_timer( hive->GetService() ), m_timer_interval( 1000 ), m_error_state( 0 )
{
}

Acceptor::~Acceptor()
{
}

void Acceptor::StartTimer()
{
	m_last_time = boost::posix_time::microsec_clock::local_time();
	m_timer.expires_from_now( boost::posix_time::milliseconds( m_timer_interval ) );
	m_timer.async_wait( m_io_strand.wrap( boost::bind( &Acceptor::HandleTimer, shared_from_this(), _1 ) ) );
}

void Acceptor::StartError( const boost::system::error_code & error )
{
	if( boost::interprocess::detail::atomic_cas32( &m_error_state, 1, 0 ) == 0 )
	{
		boost::system::error_code ec;
		m_acceptor.cancel( ec );
		m_acceptor.close( ec );
		m_timer.cancel( ec );
		OnError( error );
	}
}

void Acceptor::DispatchAccept( boost::shared_ptr< Connection > connection )
{
	m_acceptor.async_accept( connection->GetSocket(), connection->GetStrand().wrap( boost::bind( &Acceptor::HandleAccept, shared_from_this(), _1, connection ) ) );
}

void Acceptor::HandleTimer( const boost::system::error_code & error )
{
	if( error || HasError() || m_hive->HasStopped() )
	{
		StartError( error );
	}
	else
	{
		OnTimer( boost::posix_time::microsec_clock::local_time() - m_last_time );
		StartTimer();
	}
}

void Acceptor::HandleAccept( const boost::system::error_code & error, boost::shared_ptr< Connection > connection )
{
	if( error || HasError() || m_hive->HasStopped() )
	{
		connection->StartError( error );
	}
	else
	{
		if( connection->GetSocket().is_open() )
		{
			connection->StartTimer();
			if( OnAccept( connection, connection->GetSocket().remote_endpoint().address().to_string(), connection->GetSocket().remote_endpoint().port() ) )
			{
				connection->OnAccept( m_acceptor.local_endpoint().address().to_string(), m_acceptor.local_endpoint().port() );
			}
		}
		else
		{
			StartError( error );
		}
	}
}

void Acceptor::Stop()
{
	m_io_strand.post( boost::bind( &Acceptor::HandleTimer, shared_from_this(), boost::asio::error::connection_reset ) );
}

void Acceptor::Accept( boost::shared_ptr< Connection > connection )
{
	m_io_strand.post( boost::bind( &Acceptor::DispatchAccept, shared_from_this(), connection ) );
}

void Acceptor::Listen( const std::string & host, const uint16_t & port )
{
	boost::asio::ip::tcp::resolver resolver( m_hive->GetService() );
	boost::asio::ip::tcp::resolver::query query( host, boost::lexical_cast< std::string >( port ) );
	boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve( query );
	m_acceptor.open( endpoint.protocol() );
	m_acceptor.set_option( boost::asio::ip::tcp::acceptor::reuse_address( false ) );
	m_acceptor.bind( endpoint );
	m_acceptor.listen( boost::asio::socket_base::max_connections );
	StartTimer();
}

boost::shared_ptr< Hive > Acceptor::GetHive()
{
	return m_hive;
}

boost::asio::ip::tcp::acceptor & Acceptor::GetAcceptor()
{
	return m_acceptor;
}

int32_t Acceptor::GetTimerInterval() const
{
	return m_timer_interval;
}

void Acceptor::SetTimerInterval( int32_t timer_interval )
{
	m_timer_interval = timer_interval;
}

bool Acceptor::HasError()
{
	return ( boost::interprocess::detail::atomic_cas32( &m_error_state, 1, 1 ) == 1 );
}

//-----------------------------------------------------------------------------

Connection::Connection( boost::shared_ptr< Hive > hive )
: m_hive( hive ), m_socket( hive->GetService() ), m_io_strand( hive->GetService() ), m_timer( hive->GetService() ), m_receive_buffer_size( 4096 ), m_timer_interval( 1000 ), m_error_state( 0 )
{
}

Connection::~Connection()
{
}

void Connection::Bind( const std::string & ip, uint16_t port )
{
	boost::asio::ip::tcp::endpoint endpoint( boost::asio::ip::address::from_string( ip ), port );
	m_socket.open( endpoint.protocol() );
	m_socket.set_option( boost::asio::ip::tcp::acceptor::reuse_address( false ) );
	m_socket.bind( endpoint );
}

void Connection::StartSend()
{
	if( !m_pending_sends.empty() )
	{
		boost::asio::async_write( m_socket, boost::asio::buffer( m_pending_sends.front() ), m_io_strand.wrap( boost::bind( &Connection::HandleSend, shared_from_this(), boost::asio::placeholders::error, m_pending_sends.begin() ) ) );
	}
}

void Connection::StartRecv( int32_t total_bytes )
{
	if( total_bytes > 0 )
	{
		m_recv_buffer.resize( total_bytes );
		boost::asio::async_read( m_socket, boost::asio::buffer( m_recv_buffer ), m_io_strand.wrap( boost::bind( &Connection::HandleRecv, shared_from_this(), _1, _2 ) ) );
	}
	else
	{
		m_recv_buffer.resize( m_receive_buffer_size );
		m_socket.async_read_some( boost::asio::buffer( m_recv_buffer ), m_io_strand.wrap( boost::bind( &Connection::HandleRecv, shared_from_this(), _1, _2 ) ) );
	}
}

void Connection::StartTimer()
{
	m_last_time = boost::posix_time::microsec_clock::local_time();
	m_timer.expires_from_now( boost::posix_time::milliseconds( m_timer_interval ) );
	m_timer.async_wait( m_io_strand.wrap( boost::bind( &Connection::DispatchTimer, shared_from_this(), _1 ) ) );
}

void Connection::StartError( const boost::system::error_code & error )
{
	if( boost::interprocess::detail::atomic_cas32( &m_error_state, 1, 0 ) == 0 )
	{
		boost::system::error_code ec;
		m_socket.shutdown( boost::asio::ip::tcp::socket::shutdown_both, ec );
		m_socket.close( ec );
		m_timer.cancel( ec );
		OnError( error );
	}
}

void Connection::HandleConnect( const boost::system::error_code & error )
{
	if( error || HasError() || m_hive->HasStopped() )
	{
		StartError( error );
	}
	else
	{
		if( m_socket.is_open() )
		{
			OnConnect( m_socket.remote_endpoint().address().to_string(), m_socket.remote_endpoint().port() );
		}
		else
		{
			StartError( error );
		}
	}
}

void Connection::HandleSend( const boost::system::error_code & error, std::list< std::vector< uint8_t > >::iterator itr )
{
	if( error || HasError() || m_hive->HasStopped() )
	{
		StartError( error );
	}
	else
	{
		OnSend( *itr );
		m_pending_sends.erase( itr );
		StartSend();
	}
}

void Connection::HandleRecv( const boost::system::error_code & error, int32_t actual_bytes )
{
	if( error || HasError() || m_hive->HasStopped() )
	{
		StartError( error );
	}
	else
	{
		m_recv_buffer.resize( actual_bytes );
		OnRecv( m_recv_buffer );
		m_pending_recvs.pop_front();
		if( !m_pending_recvs.empty() )
		{
			StartRecv( m_pending_recvs.front() );
		}
	}
}

void Connection::HandleTimer( const boost::system::error_code & error )
{
	if( error || HasError() || m_hive->HasStopped() )
	{
		StartError( error );
	}
	else
	{
		OnTimer( boost::posix_time::microsec_clock::local_time() - m_last_time );
		StartTimer();
	}
}

void Connection::DispatchSend( std::vector< uint8_t > buffer )
{
	bool should_start_send = m_pending_sends.empty();
	m_pending_sends.push_back( buffer );
	if( should_start_send )
	{
		StartSend();
	}
}

void Connection::DispatchRecv( int32_t total_bytes )
{
	bool should_start_receive = m_pending_recvs.empty();
	m_pending_recvs.push_back( total_bytes );
	if( should_start_receive )
	{
		StartRecv( total_bytes );
	}
}

void Connection::DispatchTimer( const boost::system::error_code & error )
{
	m_io_strand.post( boost::bind( &Connection::HandleTimer, shared_from_this(), error ) );
}

void Connection::Connect( const std::string & host, uint16_t port)
{
	boost::system::error_code ec;
	boost::asio::ip::tcp::resolver resolver( m_hive->GetService() );
	boost::asio::ip::tcp::resolver::query query( host, boost::lexical_cast< std::string >( port ) );
	boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve( query );
	m_socket.async_connect( *iterator, m_io_strand.wrap( boost::bind( &Connection::HandleConnect, shared_from_this(), _1 ) ) );
	StartTimer();
}

void Connection::Disconnect()
{
	m_io_strand.post( boost::bind( &Connection::HandleTimer, shared_from_this(), boost::asio::error::connection_reset ) );
}

void Connection::Recv( int32_t total_bytes )
{
	m_io_strand.post( boost::bind( &Connection::DispatchRecv, shared_from_this(), total_bytes ) );
}

void Connection::Send( const std::vector< uint8_t > & buffer )
{
	m_io_strand.post( boost::bind( &Connection::DispatchSend, shared_from_this(), buffer ) );
}

boost::asio::ip::tcp::socket & Connection::GetSocket()
{
	return m_socket;
}

boost::asio::strand & Connection::GetStrand()
{
	return m_io_strand;
}

boost::shared_ptr< Hive > Connection::GetHive()
{
	return m_hive;
}

void Connection::SetReceiveBufferSize( int32_t size )
{
	m_receive_buffer_size = size;
}

int32_t Connection::GetReceiveBufferSize() const
{
	return m_receive_buffer_size;
}

int32_t Connection::GetTimerInterval() const
{
	return m_timer_interval;
}

void Connection::SetTimerInterval( int32_t timer_interval )
{
	m_timer_interval = timer_interval;
}

bool Connection::HasError()
{
	return ( boost::interprocess::detail::atomic_cas32( &m_error_state, 1, 1 ) == 1 );
}

//-----------------------------------------------------------------------------

  

网络库尝试为可简便实现的客户服务端提供一个线程安全可扩展的封装。用户能够从基本连接继承定制类,连接器等。下面这个例子展示
了如何使用封装。
第一个例子我们将学习如何使用封装设置类。例子与上面的例子很类似,简单的回应所有连接。

#include "network.h"
#include <conio.h>
#include <boost/thread/mutex.hpp>

boost::mutex global_stream_lock;

class MyConnection : public Connection
{
private:

private:
	void OnAccept( const std::string & host, uint16_t port )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << host << ":" << port << std::endl;
		global_stream_lock.unlock();

		Recv();
	}

	void OnConnect( const std::string & host, uint16_t port )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << host << ":" << port << std::endl;
		global_stream_lock.unlock();

		Recv();
	}

	void OnSend( const std::vector< uint8_t > & buffer )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << buffer.size() << " bytes" << std::endl;
		for( size_t x = 0; x < buffer.size(); ++x )
		{
			std::cout << std::hex << std::setfill( ‘0‘ ) <<
				std::setw( 2 ) << (int)buffer[ x ] << " ";
			if( ( x + 1 ) % 16 == 0 )
			{
				std::cout << std::endl;
			}
		}
		std::cout << std::endl;
		global_stream_lock.unlock();
	}

	void OnRecv( std::vector< uint8_t > & buffer )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << buffer.size() << " bytes" << std::endl;
		for( size_t x = 0; x < buffer.size(); ++x )
		{
			std::cout << std::hex << std::setfill( ‘0‘ ) <<
				std::setw( 2 ) << (int)buffer[ x ] << " ";
			if( ( x + 1 ) % 16 == 0 )
			{
				std::cout << std::endl;
			}
		}
		std::cout << std::endl;
		global_stream_lock.unlock();

		// Start the next receive
		Recv();

		// Echo the data back
		Send( buffer );
	}

	void OnTimer( const boost::posix_time::time_duration & delta )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << delta << std::endl;
		global_stream_lock.unlock();
	}

	void OnError( const boost::system::error_code & error )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << error << std::endl;
		global_stream_lock.unlock();
	}

public:
	MyConnection( boost::shared_ptr< Hive > hive )
		: Connection( hive )
	{
	}

	~MyConnection()
	{
	}
};

class MyAcceptor : public Acceptor
{
private:

private:
	bool OnAccept( boost::shared_ptr< Connection > connection, const std::string & host, uint16_t port )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << host << ":" << port << std::endl;
		global_stream_lock.unlock();

		return true;
	}

	void OnTimer( const boost::posix_time::time_duration & delta )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << delta << std::endl;
		global_stream_lock.unlock();
	}

	void OnError( const boost::system::error_code & error )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << error << std::endl;
		global_stream_lock.unlock();
	}

public:
	MyAcceptor( boost::shared_ptr< Hive > hive )
		: Acceptor( hive )
	{
	}

	~MyAcceptor()
	{
	}
};

int main( int argc, char * argv[] )
{
	boost::shared_ptr< Hive > hive( new Hive() );

	boost::shared_ptr< MyAcceptor > acceptor( new MyAcceptor( hive ) );
	acceptor->Listen( "127.0.0.1", 7777 );

	boost::shared_ptr< MyConnection > connection( new MyConnection( hive ) );
	acceptor->Accept( connection );

	while( !_kbhit() )
	{
		hive->Poll();
		Sleep( 1 );
	}

	hive->Stop();

	return 0;
}

  

由于使用了封装,代码很直观。所有socket管理细节都被隐藏,我们只需要注意框架逻辑。这个例子中,我们不在使用worker线程,但是
提供了同样的功能。服务器运行正常,现在来查看客户端。

#include "network.h"
#include <conio.h>
#include <boost/thread/mutex.hpp>

boost::mutex global_stream_lock;

class MyConnection : public Connection
{
private:

private:
	void OnAccept( const std::string & host, uint16_t port )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << host << ":" << port << std::endl;
		global_stream_lock.unlock();

		// Start the next receive
		Recv();
	}

	void OnConnect( const std::string & host, uint16_t port )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << host << ":" << port << std::endl;
		global_stream_lock.unlock();

		// Start the next receive
		Recv();

		std::string str = "GET / HTTP/1.0\r\n\r\n";

		std::vector< uint8_t > request;
		std::copy( str.begin(), str.end(), std::back_inserter( request ) );
		Send( request );
	}

	void OnSend( const std::vector< uint8_t > & buffer )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << buffer.size() << " bytes" << std::endl;
		for( size_t x = 0; x < buffer.size(); ++x )
		{
			std::cout << std::hex << std::setfill( ‘0‘ ) <<
				std::setw( 2 ) << (int)buffer[ x ] << " ";
			if( ( x + 1 ) % 16 == 0 )
			{
				std::cout << std::endl;
			}
		}
		std::cout << std::endl;
		global_stream_lock.unlock();
	}

	void OnRecv( std::vector< uint8_t > & buffer )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << buffer.size() << " bytes" << std::endl;
		for( size_t x = 0; x < buffer.size(); ++x )
		{
			std::cout << std::hex << std::setfill( ‘0‘ ) <<
				std::setw( 2 ) << (int)buffer[ x ] << " ";
			if( ( x + 1 ) % 16 == 0 )
			{
				std::cout << std::endl;
			}
		}
		std::cout << std::endl;
		global_stream_lock.unlock();

		// Start the next receive
		Recv();
	}

	void OnTimer( const boost::posix_time::time_duration & delta )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << delta << std::endl;
		global_stream_lock.unlock();
	}

	void OnError( const boost::system::error_code & error )
	{
		global_stream_lock.lock();
		std::cout << "[" << __FUNCTION__ << "] " << error << std::endl;
		global_stream_lock.unlock();
	}

public:
	MyConnection( boost::shared_ptr< Hive > hive )
		: Connection( hive )
	{
	}

	~MyConnection()
	{
	}
};

int main( int argc, char * argv[] )
{
	boost::shared_ptr< Hive > hive( new Hive() );

	boost::shared_ptr< MyConnection > connection( new MyConnection( hive ) );
	connection->Connect( "www.google.com", 80 );

	while( !_kbhit() )
	{
		hive->Poll();
		Sleep( 1 );
	}

	hive->Stop();

	return 0;
}

  

客户端代码简单的传送一个http GET协议到谷歌并且在命令行客户端以16进制格式输出。它的可重用的困难度度没有超过服务器,这意味
着服务端和客户端编程没有根本的不同。

通过网络封装,我们简化了很多工作。有很多特定网络封装的设计影响需要被注意。首先对于服务器,没有使用容器存储每个连接的概念

。这么做是因为如果存储所有连接到容器内,终端用户必须锁住容器来同步访问或者异步实现添加移除连接。总而言之,这是需要终端用

户实现的行为。不是所有网络程序必须同一时间处理所有连接,所以封装是最通用的办法。

接下来,所有连接的交互式通过一个专属的strand。strand对象运行事件按序列执行。那么我们就不必明确的锁定连接和事件,因为事件

将并发而无论线程的数量。如果用户使用了定制方案,那么用户需要按照同样的设计来实现自己的逻辑保证线程安全。

简单接收发送buffer逻辑是通过vector和list来实现的。任何有定制内存的需要将有自己特定的系统来进行处理,所以他们需要修改代码。对于简单的程序,提供系统已经足够处理,

最后封装设计并不适用每个人。这里仅仅是一个BOOST::ASIO的例子可能的扩展。请给予你的想法并且定制你的需求,最重要的一点就是熟悉你的boost::asio库

时间: 2024-10-06 09:48:55

boost asio 学习(九) boost::asio 网络封装的相关文章

boost asio 学习(七) 网络基础 连接器和接收器(TCP示例)

http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting- started-with-boostasio?pg=8 7. Networking basics: connectors and acceptors (TCP)我们来学习boost的TCP网络编程.之前的篇章已经介绍了网络系统框架.我们只需要学习网络API函数即可 我们首先学习如何同步的连接主机.我们的代码作为客户端运行,使用tcp::socket对象.tcp::s

boost asio 学习(六) 定时器

http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting- started-with-boostasio?pg=7 6 定时器 boost::asio 提供了一个 deadline_timer class来提供同步与异步的接口. BOOST文档提供了一组优秀示例.第一个例子,将创建一个间隔5秒的定时器. #include <boost/asio.hpp> #include <boost/shared_ptr.hpp&

BOOST ASIO 学习专贴

1.同步使用Timer 本便使用了boost::asio::deadline_timer,这个timer有两种状态:过期和不过期.wait函数调用一个过期的timer直接返回. int _tmain(int argc, _TCHAR* argv[]) { boost::asio::io_service io; boost::asio::deadline_timer t(io,boost::posix_time::seconds(5)); t.wait(); std::cout<<"w

boost asio 学习(五) 错误处理

http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting-started-with-boostasio?pg=6 5. Error handling 接下来我们需要注意的话题是错误处理.换句话说就是函数抛出异常时发生了什么 Boost::asio 给予用户两种选择来处理.错误通过handler传播,指出线程呼叫run或者poll系列函数的位置.用户可以能处理通过异常抛出的状态或者是接收返回的错误变量.更多关于BOOST的信息

boost asio 学习(三)

http://www.gamedev.net/blog/950/entry-2249317-a-guide-to-getting-started-with-boostasio?pg=4 本章节为io_service添加任务,并且区分dispatch与post的区别.如果说io_service是asio库的大脑,那么post与dispatch就是asio库的手和脚. 先看看示例1 #include <boost/asio.hpp> #include <boost/shared_ptr.hp

BOOST中read_some和 boost::asio::error::eof(2)错误

当socket读写完成调用回调函数时候一定要检查 是不是有EOF错误,如果有那么好了,另一方已经断开连接了别无选择,你也断开把. for (;;) { boost::array < char, 128 > buf; boost::system::error_code error; size_t len = socket.read_some(boost::asio::buffer(buf), boost::asio::assign_error(error)); //当服务器关闭连接时,boost

boost 1.59之 boost::asio::ssl (5)

下面是boost官方给出的ssl服务端例子: 首先,声明了一个流类型的socekt 用来作为socket对象: 1: typedef boost::asio::ssl::stream<boost::asio::ip::tcp::socket> ssl_socket; 接下来是session类: 1: class session 2: { 3: public: 4: session(boost::asio::io_service& io_service, 5: boost::asio::

基于ASIO的协程与网络编程

协程 协程,即协作式程序,其思想是,一系列互相依赖的协程间依次使用CPU,每次只有一个协程工作,而其他协程处于休眠状态.协程可以在运行期间的某个点上暂停执行,并在恢复运行时从暂停的点上继续执行. 协程已经被证明是一种非常有用的程序组件,不仅被python.lua.ruby等脚本语言广泛采用,而且被新一代面向多核的编程语言如golang rust-lang等采用作为并发的基本单位. 协程可以被认为是一种用户空间线程,与传统的线程相比,有2个主要的优点: 与线程不同,协程是自己主动让出CPU,并交付

boost库学习随记六:使用同步定时器、异步定时器、bind、成员函数回调处理、多线程的同步处理示例等

一.使用同步定时器 这个示例程序通过展示如何在一个定时器执行一个阻塞等待. [cpp] view plaincopy //makefile #---------------------------------------------------------- #makefile helloworld测试用例 # # # # #----------------------------------------------------------- ggg=g++ exe=asiotimer #所有的