带你一步步实现线程池异步回调

转载请注明出处

作者:晓渡
文章地址:https://greatestrabit.github.io/2016/03/29/callback/

1.字面意义上的回调

字面意思上理解回调,就是A调用B,B回过头来再调用A,即是回调.既然是这样,当然就要求A中有B,B中有A.如下:

class A {
	/**
	 * 提出问题
	 * @author [email protected]
	 * @param b
	 * @param question
	 */
	public void ask(final B b, final String question) {
		b.answer(this, question);
	}

	/**
	 * 处理结果
	 * @author [email protected]
	 * @param answer
	 */
	public void processResult(final String answer) {
		System.out.println(answer);
	}
}

class B {
	/**
	 * 计算结果
	 * @author [email protected]
	 * @param a
	 * @param question
	 */
	public void answer(final A a, final String question) {
		if (question.equals("What is the answer to life, the universe and everything?")) {
			a.processResult("42");
		}
	}
}

/**
 * 相互调用
 * @author [email protected]
 *
 */
public class SyncObjectCallback {
	public static void main(final String[] args) {
		B b = new B();
		A a = new A();

		a.ask(b, "What is the answer to life, the universe and everything?");
	}
}

2.面向对象的回调

上面的写法中,B的对象只在方法中被传递了.实际上,这个B对象后来又调用了A中的方法,它的作用应该不止局限在一个方法中,而应该是A的一个部分.也就是,上面的写法不够”面向对象”,让我们来改造一下:

class A {
	private final B b;

	public A(final B b) {
		this.b = b;
	}

	public void ask(final String question) {
		this.b.answer(this, question);
	}

	public void processResult(final String answer) {
		System.out.println(answer);
	}
}

class B {
	public void answer(final A a, final String question) {
		if (question.equals("What is the answer to life, the universe and everything?")) {
			a.processResult("42");
		}
	}
}

/**
 * 面向对象的相互调用
 * @author [email protected]
 *
 */
public class SyncOOCallback {
	public static void main(final String[] args) {
		B b = new B();
		A a = new A(b);
		a.ask("What is the answer to life, the universe and everything?");
	}
}

3.面向接口的回调

上面的两个例子,估计没人会承认也是回调吧.因为并没什么卵用.不过这个流程对于理解回调是很重要的.其实回调真正有用的地方,在于它的”预测”能力.
我们扩展想象一下.假设上面例子中的B,为A提供了很多服务之后突然觉醒,想为更多的对象提供服务,这样一来,B就变成了Server.而且还要制定规则.规则是什么呢,就是要Server提供服务可以,对方一定要有一个recvAnswer接口供Server调用才行,这样Server才能把结果传回给Client.具体如何制定规则呢?通过Interface.如下:

/**
 * 发出请求着需要实现的接口,要实现处理结果的方法
 * @author [email protected]
 *
 */
public interface IClient {
	void recvAnswer(String answer);
}
  
/**
 * 响应请求者,即提供服务者
 * @author [email protected]
 *
 */
public class Server {
	public void answer(final IClient client, final String question) {
		if (question.equals("What is the answer to life, the universe and everything?")) {
			calclating();
			client.recvAnswer("42");
		}
	}

