GJM : Socket TCP 通信连接(二)

上一篇中,我们编写了客户端功能。

这一篇将讲解ISocketHandler的实现。

再来回顾一下ISocketHandler接口。

public interface ISocketHandler
{
    /// <summary>
    /// 开始接收
    /// </summary>
    /// <param name="stream">Socket网络流</param>
    /// <param name="callback">回调函数</param>
    /// <param name="state">自定义状态</param>
    /// <returns>异步结果</returns>
    IAsyncResult BeginReceive(Stream stream, AsyncCallback callback, object state);
    /// <summary>
    /// 结束接收
    /// </summary>
    /// <param name="asyncResult">异步结果</param>
    /// <returns>接收到的数据</returns>
    byte[] EndReceive(IAsyncResult asyncResult);
    /// <summary>
    /// 开始发送
    /// </summary>
    /// <param name="data">要发送的数据</param>
    /// <param name="offset">数据偏移</param>
    /// <param name="count">发送长度</param>
    /// <param name="stream">Socket网络流</param>
    /// <param name="callback">回调函数</param>
    /// <param name="state">自定义状态</param>
    /// <returns>异步结果</returns>
    IAsyncResult BeginSend(byte[] data, int offset, int count, Stream stream, AsyncCallback callback, object state);
    /// <summary>
    /// 结束发送
    /// </summary>
    /// <param name="asyncResult">异步结果</param>
    /// <returns>发送是否成功</returns>
    bool EndSend(IAsyncResult asyncResult);
}

做一个类SocketHandler继承ISocketHandler接口

/// <summary>
/// Socket处理程序
/// </summary>
public class SocketHandler : ISocketHandler
{
    /// <summary>
    /// 开始接收
    /// </summary>
    /// <param name="stream">Socket网络流</param>
    /// <param name="callback">回调函数</param>
    /// <param name="state">自定义状态</param>
    /// <returns>异步结果</returns>
    public IAsyncResult BeginReceive(Stream stream, AsyncCallback callback, object state)
    {

    }

    /// <summary>
    /// 结束接收
    /// </summary>
    /// <param name="asyncResult">异步结果</param>
    /// <returns>接收到的数据</returns>
    public byte[] EndReceive(IAsyncResult asyncResult)
    {

    }

    /// <summary>
    /// 开始发送
    /// </summary>
    /// <param name="data">要发送的数据</param>
    /// <param name="offset">数据偏移</param>
    /// <param name="count">发送长度</param>
    /// <param name="stream">Socket网络流</param>
    /// <param name="callback">回调函数</param>
    /// <param name="state">自定义状态</param>
    /// <returns>异步结果</returns>
    public IAsyncResult BeginSend(byte[] data, int offset, int count, Stream stream, AsyncCallback callback, object state)
    {

    }

    /// <summary>
    /// 结束发送
    /// </summary>
    /// <param name="asyncResult">异步结果</param>
    /// <returns>发送是否成功</returns>
    public bool EndSend(IAsyncResult asyncResult)
    {

    }
}

增加两个属性与构造函数。

    //异步处理关系集合
    private Dictionary<IAsyncResult, SocketHandlerState> StateSet;
    //发送队列
    private List<SocketHandlerState> SendQueue;

    /// <summary>
    /// 实例化Socket处理程序
    /// </summary>
    public SocketHandler()
    {
        StateSet = new Dictionary<IAsyncResult, SocketHandlerState>();
        SendQueue = new List<SocketHandlerState>();
    }

StateSet可以保存我们的异步调用结果等数据

SendQueue用来做一个发送队列

接下来我们从发送数据开始。

由于需要用到Stream的异步方法,我们需要定义一个State类。

internal class SocketHandlerState
{
    /// <summary>
    /// 数据
    /// </summary>
    public byte[] Data { get; set; }
    /// <summary>
    /// 异步结果
    /// </summary>
    public IAsyncResult AsyncResult { get; set; }
    /// <summary>
    /// Socket网络流
    /// </summary>
    public Stream Stream { get; set; }
    /// <summary>
    /// 异步回调函数
    /// </summary>
    public AsyncCallback AsyncCallBack { get; set; }
    /// <summary>
    /// 是否完成
    /// </summary>
    public bool Completed { get; set; }
    /// <summary>
    /// 数据长度
    /// </summary>
    public int DataLength { get; set; }
}

因为我们需要返回IAsyncResult,所以我们继承该接口做一个SocketHandlerState类。

/// <summary>
/// Socket异步操作状态
/// </summary>
public class SocketAsyncResult : IAsyncResult
{
    /// <summary>
    /// 实例化Socket异步操作状态
    /// </summary>
    /// <param name="state"></param>
    public SocketAsyncResult(object state)
    {
        AsyncState = state;
        AsyncWaitHandle = new AutoResetEvent(false);
    }

