java并发编程(2)线程池的使用

一、任务和执行策略之间的隐性耦合

  Executor可以将任务的提交和任务的执行策略解耦

  只有任务是同类型的且执行时间差别不大,才能发挥最大性能,否则,如将一些耗时长的任务和耗时短的任务放在一个线程池,除非线程池很大,否则会造成死锁等问题

1.线程饥饿死锁

  类似于:将两个任务提交给一个单线程池,且两个任务之间相互依赖,一个任务等待另一个任务,则会发生死锁;表现为池不够

  定义:某个任务必须等待池中其他任务的运行结果,有可能发生饥饿死锁

2.线程池大小

  

  注意:线程池的大小还受其他的限制,如其他资源池:数据库连接池

    如果每个任务都是一个连接,那么线程池的大小就受制于数据库连接池的大小

3.配置ThreadPoolExecutor线程池

实例:

  1.通过Executors的工厂方法返回默认的一些实现

  2.通过实例化ThreadPoolExecutor(.....)自定义实现

线程池的队列

  1.无界队列:任务到达,线程池饱满,则任务在队列中等待,如果任务无限达到,则队列会无限扩张

    如:单例和固定大小的线程池用的就是此种

  2.有界队列:如果新任务到达,队列满则使用饱和策略

    3.同步移交:如果线程池很大,将任务放入队列后在移交就会产生延时,如果任务生产者很快也会导致任务排队

    SynchronousQueue直接将任务移交给工作线程

    机制:将一个任务放入,必须有一个线程等待接受,如果没有,则新增线程,如果线程饱和,则拒绝任务

    如:CacheThreadPool就是使用的这种策略

饱和策略:

  setRejectedExecutionHandler来修改饱和策略

  1.终止Abort(默认):抛出异常由调用者处理

  2.抛弃Discard

  3.抛弃DiscardOldest:抛弃最旧的任务,注意:如果是优先级队列将抛弃优先级最高的任务

  4.CallerRuns:回退任务,有调用者线程自行处理

4.线程工厂ThreadFactoy

  每当创建线程时:其实是调用了线程工厂来完成

   自定义线程工厂:implements ThreadFactory

   可以定制该线程工厂的行为:如UncaughtExceptionHandler等

  

public class MyAppThread extends Thread {
    public static final String DEFAULT_NAME = "MyAppThread";
    private static volatile boolean debugLifecycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();

    public MyAppThread(Runnable r) {
        this(r, DEFAULT_NAME);
    }

    public MyAppThread(Runnable runnable, String name) {
        super(runnable, name + "-" + created.incrementAndGet());
        //设置该线程工厂创建的线程的 未捕获异常的行为
        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            public void uncaughtException(Thread t,
                                          Throwable e) {
                log.log(Level.SEVERE,
                        "UNCAUGHT in thread " + t.getName(), e);
            }
        });
    }

    public void run() {
        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;
        if (debug) log.log(Level.FINE, "Created " + getName());
        try {
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug) log.log(Level.FINE, "Exiting " + getName());
        }
    }

    public static int getThreadsCreated() {
        return created.get();
    }

    public static int getThreadsAlive() {
        return alive.get();
    }

    public static boolean getDebug() {
        return debugLifecycle;
    }

    public static void setDebug(boolean b) {
        debugLifecycle = b;
    }
}

5.扩展ThreadPoolExecutor

  可以被自定义子类覆盖的方法:

  1.afterExecute:结束后,如果抛出RuntimeException则方法不会执行

  2.beforeExecute:开始前,如果抛出RuntimeException则任务不会执行

  3.terminated:在线程池关闭时,可以用来释放资源等

二、递归算法的并行化

1.循环  

  在循环中,每次循环操作都是独立的

//串行化
    void processSequentially(List<Element> elements) {
        for (Element e : elements)
            process(e);
    }
    //并行化
    void processInParallel(Executor exec, List<Element> elements) {
        for (final Element e : elements)
            exec.execute(new Runnable() {
                public void run() {
                    process(e);
                }
            });
    }

2.迭代

   如果每个迭代操作是彼此独立的,则可以串行执行

  如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的

  

//串行 计算compute 和串行迭代
    public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
        for (Node<T> n : nodes) {
            results.add(n.compute());
            sequentialRecursive(n.getChildren(), results);
        }
    }
    //并行 计算compute 和串行迭代
    public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
        for (final Node<T> n : nodes) {
            exec.execute(() -> results.add(n.compute()));
            parallelRecursive(exec, n.getChildren(), results);
        }
    }
    //调用并行方法的操作
    public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
            throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
        parallelRecursive(exec, nodes, resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        return resultQueue;
    }

  实例:

  

public class ConcurrentPuzzleSolver <P, M> {
    private final Puzzle<P, M> puzzle;
    private final ExecutorService exec;
    private final ConcurrentMap<P, Boolean> seen;
    protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();

