异步框架asyn4j的原理

启动时调用init方法

[java] view plain copy

  1. public void init(){
  2. if (!run){
  3. run = true;
  4. //工作队列
  5. workQueue = newPriorityBlockingQueue(maxCacheWork);
  6. //是否存在工作队列满处理类
  7. if (this.workQueueFullHandler != null) {
  8. //自定义线程池
  9. workExecutor = newAsynThreadPoolExecutor(work_thread_num, work_thread_num, 0L,TimeUnit.MILLISECONDS,
  10. workQueue, new AsynThreadFactory(),new AsynWorkRejectedExecutionHandler(this.workQueueFullHandler),
  11. executeWorkNum);
  12. } else {
  13. workExecutor = newAsynThreadPoolExecutor(work_thread_num, work_thread_num, 0L,TimeUnit.MILLISECONDS,
  14. workQueue, new AsynThreadFactory(),executeWorkNum);
  15. }
  16. //回调函数工作队列
  17. callBackQueue = newLinkedBlockingQueue();
  18. //自定义回调函数线程池
  19. callBackExecutor = newCallBackThreadPoolExecutor(callback_thread_num, callback_thread_num, 0L,
  20. TimeUnit.MILLISECONDS, callBackQueue,new CallBackThreadFactory(),
  21. new CallBackRejectedExecutionHandler(),callBackNum);
  22. //service启动和关闭处理器
  23. if (this.serviceHandler != null) {
  24. this.serviceHandler.setServiceStat(0);
  25. this.serviceHandler.setAsynService(this);
  26. this.serviceHandler.process();
  27. }
  28. //启动工作队列满处理器
  29. if (this.workQueueFullHandler != null) {
  30. this.workQueueFullHandler.process();
  31. }
  32. //程序关闭后的处理
  33. Runtime.getRuntime().addShutdownHook(newThread(){
  34. public void run()
  35. {
  36. AsynServiceImpl.this.close(AsynServiceImpl.closeServiceWaitTime);
  37. }
  38. });
  39. }
  40. }

AsynServiceImpl主要有两个线程池和三个处理器组成

工作线程池AsynThreadPoolExecutor

