Java 并发编程 --- ThreadPoolExecutor(五)

使用线程池的好处

引用自 http://ifeve.com/java-threadpool/ 的说明:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Java中的线程池是用ThreadPoolExecutor类来实现的. 本文就结合JDK 1.8对该类的源码来分析一下这个类内部对于线程的创建, 管理以及后台任务的调度等方面的执行原理。ThreadPoolExecutor结构如下图:

Executor接口

此接口提供了一种将任务提交与每个任务的运行机制分离的方法,包括线程使用,调度等的详细信息。该接口中只有execute(Runnable command)方法,用来替代通常创建或启动线程的方法。例如使用Thread创建线程

Thread thread = new Thread();
thread.start();

使用execute创建运行线程,具体的线程执行会由相应的实现类去执行(jdk默认线程池execute的实现是由ThreadPoolExecutor来实现的)

Thread thread = new Thread();
executor.execute(thread);

ExecutorService接口

ExecutorService接口提供管理终止的方法和可以生成Future的方法,用于跟踪一个或多个异步任务的进度, 它继承了Executor接口,同时增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。

shutDown() : 允许之前提交的任务继续执行(执行完后shutDown,不会再接收新的任务)

shutDownNow():立即停止正在执行的任务

invokeAll():执行给定的任务,当所有任务完成后返回任务状态和结果的Futures列表
invokeAny():执行给定的任务,返回已完成的任务的结果

submit():提交线程

AbstractExecutorService类

ExecutorService接口的默认实现,同时也是线程池实现类ThreadPoolExecutor的父类,主要看下submit()方法与invokeAll()方法:

submit:

/**不管参数是Callable还是Runable, 执行方法都一样,生成一个task,然后执行task,execute方法的具体实现在ThreadPoolExecutor中,后续分析**/
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

invokeAll :

