StreamWriteWithTimeout类(NetworkComms 2.3.1源码了解和学习)

networkComms.net2.3.1开源版本,基于gpl V3协议。因为不能公开3.x版本的源码,所以基于此版本进行学习。3.X版本进行了诸多改进和Bug修复,使用方法上两者相差不大。
/*请注意使用以下代码,需遵循GplV3协议*/

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;

namespace DPSBase
{
    /// <summary>
    /// 写入流的包装类---带超时判断  主要用来防止数据流写入时的死锁
    /// </summary>
    public static class StreamWriteWithTimeout
    {

        ///  把数据流 SendBuffer中数据写入到目标数据流中,每次写入的大小为 WriteBufferSize,当数据写入超时时,抛出异常。
        ///  SendBuffer  包含数据的缓冲区
        ///  bufferLength 要写入的字节数
        ///  destinationStream 目标数据流
        ///  writeBufferSize   每次成功写入的字节数
        ///  timeoutMSPerKBWrite 每KB写入的最长时间
        ///  允许写入的最小时间
        ///  返回: 每KB数据写入的平均时间
        public static double Write(byte[] sendBuffer, int bufferLength, Stream destinationStream, int writeBufferSize, double timeoutMSPerKBWrite, int minTimeoutMS)
        {
            if (sendBuffer == null) throw new ArgumentNullException("sendBuffer");
            if (destinationStream == null) throw new ArgumentNullException("destinationStream");

            int totalBytesCompleted = 0;
            Exception innerException = null;

            //信号 --无
            AutoResetEvent writeCompletedEvent = new AutoResetEvent(false);

            //写入等待时间:  (1) minTimeOutMS 写入最小时间 (2)每KB超时时间*KB数量  ====》取其中较大的值
            //如果数据大小小于缓冲区的大小,则数据长度为数据大小  否则为 数据缓冲区大小
            int writeWaitTimeMS = Math.Max(minTimeoutMS, (int)(((bufferLength < writeBufferSize ? bufferLength : writeBufferSize) / 1024.0) * timeoutMSPerKBWrite));

            System.Diagnostics.Stopwatch timerTotal = new System.Diagnostics.Stopwatch();
            timerTotal.Start();

            do
            {
                //如果  (字节数-已经完成数) 如果小于 写入缓冲区  (1)  则等于  (字节数-已经完成数)  (2) 否则,则等于缓冲区数

                int writeCountBytes = (bufferLength - totalBytesCompleted < writeBufferSize ? bufferLength - totalBytesCompleted : writeBufferSize);

                //(1) 要写入的数据  (2)totalBytesCompleted 从此位置开始写入 (3) writeCountBytes 写入的数量
                destinationStream.BeginWrite(sendBuffer, totalBytesCompleted, writeCountBytes, new AsyncCallback((state)=>
                    {
                        try
                        {
                            //(4)写入完成后
                            destinationStream.EndWrite(state);
                        }
                        catch (Exception ex)
                        {
                            innerException = ex;
                        }
                        //(5):写入完成后,发信号
                        writeCompletedEvent.Set();

                    }), null);

                //有交警  必须有信号才能通行  交警等待 writeWaitTimeMS时间
                if (!writeCompletedEvent.WaitOne(writeWaitTimeMS))
                {
//#if !WINDOWS_PHONE
//                    using (System.Diagnostics.Process process = System.Diagnostics.Process.GetCurrentProcess())
//                        AppendStringToLogFile("WriteWithTimeLog_" + process.Id, "Write timed out after " + writeWaitTimeMS.ToString() + "ms, while writing " + writeCountBytes + " bytes.");
//#endif
                    throw new TimeoutException("Write timed out after " + writeWaitTimeMS.ToString() + "ms");
                }

                if (innerException != null)
                    throw innerException;

                //完成的写入数  +=此次的写入数
                totalBytesCompleted += writeCountBytes;
            } while (totalBytesCompleted < bufferLength);  //如果已经写入的数据< 数据数  则继续循环  写入数据 否则跳出循环

            timerTotal.Stop();

            double writeTimePerKBms = 0;
            if (bufferLength > 0)
                //计算出每KB数据的写入时间
                writeTimePerKBms = (double)timerTotal.ElapsedMilliseconds * 1024.0 / bufferLength;

//#if !WINDOWS_PHONE
//            using (System.Diagnostics.Process process = System.Diagnostics.Process.GetCurrentProcess())
//                AppendStringToLogFile("WriteWithTimeLog_" + process.Id, "Write succeded using " + writeWaitTimeMS.ToString() + "ms, using buffer of " + sendBuffer.Length.ToString() + " bytes, average write time was " + writeTimePerKBms.ToString("0.00") + " ms/KB.  timeoutMSPerKBWrite was " + timeoutMSPerKBWrite);
//#endif
            //返回每KB数据的写入时间
            return writeTimePerKBms;
        }

        /// 此方法与上面的方法的区别是,每次写入一个缓冲区的数据后,都要从输入流中读取指定缓冲区大小的数据