[java] view plain copy

  1. public classAsynThreadPoolExecutor
  2. extends ThreadPoolExecutor
  3. {
  4. private AtomicLong executeWorkNum;

回调函数线程池CallbackThreadPoolExecutor

[java] view plain copy

  1. public classCallBackThreadPoolExecutor
  2. extends ThreadPoolExecutor
  3. {
  4. private AtomicLong callBackNum;

AsynThreadPoolExecutor的executeWorkNum,统计任务的执行数

AsynThreadPoolExecutor的callBackNum,统计回调函数的执行数

ServiceHandler处理器(启动或关闭)

WorkQueueFullHandler处理器(任务数超过maxCacheWork)

ErrorAsynWorkHandler处理器(任务执行异常)

执行时调用addWork方法

[java] view plain copy

  1. public void addWork(Object tagerObject, Stringmethod, Object[] params, AsynCallBack asynCallBack, WorkWeight weight, booleancache)
  2. {
  3. if ((tagerObject == null) || (method ==null)) {
  4. throw newIllegalArgumentException("target name is null or  target method name is null");
  5. }
  6. Object target = null;
  7. //如果对象是String
  8. if(tagerObject.getClass().isAssignableFrom(String.class)){
  9. addWorkWithSpring(tagerObject.toString(),method, params, asynCallBack, weight);
  10. return;
  11. }
  12. //如果对象是class
  13. if ((tagerObject instanceof Class)) {
  14. String classKey =((Class)tagerObject).getSimpleName();
  15. //是否缓存,一个class缓存一个默认的对象,防止重复创建
  16. if (cache)
  17. {
  18. target = targetCacheMap.get(classKey);
  19. if (target == null)
  20. {
  21. target =newObject((Class)tagerObject);
  22. targetCacheMap.put(classKey, target);
  23. }
  24. }
  25. else
  26. {
  27. target = newObject((Class)tagerObject);
  28. }
  29. }
  30. else
  31. {
  32. target = tagerObject;
  33. }
  34. if (target == null) {
  35. throw newIllegalArgumentException("target object is null");
  36. }
  37. //封装成异步任务
  38. AsynWork anycWork = newAsynWorkEntity(target, method, params, asynCallBack, weight);
  39. addAsynWork(anycWork);
  40. }

[java] view plain copy

  1. public voidaddAsynWork(AsynWork asynWork){
  2. if (!run) {
  3. throw new Asyn4jException("asynservice is stop or no start!");
  4. }
  5. if (asynWork == null) {
  6. throw newIllegalArgumentException("asynWork is null");
  7. }
  8. try<span style="font-family: Arial, Helvetica, sans-serif;">{</span>

[java] view plain copy

  1. //通过semaphore来获取许可权限
  2. if(this.semaphore.tryAcquire(addWorkWaitTime, TimeUnit.MILLISECONDS))
  3. {
  4. WorkProcessor workProcessor = newWorkProcessor(asynWork, this);
  5. workExecutor.execute(workProcessor);
  6. totalWork.incrementAndGet();
  7. }
  8. else{
  9. log.warn("work queue is full,addwork to cache queue");
  10. this.workQueueFullHandler.addAsynWork(asynWork);
  11. }
  12. }
  13. catch (InterruptedException e)
  14. {
  15. log.error(e);
  16. }
  17. }

通过semaphore来控制最大的访问线程数,maxCacheWork就是semaphore的数量,也是工作队列的数量

这里的目的是控制工作队列的数量不能超过maxCacheWork(也可以通过用上限的queue来代替)

如果超过队列maxCacheWork,就用workQueueFullHandler去处理,处理方式同线程池的拒绝策略处理器(

newAsynWorkRejectedExecutionHandler(this.workQueueFullHandler))不过线程池的拒绝策略没用到(前提是队列的有上限的队列),

工作任务WorkProcessor(内含AsynWorkEntity)

[java] view plain copy

  1. public void run() {
  2. Thread currentThread =Thread.currentThread();
  3. if (this.asynWork.getThreadName() != null){
  4. setName(currentThread,this.asynWork.getThreadName());
  5. }
  6. AsynCallBack result = null;
  7. try{
  8. result = this.asynWork.call();
  9. if (result != null) {
  10. ApplicationContext.getCallBackExecutor().execute(result);
  11. }
  12. }
  13. catch (Throwable throwable){
  14. if(this.applicationContext.getErrorAsynWorkHandler() != null) {
  15. this.applicationContext.getErrorAsynWorkHandler().addErrorWork(this.asynWork,throwable);
  16. }
  17. }
  18. finally{
  19. this.applicationContext.getSemaphore().release();
  20. }
  21. }

首先修改线程名称,然后调用asynWork的call方法,如果有回调函数,就执行,如果有异常就执行ErrorAsynWorkHandler,最后Semaphore释放一个许可

AsynWorkEntity

[java] view plain copy

  1. public AsynCallBack call()
  2. throws Exception {
  3. if (this.target == null) {
  4. throw new RuntimeException("targetobject is null");
  5. }
  6. Class clazz = this.target.getClass();
  7. String methodKey =MethodUtil.getClassMethodKey(clazz, this.params, this.method);
  8. Method targetMethod =(Method)methodCacheMap.get(methodKey);
  9. if (targetMethod == null){
  10. targetMethod =MethodUtil.getTargetMethod(clazz, this.params, this.method);
  11. if (targetMethod != null) {
  12. methodCacheMap.put(methodKey,targetMethod);
  13. }
  14. }
  15. if (targetMethod == null) {
  16. throw newIllegalArgumentException("target method is null");
  17. }
  18. Object result =targetMethod.invoke(this.target, this.params);
  19. if (this.anycResult != null) {
  20. this.anycResult.setInokeResult(result);
  21. }
  22. return this.anycResult;
  23. }

通过反射技术调用用具体方法,也用缓存技术,anycResult就是你定义的回调函数,没有就返回null

关闭时调用close方法

[java] view plain copy

    1. publicvoid close(long waitTime){
    2. if (run){
    3. run = false;
    4. try {
    5. workExecutor.awaitTermination(waitTime,TimeUnit.MILLISECONDS);
    6. callBackExecutor.awaitTermination(0L,TimeUnit.MILLISECONDS);
    7. }
    8. catch (InterruptedException e)
    9. {
    10. log.error(e);
    11. }
    12. //关闭工作线程和回调函数线程
    13. workExecutor.shutdown();
    14. callBackExecutor.shutdown();
    15. //service关闭处理器
    16. if (this.serviceHandler != null)
    17. {
    18. this.serviceHandler.setAsynWorkQueue(workQueue);
    19. this.serviceHandler.setCallBackQueue(callBackQueue);
    20. this.serviceHandler.setServiceStat(1);
    21. this.serviceHandler.process();
    22. }
    23. }
    24. }
时间: 2024-10-03 20:53:17

异步框架asyn4j的原理的相关文章

jQuery异步框架探究1:jQuery._Deferred方法

jQuery异步框架应用于jQuery数据缓存模块.jQuery ajax模块.jQuery事件绑定模块等多个模块,是jQuery的基础功能之一.实际上是jQuery实现的一个异步处理框架,从本质上讲与java aio没有区别,所以需要从更抽象层面的"异步处理"的视角分析解读该模块.这个部分与dom功能关系不大,是独立部分,可以看作是jQuery工具系列之一. 与异步框架相关的方法定义于jQuery类的静态方法中.只有三个方法,但是功能和应用及其强大!本篇详细讲解第一个方法jQuery

基于SEDA的异步框架设计与实现

基于SEDA的异步框架设计与实现 二.为什么使用SEDA 目前,面对并发环境,主流互联网服务器编程模型有两种:多线程模型以及事件驱动模型.但是这两个模型都不足以解决这个问题.我们来首先看一下这两种编程模型. 1.多线程并发模型 多线程并发模型是目前最普遍的服务器编程模型,该模型的架构如下图所示:        该模型针对每一个请求,会为其创建并分配一个线程.该线程负责这个请求的处理.该模型的优点:执行粒度是整个完整的处理流程.处理逻辑清晰,容易开发.但与此同时缺点也很明显:如果处理过程中某一步骤

无废话Android之内容观察者ContentObserver、获取和保存系统的联系人信息、网络图片查看器、网络html查看器、使用异步框架Android-Async-Http(4)

1.内容观察者ContentObserver 如果ContentProvider的访问者需要知道ContentProvider中的数据发生了变化,可以在ContentProvider 发生数据变化时调用getContentResolver().notifyChange(uri, null)来通知注册在此URI上的访问者,例子如下: private static final Uri URI = Uri.parse("content://person.db"); public class

CodeIgniter框架的缓存原理分解

用缓存的目的:(手册上叙述如下,已经写得很清楚了) Codeigniter 支持缓存技术,以达到最快的速度. 尽管CI已经相当高效了,但是网页中的动态内容.主机的内存CPU 和数据库读取速度等因素直接影响了网页的加载速度. 依靠网页缓存,你的网页可以达到近乎静态网页的加载速度,因为他们将程序输出的结果保存到硬盘上了. 如何用: 启用缓存功能,只需要将下面的代码放入你的任何一个控制器(controller)的方法(function)内: $this->output->cache(1); 其中 n

浅议事件异步处理底层实现原理

//主类 package cn.com.likeshow.bluetoothchat; import android.os.Bundle; import android.app.Activity; import android.view.Menu; import android.widget.LinearLayout; import android.widget.TextView; public class MainActivity extends Activity { @Override pr

Spring框架和MVC原理

Spring框架和MVC原理 目录 Spring框架 SpringMVC工作原理 参考资料 回到顶部 Spring框架 Spring当前框架有20个jar包,大致可以分为6大模块: Core Container AOP and Instrumentation Messaging Data Access/Integration Web Test Spring框架提供了非常丰富的功能,因此整个架构也很庞大. 在我们实际的应用开发中,并不一定要使用所有的功能,而是可以根据需要选择合适的Spring模块

几个常用的异步框架和网络访问框架区分对比

Part1: 由于在我们的程序中,不允许一些耗时的任务在主线程中出现,主要是为了防止阻塞主线程而导致的 Anr(Application not Responding),一些耗时任务主要包括: 网络访问,缓慢的磁盘操作,比较耗时的算法 当我们的主线程在一定时间里对某一事件的处理超过一定时间后会主线程会崩溃报ANR, 通常的解决方案:采用子线程技术来将耗时任务与主线程进行脱离 1.handler机制 只需要将UI更新参数在子线程中使用sendMessage发送到定义好的Handler里的handle

Android异步框架RxJava 1.x系列(一) - 观察者模式及实现

Android异步框架RxJava 1.x系列(一) - 观察者模式及实现 前言 RxJava 是一款基于 Java VM 实现的响应式编程扩展库 - 基于观察者模式的异步和事件处理框架.RxJava 官方目前同时维护了两个版本,分别是 1.x 和 2.x,区别是它们使用不同的 group id 和 namespaces. 版本 group id namespaces v1.x io.reactivex io.reactivex v2.x io.reactivex.rxjava2 rx 本系列的

好程序员web前端分享MVVM框架Vue实现原理

好程序员web前端分享MVVM框架Vue实现原理,Vue.js是当下很火的一个JavaScript MVVM库,它是以数据驱动和组件化的思想构建的.相比于Angular.js和react.js更加简洁.更易于理解的API,使得我们能够快速地上手并使用Vue.js. ? 1.什么是MVVM呢? MVVM的简写是Model-View-ViewModel. 在过去的10年里面,我们已经把很多传统的服务端代码放到了浏览器中,这样就产生了成千上万行的javascript代码,它们连接了HTML 和CSS文