	private void calclating() {
		try {
			Thread.sleep(new Random().nextInt(5000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
  
/**
 * 发出请求者,同时要处理请求结果
 * @author [email protected]
 *
 */
public class ClientSync implements IClient {
	private final Server server;

	public ClientSync(final Server server) {
		this.server = server;
	}

	public void ask(final String question) {
		this.server.answer(this, question);
	}

	@Override
	public void recvAnswer(final String answer) {
		System.out.println(answer);
	}
}
  
/**
 * 面向接口的同步回调
 * @author [email protected]
 *
 */
public class SyncInterfaceCallback {
	/**
	 * 使用内部类来实现的方式
	 * @author [email protected]
	 */
	private static void innerMain() {
		Server server = new Server();
		server.answer(new IClient() {
			@Override
			public void recvAnswer(final String answer) {
				System.out.println(answer);
			}
		}, "What is the answer to life, the universe and everything?");
	}

	public static void main(final String[] args) {
		Server server = new Server();
		ClientSync client = new ClientSync(server);
		client.ask("What is the answer to life, the universe and everything?");

		innerMain();
	}
}

注意,接口IClient实际上应该是属于Server端的,它是由Server制定的,需要Client来实现的接口,虽然看上去它跟Client很近.
为什么说有”预测”能力呢?想象另一个场景.Server现在是一个底层服务,这个底层服务知道迟早有一天会有高层服务来讨要数据,但是数据如何向上传递呢?底层可以承诺,只有你实现IClient接口,我就会调用其中的recvAnswer方法,把数据传上来.现在底层也可以调用高层的方法,算是有”预测”能力吧?

4.异步回调

上面的调用都是同步的.假设Server计算结果需要较长的时间,你一定希望它能在一个单独的线程中被执行,这是就可以把ask方法的调用用线程包装一下:

public class ClientAsync implements IClient {
	private final Server server;

	public ClientAsync(final Server server) {
		this.server = server;
	}

	/**
	 * 在线程中发出请求
	 * @author [email protected]
	 * @param question
	 */
	public void ask(final String question) {
		new Thread(new Runnable() {
			@Override
			public void run() {
				server.answer(ClientAsync.this, question);
			}
		}).start();
	}

	@Override
	public void recvAnswer(final String answer) {
		System.out.println(answer);
	}
}
  
/**
 * 基于接口的异步回调,每次建立新的线程
 * @author [email protected]
 *
 */
public class AsyncInterfaceCallback {
	/**
	 * 使用内部类的实现方式,此处可见回调地狱
	 * @author [email protected]
	 */
	private static void innerMain() {
		Server server = new Server();

		new Thread(new Runnable() {
			@Override
			public void run() {
				server.answer(new IClient() {
					@Override
					public void recvAnswer(final String answer) {
						System.out.println(answer);
					}
				}, "What is the answer to life, the universe and everything?");
			}
		}).start();
		System.out.println("asked ! waiting for the answer...");
	}

	public static void main(final String[] args) {
		Server server = new Server();
		ClientAsync client = new ClientAsync(server);
		client.ask("What is the answer to life, the universe and everything?");
		System.out.println("asked ! waiting for the answer...");

		innerMain();
	}
}

5.线程池异步回调

每次建立新的线程耗费资源巨大,为了重用线程,使用线程池管理异步调用,这时候就要求Client不仅要实现IClient接口,还要同时是一个任务,才能被线程池执行,如下:

/**
 * 专门用来执行请求的任务,供线程池调用
 * @author [email protected]
 *
 */
public class ClientRunnable implements IClient, Runnable {
	private final Server server;
	private final String question;
	private final int id;

	public ClientRunnable(final Server server, final String question, final int id) {
		this.server = server;
		this.question = question;
		this.id = id;
	}

	@Override
	public void recvAnswer(final String answer) {
		System.out.println("clinet " + this.id + " got answer: " + answer);
	}

	@Override
	public void run() {
		server.answer(ClientRunnable.this, this.question);
	}
}
  
/**
 * 基于线程池的异步回调
 * @author [email protected]
 *
 */
public class ThreadpoolCallback {
	public static void main(final String[] args) {
		ExecutorService es = Executors.newCachedThreadPool();

		Server server = new Server();

		for (int i = 0; i < 100; i++) {
			ClientRunnable cr = new ClientRunnable(server, "What is the answer to life, the universe and everything?",
					i);
			es.execute(cr);
			System.out.println("client " + i + " asked !");
		}

		es.shutdown();
	}
}

至此,我们就实现了线程池异步回调.

完整源码请移步:github

时间: 2025-01-04 15:24:10

带你一步步实现线程池异步回调的相关文章

使用Android新式LruCache缓存图片,基于线程池异步加载图片

import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import a

进程池、线程池及回调函数使用

一.线程池与进程池 池表示容器 线程就是装线程的容器 为什么要装到容器中 可以避免频繁的创建和销毁(进程/线程)来的资源开销 可以限制同时存在的线程数量 以保证服务器不会应为资源不足而导致崩溃 帮我们管理了线程的生命周期 管理了任务的分配 import os import time from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from threading import activeCount,enum

线程池 异步I/O线程 &lt;第三篇&gt;

在学习异步之前先来说说异步的好处,例如对于不需要CPU参数的输入输出操作,可以将实际的处理步骤分为以下三步: 启动处理: 实际的处理,此时不需要CPU参数: 任务完成后的处理: 以上步骤如果仅仅使用一个线程,当线程正在处理UI操作时就会出现“卡”的现象. 如果使用异步的处理方式,则这三步处理过程涉及到两个线程,主线程中启动第一步:第一步启动后,主线程结束(如果不结束,只会让该线程处于无作为的等待状态):第二步不需要CPU参与:第二步完成之后,在第二个线程上启动第三步:完成之后第二个线程结束.这样

ThreadPool.QueueUserWorkItem引发的血案,线程池异步非正确姿势导致程序闪退的问题

ThreadPool是.net System.Threading命名空间下的线程池对象.使用QueueUserWorkItem实现对异步委托的先进先出有序的回调.如果在回调的方法里面发生异常则应用程序会出现闪退.当然是指不处理那个异常的情况下.这不公司的CMS在生产环境频频出现闪退的情况.该死的是,原来用老机器配置不高的情况下没有出现过.换了更好的新机器后出现的. // // 摘要: // 将方法排入队列以便执行,并指定包含该方法所用数据的对象.此方法在有线程池线程变得可用时执行. // //

Android AsyncTask内部线程池异步执行任务机制简要分析

如下分析针对的API 25的AsyncTask的源码: 使用AsyncTask如果是调用execute方法则是同步执行任务,想要异步执行任务可以直接调用executeOnExecutor方法,多数情况下我们会使用AsyncTask内部静态的线程池, THREAD_POOL_EXECUTOR,这里并不是要分析AsyncTask内部的流程,而是简单介绍下线程池的工作流程.可以看到THREAD_POOL_EXECUTOR的配置如下: new ThreadPoolExecutor( CORE_POOL_

Java ExecutorServic线程池(异步)

相信大家都在项目中遇到过这样的情况,前台需要快速的显示,后台还需要做一个很大的逻辑.比如:前台点击数据导入按钮,按钮后的服务端执行逻辑A,和逻辑B(执行大量的表数据之间的copy功能),而这时前台不能一直等着,要返回给前台,告诉正在处理中就行了.这里就需要用到异步了. 点击按钮 -> 逻辑A ->逻辑B(异步) -> 方法结束. 到底,项目需求明确了,就引入了ExecutorServic线程池. Java通过Executors提供四种线程池,分别为:newCachedThreadPool

基于线程池异步抓取

from multiprocessing.dummy import Pool #线程池模块 #必须只可以有一个参数 def my_requests(url): return requests.get(url=url,headers=headers).text start = time.time() urls = [ 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', ] p

java 线程池——异步任务

一.简单粗暴的线程 最原始的方式,当我们要并行的或者异步的执行一个任务的时候,我们会直接使用启动一个线程的方式,如下面所示: new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub 这里放你要执行的方法 } }).start(); 但是像上面或者类似这种每次来都是用new 一个Thread出来的方式存在着很多的弊端,如下面: 每次new Thread新建对象性能差:

ThreadPoolExecutor线程池和ProcessPoolExecutor进程池

ProcessPoolExecutor线程池 1.为什么需要线程池呢,如果创建了20个线程,而同时只允许3个线程在运行,但是20个线程都需要创建和销毁,线程的创建是需要消耗系统资源的,所以线程池的思想就是:每个线程各分配一个任务,剩下的任务皮队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行 2.标准库concurrent.futures模块,它提供了 ProcessPoolExecutor和ThreadPoolExecutor两个类,实现了对threading和multi