第八章、线程池的使用

线程工厂的使用:

  在创建线程时,应该要初始化它的线程名称,以便以后更好的查找错误,下面的示例展示了线程工厂的使用,创建线程是并发的,因此count使用原子类。

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by chenkaiwei on 2017/6/8.
 */
public class MyThreadFactory implements ThreadFactory{
    private final String poolName;
    private AtomicInteger count=new AtomicInteger(0);
    public MyThreadFactory(String poolName) {
        this.poolName=poolName;
        System.out.println("Thread name is "+poolName);
    }
    @Override
    public Thread newThread(Runnable r) {
        Thread t=new Thread(r);
        t.setName(poolName+"-"+count.incrementAndGet());
        return t;
    }
}

  测试类:

package com.company;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        ExecutorService exec=Executors.newCachedThreadPool(new MyThreadFactory("thread"));
        exec.submit(new Thread1());
        exec.submit(new Thread1());

        exec.shutdown();

    }
}
class Thread1 implements Runnable{

    public void run(){
        System.out.println("the thread is running!");
        System.out.println("my name is "+Thread.currentThread().getName());
    }
}

扩展ThreadPoolExecutor

  下面演示使用给线程增加日志和计时功能,startTime使用ThreadLocal是因为全局变量,可以发现,在这里的全局变量要保证线程安全:

package com.company;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by chenkaiwei on 2017/6/8.
 */
public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime=new ThreadLocal<Long>();
    private final AtomicLong numTasks=new AtomicLong();
    private final AtomicLong totalTime=new AtomicLong();

    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

    }

    @Override
    protected void beforeExecute(Thread t, Runnable r){
        startTime.set(System.nanoTime());
        super.beforeExecute(t, r);
        System.out.println("在线程执行之前打印!");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        long endTime=System.nanoTime();
        long taskTime=endTime-startTime.get();
        numTasks.incrementAndGet();
        totalTime.addAndGet(taskTime);
        System.out.println(String.format("Thread %s:end %s, time=%dns",Thread.currentThread().getName(),endTime,taskTime));

        System.out.println("在线程执行之后打印!");
        super.afterExecute(r, t);
    }

    @Override
    protected void terminated() {
        super.terminated();
        System.out.println("在线程池关闭后打印!");
    }
}

递归算法的并行化

  这里要比较多线程和单线程递归地处理某个文件夹下全部文件的时间快慢。唯一不方便的是使用多线程处理时的时间统计,由于不知道提交了多少个处理线程,因此不能及时地关闭线程池(如果有大神,请指点一二),使用等待固定时间的方式,让线程池最终关闭。这里使用TimingThreadPool这个类来统计各个线程的执行时间,并每个线程执行完后将时间统计到totalTime中,在线程池关闭后再输出总时间。

package com.company;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by chenkaiwei on 2017/6/8.
 */
public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime=new ThreadLocal<Long>();
    private final AtomicLong numTasks=new AtomicLong();
    private final AtomicLong totalTime=new AtomicLong();

    public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

    }

    @Override
    protected void beforeExecute(Thread t, Runnable r){
        startTime.set(System.nanoTime());
        super.beforeExecute(t, r);
        //System.out.println("在线程执行之前打印!");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        long endTime=System.nanoTime();
        long taskTime=endTime-startTime.get();
        numTasks.incrementAndGet();
        totalTime.addAndGet(taskTime);

        //System.out.println(String.format("Thread %s:end %s, time=%dns",Thread.currentThread().getName(),endTime,taskTime));
        //System.out.println("在线程执行之后打印!");
        //System.out.println("线程总运行时间为:"+totalTime.get()/1000000000+"秒");
        super.afterExecute(r, t);
    }

    @Override
    protected void terminated() {
        super.terminated();
        System.out.println("在线程池关闭后打印!总共用时:"+totalTime.get()/1000000000+"秒");
    }
}

  主方法,其中Thread1是使用多线程来执行,可以修改TimingThreadPool的初始状态可以调整线程数量。Thread2是使用单线程处理。在代码中,我假设处理文件的时间都是1毫秒(单单遍历文件时,多线程要比单线程的慢,但加入一些处理之后,多线程就占优势了)。可以看到我的测试数据有580259个文件,使用单线程时一共用时818秒,而使用3个线程时用时798秒,4线程416秒,2线程之类的846秒。因此最合适的是3线程也就快那么20秒,但是如果处理文件不仅仅是1毫秒,而是更多,那么这两者的差距会更大。

package com.company;

