封装Socket.BeginReceive/EndReceive以支持Timeout

Socket

.NET中的Socket类提供了网络通信常用的方法,分别提供了同步和异步两个版本,其中异步的实现是基于APM异步模式实现,即BeginXXX/EndXXX的方式。异步方法由于其非阻塞的特性,在需考虑程序性能和伸缩性的情况下,一般会选择使用异步方法。但使用过Socket提供的异步方法的同学,应该都会注意到了Socket的异步方法是无法设置Timeout的。以Receive操作为例,Socket提供了一个ReceiveTimeout属性,但该属性设置的是同步版本的Socket.Receive()方法的Timeout值,该设置对异步的Socket.BeginReceive()无效:如果对方没有返回任何消息,则BeginReceive操作将无法完成,其中提供的回调函数也将不会调用。如下示例代码所示:

private static void TestSocketBeginReceive()
{
    Socket socket = new Socket(AddressFamily.InterNetwork,
        SocketType.Dgram, ProtocolType.Udp);
    byte[] content = Encoding.ASCII.GetBytes("Hello world");

    IPAddress ip = Dns.Resolve("www.google.com").AddressList[0];
    IPEndPoint receiver = new IPEndPoint(ip, 80);

    socket.BeginSendTo(content, 0, content.Length, SocketFlags.None,
        receiver, SendToCb, socket);
    Console.WriteLine("Sent bytes: " + content.Length);
}

private static void SendToCb(IAsyncResult ar)
{
    var socket = ar.AsyncState as Socket;
    socket.EndSendTo(ar);
    byte[] buffer = new byte[1024];

    IAsyncResult receiveAr = socket.BeginReceive(buffer, 0, buffer.Length,
        SocketFlags.None, null, null);
    int received = socket.EndReceive(receiveAr);
    Console.WriteLine("Received bytes: " + received);
} 

由于接收方不会返回任何消息,Socket.BeginReceive将永远不会完成,SentToCb方法中的socket.EndReceive()调用将永远阻塞,应用程序也无法得知操作的状态。

支持Timeout

在个别的应用场景下,我们希望既能使用Socket的异步通信方法,保证程序的性能,同时又希望能指定Timeout值,当操作没有在指定的时间内完成时,应用程序能得到通知,以进行下一步的操作,如retry等。以下介绍的就是一种支持Timeout的Socket异步Receive操作的实现,方式如下:

  1. 基于APM异步模式封装Socket.BeginReceive/EndReceive方法。
  2. 使用ThreadPool提供的RegisterWaitForSingleObject()方法注册一个WaitOrTimerCallback,如果指定时间内操作未完成,则结束操作,并设置状态为Timeout。
  3. 将上述封装实现为Socket的扩展方法方便调用。

以下代码简化了所有的参数检查和异常处理,实际使用中需添加相关逻辑。

AsyncResultWithTimeout

首先看一下IAsyncResult接口的实现:

public class AsyncResultWithTimeout : IAsyncResult
{
    private ManualResetEvent m_waitHandle = new ManualResetEvent(false);
    public AsyncResultWithTimeout(AsyncCallback cb, object state)
    {
        this.AsyncState = state;
        this.Callback = cb;
    }

    #region IAsyncResult

    public object AsyncState { get; private set; }
    public WaitHandle AsyncWaitHandle { get { return m_waitHandle; } }
    public bool CompletedSynchronously { get { return false; } }
    public bool IsCompleted { get; private set; }

    #endregion

    public AsyncCallback Callback { get; private set; }
    public int ReceivedCount { get; private set; }
    public bool TimedOut { get; private set; }
    public void SetResult(int count)
    {
        this.IsCompleted = true;
        this.ReceivedCount = count;
        this.m_waitHandle.Set();

        if (Callback != null) Callback(this);
    }

    public void SetTimeout()
    {
        this.TimedOut = true;
        this.IsCompleted = true;
        this.m_waitHandle.Set();
    }
}

AsyncResultWithTimeOut类中包含了IAsyncResult接口中4个属性的实现、用户传入的AsyncCallback委托、接收到的字节数ReceivedCount以及两个额外的方法:

  1. SetResult(): 用于正常接收到消息时设置结果,标记操作完成以及执行回调。
  2. SetTimeout():当超时时,标记操作完成以及设置超时状态。

StateInfo