    /// <summary>
    /// 获取用户定义的对象,它限定或包含关于异步操作的信息。
    /// </summary>
    public object AsyncState { get; private set; }

    /// <summary>
    /// 获取用于等待异步操作完成的 System.Threading.WaitHandle。
    /// </summary>
    public WaitHandle AsyncWaitHandle { get; private set; }

    /// <summary>
    /// 获取一个值,该值指示异步操作是否同步完成。
    /// </summary>
    public bool CompletedSynchronously { get { return false; } }

    /// <summary>
    /// 获取一个值,该值指示异步操作是否已完成。
    /// </summary>
    public bool IsCompleted { get; internal set; }
}

然后开始编写发送数据相关函数。

这里我将发送数据的大小限制为最大65535。

只需发送长度为2的头信息即可把数据长度发送到对方。

    /// <summary>
    /// 开始发送
    /// </summary>
    /// <param name="data">要发送的数据</param>
    /// <param name="offset">数据偏移</param>
    /// <param name="count">发送长度</param>
    /// <param name="stream">Socket网络流</param>
    /// <param name="callback">回调函数</param>
    /// <param name="state">自定义状态</param>
    /// <returns>异步结果</returns>
    public IAsyncResult BeginSend(byte[] data, int offset, int count, Stream stream, AsyncCallback callback, object state)
    {
        //data不能为null
        if (data == null)
            throw new ArgumentNullException("data");
        //offset不能小于0和超过data长度
        if (offset > data.Length || offset < 0)
            throw new ArgumentOutOfRangeException("offset");
        //count不能大于65535
        if (count <= 0 || count > data.Length - offset || count > ushort.MaxValue)
            throw new ArgumentOutOfRangeException("count");
        //stream不能为null
        if (stream == null)
            throw new ArgumentNullException("stream");
        //回调函数不能为null
        if (callback == null)
            throw new ArgumentNullException("callback");
        //stream异常
        if (!stream.CanWrite)
            throw new ArgumentException("stream不支持写入。");

        SocketAsyncResult result = new SocketAsyncResult(state);

        //初始化SocketHandlerState
        SocketHandlerState shs = new SocketHandlerState();
        shs.Data = data;
        shs.AsyncResult = result;
        shs.Stream = stream;
        shs.AsyncCallBack = callback;
        shs.DataLength = 0;

        //锁定SendQueue
        //避免多线程同时发送数据
        lock (SendQueue)
        {
            //添加状态
            SendQueue.Add(shs);
            //如果SendQueue数量大于1,则表示有数据尚未发送完成
            if (SendQueue.Count > 1)
                return result;
        }

        //获取数据长度
        //ushort的最大值为65535
        //转换为byte[]长度为2
        var dataLength = BitConverter.GetBytes((ushort)data.Length);
        //向对方发送长度为2的头信息,表示接下来要发送的数据长度
        stream.Write(dataLength, 0, dataLength.Length);
        //开始异步发送数据
        stream.BeginWrite(shs.Data, 0, shs.Data.Length, EndWrite, shs).AsyncWaitHandle.WaitOne();

        return result;
    }

    //stream异步结束写入
    private void EndWrite(IAsyncResult ar)
    {
        SocketHandlerState state = (SocketHandlerState)ar.AsyncState;

        //锁定StateSet
        lock (StateSet)
            StateSet.Add(state.AsyncResult, state);

        try
        {
            state.Stream.EndWrite(ar);
        }
        catch
        {
            //出现Socket异常,发送失败
            state.Completed = false;
            //允许等待线程继续
            ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set();
            //执行异步回调函数
            state.AsyncCallBack(state.AsyncResult);
            return;
        }
        //发送成功
        state.Completed = true;
        //允许等待线程继续
        ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set();
        //执行异步回调函数
        state.AsyncCallBack(state.AsyncResult);

        //锁定SendQueue
        lock (SendQueue)
        {
            SocketHandlerState prepare = null;
            //移除当前发送完成的数据
            SendQueue.Remove(state);
            //如果SendQueue还有数据存在,则继续发送
            if (SendQueue.Count > 0)
            {
                prepare = SendQueue[0];
            }
            if (prepare != null)
            {
                //获取数据长度
                //ushort的最大值为65535
                //转换为byte[]长度为2
                var dataLength = BitConverter.GetBytes((ushort)prepare.Data.Length);
                //向对方发送长度为2的头信息,表示接下来要发送的数据长度
                prepare.Stream.Write(dataLength, 0, dataLength.Length);
                //开始异步发送数据
                prepare.Stream.BeginWrite(prepare.Data, 0, prepare.Data.Length, EndWrite, prepare).AsyncWaitHandle.WaitOne();
            }
        }
    }

    /// <summary>
    /// 结束发送
    /// </summary>
    /// <param name="asyncResult">异步结果</param>
    /// <returns>发送是否成功</returns>
    public bool EndSend(IAsyncResult asyncResult)
    {
        //判断异步操作状态是否属于当前处理程序
        if (!StateSet.ContainsKey(asyncResult))
            throw new ArgumentException("无法识别的asyncResult。");
        SocketHandlerState state = StateSet[asyncResult];
        lock (StateSet)
            StateSet.Remove(asyncResult);
        return state.Completed;
    }