import java.io.File;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class Main {
    public final AtomicLong atom=new AtomicLong(0);
    public static void main(String[] args) {
        ExecutorService exec=Executors.newFixedThreadPool(1);
        //ThreadPoolExecutor pool=new TimingThreadPool(1,1,2,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(),new MyThreadFactory("findThread"));
        exec.submit(new Thread1());
        exec.shutdown();
    }

    //单线程遍历某个文件夹
    public void getAllFiles(File file){
        if(file.isDirectory()) {
            for (File f:file.listFiles()){
                getAllFiles(f);
            }
        }else {
            try{
                Thread.sleep(1);
            }catch(Exception e){
                e.printStackTrace();
            }
            atom.incrementAndGet();
        }
    }

    //多线程遍历某个文件夹
    public void parallelRecursive(Executor exec,File file){
        if(file.isDirectory()) {
            for (File f:file.listFiles()){
                exec.execute(new Thread(){
                    public void run(){
                        parallelRecursive(exec,f);
                    }
                });
            }
        }else {
            try{
                Thread.sleep(1);
            }catch(Exception e){
                e.printStackTrace();
            }
            atom.incrementAndGet();
        }
    }

}
class Thread1 implements Runnable{
    public void run(){
        Main m=new Main();
        File file=new File("/Users/chenkaiwei");

        ThreadPoolExecutor pool=new TimingThreadPool(2,2,5,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(),new MyThreadFactory("findThread"));

        m.parallelRecursive(pool,file);
        try{
            Thread.sleep(818000);
            pool.shutdown();
            System.out.println("共访问【"+m.atom.get()+"】个文件");
            //System.out.println("线程池退出");
            //exec.awaitTermination(Integer.MAX_VALUE,TimeUnit.SECONDS);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    //798秒 3线程
    //816秒 4线程
}

class Thread2 implements Runnable{
    public void run(){
        Main m=new Main();
        File file=new File("/Users/chenkaiwei");
        long startTime=System.currentTimeMillis();
        m.getAllFiles(file);
        long endTime=System.currentTimeMillis();
        System.out.println("总共用时:"+(endTime-startTime)/1000+"秒");
        System.out.println("共访问【"+m.atom.get()+"】个文件");
    }
    //818秒
}
//580259
时间: 2024-11-05 01:08:54

第八章、线程池的使用的相关文章

《Java并发编程实战》第八章 线程池的使用 读书笔记

一.在任务与执行策略之间的隐性解耦 有些类型的任务需要明确地指定执行策略,包括: . 依赖性任务.依赖关系对执行策略造成约束,需要注意活跃性问题.要求线程池足够大,确保任务都能放入. . 使用线程封闭机制的任务.需要串行执行. . 对响应时间敏感的任务. . 使用ThreadLocal的任务. 1. 线程饥饿死锁 线程池中如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,这种现象称为线程饥饿死锁. 2. 运行时间较长的任务 Java提供了限时版本与无限时版本.例如Thread

[Java Concurrency in Practice]第八章 线程池的使用

线程池的使用 8.1 在任务与执行策略之间的隐性耦合 虽然Executor框架为制定和修改执行策略提供了相当大的灵活性,但并非所有的任务都能适用所有的执行策略.有些类型的任务需要明确地执行执行策略,包括: 1. 依赖性任务:当线程池中运行任务都是独立的时,我们可以随意地修改池的长度与配置,这不会影响到性能以外的任何事情.但如果你提交到线程池中的任务依赖于其他的任务,这就会隐式地给执行策略带来了约束. 2. 非安全性任务:如果即使一个任务有线程安全性问题,只要它在单线程的环境下运行是不会有问题,如

Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor

介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? Java new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 1 2 3 4 5 6 7 new Thread(new

线程的控制和线程池

一.WaitHandle: ”.Net 中提供了一些线程间更自由通讯的工具,他们提供了通过"信号"进行通讯的机制 可以通过ManualResetEvent,AutoResetEvent(他是在开门并且一个 WaitOne 通过后自动关门)来进行线程间的通讯 waitOne:    等待开门 Set:           开门 Reset:       关门 static void Main(string[] args) { ManualResetEvent mre = new Manu

内存池、进程池、线程池

首先介绍一个概念"池化技术 ".池化技术 一言以蔽之就是:提前保存大量的资源,以备不时之需以及重复使用. 池化技术应用广泛,如内存池,线程池,连接池等等.内存池相关的内容,建议看看Apache.Nginx等开源web服务器的内存池实现. 起因:由于在实际应用当中,分配内存.创建进程.线程都会设计到一些系统调用,系统调用需要导致程序从用户态切换到内核态,是非常耗时的操作.           因此,当程序中需要频繁的进行内存申请释放,进程.线程创建销毁等操作时,通常会使用内存池.进程池.

缓冲池,线程池,连接池

SSH:[email protected]:unbelievableme/object-pool.git   HTTPS:https://github.com/unbelievableme/object-pool.git 缓冲池 设计要点:包含三个队列:空缓冲队列(emq),装满输入数据的输入的队列(inq),装满输出数据的输出队列(outq),输入程序包括收容输入(hin),提取输入(sin),输出程序包括收容输出(hout)和提取输出(sout). 注意点:输入程序和输出程序会对缓冲区并发访

记5.28大促压测的性能优化&mdash;线程池相关问题

目录: 1.环境介绍 2.症状 3.诊断 4.结论 5.解决 6.对比java实现 废话就不多说了,本文分享下博主在5.28大促压测期间解决的一个性能问题,觉得这个还是比较有意思的,值得总结拿出来分享下. 博主所服务的部门是作为公共业务平台,公共业务平台支持上层所有业务系统(2C.UGC.直播等).平台中核心之一的就是订单域相关服务,下单服务.查单服务.支付回调服务,当然结算页暂时还是我们负责,结算页负责承上启下进行下单.结算.跳支付中心.每次业务方进行大促期间平台都要进行一次常规压测,做到心里

线程池的创建

package com.newer.cn; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test1 { public static void main(String[] args) { // 创建线程池的方式 // 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程. E

Java底层技术系列文章-线程池框架

一.线程池结构图    二.示例 定义线程接口 public class MyThread extends Thread { @Override publicvoid run() { System.out.println(Thread.currentThread().getName() + "正在执行"); }}   1:newSingleThreadExecutor ExecutorService pool = Executors. newSingleThreadExecutor()