StateInfo类用于保存相关的状态信息,该对象会作为Socket.BeginReceive()的最后一个参数传入。当接收到消息时,接收到的字节数会保存到AsyncResult属性中,并设置操作完成。当超时时,WatchTimeOut方法会将AsyncResult设置为TimeOut状态,并通过RegisteredWaitHandle属性取消注册的WaitOrTimerCallback.

public class StateInfo
{
    public StateInfo(AsyncResultWithTimeout result, Socket socket)
    {
        this.AsycResult = result;
        this.Socket = socket;
    }

    public Socket Socket { get; private set; }
    public AsyncResultWithTimeout AsycResult { get; private set; }
    public RegisteredWaitHandle RegisteredWaitHandle { get; set; }
}

封装Socket.BeginReceive

与Socket.BeginReceive方法相比,BeginReceive2添加了一个参数timeout,可以设置该操作的超时时间,单位为毫秒。BeginReceive2中调用Socket.BeginReceive()方法,其中指定的ReceiveCb回调将在正常接收到消息后将结果保存在stateInfo对象的AsyncResult属性中,该属性中的值就是BeginReceive2()方法返回的IAsyncResult。BeginReceive2调用Socket.BeginReceive后,在ThreadPool中注册了一个WaitOrTimerCallback委托。ThreadPool将在Receive操作完成或者Timeout时调用该委托。

public static class SocketExtension
{

    public static int EndReceive2(IAsyncResult ar)
    {
        var result = ar as AsyncResultWithTimeout;
        result.AsyncWaitHandle.WaitOne();

        return result.ReceivedCount;
    }

    public static AsyncResultWithTimeout BeginReceive2
    (
        this Socket socket,
        int timeout,
        byte[] buffer,
        int offset,
        int size,
        SocketFlags flags,
        AsyncCallback callback,
        object state
    )
    {
        var result = new AsyncResultWithTimeout(callback, state);

        var stateInfo = new StateInfo(result, socket);

        socket.BeginReceive(buffer, offset, size, flags, ReceiveCb, state);

        var registeredWaitHandle =
            ThreadPool.RegisterWaitForSingleObject(
                result.AsyncWaitHandle,
                WatchTimeOut,
                stateInfo, // 作为state传递给WatchTimeOut
                timeout,
                true);

        // stateInfo中保存RegisteredWaitHandle,以方便在úWatchTimeOut
        // 中unregister.
        stateInfo.RegisteredWaitHandle = registeredWaitHandle;

        return result;
    }

    private static void WatchTimeOut(object state, bool timeout)
    {
        var stateInfo = state as StateInfo;
        // 设置的timeout前,操作未完成,则设置为操作Timeout
        if (timeout)
        {
            stateInfo.AsycResult.SetTimeout();
        }

        // 取消之前注册的WaitOrTimerCallback
        stateInfo.RegisteredWaitHandle.Unregister(
            stateInfo.AsycResult.AsyncWaitHandle);
    }

    private static void ReceiveCb(IAsyncResult result)
    {
        var state = result.AsyncState as StateInfo;
        var asyncResultWithTimeOut = state.AsycResult;
        var count = state.Socket.EndReceive(result);
        state.AsycResult.SetResult(count);
    }
}

试一下

以下代码演示了如何使用BeginReceive2:

private static void TestSocketBeginReceive2()
{
    Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
    byte[] content = Encoding.ASCII.GetBytes("Hello world");

    IPAddress ip = Dns.Resolve("www.google.com").AddressList[0];
    IPEndPoint receiver = new IPEndPoint(ip, 80);

    socket.BeginSendTo(content, 0, content.Length, SocketFlags.None, receiver, SendToCb2, socket);
    Console.WriteLine("Sent bytes: " + content.Length);
}

private static void SendToCb2(IAsyncResult ar)
{
    var socket = ar.AsyncState as Socket;
    socket.EndSendTo(ar);
    byte[] buffer = new byte[1024];

    AsyncResultWithTimeout receiveAr = socket.BeginReceive2(2000, buffer, 0, buffer.Length, SocketFlags.None, null, null);
    receiveAr.AsyncWaitHandle.WaitOne();
    if (receiveAr.TimedOut)
    {
        Console.WriteLine("Operation timed out.");
    }
    else
    {
        int received = socket.EndReceive(ar);
        Console.WriteLine("Received bytes: " + received);
    }
}

输出结果如下:

上述实现是针对BeginReceive的封装,还可以以相同的方式将Send/Receive封装以支持Timeout, 或者更进一步支持retry操作。

附示例代码:下载

出处:http://www.cnblogs.com/dytes/archive/2012/08/13/SocketAsyncOpWithTimeout.html

