线程池 异步I/O线程 <第三篇>

在学习异步之前先来说说异步的好处,例如对于不需要CPU参数的输入输出操作,可以将实际的处理步骤分为以下三步:

  1. 启动处理;
  2. 实际的处理,此时不需要CPU参数;
  3. 任务完成后的处理;

  以上步骤如果仅仅使用一个线程,当线程正在处理UI操作时就会出现“卡”的现象。

  如果使用异步的处理方式,则这三步处理过程涉及到两个线程,主线程中启动第一步;第一步启动后,主线程结束(如果不结束,只会让该线程处于无作为的等待状态);第二步不需要CPU参与;第二步完成之后,在第二个线程上启动第三步;完成之后第二个线程结束。这样的处理过程中没有一个线程需要处于等待状态,使得运行的线程得到充分利用。

一、CLR线程池的I/O线程

上一篇学习的都是CLR线程池的辅助线程,这次要学习的是CLR线程池的I/O线程。

I/O线程是.NET专为访问外部资源所设置的一种线程,因为访问外部资源常常要受到外界因素的影响,为了防止让主线程受影响而长期处于阻塞状态,.NET为多个I/O操作都建立起了异步方法。例如:FileStream、TCP/IP、WebRequest、WebService等等,而且每个异步方法的使用方式都非常类似,

  都是以Beginxxx开始,以Endxxx结束(APM)。对于APM来说,必须使用Endxxx结束异步,否则可能会造成资源泄露。Beginxxx实际是将线程排入线程池。

  另外还有一种基于事件的异步编程模式(EPM),支持基于事件的异步模式的类将有一个或多个后缀为Async的方法,同时还会有一个相应名为Completed后缀的事件,Async方法用于启动异步处理,而Completed事件将在异步处理完成之后通过事件来宣告异步处理的完成。注意,在使用EPM模式的时候,不管是完成了异步请求,还是在处理中出现异常,或者是终止异步处理,都必须要调用Compeleted处理程序。如:

  OpenReadAsync
  OpenReadCompleted

二、异步读写FileStream

需要在FileStream中异步调用I/O线程,必须使用以下构造函数建立FileStream对象,并把useAsync设置为true。