接下来是接收数据的相关方法。

    /// <summary>
    /// 开始接收
    /// </summary>
    /// <param name="stream">Socket网络流</param>
    /// <param name="callback">回调函数</param>
    /// <param name="state">自定义状态</param>
    /// <returns>异步结果</returns>
    public IAsyncResult BeginReceive(Stream stream, AsyncCallback callback, object state)
    {
        //stream不能为null
        if (stream == null)
            throw new ArgumentNullException("stream");
        //回调函数不能为null
        if (callback == null)
            throw new ArgumentNullException("callback");
        //stream异常
        if (!stream.CanRead)
            throw new ArgumentException("stream不支持读取。");

        SocketAsyncResult result = new SocketAsyncResult(state);

        //初始化SocketHandlerState
        SocketHandlerState shs = new SocketHandlerState();
        shs.Data = new byte[2];
        shs.AsyncResult = result;
        shs.Stream = stream;
        shs.AsyncCallBack = callback;
        shs.Completed = true;
        //开始异步接收长度为2的头信息
        //该头信息包含要接收的主要数据长度
        stream.BeginRead(shs.Data, 0, 2, EndRead, shs);
        return result;
    }

    //stream异步结束读取
    private void EndRead(IAsyncResult ar)
    {
        SocketHandlerState state = (SocketHandlerState)ar.AsyncState;
        int dataLength;
        try
        {
            dataLength = state.Stream.EndRead(ar);
        }
        catch
        {
            dataLength = 0;
        }
        //dataLength为0则表示Socket断开连接
        if (dataLength == 0)
        {
            lock (StateSet)
                StateSet.Add(state.AsyncResult, state);
            //设定接收到的数据位空byte数组
            state.Data = new byte[0];
            //允许等待线程继续
            ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set();
            //执行异步回调函数
            state.AsyncCallBack(state.AsyncResult);
            return;
        }

        //如果是已完成状态,则表示state.Data的数据是头信息
        if (state.Completed)
        {
            //设定状态为未完成
            state.Completed = false;
            //已接收得数据长度为0
            state.DataLength = 0;
            //获取主要数据长度
            var length = BitConverter.ToUInt16(state.Data, 0);
            //初始化数据的byte数组
            state.Data = new byte[length];
            try
            {
                //开始异步接收主要数据
                state.Stream.BeginRead(state.Data, 0, length, EndRead, state);
            }
            catch
            {
                //出现Socket异常
                lock (StateSet)
                    StateSet.Add(state.AsyncResult, state);
                state.Data = new byte[0];
                ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set();
                state.AsyncCallBack(state.AsyncResult);
            }
            return;
        }
        //接收到主要数据
        else
        {
            //判断是否接收了完整的数据
            if (dataLength + state.DataLength != state.Data.Length)
            {
                //增加已接收数据长度
                state.DataLength += dataLength;
                try
                {
                    //继续接收数据
                    state.Stream.BeginRead(state.Data, state.DataLength, state.Data.Length - state.DataLength, EndRead, state);
                }
                catch
                {
                    //出现Socket异常
                    lock (StateSet)
                        StateSet.Add(state.AsyncResult, state);
                    state.Data = new byte[0];
                    ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set();
                    state.AsyncCallBack(state.AsyncResult);
                    return;
                }
                return;
            }
            //接收完成
            state.Completed = true;
            lock (StateSet)
                StateSet.Add(state.AsyncResult, state);
            ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set();
            state.AsyncCallBack(state.AsyncResult);
        }
    }

    /// <summary>
    /// 结束接收
    /// </summary>
    /// <param name="asyncResult">异步结果</param>
    /// <returns>接收到的数据</returns>
    public byte[] EndReceive(IAsyncResult asyncResult)
    {
        //判断异步操作状态是否属于当前处理程序
        if (!StateSet.ContainsKey(asyncResult))
            throw new ArgumentException("无法识别的asyncResult。");
        SocketHandlerState state = StateSet[asyncResult];
        lock (StateSet)
            StateSet.Remove(asyncResult);
        return state.Data;
    }

至此,SocketHandler的功能已经实现。

下一篇将为大家讲解服务器端的实现。

原文地址:http://www.cnblogs.com/Kation/archive/2013/03/06/2947145.html

时间: 2025-01-15 00:59:19

GJM : Socket TCP 通信连接(二)的相关文章