    public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
        this.puzzle = puzzle;
        this.exec = initThreadPool();
        this.seen = new ConcurrentHashMap<P, Boolean>();
        if (exec instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
            tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        }
    }

    private ExecutorService initThreadPool() {
        return Executors.newCachedThreadPool();
    }

    public List<M> solve() throws InterruptedException {
        try {
            P p = puzzle.initialPosition();
            exec.execute(newTask(p, null, null));
            // 等待ValueLatch中闭锁解开,则表示已经找到答案
            PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
            return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
        } finally {
            exec.shutdown();//最终主线程关闭线程池
        }
    }

    protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
        return new SolverTask(p, m, n);
    }

    protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
        SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
            super(pos, move, prev);
        }
        public void run() {
            //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;
            //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
            if (solution.isSet() || seen.putIfAbsent(pos, true) != null){
                return; // already solved or seen this position
            }
            if (puzzle.isGoal(pos)) {
                solution.setValue(this);
            } else {
                for (M m : puzzle.legalMoves(pos))
                    exec.execute(newTask(puzzle.move(pos, m), m, this));
            }
        }
    }
}

  

时间: 2024-08-19 11:38:59

java并发编程(2)线程池的使用的相关文章

【转】Java并发编程:线程池的使用

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java并发编程:线程池的使用(转)

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java并发编程之线程池

一.概述 在执行并发任务时,我们可以把任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程,只要池里有空闲的线程,任务就会分配一个线程执行.在线程池的内部,任务被插入一个阻塞队列(BlockingQueue),线程池里的线程会去取这个队列里的任务. 利用线程池有三个好处: 降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗 提高响应速度.当任务到达时,任务可以不需要的等到线程创建就能立即执行 提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,

Java并发编程:线程池的使用

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,

Java 并发编程之线程池的使用 (二)

设置线程池的大小 如果线程池过大,那么可能会耗尽资源 ,如果过小,那么 将导致许多空闲的处理器无法工作,从而降低吞吐率. 要设置正确的线程池大小,需要分析计算环境,资源预算和任务的特性,cpu数量,内存大小,任务是计算密集型还是I/O密集型,还是二者皆可.它们是否需要像JDBC连接这样的稀缺资源,下面给出一个计算公式 N(threads)=N(cpu)*U(cpu)*(1+w/c); N(threads)是最后得到的结果大小 . N(cpu)是cpu数量,我的电脑是双核四线程,cpu的数量会是4

Java 并发编程之线程池的使用

在任务与执行策略之间的隐性耦合 Executor框架可以将任务的提交与任务的执行策略解耦开来(就是独立化).虽然Executor框架为制定和修改执行策略都提供了相当大的灵活性,但并非所有的任务都能适用所有的执行策略 比如: 依赖性任务 比如依赖于执行时序,执行结果或者其他效果,那么任务就带有隐含的依赖性.此时必须小心 地维持这些执行策略以避免产生活跃性问题(死锁等造成执行困难的问题) 使用线程封闭机制的任务 与线程池相比,单线程的Executor能够对并发性做出更强的承诺,它们能确保任务不会并发

Java并发编程中线程池源码分析及使用

当Java处理高并发的时候,线程数量特别的多的时候,而且每个线程都是执行很短的时间就结束了,频繁创建线程和销毁线程需要占用很多系统的资源和时间,会降低系统的工作效率. 参考http://www.cnblogs.com/dolphin0520/p/3932921.html 由于原文作者使用的API 是1.6 版本的,参考他的文章,做了一些修改成 jdk 1.8版本的方法,涉及到的内容比较多,可能有少许错误. API : jdk1.8.0_144 ThreadPoolExecutor类 Java中线

Java并发编程:线程池

一.为什么使用线程池 使用线程的时候直接就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 通过使用线程池可以达到这样的效果:空闲下来的线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务. 二.Java中的ThreadPoolExecutor类 首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然

Java 并发编程之线程池的使用 (三)

线程工厂 每当线程池需要创建一个线程时,都是通过线程工厂方法来完善的.默认的线程工厂方法将创建一个新的.非守护的线程,并且不包含特殊的配置信息,通过指定一个线程工厂方法,可以线程池的配置信息. 需要定制线程工厂方法的情景 : 需要为线程池里面的线程指定 个UncaughtExceptionHandler 实例化一个定制的Thread类执行调试信息的记录 需要修改线程的优先级或者守护线程的状态(这建设使用这两个功能,线程优先级会增加平台依赖性,并且导致活跃性问题,在大多数并发应用程序中,都可以使用

(转)Java并发编程:线程池的使用

原文地址: http://www.cnblogs.com/dolphin0520/p/3932921.html 一.Java中的ThreadPoolExecutor类 java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类.下面我们来看一下ThreadPoolExecutor类的具体实现源码. 在ThreadPoolExecutor类中提供了四个构造方法: public class Thr