FileStream stream = new FileStream(string path,FileMode mode,FileAccess access,FileShare share,int bufferSize,bool useAsync);

  参数说明:

  1. path是文件的相对路径或绝对路径;
  2. mode确定如何打开或创建文件;
  3. access确认访问文件的方式;
  4. share确定文件如何进程共享;
  5. bufferSize是代表缓冲区大小,一般默认最小值为8,在启动异步读取或写入时,文件大小一般大于缓冲大小;
  6. userAsync代表是否启动异步I/O线程。

  注意:当使用BeginRead和BeginWrite方法在执行大量读或写时效果更好,但对于少量读/写,这些方法速度可能比同步还要慢,因为进行线程间的切换需要大量时间。

  1、异步写入

  FileStream中包含BeginWrite、EndWrite方法可以启动I/O线程进行异步写入。

  public override IAsyncResult BeginWrite(byte[] array,int offset,int numBytes,AsyncCallback,Object stateObject)
  public override void EndWrite(IAsyncResult asyncResult)

  BeginWrite返回值为IAsyncResult,使用方式与委托的BeginInvoke方法相似,最好就是使用回调函数,避免线程阻塞。

  最后两个参数还是同样的套路:

  • AsyncCallback用于绑定回调函数;
  • Object用于传递外部数据。

  要注意一点:AsyncCallback所绑定的回调函数必须是带单个IAsyncResult参数的无返回值方法。

  在例子中,把FileStream作为外部数据传递到回调函数当中,然后再回调函数中利用IAsyncResult.AsyncState获取FileStream对象,最后通过FileStream.EndWrite(IAsyncResult)结束写入。

  下面是一个异步写入的例子:

    class Program
    {
        static void Main(string[] args)
        {
            int a, b;
            ThreadPool.GetMaxThreads(out a, out b);
            Console.WriteLine("原有辅助线程数" + a + "   " + "原有I/O线程数" + b);

            //文件名 文件创建方式 文件权限 文件进程共享 缓冲区大小为1024 是否启动异步I/O线程为true
            FileStream stream = new FileStream(@"D:\123.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);
            //这里要注意,如果写入的字符串很小,则.Net会使用辅助线程写,因为这样比较快
            byte[] bytes = Encoding.UTF8.GetBytes("你在他乡还好吗?");
            //异步写入开始,倒数第二个参数指定回调函数,最后一个参数将自身传到回调函数里,用于结束异步线程
            stream.BeginWrite(bytes, 0, (int)bytes.Length, new AsyncCallback(Callback), stream);

            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("现有辅助线程数" + a + "   " + "现有有I/O线程数" + b);

            Console.WriteLine("主线程继续干其他活!");
            Console.ReadKey();
        }

        static void Callback(IAsyncResult result)
        {
            //显示线程池现状
            Thread.Sleep(2000);
            //通过result.AsyncState再强制转换为FileStream就能够获取FileStream对象,用于结束异步写入
            FileStream stream = (FileStream)result.AsyncState;
            stream.EndWrite(result);
            stream.Flush();
            stream.Close();
        }
    }

  输出结果如下:

  

  对于结束异步线程的方法,还是玩IAsyncResult的这一套,在启动异步写时将自身对象传递到回调函数中,在回调函数中获得自身去结束异步线程。

  这就是C#中的异步操作,从剩余线程数我们看到,异步实际上是调用线程池的线程来实现异步的。

  2、异步读取

  FileStream中可以通过使用BeginRead和EndRead调用异步I/O线程读取:

  public override IAsyncResult BeginRead(byte[] array,int offset,int numBytes,AsyncCallback userCallback,Object stateObject)
  public override int EndRead(IAsyncResult asyncResult)

  BeginRead与EndRead方法与写相似,AsyncCallback用于绑定回调函数;Object用于传递外部数据。在回调函数只需要使用IAsyncResult.AsyncState就可以获取外部数据。EndRead方法会返回从流中读取到的字节数量。

  首先定义FileData类,里面包含FileStream对象,byte[]数组和长度。然后把FileData对象作为外部数据传到回调函数,在回调函数中,把IAsyncResult.AsyncState强制转换为FileData。然后通过FileStream.EndRead(IAsyncResult)结束读取。

  最后比较一下长度,如果读取到的长度与输入的数据长度不一致,则抛出异常。

    class Program
    {
        static void Main(string[] args)
        {
            int a, b;
            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("原有辅助线程:" + a + "原有I/O线程:" + b);

            byte[] byteData = new byte[1024];
            FileStream stream = new FileStream(@"D:\123.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);
            //把FileStream对象,byte[]对象,长度等有关数据绑定到FileDate对象中,以附带属性方式送到回调函数
            Hashtable ht = new Hashtable();
            ht.Add("Length", (int)stream.Length);
            ht.Add("Stream", stream);
            ht.Add("ByteData", byteData);

            //启动异步读取,倒数第二个参数是指定回调函数,倒数第一个参数是传入回调函数中的参数
            stream.BeginRead(byteData, 0, (int)ht["Length"], new AsyncCallback(Completed), ht);

            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("现有辅助线程:" + a + "现有I/O线程:" + b);

            Console.ReadKey();
        }

        //实际参数就是回调函数
        static void Completed(IAsyncResult result)
        {
            Thread.Sleep(2000);
            //参数result实际上就是Hashtable对象,以FileStream.EndRead完成异步读取
            Hashtable ht = (Hashtable)result.AsyncState;
            FileStream stream = (FileStream)ht["Stream"];
            int length = stream.EndRead(result);
            stream.Close();
            string str = Encoding.UTF8.GetString(ht["ByteData"] as byte[]);
            Console.WriteLine(str);
            stream.Close();
        }
    }

  输出如下:

  

  注意,如果文件过小,小于缓冲区1024,那么可能会调用工作者线程而非I/O线程操作。但是根据我的观察,只是读取文件时文件过小可能会调用辅助线程操作,但是写入时不会。

  像上面就是直接用辅助线程处理了。

  IAsyncResult的作用主要有两点:

  • AsyncState属性,用来传递参数到回调函数;
  • Endxxx方法,结束异步操作方法需要此对象作为参数;

三、异步WebRequest

  System.Net.WebRequest是.NET为实现Internet的"请求/响应模型"而开发的一个abstract基类。它主要有三个子类:

  • FtpWebRequest,FileWebRequest使用"file://路径"的URI方式实现对本地资源和内部文件的请求/响应;
  • HttpWebRequest,FtpWebRequest使用FTP文件传输协议实现文件请求/响应;
  • FileWebRequest,HttpWebRequest用于处理HTTP的页面请求/响应;

  当使用WebRequest.Create(string uri)创建对象时,应用程序就可以根据请求协议判断实现类来进行操作FileWebRequest、FtpWebRequest、HttpWebRequest各有其作用。由于使用方法类似,下面就用常用的HttpWebRequest为例子介绍一下异步WebRequest的使用方法。

  HttpWebRequest包含由一下几个常用方法处理请求/响应:

  public override Stream GetRequest()
  public override WebResponse GetResponse()
  public override IAsyncResult BeginGetRequestStream(AsyncCallback callback,Object state)
  public override Stream EndGetRequestStream(IAsyncResult asyncResult)
  public override IAsyncResult BeginGetResponse(AsyncCallback callback,Object state)
  public override WebResponse EndGetResponse(IAsyncResult asyncResult)
  • BeginGetRequestStream、EndGetRequestStream用于异步向HttpWebRequest对象写入请求信息;
  • BeginGetResponse、EndGetResponse用于异步发送页面请求并获取返回信;

  使用异步方式操作Internet的"请求/响应",避免主线程长期处于等待状态,而操作期间异步线程是来自CLR线程池的I/O线程。

  注意:请求与响应不能使用同步与异步混合开发模式,即当请求写入使用GetRequestStream同步模式,即使响应使用BeginGetResponse异步方法,操作也与GetRequestStream方法在于同一线程内。

    class Program
    {
        static void Main(string[] args)
        {
            int a, b;
            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("原有辅助线程:" + a + "原有I/O线程:" + b);

            //使用WebRequest.Create方法建立HttpWebRequest对象
            HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create("http://www.baidu.com");
            webRequest.Method = "post";

            //对写入数据的RequestStream对象进行异步请求
            IAsyncResult result = webRequest.BeginGetResponse(new AsyncCallback(EndGetResponse), webRequest);
            Thread.Sleep(1000);
            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("现有辅助线程:" + a + "现有I/O线程:" + b);

            Console.WriteLine("主线程继续干其他事!");
            Console.ReadKey();
        }

        static void EndGetResponse(IAsyncResult result)
        {
            Thread.Sleep(2000);
            //结束异步请求,获取结果
            HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState;
            WebResponse webResponse = webRequest.EndGetResponse(result);

            Stream stream = webResponse.GetResponseStream();
            StreamReader sr = new StreamReader(stream);
            string html = sr.ReadToEnd();
            Console.WriteLine(html.Substring(0,50));
        }
    }

  显示结果如下:

  

四、异步SqlCommand

  使用异步SqlCommand的时候,请注意把ConnectionString 的 Asynchronous Processing 设置为 true 。

    class Program
    {
        static void Main(string[] args)
        {
            int a, b;
            ThreadPool.GetMaxThreads(out a, out b);
            Console.WriteLine("原有辅助线程数" + a + "   " + "原有I/O线程数" + b);

            string str = "server=.;database=Test;uid=sa;pwd=123;Asynchronous Processing=true";
            SqlConnection conn = new SqlConnection(str);
            SqlCommand cmd = conn.CreateCommand();
            cmd.CommandText = "INSERT INTO Person VALUES(15,‘郭嘉‘,22)";
            conn.Open();
            cmd.BeginExecuteNonQuery(new AsyncCallback(EndCallback), cmd);
            Thread.Sleep(1000);
            ThreadPool.GetAvailableThreads(out a, out b);
            Console.WriteLine("现有辅助线程数" + a + "   " + "现有I/O线程数" + b);

            Console.WriteLine("主线程继续执行!");

            Console.ReadKey();
        }

        public static void EndCallback(IAsyncResult result)
        {
            Thread.Sleep(2000);
            SqlCommand cmd = result.AsyncState as SqlCommand;   //获得异步传入的参数
            Console.WriteLine("成功执行命令:" + cmd.CommandText);
            Console.WriteLine("本次执行影响行数为:" + cmd.EndExecuteNonQuery(result));
            cmd.Connection.Close();
        }
    }

  输出如下:

  

时间: 2024-08-07 16:40:34

线程池 异步I/O线程 <第三篇>的相关文章

带你一步步实现线程池异步回调

转载请注明出处 作者:晓渡文章地址:https://greatestrabit.github.io/2016/03/29/callback/ 1.字面意义上的回调 字面意思上理解回调,就是A调用B,B回过头来再调用A,即是回调.既然是这样,当然就要求A中有B,B中有A.如下: class A { /**  * 提出问题  * @author [email protected]  * @param b  * @param question  */ public void ask(final B b

java 线程池——异步任务

一.简单粗暴的线程 最原始的方式,当我们要并行的或者异步的执行一个任务的时候,我们会直接使用启动一个线程的方式,如下面所示: new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub 这里放你要执行的方法 } }).start(); 但是像上面或者类似这种每次来都是用new 一个Thread出来的方式存在着很多的弊端,如下面: 每次new Thread新建对象性能差:

使用Android新式LruCache缓存图片,基于线程池异步加载图片

import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import a

Java ExecutorServic线程池(异步)

相信大家都在项目中遇到过这样的情况,前台需要快速的显示,后台还需要做一个很大的逻辑.比如:前台点击数据导入按钮,按钮后的服务端执行逻辑A,和逻辑B(执行大量的表数据之间的copy功能),而这时前台不能一直等着,要返回给前台,告诉正在处理中就行了.这里就需要用到异步了. 点击按钮 -> 逻辑A ->逻辑B(异步) -> 方法结束. 到底,项目需求明确了,就引入了ExecutorServic线程池. Java通过Executors提供四种线程池,分别为:newCachedThreadPool

高并发的epoll+线程池,业务在线程池内

我们知道,服务器并发模型通常可分为单线程和多线程模型,这里的线程通常是指"I/O线程",即负责I/O操作,协调分配任务的"管理线程",而实际的请求和任务通常交由所谓"工作者线程"处理.通常多线程模型下,每个线程既是I/O线程又是工作者线程.所以这里讨论的是,单I/O线程+多工作者线程的模型,这也是最常用的一种服务器并发模型.我所在的项目中的server代码中,这种模型随处可见.它还有个名字,叫"半同步/半异步"模型,同时,这种