        public static double Write(Stream inputStream, long inputStart, long inputLength, Stream destinationStream, int writeBufferSize, double timeoutMSPerKBWrite, int minTimeoutMS)
        {
            if (inputStream == null) throw new ArgumentException("inputStream");
            if (destinationStream == null) throw new ArgumentException("destinationStream");

            //定位好输入流的指定位置
            inputStream.Seek(inputStart, SeekOrigin.Begin);
            long totalBytesCompleted = 0;
            Exception innerException = null;
            AutoResetEvent writeCompletedEvent = new AutoResetEvent(false);

            //数据的缓冲区  如果数据的长度小于要写入的缓冲区的大小  则(1): 数据缓冲区等于 数据的长度  否则(2)数据缓冲区等于 写入缓冲区的大小
            byte[] sendBuffer = new byte[Math.Min(inputLength, writeBufferSize)];

            //计算超时时间
            int writeWaitTimeMS = Math.Max(minTimeoutMS, (int)((sendBuffer.Length / 1024.0) * timeoutMSPerKBWrite));

            System.Diagnostics.Stopwatch timerTotal = new System.Diagnostics.Stopwatch();
            timerTotal.Start();

            do
            {

                //剩余数据数 ==总数据库 -已经完成数
                long bytesRemaining = inputLength - totalBytesCompleted;

                //要读取的数据长度:   如果数据的缓冲区 大于 剩余的数据数  则等于 (1)剩余的数据数 (2)否则为缓冲区的大小
                //writeCountBytes 已读取,要写入的数据长度
                int writeCountBytes = inputStream.Read(sendBuffer, 0, (sendBuffer.Length > bytesRemaining ? (int)bytesRemaining : sendBuffer.Length));

                if (writeCountBytes <= 0)
                    break;

                if (!destinationStream.CanWrite) throw new Exception("Unable to write to provided destinationStream.");

                //写入到目标数据流中
                destinationStream.BeginWrite(sendBuffer, 0, writeCountBytes, new AsyncCallback((state) =>
                {
                    try
                    {
                        //完成写入数据  完成一次写入
                        destinationStream.EndWrite(state);
                    }
                    catch (Exception ex)
                    {
                        innerException = ex;
                    }

                    //给信号
                    writeCompletedEvent.Set();

                }), null);

                if (!writeCompletedEvent.WaitOne(writeWaitTimeMS))
                {
//#if !WINDOWS_PHONE
//                    using (System.Diagnostics.Process process = System.Diagnostics.Process.GetCurrentProcess())
//                        AppendStringToLogFile("WriteWithTimeLog_" + process.Id, "Write timed out after " + writeWaitTimeMS.ToString() + "ms, while writing " + writeCountBytes + " bytes.");
//#endif
                    throw new TimeoutException("Write timed out after " + writeWaitTimeMS.ToString() + "ms");
                }

                if (innerException != null)
                    throw innerException;

                //已经写入的总数
                totalBytesCompleted += writeCountBytes;

            } while (totalBytesCompleted < inputLength); //如果已经写入的总数<数据库 继续执行循环

            timerTotal.Stop();

            double writeTimePerKBms = 0;
            if (inputLength > 0)
                writeTimePerKBms = (double)timerTotal.ElapsedMilliseconds * 1024.0 / inputLength;

//#if !WINDOWS_PHONE
//            using (System.Diagnostics.Process process = System.Diagnostics.Process.GetCurrentProcess())
//                AppendStringToLogFile("WriteWithTimeLog_" + process.Id, "Write succeded using " + writeWaitTimeMS.ToString() + "ms, using buffer of " + sendBuffer.Length.ToString() + " bytes, average write time was " + writeTimePerKBms.ToString("0.00") + " ms/KB.  timeoutMSPerKBWrite was " + timeoutMSPerKBWrite);
//#endif

            return writeTimePerKBms;
        }

//在英文网站上购买 九折折扣代码: NCDN_PRCLW

//淘宝正版销售 http://shop115882994.taobao.com/ 推广期间 八折优惠

/// <summary>
        /// Locker for LogError() which ensures thread safe saves.
        /// </summary>
        static object errorLocker = new object();

        /// <summary>
        /// Appends the provided logString to end of fileName.txt. If the file does not exist it will be created.
        /// </summary>
        /// <param name="fileName">The filename to use. The extension .txt will be appended automatically</param>
        /// <param name="logString">The string to append.</param>
        static void AppendStringToLogFile(string fileName, string logString)
        {
            try
            {
                lock (errorLocker)
                {
                    using (System.IO.StreamWriter sw = new System.IO.StreamWriter(fileName + ".txt", true))
                        sw.WriteLine(DateTime.Now.Hour.ToString() + "." + DateTime.Now.Minute.ToString() + "." + DateTime.Now.Second.ToString() + "." + DateTime.Now.Millisecond.ToString() + " [" + Thread.CurrentThread.ManagedThreadId.ToString() + "] " + logString);
                }
            }
            catch (Exception)
            {
                //If an error happens here, such as if the file is locked then we lucked out.
            }
        }
    }
}
 http://www.cnblogs.com/networkcomms
