5 ACE acceptor connector Proactor异步框架



ACE_Acceptor_Connector框架 完成accpet操作


[email protected]:~/ace/AceAcceptorConnector$ cat echo_server.cpp 
#include <ace/Svc_Handler.h>
#include <ace/SOCK_Stream.h>
#include <ace/SOCK_Acceptor.h>
#include <ace/Acceptor.h>

class AcceptorHandler: public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_NULL_SYNCH> {
public:
	typedef ACE_Svc_Handler<ACE_SOCK_Stream, ACE_NULL_SYNCH> Parent;
	enum {
		BUF_SIME = 512
	};

	virtual int handle_input(ACE_HANDLE h) 
	{
		//ACE_OS::sleep(5);
		ssize_t n = peer().recv(buf, BUF_SIME);
		if (n <= 0)
			ACE_ERROR_RETURN((LM_ERROR, "%p\n", "peer().recv()"), -1);
		if (peer().send(buf, n) == -1)
			ACE_ERROR_RETURN((LM_ERROR, "%p\n", "peer().send()"), -1);
		return 0;
	}
private:
		char buf[BUF_SIME];
};

typedef ACE_Acceptor<AcceptorHandler, ACE_SOCK_Acceptor> MyAcceptor;

int main() {
	ACE_INET_Addr addr(8868);
	MyAcceptor acceptor(addr, ACE_Reactor::instance());
	ACE_Reactor::instance()->run_reactor_event_loop();
}
[email protected]:~/ace/AceAcceptorConnector$ 

编译运行:
[email protected]:~/ace/AceAcceptorConnector$ g++ echo_server.cpp -lACE -lpthread && ./a.out 

[email protected]:~/ace/AceTask$ nc localhost 8868
121
121

[email protected]:~$  nc localhost 8868
3213
3213
32131
32131



ACE_Acceptor_Connector框架 完成connector操作

[email protected]:~/ace/AceAcceptorConnector$ cat echo_client.cpp 
#include <iostream>
#include <ace/Svc_Handler.h>
#include <ace/SOCK_Stream.h>
#include <ace/SOCK_Connector.h>
#include <ace/Connector.h>

class InputHandler: public ACE_Svc_Handler<ACE_SOCK_Stream, ACE_NULL_SYNCH> 
{
public:
	typedef ACE_Svc_Handler<ACE_SOCK_Stream, ACE_NULL_SYNCH> Parent;
	enum {
		BUF_SIME = 512
	};
	virtual int open(void* a) 
	{
		if (Parent::open(a) == -1)
			return -1;
		return this->activate(THR_NEW_LWP | THR_DETACHED);
	}
	virtual int handle_input(ACE_HANDLE) //输出收到的数据
	{
		ssize_t n = peer().recv(buf, BUF_SIME);
		if (n <= 0)
			ACE_ERROR_RETURN((LM_ERROR, "%p\n","peer().recv()"), -1);
		buf[n] = 0;
		ACE_DEBUG((LM_DEBUG, "%s\n", buf));
		return 0;
	}
	virtual int svc() //从键盘读取数据
	{
		char inBuf[BUF_SIME] = "";
		while (std::cin.getline(inBuf, BUF_SIME)) 
		{
			if (peer().send(inBuf, strlen(inBuf)) == -1) 
			{
				ACE_ERROR((LM_ERROR, "%p\n", "peer().send()"));
				break;
			}
		}
		return 0;
	}

private:
		char buf[BUF_SIME];
};

typedef ACE_Connector<InputHandler, ACE_SOCK_Connector> MyConnector;

int main() 
{
	ACE_INET_Addr addr(8868, "127.0.0.1");
	MyConnector connector;

	InputHandler* p = 0;//由connector 创建 InputHandler
	if (connector.connect(p, addr) == -1)
		ACE_ERROR_RETURN((LM_ERROR, "%p\n", "connect()"), -1);

	ACE_Reactor::instance()->run_reactor_event_loop();
}
[email protected]:~/ace/AceAcceptorConnector$ 

编译运行:
[email protected]:~/ace/AceAcceptorConnector$ g++ echo_client.cpp -lACE -lpthread && ./a.out 
1234567890
qazwsx