线程池的优点及线程池的创建方式

什么是线程池 Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池. 在开发过程中,合理地使用线程池能够带来3个好处.第一:降低资源消耗.通过重复利用机制已降低线程创建和销毁造成的消耗.第二:提高响应速度.当任务到达时,任务可以不需要等到线程创建就能立即执行.第三:提高线程的可管理性.线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配.调优和监控.但是,要做到合理利用线程池,必须对其实现原理了如指掌.

java多线程系类:JUC线程池:03之线程池原理(二)(转)

概要 在前面一章"Java多线程系列--"JUC线程池"02之 线程池原理(一)"中介绍了线程池的数据结构,本章会通过分析线程池的源码,对线程池进行说明.内容包括:线程池示例参考代码(基于JDK1.7.0_40)线程池源码分析(一) 创建"线程池"(二) 添加任务到"线程池"(三) 关闭"线程池" 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509954.h

Thread 线程池中可用的线程数量

GetAvaliableThread(out workerThreadCount,out iocompletedThreadCount)   函数居然一次返回了两个变量.线程池里的线程按照公用被分成了两大类:工作线程和IO线程,或者IO完成线程,前者用于执行普通的操作,后者专用于异步IO,比如文件和网络请求,注意,分类并不说明两种线程本身有差别,线程就是线程,是一种执行单元,从本质上来讲都是一样的,线程池这样分类 Thread 线程池中可用的线程数量

线程池原理以及自定义线程池

第一部分:对线程池的需求分析 /* 8.1 线程池原理 一个完整的线程池应该具备如下要素: 1.任务队列:用于缓存提交的任务 2.线程数量管理功能:可通个三个参数实现: init:创建时初始的线程数量 max:线程池自动扩充时最大的线程数量 core:空闲时但是需要释放线程,但是也要维护一定数量的活跃线程 3.任务拒绝策略: 4.线程工程:主要用于个性化定制线程,比如设置守护线程.设置线程名称等 5.QueueSize:任务队列主要存放Runnable,防止内存溢出,需要有limit数量限制 6