http://www.networkcoms.cn 编辑
时间: 2024-11-03 01:28:20

StreamWriteWithTimeout类(NetworkComms 2.3.1源码了解和学习)的相关文章

ThreadSafeStream 类(NetworkComms 2.3.1源码了解和学习)

networkComms.net2.3.1开源版本,基于gpl V3协议.因为不能公开3.x版本的源码,所以基于此版本进行学习.3.X版本进行了诸多改进和Bug修复,使用方法上两者相差不大. /*请注意使用以下代码,需遵循GplV3协议*/ using System; using System.Collections.Generic; using System.Text; using System.IO; using System.Security.Cryptography; namespace

ConnectionInfo类(NetworkComms 2.3.1源码了解和学习)

networkComms.net2.3.1开源版本,基于gpl V3协议.因为不能公开3.x版本的源码,所以基于此版本进行学习.3.X版本进行了诸多改进和Bug修复,使用方法上两者相差不大. /*请注意使用以下代码,需遵循GplV3协议*/ /// <summary> /// 连接状态枚举类 /// </summary> public enum ConnectionState { /// <summary> /// 未定义 是连接的初始状态. /// </summ

PacketHeaderStringItems枚举类(NetworkComms 2.3.1源码了解和学习)

networkComms.net2.3.1开源版本,基于gpl V3协议.因为不能公开3.x版本的源码,所以基于此版本进行学习.3.X版本进行了诸多改进和Bug修复,使用方法上两者相差不大. /*请注意使用以下代码,需遵循GplV3协议*/ ///<summary> /// 字符类型的选项 PacketHeader类中会用到此枚举类型 /// </summary> public enum PacketHeaderStringItems { /// <summary> /

PacketHeaderLongItems枚举类(NetworkComms 2.3.1源码了解和学习)

networkComms.net2.3.1开源版本,基于gpl V3协议.因为不能公开3.x版本的源码,所以基于此版本进行学习.3.X版本进行了诸多改进和Bug修复,使用方法上两者相差不大. /*请注意使用以下代码,需遵循GplV3协议*/ /// <summary> /// Long类型的选项 PacketHeader类中会用到此枚举类型 /// </summary> public enum PacketHeaderLongItems { /// <summary>

PacketHeader类(NetworkComms 2.3.1源码了解和学习)

networkComms.net2.3.1开源版本,基于gpl V3协议.因为不能公开3.x版本的源码,所以基于此版本进行学习.3.X版本进行了诸多改进和Bug修复,使用方法上两者相差不大. /*请注意使用以下代码,需遵循GplV3协议*/ /// <summary> /// PacketHeader 包含发送,接收,重建数据包的相关信息 /// </summary> [ProtoContract] public sealed class PacketHeader { [Proto

StreamSendWrapper 类(NetworkComms 2.3.1源码了解和学习)

networkComms.net2.3.1开源版本,基于gpl V3协议.因为不能公开3.x版本的源码,所以基于此版本进行学习.3.X版本进行了诸多改进和Bug修复,使用方法上两者相差不大. /*请注意使用以下代码,需遵循GplV3协议*/ using System; using System.Collections.Generic; using System.Text; using System.IO; namespace DPSBase { /// 发送全部或者部分数据流 public cl

ProtobufSerializer类(NetworkComms 2.3.1源码了解和学习)

networkComms.net2.3.1开源版本,基于gpl V3协议.因为不能公开3.x版本的源码,所以基于此版本进行学习.3.X版本进行了诸多改进和Bug修复,使用方法上两者相差不大. /*请注意使用以下代码,需遵循GplV3协议*/ using System; using System.Collections.Generic; using System.Text; using ProtoBuf; using System.IO; using System.Runtime.InteropS

DataSerializerProcessorAttribute类(NetworkComms 2.3.1源码了解和学习)

networkComms.net2.3.1开源版本,基于gpl V3协议.因为不能公开3.x版本的源码,所以基于此版本进行学习.3.X版本进行了诸多改进和Bug修复,使用方法上两者相差不大. /*请注意使用以下代码,需遵循GplV3协议*/ //在英文网站上购买 九折折扣代码: NCDN_PRCLW using System; using System.Collections.Generic; using System.Text; namespace DPSBase { /// <summary

SentPacket类(NetworkComms 2.3.1源码了解和学习)

networkComms.net2.3.1开源版本,基于gpl V3协议.因为不能公开3.x版本的源码,所以基于此版本进行学习.3.X版本进行了诸多改进和Bug修复,使用方法上两者相差不大. /// 一个包装类用来跟踪发送的数据包 如果检验和失败需要重新发送时可以使用 /// </summary> class SentPacket { public int SendCount { get; private set; } public Packet Packet { get; private s