中国上海!

Hello ACE 

^C
[email protected]:~/ace/AceAcceptorConnector$ 

模拟一个服务器:
[email protected]:~/ace$ nc -l localhost 8868
1234567890qazwsx
中国上海!
Hello ACE 
[email protected]:~/ace$

proactor 异步echo server

[email protected]:~/ace/AceProactor$ cat proactor_echo_server.cpp 
#include "ace/Asynch_IO.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/INET_Addr.h"
#include "ace/Proactor.h"

class EchoService: public ACE_Service_Handler 
{
public:
	~EchoService() 
	{
		if (this->handle() != ACE_INVALID_HANDLE)
			ACE_OS::closesocket(this->handle());
	}
	virtual void open(ACE_HANDLE h, ACE_Message_Block&) 
	{
		handle(h);
		if (this->reader_.open(*this) != 0 || this->writer_.open(*this) != 0) 
		{
			ACE_ERROR((LM_ERROR, "%p\n", "open()"));
			delete this;
			return;
		}

		ACE_Message_Block* mb;
		ACE_NEW_NORETURN(mb, ACE_Message_Block(512));
		if (this->reader_.read(*mb, mb->space()) != 0) 
		{
			ACE_ERROR((LM_ERROR, "%p\n", "read()"));
			mb->release();
			delete this;
			return;
		}
	}
	virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result& result) 
	{
		ACE_Message_Block &mb = result.message_block();
		if (!result.success() || result.bytes_transferred() == 0) 
		{
			mb.release();
			delete this;
		} 
		else 
		{
			if (this->writer_.write(mb, mb.length()) != 0) 
			{
				ACE_ERROR((LM_ERROR, "%p\n", "write()"));
				mb.release();
			} 
			else 
			{
				ACE_Message_Block* mblk;
				ACE_NEW_NORETURN(mblk, ACE_Message_Block(512));
				this->reader_.read(*mblk, mblk->space());
			}
		}
	}
	virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result& result) 
	{
		result.message_block().release();
	}

private:
	ACE_Asynch_Read_Stream reader_;
	ACE_Asynch_Write_Stream writer_;
};

int main() 
{
	ACE_INET_Addr listen_addr(8868);
	ACE_Asynch_Acceptor<EchoService> aio_acceptor;
	if (0 != aio_acceptor.open(listen_addr, 
				0, // bytes_to_read
				0, // pass_addresses
				ACE_DEFAULT_BACKLOG, 1, // reuse_addr
				0, // proactor
				0)) // validate_new_connection
		ACE_ERROR_RETURN((LM_ERROR, "%p\n", "write()"), 1);

	ACE_Proactor::instance()->proactor_run_event_loop();
}
[email protected]:~/ace/AceProactor$ 

编译运行:
[email protected]:~/ace/AceProactor$ g++ proactor_echo_server.cpp -lACE && ./a.out 
(2710 | 140613802981248) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=1024
^C
[email protected]:~/ace/AceProactor$ 

客户端测试:
[email protected]:~/ace$ nc  localhost 8868
123
123
34567
34567
^C
[email protected]:~/ace$






时间: 2025-01-02 01:21:29

5 ACE acceptor connector Proactor异步框架的相关文章

ACE Acceptor Connector模式

在指定端口监听 下面的程序会在7905端口持续监听,其他的什么也不做 #include <iostream> using namespace std; #include "ace/INET_Addr.h" #include "ace/Svc_Handler.h" #include "ace/SOCK_Stream.h" #include "ace/SOCK_Acceptor.h" #include "ac

基于SEDA的异步框架设计与实现

基于SEDA的异步框架设计与实现 二.为什么使用SEDA 目前,面对并发环境,主流互联网服务器编程模型有两种:多线程模型以及事件驱动模型.但是这两个模型都不足以解决这个问题.我们来首先看一下这两种编程模型. 1.多线程并发模型 多线程并发模型是目前最普遍的服务器编程模型,该模型的架构如下图所示:        该模型针对每一个请求,会为其创建并分配一个线程.该线程负责这个请求的处理.该模型的优点:执行粒度是整个完整的处理流程.处理逻辑清晰,容易开发.但与此同时缺点也很明显:如果处理过程中某一步骤

