ACE_Reactor的notify阻塞问题

今天听到一种说法:

ACE_Reactor的notify可能会发生阻塞。windwos与linux的消息队列满了之后默认会阻塞掉。linux可以设置成异步的,但是notify队列满了之后,无论异步还是阻塞,新来的信号都会被丢失。

信号队列长度,linux下与文件句柄数一样。

今天再windwos上测试,当信号多余1023个时,notify就会阻塞。

linux下待测试……

windows测试部分代码:

// t4l.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include "ace/ACE.h"
#include "ace/Reactor.h"
#include "ace/Task_Ex_T.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/Sock_Connect.h"
#include "ace/SOCK_Connector.h"
#include "ace/Connector.h"
#include "ace/Svc_Handler.h"
#include "ace/OS.h"
#include "rapidjson/document.h"
#include "rapidjson/prettywriter.h"
#include "rapidjson/stringbuffer.h"

class TestEvent
    :public ACE_Task_Ex<ACE_MT_SYNCH, TestEvent>
{
public:
      virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE)
      {
          printf("handle_input\n");
          return 0;
      }

      int putq()
      {
          ar_->notify(this);
          return 0;
      }
      virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE)
      {
          printf("handle_output\n");
          return 0;
      }

      virtual int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE)
      {
          size_ ++;
           printf("handle_exception:%d\n", size_);
        if (is_sleep_)
        {
            ACE_OS::sleep(30);
            is_sleep_ = false;
        }

         ar_->remove_handler(this, ACE_Event_Handler::EXCEPT_MASK);
         return 0;
      }

  virtual int handle_timeout (const ACE_Time_Value &current_time,
      const void *act = 0)
  {
      ACE_Time_Value atv(1,0);
      ACE_OS::sleep(atv);
      //ar_->remove_handler(this, ACE_Event_Handler::EXCEPT_MASK);
      ar_->cancel_timer(this);
      printf("handle_timeout\n");
      return 0;
  }
  virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
  {
      printf("handle_close\n");
      return 0;
  }
  int open()
  {
      size_ = 0;
      is_sleep_ = true;
      ar_ = ACE_Reactor::instance();
      ar_->register_handler(this,ACE_Event_Handler::EXCEPT_MASK);
      activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED ,
          1,0,ACE_DEFAULT_THREAD_PRIORITY);
      //ACE_OS::sleep(1);
      for (int i= 0; i < 8900; i ++)
      {
          // ACE_OS::sleep(1);
          printf("opened:%d\n",i);

          putq();
      }
      printf("opened\n");
      ACE_Time_Value atv(1,0);
      ACE_Reactor::instance()->schedule_timer(this, NULL, atv, atv);
      printf("putq leave");
      return 0;
  }
  int svc()
  {
      ACE_Time_Value tv = ACE_Time_Value(1,0);
        ar_->owner(ACE_Thread::self());
        //ar_->run_reactor_event_loop();
        while (1)
        {
    //        ACE_Reactor::instance()->event_loop_done();
            ar_->handle_events(&tv);
        }
    return 0;
  }
private:
    ACE_Reactor* ar_;
    size_t  size_;
    bool is_sleep_;
};
class Server
    :public ACE_Event_Handler
{
public:
    int open(const char* addr)
    {
        ACE_INET_Addr ace_addr;
        if (-1 == ace_addr.set(addr,strlen(addr)))
        {
            return ACE_OS::last_error();
        }
        int ret = acceptor_.open(ace_addr);
        reactor_ = new ACE_Reactor;
        this->reactor(reactor_);
        this->reactor()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
        return 0;
    }

public:
    int handle_input(ACE_HANDLE fd)
    {
        printf("connector(%d) comming\n", fd);

    }
    ACE_HANDLE get_handle()
    {
        return this->acceptor_.get_handle();
    }

private:
    ACE_SOCK_Acceptor acceptor_;
    ACE_Reactor* reactor_;

};
class ConnectInst
    :public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>
{
public:
    ConnectInst()
        :timeout_(1000)
    {
    }
    ~ConnectInst()
    {
    }
private:
    int timeout_;
};

class Client
{
public:
    int open()
    {
        reactor_ = new ACE_Reactor;
        connector_.open(reactor_);
        return 0;
    }
    int connect(const char* addr, int time)
    {
        ConnectInst* inst;
        int ret = connector_.connect(inst, ACE_INET_Addr(addr));
    }
private:
    ACE_Connector<ConnectInst, ACE_SOCK_CONNECTOR> connector_;
    ACE_Reactor* reactor_;

};

