Java多线程与并发库高级应用 学习笔记 10-16课

Callable与Future的介绍

package Thread;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableAndFuture {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        Future<String> future = threadPool.submit(new Callable<String>() {

            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "hi~";
            }
        });
        System.out.println("wait for result");
        try {
            System.out.println("result:" + future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        ExecutorService threadPool2 = Executors.newFixedThreadPool(10);
        CompletionService<String> completionService = new ExecutorCompletionService<String>(
                threadPool2);

        //2333 传说中的睡眠排序。
        for (int i = 0; i < 10; i++) {
            final int seq = i;
            completionService.submit(new Callable<String>() {

                @Override
                public String call() throws Exception {
                    int wait = new Random().nextInt(5000);
                    Thread.sleep(wait);
                    return seq + "等待时间:" + wait;
                }
            });
        }

        //异步提交结果
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println(completionService.take().get());
            } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            }
        }
    }
}

Java并发编程:Lock

package Thread;

import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

//这例子有点渣
public class ReadWriteLockTest {

    public static void main(String[] args) {
        final Queue3 queue3 = new Queue3();
        for (int i = 0; i < 3; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    while (true) {
                        queue3.get();
                    }
                }
            }).start();

            new Thread(new Runnable() {

                @Override
                public void run() {
                    while (true) {
                        queue3.put(new Random().nextInt(10000));
                    }
                }
            }).start();
        }
    }
}

class Queue3 {
    private Object data = null;
    ReadWriteLock rwl = new ReentrantReadWriteLock();

    public void get() {
        rwl.readLock().lock();

        try {
            System.out.println(Thread.currentThread().getName()
                    + " be ready to read data!");
            Thread.sleep((long) (Math.random() * 1000));
            System.out.println(Thread.currentThread().getName()
                    + "have read data :" + data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rwl.readLock().unlock();
        }
    }

    public void put(Object data) {

        rwl.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()
                    + " be ready to write data!");
            Thread.sleep((long) (Math.random() * 1000));
            this.data = data;
            System.out.println(Thread.currentThread().getName()
                    + " have write data: " + data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            rwl.writeLock().unlock();
        }

    }
}

Java线程(九):Condition-线程通信更高效的方式

package Thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionCommunication {

    /**
     * @param args
     */
    public static void main(String[] args) {

        final Business business = new Business();
        new Thread(
                new Runnable() {

                    @Override
                    public void run() {

                        for(int i=1;i<=50;i++){
                            business.sub(i);
                        }

                    }
                }
        ).start();

        for(int i=1;i<=50;i++){
            business.main(i);
        }

    }

    static class Business {
            Lock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
          private boolean bShouldSub = true;
          public /* synchronized*/ void sub(int i){
              lock.lock();
              try{
                  while(!bShouldSub){
                      try {
                          //this.wait();
                        condition.await();
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                  }
                    for(int j=1;j<=10;j++){
                        System.out.println("sub thread sequence of " + j + ",loop of " + i);
                    }
                  bShouldSub = false;
                  //this.notify();
                  condition.signal();
              }finally{
                  lock.unlock();
              }
          }

          public  void main(int i){
              lock.lock();
              try{
                 while(bShouldSub){
                          try {
                            condition.await();
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                      }
                    for(int j=1;j<=100;j++){
                        System.out.println("main thread sequence of " + j + ",loop of " + i);
                    }
                    bShouldSub = true;
                    condition.signal();
          }finally{
              lock.unlock();
          }
      }

    }
}
package Thread;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {

    final Lock lock = new ReentrantLock();
    final Condition write = lock.newCondition();
    final Condition read = lock.newCondition();
    final static int capacity = 100;
    final static Queue<Integer> queue = new ArrayBlockingQueue<Integer>(
            capacity);

    public void put(int x) {
        lock.lock();
        try {
            while (queue.size() == capacity) {// 写数据缓存满了
                write.await();
            }
            queue.add(x);
            read.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public int get() {
        lock.lock();
        int x = 0;
        try {
            while (queue.isEmpty()) {
                read.await();
            }
            x = queue.remove();
            read.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.lock();
            return x;
        }
    }
/*
 * 假设缓存队列中已经存满,那么阻塞的肯定是写线程,唤醒的肯定是读线程,相反,阻塞的肯定是读线程,唤醒的肯定是写线程,
 * 那么假设只有一个Condition会有什么效果呢,缓存队列中已经存满,这个Lock不知道唤醒的是读线程还是写线程了,
 * 如果唤醒的是读线程,皆大欢喜,如果唤醒的是写线程,那么线程刚被唤醒,又被阻塞了,这时又去唤醒,这样就浪费了很多时间。
 */
}

Java多线程-新特征-信号量Semaphore

package Thread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 信号量
 *
 */
public class SemaphoreTest {
    public static void main(String[] args) {
        // 线程池
        ExecutorService exec = Executors.newCachedThreadPool();
        // 只能5个线程同时访问
        final Semaphore semp = new Semaphore(5);
        // 模拟20个客户端访问
        for (int index = 0; index < 20; index++) {
            final int NO = index;
            Runnable run = new Runnable() {
                public void run() {
                    try {
                        // 获取许可
                        semp.acquire();
                        System.out.println("Accessing: " + NO);
                        Thread.sleep((long) (Math.random() * 10000));
                        // 访问完后,释放
                        semp.release();
                        // availablePermits()指的是当前信号灯库中有多少个可以被使用
                        System.out.println("-----------------"
                                + semp.availablePermits());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            exec.execute(run);
        }
        // 退出线程池
        exec.shutdown();
    }
}

CyclicBarrier介绍

package Thread;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/*
 * 等待子线程全部完成后,执行主线程任务
 */
public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {

            @Override
            public void run() {
                System.out.println("main thread start");
            }
        });

        for (int i = 0; i < 3; i++) {
            new ThreadTest(cyclicBarrier).start();
        }
    }
}

class ThreadTest extends Thread {
    private CyclicBarrier cyclicBarrier;

    public ThreadTest(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(new Random().nextInt(500));
            System.out.println(Thread.currentThread().getName()
                    + " sub thread have done!");
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

}

CountDownLatch浅析

作用和Cyclicbarrier没有什么本质区别。

时间: 2024-12-28 04:57:46

Java多线程与并发库高级应用 学习笔记 10-16课的相关文章

Java多线程与并发库高级应用 学习笔记 1-9课

来源XXX,免得打广告嫌疑. http://www.cnblogs.com/whgw/archive/2011/10/03/2198506.html 今天看了文章才发现创建线程最佳方式为实现Runnable接口,之前的习惯要改鲁. http://blog.csdn.net/imzoer/article/details/8500670 Java中Timer的用法 package timer; import java.util.Calendar; import java.util.Timer; im

Java多线程与并发库高级应用 学习笔记 16-22课 +面试题

java.util.concurrent.Exchanger应用范例与原理浅析--转载 package Thread; import java.util.Random; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExchangerTest { public static

Java多线程与并发库高级应用之公共屏障点CyclicBarrier

一个小队去登山,每位队员登山的速度不同.山上有几个集合点,在每一集合点处,先到达的队员只有等后面的队员全部到达集合点后才能继续向下一个集合点出发. JDK1.5提供的CyclicBarrier模拟了这种情况.每一个线程相当于一个登山队员,CyclicBarrier相当于山上的集合点.只有等所有线程都执行到了CyclicBarrier后才可以继续向下执行. CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程

Java多线程与并发库高级应用之信号量Semaphore

JDK1.5提供了一个计数信号量Semaphore类.Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目,并提供了同步机制. Semaphore提供了两个构造器来创建对象: 1)Semaphore(int permits):创建具有给定的许可数和非公平的公平设置的Semaphore. 2)Semaphore(int permits, boolean fair):创建具有给定的许可数和给定的公平设置的Semaphore.如果此信号量保证在争用时按先进先出的顺序授予许可,则为

Java多线程与并发库高级应用之阻塞队列BlockingQueue

JDK1.5提供了阻塞队列接口BlockingQueue,它是一个有界阻塞队列.BlockingQueue实现是线程安全的,可以安全地与多个生产者和多个使用者一起使用. 使用时用其实现类 ArrayBlockingQueue,它一个由数组支持的有界阻塞队列.此队列按 FIFO(先进先出)原则对元素进行排序.队列的头部 是在队列中存在时间最长的元素.队列的尾部是在队列中存在时间最短的元素.新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素. 这是一个典型的"有界缓存区",固定

Java多线程与并发库高级应用之线程数据交换Exchanger

JDK1.5提供了Exchanger用于两个线程的数据交换.两个线程先后到达交换点,先到达的线程会等待后到达的线程,然后两个线程互相交换数据,交换后双方持对方的数据. Exchanger只提供了一个构造器: Exchanger():创建一个新的Exchanger. Exchanger中也只有两个方法: V exchange(V x): 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象. V exchange(V x, long timeout,

Java多线程与并发库高级应用之倒计时计数器

CountDownLatch 类是一个倒计时计数器,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待.用给定的计数初始化 CountDownLatch.由于调用了countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞.之后,会释放所有等待的线程,await 的所有后续调用都将立即返回. CountDownLatch 是一个通用同步工具,它有很多用途.将计数1初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 c

Java多线程与并发库高级应用-java5线程并发库

java5 中的线程并发库 主要在java.util.concurrent包中 还有 java.util.concurrent.atomic子包和java.util.concurrent.lock子包

Java多线程与并发库高级应用

1.传统线程机制的回顾 1.1创建线程的两种传统方式 在Thread子类覆盖的run方法中编写运行代码 // 1.使用子类,把代码放到子类的run()中运行 Thread thread = new Thread() { @Override public void run() { while (true) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.p