无废话Android之内容观察者ContentObserver、获取和保存系统的联系人信息、网络图片查看器、网络html查看器、使用异步框架Android-Async-Http(4)

1.内容观察者ContentObserver 如果ContentProvider的访问者需要知道ContentProvider中的数据发生了变化,可以在ContentProvider 发生数据变化时调用getContentResolver().notifyChange(uri, null)来通知注册在此URI上的访问者,例子如下: private static final Uri URI = Uri.parse("content://person.db"); public class

jQuery异步框架探究1:jQuery._Deferred方法

jQuery异步框架应用于jQuery数据缓存模块.jQuery ajax模块.jQuery事件绑定模块等多个模块,是jQuery的基础功能之一.实际上是jQuery实现的一个异步处理框架,从本质上讲与java aio没有区别,所以需要从更抽象层面的"异步处理"的视角分析解读该模块.这个部分与dom功能关系不大,是独立部分,可以看作是jQuery工具系列之一. 与异步框架相关的方法定义于jQuery类的静态方法中.只有三个方法,但是功能和应用及其强大!本篇详细讲解第一个方法jQuery

几个常用的异步框架和网络访问框架区分对比

Part1: 由于在我们的程序中,不允许一些耗时的任务在主线程中出现,主要是为了防止阻塞主线程而导致的 Anr(Application not Responding),一些耗时任务主要包括: 网络访问,缓慢的磁盘操作,比较耗时的算法 当我们的主线程在一定时间里对某一事件的处理超过一定时间后会主线程会崩溃报ANR, 通常的解决方案:采用子线程技术来将耗时任务与主线程进行脱离 1.handler机制 只需要将UI更新参数在子线程中使用sendMessage发送到定义好的Handler里的handle

Android异步框架RxJava 1.x系列(一) - 观察者模式及实现

Android异步框架RxJava 1.x系列(一) - 观察者模式及实现 前言 RxJava 是一款基于 Java VM 实现的响应式编程扩展库 - 基于观察者模式的异步和事件处理框架.RxJava 官方目前同时维护了两个版本,分别是 1.x 和 2.x,区别是它们使用不同的 group id 和 namespaces. 版本 group id namespaces v1.x io.reactivex io.reactivex v2.x io.reactivex.rxjava2 rx 本系列的

Tornado中异步框架的使用

tornado的同步框架与其他web框架相同都是处理先来的请求,如果先来的请求阻塞,那么后面的请求也会处理不了.一直处于等待过程中.但是请求一旦得到响应,那么: 请求发送过来后,将需要的本站资源直接返回给客户端 请求发送过来后,本站没有需要的资源,从其它站点获取过来,再返回给客户端 一.Tornado中的同步框架 1.本站资源直接返回 import tornado.web import time class LoginHandler(tornado.web.RequestHandler): de

Android异步框架RxJava 1.x系列(三) - 线程调度器Scheduler

前言 RxJava 事件的发出和消费都在同一个线程,基于同步的观察者模式.观察者模式的核心是后台处理,前台回调的异步机制.要实现异步,需要引入 RxJava 的另一个概念 - 线程调度器 Scheduler. 正文 在不指定线程的情况下,RxJava 遵循的是线程不变的原则.即在哪个线程调用 subscribe() 方法,就在哪个线程生产事件:在哪个线程生产事件,就在哪个线程消费事件.如果需要切换线程,就需要用到线程调度器 Scheduler. 1. 几种Scheduler介绍 在 RxJava

Android异步框架RxJava 1.x系列(二) - 事件及事件序列转换原理

前言 在介绍 RxJava 1.x 线程调度器之前,首先引入一个重要的概念 - 事件序列转换.RxJava 提供了对事件序列进行转换的支持,这是它的核心功能之一. 正文 1. 事件序列转换定义 所谓转换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列,有点类似 Java 1.8 中的流处理. 2. 事件序列转换API 首先看一个 map() 的例子: Observable.just("images/logo.png") // 输入类型 String .map(