int main(int argc, char* argv[])
{
    TestEvent* te = new TestEvent();

    te->open();
    ACE_Reactor::instance()->run_reactor_event_loop();
    te->wait();
    system("pause");
    return 0;
}
时间: 2024-12-08 22:50:15

ACE_Reactor的notify阻塞问题的相关文章

多线程之Java线程阻塞与唤醒

线程的阻塞和唤醒在多线程并发过程中是一个关键点,当线程数量达到很大的数量级时,并发可能带来很多隐蔽的问题.如何正确暂停一个线程,暂停后又如何在一个要求的时间点恢复,这些都需要仔细考虑的细节.在Java发展史上曾经使用suspend().resume()方法对于线程进行阻塞唤醒,但随之出现很多问题,比较典型的还是死锁问题.如下代码,主要的逻辑代码是主线程启动线程mt一段时间后尝试使用suspend()让线程挂起,最后使用resume()恢复线程.但现实并不如愿,执行到suspend()时将一直卡住

Java实现锁、公平锁、读写锁、信号量、阻塞队列、线程池等常用并发工具

锁的实现 锁的实现其实很简单,主要使用Java中synchronized关键字. public class Lock { private volatile boolean isLocked = false; private Thread lockingThread = null; public synchronized void lock() throws InterruptedExpection { while(isLocked){ wait(); } isLocked = true; loc

线程通信(生产者与消费者问题)

1.线程通信的必要性 多线程不仅共享资源,而且相互牵制向前运行. 2.线程通信的方法(都是在Object中定义) 3个方法: 1)    wait() 可运行转入阻塞状态,放锁 2)    notify() 阻塞转入可运行状态,获得锁 3)    notifyAll() 所有调用wait方法而被挂起的线程重新启动,有个条件:wait与notifyAll必须是属于同一个对象 必须在同步方法或同步代码块中使用 3.共享资源类(仓库) 注:共享资源(产品),牵制信息(产品有无)  package co

深入解析线程池的使用

为什么需要线程池 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短. 传 统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务.任务执行完毕后,线程退出,这就是是"即时创建,即 时销毁"的策略.尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于 不停的创建线程,

wait/notify模拟阻塞队列

程序代码如下: public class MyQueue<E> {     //1. 提供集合容器     private List<E> list = new ArrayList<E>();     //2. 提供计数器     private AtomicInteger counter = new AtomicInteger();     //3. 提供 上限 下限     private int MAX;     private int MIN = 0;     

(转) Java线程同步阻塞, sleep(), suspend(), resume(), yield(), wait(), notify()

为了解决对共享存储区的访问冲突,Java 引入了同步机制.但显然不够,因为在任意时刻所要求的资源不一定已经准备好了被访问,反过来,同一时刻准备好了的资源也可能不止一个. 为解决访问控制问题,Java 引入阻塞机制.阻塞指的是暂停一个Java线程同步的执行以等待某个条件发生(如某资源就绪). sleep():允许指定以毫秒为单位的一段时间作为参数,它使得线程在指定的时间内进入阻塞状态,不能得到CPU 时 间,指定的时间一过,线程重新进入可执行状态.典型地,sleep() 被用在等待某个资源就绪的情

Java 多线程学习笔记:wait、notify、notifyAll的阻塞和恢复

前言:昨天尝试用Java自行实现生产者消费者问题(Producer-Consumer Problem),在coding时,使用到了Condition的await和signalAll方法,然后顺便想起了wait和notify,在开发中遇到了一个问题:wait.notify等阻塞和恢复的时机分别是什么?在网上Google了很久各种博文后,发现几乎没有人提到这个点.最后在官方文档中才找到了相应的介绍. (一)准备 按照惯例应该是要先介绍一下wait.notify和notifyAll的基础知识.我找到了

16_Queue_利用wait()和notify()编写一个阻塞队列

[BlockingQueue] 阻塞队列,支持阻塞的机制,阻塞地放入和得到数据.我们来自行实现LinkedBlockingQueue下面的两个简单的方法put()和take(). [ put ] 把一个Object加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻塞,直到BlockingQueue里面有空间再继续. [ take ] 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking

Java阻塞中断和LockSupport

在介绍之前,先抛几个问题. Thread.interrupt()方法和InterruptedException异常的关系?是由interrupt触发产生了InterruptedException异常? Thread.interrupt()会中断线程什么状态的工作? RUNNING or BLOCKING? 一般Thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么? LockSupport.park()和unpark(),与object.wait()和notify()的区