时间: 2024-10-21 16:28:13

封装Socket.BeginReceive/EndReceive以支持Timeout的相关文章

Unity3d 封装Socket创建简单网络

北京又在打雷下大雨了,学习Unity以来,越来越感兴趣,情不自禁的想要学习更多知识 这次自己搭建一个Socket模块,比较基础,适合新手学习,详细介绍Socket的搭建过程,同样会把详细过程在代码里进行注释~ 在搭建Socket过程中,需要创建以下几个常用的方法: 1.创建套接字(socket) 2.绑定Ip和端口 3.监听方法 4.接收客户端请求的方法 5.收发消息的方法 创建SocketManger管理类 把客户端与服务端代码都写在新建的SocketManger里,并把SocketMange

Socket.IO介绍:支持WebSocket、用于WEB端的即时通讯的框架

一.基本介绍 WebSocket是HTML5的一种新通信协议,它实现了浏览器与服务器之间的双向通讯.而Socket.IO是一个完全由JavaScript实现.基于Node.js.支持WebSocket的协议用于实时通信.跨平台的开源框架,它包括了客户端的JavaScript和服务器端的Node.js. Socket.IO除了支持WebSocket通讯协议外,还支持许多种轮询(Polling)机制以及其它实时通信方式,并封装成了通用的接口,并且在服务端实现了这些实时机制的相应代码.Socket.I

golang(4):编写socket服务,简单支持命令

本文的原文连接是: http://blog.csdn.net/freewebsys/article/details/46881213 转载请必须注明出处! 1,socket服务 使用golang开发socket服务还是非常简单的. socket的库都封装好了. 参考文档: https://github.com/astaxie/build-web-application-with-golang/blob/master/zh/08.1.md 2,简单例子 package main import (

夺命雷公狗—angularjs—15—内置封装好的计时器$interval和$timeout

这里其实和js源生的效果是一样的,代码如下所示: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <script src="js/angular.min.js"></script> </head> <body ng-

js原生封装getClassName()方法-ie不支持getElementsByClassName,所以要自己实现获取类名为className的所有元素

<html> <head> <script type="text/javascript"> window.onload = function() { var topMenus = getClass('li','topMenu'); for(var i=0;i < topMenus.length; i++) { alert(topMenus[i].innerHTML); } } function getClass(tagName,classNam

cross socket接口封装

cross socket是DELPHI跨操作系统的SOCKET通讯库,支持WINDOWS,LINUX,MACOS操作系统. 经过封装的接口,调用异常简单. procedure TForm1.Button2Click(Sender: TObject); //REST查询 begin var url, sql1, sql2: string; sql1 := TNetEncoding.URL.Encode('select * from tgoods'); sql2 := TNetEncoding.UR

Linux组件封装(八)——Socket的封装

我们要封装Socket,首先我们需要了解Socket需要哪些要素: 1) 首先,一个套接字创建后,需要绑定一块网卡的IP,以及连接的对口号,所以我们先封装InetAddr. 在class中,仅有的一个私有成员就是struct sockaddr_in类型的一个对象,我们需要将该对象的几种赋值与创建封装到类中,这样,我们仅需传递相应的IP与port即可获得一个addr. 在这里,我们为了方便获得该addr的IP及port,封装几个将addr转化为IP及port的函数,这样我们仅需调用函数即可. 然后

Java Socket Timeout 总结

原文出处:囚兔 摘要: Java的网络编程Socket常常用于各种网络工具,比如数据库的jdbc客户端,redis客户端jedis,各种RPC工具java客户端,这其中存在一些参数来配置timeout,但是之前一直对timeout的理解还不清晰,所以会导致使用这些网络工具的时候有点迷茫.在此做个总结. 1. Socket timeout Java socket有如下两种timeout: 建立连接timeout,暂时就叫 connect timeout: 读取数据timeout,暂时就叫so ti

C#之Raw Socket实现网络封包监视

同Winsock1相比,Winsock2最明显的就是支持了Raw Socket套接字类型,使用Raw Socket,可把网卡设置成混杂模式,在这种模式下,我们可以收到网络上的IP包,当然包括目的不是本机的IP包,通过原始套接字,我们也可以更加自如地控制Windows下的多种协议,而且能够对网络底层的传输机制进行控制.  在本文例子中,nbyte.BasicClass命名空间实现了RawSocket类,它包含了我们实现数据包监视的核心技术.在实现这个类之前,需要先写一个IP头结构,来暂时存放一些有