java socket报文通信(二)报文的封装

昨天我们谈了怎么建立socket通信的服务端和客户端,今天我们就来谈一谈怎么封装报文. 什么是报文这里我就不在阐述了,不清楚的朋友可以自己去查资料.我们今天要谈的报文主要友以下几个部分组成: 3位同步校验位+8位报文长度+报文头+报文体+32位MD5校验位 基本格式如下: 0X110X120X1300000232<?xml version="1.0" encoding="GBK"?><ROOT><Code>0204</Cod

Photon服务器引擎(二)socket/TCP/UDP基础及Unity聊天室的实现

Photon服务器引擎(二)socket/TCP/UDP基础及Unity聊天室的实现 我们平时说的最多的socket是什么呢,实际上socket是对TCP/IP协议的封装,Socket本身并不是协议,而是一个调用接口(API). 通过Socket,我们才能使用TCP/IP协议.实际上,Socket跟TCP/IP协议没有必然的联系.Socket编程接口在设计的时候,就希望也能适应其他的网络协议.所以说,Socket的出现只是使得程序员更方便地使用TCP/IP协议栈而已,是对TCP/IP协议的抽象,

TCP/IP,http,socket,长连接,短连接——小结(转)

概要: 之前对这几个概念有点糊涂,查阅了些资料,稍微概括下他们的区别吧.如有错误,请拍~~~ 先看图: TCP/IP是什么? TCP/IP是个协议组,可分为三个层次:网络层.传输层和应用层.    在网络层有IP协议.ICMP协议.ARP协议.RARP协议和BOOTP协议.    在传输层中有TCP协议与UDP协议.    在应用层有FTP.HTTP.TELNET.SMTP.DNS等协议. Socket是什么呢? Socket是应用层与TCP/IP协议族通信的中间软件抽象层,一组接口,把复杂的T

27.Socket,TCP,UDP,HTTP基本通信原理

Socket,TCP,UDP,HTTP基本通信原理(摘自百度): TCP.UDP,HTTP 底层通信都是通过 socket 套接字实现 网络上不同的计算机,也可以通信,那么就得使用网络套接字(socket). socket就是在不同计算机之间进行通信的一个抽象. 他工作于TCP/IP协议中应用层和传输层之间的一个抽象  如图所示: 1.Socket 是对 TCP/IP 协议族的一种封装,是应用层与TCP/IP协议族通信的中间软件抽象层.从设计模式的角度看来,Socket其实就是一个门面模式,它把

Socket简单学习之Tcp通信

Socket网络通信的简单学习 建立Tcp通信 服务器端 using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; namespace TcpSocket { //服务器端 class Program { static void Main

进程对象的其他方法、守护进程、使用多进程实现 socket tcp协议 server端的并发(抢票程序)、队列、进程之间的通信(IPC)

# 进程对象的其他方法 from multiprocessing import Process import time class MyProcess(Process): def __init__(self, a, b): # 为了给子进程传递参数 super().__init__() self.a = a self.b = b def run(self): print("子进程开始执行") time.sleep(2) print("子进程结束", self.a,

Java Socket编程----通信是这样炼成的

Java最初是作为网络编程语言出现的,其对网络提供了高度的支持,使得客户端和服务器的沟通变成了现实,而在网络编程中,使用最多的就是Socket.像大家熟悉的QQ.MSN都使用了Socket相关的技术.下面就让我们一起揭开Socket的神秘面纱. Socket编程 一.网络基础知识(参考计算机网络)            关于计算机网络部分可以参考相关博客:           <TCP/IP协议栈及OSI参考模型详解> http://wangdy.blog.51cto.com/3845563/

极限优化:php巧用tcp长连接

极限优化:php巧用tcp长连接 提交 我的评论 加载中 已评论 极限优化:php巧用tcp长连接 2015-01-23 架构师之路 架构师之路 架构师之路 微信号 功能介绍 通往架构师之路,悠远而漫长,一路上,我们同行. 上一期,和大家分享了YouTube系统架构,本期将和大家分享一个大并发下php使用tcp长连接访问后端的优化方法. php巧用TCP长连接优化 一.面向人群如果你的站点架构满足以下几点,那么本文的优化方案会非常适合你:1)使用php等脚本语言作为开发语言2)需要连接后端服务,

Java网络编程之TCP通信

一.概述 Socket类是Java运行clientTCP操作的基础类,这个类本身使用代码通过主机操作系统的本地TCP栈进行通信. Socket类的方法会建立和销毁连接,设置各种Socket选项. ServerSocket类是Java执行server端操作的基础类,该类执行于server,监听入站TCP连接.每一个socketserver监听server的某个port.当远程主机的client尝试连接此port时.server就被唤醒.并返回一个表示两台主机之间socket的正常Socket对象.