/**代码很简单,将给定的任务线程封装成Future对象,等待所有任务执行完成,统一返回Future对象,如果出现异常,会将未完成的任务取消**/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                try {
            /** 没有完成,阻塞**/
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

ThreadPoolExecutor类

在关注ThreadPoolExecutor之前,先来了解下线程的基本状态信息。

线程总的来说有NEW(初始)、RUNNABLE(运行)、WAITING(等待)、TIME_WAITING(超时等待)、BLOCKED(阻塞)、TERMINATED(终止)6种状态。

NEW:初始状态,线程被构建,但是还没有调用 start 方法

RUNNABLED:运行状态,JAVA 线程把操作系统中的就绪和运行两种状态统一称为“运行中”

BLOCKED:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃了 CPU 使用权,阻塞也分为几种情况

   等待阻塞:运行的线程执行 wait 方法,jvm 会把当前线程放入到等待队列

   同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被其他线程锁占用了,那么 jvm 会把当前的线程放入到锁池中

   其他阻塞:运行的线程执行 Thread.sleep 或者 Thread.join 方法,或者发出了 I/O请求时,JVM 会把当前线程设置为阻塞状态,当 sleep 结束、join 线程终止、           io 处理完毕则线程恢复

WAITING:等待,需要主动唤醒

TIME_WAITING:超时等待状态,超时以后自动返回.

TERMINATED:终止状态,表示当前线程执行完毕

具体的转化关系如下图:

对于线程池而言,也有五种种不同的状态,分别为RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED

RUNNING:运行状态,可以处理任务,并且接收任务(前提阻塞队列处于未满状态,阻塞队列一旦满了,会根据相应的饱和策略进行不同的处理)

SHUTDOWN:关闭状态,不能接收新的任务,但是能处理队列中的任务(shutdow方法)

STOP:停止状态,不能接收行的任务,不能处理队列中的任务并且会中断正在运行的任务(shutdownNow方法)

TIDYING:所有的任务都终止了,workCount为0,会进入该状态,将调用terminated方法进入TERMINATED状态

TERMINATED:terminated()方法执行完成

各个状态之间的转化关系(借用这里的图)

ThreadPoolExcecutor类有一些重要的属性:

corePoolSize:线程池中核心线程的数量

maximumPoolSize:线程池中最大线程的数量

defaultHandler:默认的线程池饱和执行策略,一般是阻塞队列满了后且没有空闲线程,再有任务提交是抛出异常,还是直接丢弃等,默认的策略是抛出异:

ctl:对线程池运行状态以及线程池中有效线程数进行记录的一个原子性int变量,主要记录两部分:线程池中的有效线程(workerCount);线程的状态(runstate)包含运行,shutdown     等状态。该变量的高3位用来记录runstate,低29位用来记录有效线程数(约5亿条)(其实这个地方与ReentReadWriteLock中的state变量相似)

COUNT_BITS:workerCount计数位数,低29位

CAPACITY:workerCount的最大值2^29 - 1

饱和策略(内部类)

ThreadPoolExecutor中提供了四种可选择的饱和策略(拒绝策略),用来处理阻塞队列已满且没有空闲线程,后续新来任务的处理

AbortPolicy:直接抛出异常(默认策略)

CallerRunsPolicy:用调用者所在的线程执行任务

CallerRunsPolicy:丢弃队列中最靠前的任务,执行该任务

DiscardPolicy:直接丢弃

worker类(内部类)

worker类是实现线程池的重要类,它继承了AQS类并实现了Runnable接口,结构如下:

Worker内部类主要是用来将运行线程封装,维护运行任务线程中断状态的类,该类继承了AQS类并实现了Runnable接口

变量:

firstTask: 提交的任务线程;

thread: worker类封装后的线程,用来处理任务线程;

completeTasks: 完成的任务数;

构造方法:

Worker(Runnable firstTask) {   /**初始化锁的获取次数**/
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

获取锁、释放锁

从Worker类获取锁的方式可以看到worker类只会去获取独占锁,也就是说不支持重入的,这也是为什么Worker不直接使用ReentrantLock的原因,ReentrantLock是可重入的;当worker获取到锁时表明工作线程正在运行,不允许中断(可以在runWorker中查看);

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

构造方法

ThreadPoolExecutor总共有四种构造方法

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

/**所有的构造方法调用的都是该方法**/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

字段说明:
corePoolSize:线程池初始化核心线程数

maximumPoolSize:线程池最大线程数

keepAliveTime:空闲线程存活时间

workQueue:存放任务的队列(阻塞队列)

threadFactory:线程池的类型

handler:饱和处理策略

execute方法

执行给定的任务,可能是用的是新创建的线程,也可能是已存在的线程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /**获取ctl,记录workCount以及runState, 为32**/   
    int c = ctl.get();
    /**判断线程池中的线程数是否小于核心线程数**/
    if (workerCountOf(c) < corePoolSize) {
     /**添加一个工作线程线程**/
        if (addWorker(command, true))
            return;
     /**添加失败重新获取ctl**/
        c = ctl.get();
    }
   /**线程池是运行状态,并且线程成功添加到队列(线程池中线程数大于核心线程或者小于核心线程且添加线程失败)**/
    if (isRunning(c) && workQueue.offer(command)) {
     /**重新获取ctl**/
        int recheck = ctl.get();
     /**该处的二次检查是为了防止线程池被shutdown或者上次检查后有线程死亡**/
     /**重新判断线程池是否是运行状态,如果不是运行状态,将成功添加到队列中的线程从队列中移除,同时通过对应的饱和策略处理**/
        if (! isRunning(recheck) && remove(command))
       /**执行拒绝策略**/
            reject(command);
     /**如果工作线程为0,执行添加工作线程操作**/
        else if (workerCountOf(recheck) == 0)
        /**添加一个工作线程但不启动**/
            addWorker(null, false);
    }
   /** 执行到这里说有存在两种情况
     * 1.线程池是running状态,工作线程数大于核心线程数且阻塞队列已满导致添加任务失败。
     * 2.线程池不是工作状态
   **/
    else if (!addWorker(command, false))
        reject(command);
}

addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
      /**获取线程池的运行状态**/
        int c = ctl.get();
        int rs = runStateOf(c);

        /** 判断是否需要添加新的线程(不在添加需要满足两个条件:rs >= shutdown; 第二个条件整体为false)
          * 1.rs >= SHUTDOWN 即线程池是shutdown、stop、tidying、terminated状态,表示线程池不在接收新的任务。
          *
          * 2.rs == SHUTDOWN 即线程池不在接收新的任务;firstTask == null 即提交执行的线程为空;!workQueue.isEmpty() 即阻塞队列不为空只要三个条件有
          *    一个不满足,则返回false。
          *   2.1. 能执行到这里表名rs一定是>=SHUTDOWN的,如果rs不是SHUTDOWN状态,线程池不会接受新的任务,以及正在处理的任务一会停掉,所以不需要添加新的
          *        工作线程。
          *   2.2. fistTask为空,没必要为该任务创建新的工作线程
          *   2.3. 阻塞队列为空,进行该判断表明rs = SHUTDOWN且阻塞队列中的任务已经处理完,不会创建新的工作线程
         **/
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            /**获取线程池中的工作线程**/
            int wc = workerCountOf(c);
            /**判断工作线程是否超限**/
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /**通过cas方法添加一个工作线程数**/
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
     /**根据firstTask创建一个工作线程**/
        w = new Worker(firstTask);
        final Thread t = w.thread;
     /**firstTask为null只创建,不启动**/
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
          /**1. 线程池是running状态
            *2. 线程池是shutdown状态并且firstTask为null
                  *满足上面任意一个条件,会去添加工作线程,对于第二个条件来说,不会去接收新的任务,但阻塞队列可能没有处理完,可以添加新的工作线程
                 **/
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
            /**线程是否已经启动**/
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
          /**启动线程**/
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            /**添加工作线程失败,进行回滚操作
              *1.将添加的工作线程从工作线程集合中移除
              *2.线程池工作线程数减一
              *3.重新执行线程池的terminate状态转换
             **/
            addWorkerFailed(w);
    }
    return workerStarted;
}

runWorker方法(执行任务)

/**仅仅会在addWorker()成功时调用,内容比较简单,需要注意三个地方getTask()、beforeExecute()、afterExecute()(后两个可以自己重写)**/
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
   /** 释放锁,对应于worker类构造方法中的setState(-1), 将state状态恢复为0,允许中断
     *  线程池正在初始化任务线程时,会将锁的初始值设置为-1,这样做的目的是禁止执行前对任务进行中断
    **/
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
       /**通过getTask()方法获取任务**/
        while (task != null || (task = getTask()) != null) {
            w.lock();
            /**判断线程/线程池是否处于中断/stop状态**/
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
              /**获得锁并运行任务**/
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
              /**释放锁,任务完成数加1**/
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

getTask方法

private Runnable getTask() {
    /**从阻塞队列中获取任务是否超时的变量设置**/
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
       /**如果线程池不是运行状态
         *1.线程是是否是stop、TIDYING、terminate状态
         *2.阻塞队列是否为空
         *满足以上条件 1||2,表明线程池不处理任务,不接受新的任务,线程池任务线程数-1
        **/
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        /**allowCoreThreadTimeOut为false表示线程池中核心线程数不需要进行超时判断**/
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

       /**获取任务(都会阻塞)
         * 如果设置了核心线程运行超时,或者是线程池中任务线程数多于核心线程数,通过pool设置超时时间获取任务。
         * 没事设置超时时间,通过take方法获取任务
         **/
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

shutdownNow方法

与shutdown方法相比,多了一个drainQueue清空阻塞队列的方法,并且所有线程进行中断操作

/**shutdown方法主要调用了四个方法**/
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        /**如果存在安全管理器,判断是否有权限interrupt权限**/
        checkShutdownAccess();
        /**设置线程池运行状态**/
        advanceRunState(STOP);
     /**中断任务线程**/
        interruptWorkers();
        /**清空阻塞队列**/
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    /**尝试将线程池设置为terminate状态**/
    tryTerminate();
    return tasks;
}

/**该方法是worker类中的方法,直接中断,与shutdown方法相比,改方法是对所有的任务线程进行中断操作,
  *shutdown方法会去先尝试获取锁,如果获取锁成功,表示当前线程正在等待任务,对于这种任务线程进行中断操作**/
void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

tryTerminate方法

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        /**1.线程池是否是运行状态
          *2.线程池是都是Tidying、terminate状态
          *3.线程池是否是shutdown状态,并且阻塞队列不为空
          *满足上述3个条件任意一个立即返回:
          *运行状态,线程池允许任务的处理以及添加,不能直接转换到terminate
          *shutdown状态,阻塞队列不为空,表示还在处理任务,不能直接转换到terminate
        **/
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        /**线程池为shutdown或者stop状态,且阻塞队列为空
          *如果线程池工作线程数不为0,至少中断一个工作线程, 此处可能存在getTask获取任务是一直处于阻塞的任务线程,避免队列为空,任务线程一直阻塞的情况
        **/
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            /**设置为tidying状态**/
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    /**设置成terminated状态**/
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

线程池的监控

getPoolSize() : 获取当前线程池的工作线程数量

getQueue() : 获取线程池中阻塞队列(间接获取阻塞队列中任务的数量)

getCompletedTaskCount() : 获取也完成的任务数量

getTaskCount() : 获取已运行、未运行的任务总数

getLargestPoolSize() : 线程池线程数最大值

getActiveCount():当前线程池中正在执行任务的线程数量。

getCorePoolSize() : 线程池核心线程数

常见的线程池(Executors)

Executors是线程池的工厂类,通过Executors可以创建四种不同的线程池 (newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool、newSingleThreadExecutor、newWorkStealingPool(也是一种线程池,但不是通过ThreadPoolExecutor实现,不做讨论))

阻塞队列(引用这里

SynchronousQueue:newCachedThreadPool
LinkedBlockingQueue(无界队列):基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,                    而maximumPoolSize就不会起作用了(后面也会说到)。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。                    newFixedThreadPool使用
ArrayBlockingQueue(有界队列):使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变                    得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可                    能的降低线程池对资源的消耗,就需要合理的设置这两个数量。
                    1. 如果要想降低系统资源的消耗(包括CPU的使用率,操作系统资源的消耗,上下文环境切换的开销等), 可以设置较大的队列容量和较小的线程池容量,                        但这样也会降低线程处理任务的吞吐量。
                    2. 如果提交的任务经常发生阻塞,那么可以考虑通过调用 setMaximumPoolSize() 方法来重新设定线程池的容量。
                    3. 如果队列的容量设置的较小,通常需要将线程池的容量设置大一点,这样CPU的使用率会相对的高一些。但如果线程池的容量设置的过大,则在提交的任                       务数量太多的情况下,并发量会增加,那么线程之间的调度就是一个要考虑的问题,因为这样反而有可能降低处理任务的吞吐量。

DelayedWorkQueue : ScheduledThreadPoolExecutor使用

newFixedThreadPool

固定线程数量的线程池,corePoolSize==maximumPoolSize

1.所有工作线程都在执行任务,新来任务需要在队列中等待直到有空闲工作线程

2.工作线程在执行任务时被shutdown了,新来任务是会创建一个新的任务线程

newCachedThreadPool

可缓存线程池,corePoolSize==0, maximumPoolSize=Integer.MAX_VALUE

1.没有核心任务处理线程

2.新来任务是如果有空闲的处理线程,直接使用已有的处理线程,否则创建一个处理线程

3.当超过60s工作线程没有任务处理,将会被销毁

该线程池适合处理执行时间短,数量多的任务

newScheduledThreadPool

调度线程池,jdk中单独一个类实现,初始化对象时设置corePoolSize,maximumPoolSize=Integer.MAX_VALUE

用来设置给定延迟时间后执行

newSingleThreadExecutor

只有一个工作线程来处理任务的线程池,corePoolSize==maximumPoolSize==1

原文地址:https://www.cnblogs.com/kaneziki/p/9698781.html

时间: 2024-10-11 01:29:41

Java 并发编程 --- ThreadPoolExecutor(五)的相关文章

Java并发编程(五)ConcurrentHashMap的实现原理和源码分析

相关文章 Java并发编程(一)线程定义.状态和属性 Java并发编程(二)同步 Java并发编程(三)volatile域 Java并发编程(四)Java内存模型 前言 在Java1.5中,并发编程大师Doug Lea给我们带来了concurrent包,而该包中提供的ConcurrentHashMap是线程安全并且高效的HashMap,本节我们就来研究下ConcurrentHashMap是如何保证线程安全的同时又能高效的操作. 1.为何用ConcurrentHashMap 在并发编程中使用Has

java并发编程(五)正确使用volatile

转载请注明出处:     volatile用处说明     在JDK1.2之前,Java的内存模型实现总是从主存(即共享内存)读取变量,是不需要进行特别的注意的.而随着JVM的成熟和优化,现在在多线程环境下volatile关键字的使用变得非常重要. 在当前的Java内存模型下,线程可以把变量保存在本地内存(比如机器的寄存器)中,而不是直接在主存中进行读写.这就可能造成一个线程在主存中修改了一个变量的值,而另外一个线程还继续使用它在寄存器中的变量值的拷贝,造成数据的不一致. 要解决这个问题,就需要

Java并发编程(五)JVM指令重排

引言:在Java中看似顺序的代码在JVM中,可能会出现编译器或者CPU对这些操作指令进行了重新排序:在特定情况下,指令重排将会给我们的程序带来不确定的结果..... 如下代码可能的结果有哪些? public class PossibleReordering { static int x = 0, y = 0; static int a = 0, b = 0; public static void main(String[] args) throws InterruptedException {

Java并发编程(六)阻塞队列

相关文章 Java并发编程(一)线程定义.状态和属性 Java并发编程(二)同步 Java并发编程(三)volatile域 Java并发编程(四)Java内存模型 Java并发编程(五)ConcurrentHashMap的实现原理和源码分析 前言 在Android多线程(一)线程池这篇文章时,当我们要创建ThreadPoolExecutor的时候需要传进来一个类型为BlockingQueue的参数,它就是阻塞队列,在这一篇文章里我们会介绍阻塞队列的定义.种类.实现原理以及应用. 1.什么是阻塞队

《Java并发编程实战》第十五章 原子变量与非阻塞同步机制 读书笔记

一.锁的劣势 锁定后如果未释放,再次请求锁时会造成阻塞,多线程调度通常遇到阻塞会进行上下文切换,造成更多的开销. 在挂起与恢复线程等过程中存在着很大的开销,并且通常存在着较长时间的中断. 锁可能导致优先级反转,即使较高优先级的线程可以抢先执行,但仍然需要等待锁被释放,从而导致它的优先级会降至低优先级线程的级别. 二.硬件对并发的支持 处理器填写了一些特殊指令,例如:比较并交换.关联加载/条件存储. 1 比较并交换 CAS的含义是:"我认为V的值应该为A,如果是,那么将V的值更新为B,否则不需要修

转: 【Java并发编程】之十八:第五篇中volatile意外问题的正确分析解答(含代码)

转载请注明出处:http://blog.csdn.net/ns_code/article/details/17382679 在<Java并发编程学习笔记之五:volatile变量修饰符-意料之外的问题>一文中遗留了一个问题,就是volatile只修饰了missedIt变量,而没修饰value变量,但是在线程读取value的值的时候,也读到的是最新的数据.但是在网上查了很多资料都无果,看来很多人对volatile的规则并不是太清晰,或者说只停留在很表面的层次,一知半解. 这两天看<深入Ja

[Java 并发] Java并发编程实践 思维导图 - 第五章 基础构建模块

根据<Java并发编程实践>一书整理的思维导图.希望能够有所帮助. 第一部分: 第二部分:

java高并发编程(五)线程池

摘自马士兵java并发编程 一.认识Executor.ExecutorService.Callable.Executors /** * 认识Executor */ package yxxy.c_026; import java.util.concurrent.Executor; public class T01_MyExecutor implements Executor { public static void main(String[] args) { new T01_MyExecutor(

《Java并发编程实战》第八章 线程池的使用 读书笔记

一.在任务与执行策略之间的隐性解耦 有些类型的任务需要明确地指定执行策略,包括: . 依赖性任务.依赖关系对执行策略造成约束,需要注意活跃性问题.要求线程池足够大,确保任务都能放入. . 使用线程封闭机制的任务.需要串行执行. . 对响应时间敏感的任务. . 使用ThreadLocal的任务. 1. 线程饥饿死锁 线程池中如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,这种现象称为线程饥饿死锁. 2. 运行时间较长的任务 Java提供了限时版本与无限时版本.例如Thread