java线程加强 |
Quartz :一个调度框架(比如想实现定时器的日期切换等等)
Git Bash
定时器:Timer 定时炸弹代码如下:
public class TimerTest { public static int count = 1; @SuppressWarnings( "deprecation") public static void main(String args[]) throws InterruptedException { //静态方法不能访问内部类的实例对象,除非有了外部类对像,才可以访问内部类非静态成员,或内部类。 new Timer().schedule(new TimerTest().new MyTask(),2000);// while(true ){ System. out.println( new Date().getSeconds()); Thread. sleep(1000); } } class MyTask extends TimerTask { public void run() { count = count % 2;//控制炸弹的时间间隔 new Timer().schedule(new MyTask(), 2000 + 4000 * count);//递归调用 System. out.println("bommbing" ); } } }
线程的相互通信:
经验:要用到共同数据(包括同步锁)或共同的算法的若干个方法应该归同一个类身上,
这种设计方式正好体现了高类聚和程序的健壮性
锁是上在代表要操作的资源的类的内部方法中,而不是线程代码中。
面试题:子线程50次循环,主线程100次循环,又子线程循环50 如此循环50次:
代码如下:
package xyxysjxy.thread; public class ThreadCommunition { public static void main(String args[]) { final OutPuter op = new OutPuter(); new Thread(new Runnable() { public void run() { for(int i=0 ; i < 50 ;i ++) op.sub(); } }).start(); for(int i=0 ; i < 50 ;i ++) op.main(); } } // 要同步的方法或者资源要被封装到一个类中去,体现了高聚性 class OutPuter { private boolean flag = true; public synchronized void main() { while (flag ) { //避免了假唤醒,就像人做梦了一样。有可能是被自己的噩梦惊醒的。不是属于被别人唤醒,而是假唤醒 try { this.wait(); } catch (Exception e) { } } for (int i = 1; i <= 100; i++) { System. out.println(Thread.currentThread().getName() + "------" + i); } flag = true ; this.notify(); } public synchronized void sub() { while (!flag ) { try { this.wait(); } catch (Exception e) { } } for (int i = 1; i <= 50; i++) { System. out.println(Thread.currentThread().getName() + "===========" + i); } flag = false ; this.notify(); } }
线程范围内的共享变量:
下面代码模拟了线程内中的共享数据:
package cn.itcast.heima2; import java.util.HashMap; import java.util.Map; import java.util.Random; public class ThreadScopeShareData { private static int data = 0; private static Map<Thread, Integer> threadData = new HashMap<Thread, Integer>(); public static void main(String[] args) { for(int i=0;i<2;i++){ new Thread(new Runnable(){ @Override public void run() { int data = new Random().nextInt(); System. out.println(Thread.currentThread().getName() + " has put data :" + data); threadData.put(Thread.currentThread(), data); new A().get(); new B().get(); } }).start(); } } static class A{ public void get(){ int data = threadData.get(Thread.currentThread()); System. out.println("A from " + Thread.currentThread().getName() + " get data :" + data); } } static class B{ public void get(){ int data = threadData.get(Thread.currentThread()); System. out.println("B from " + Thread.currentThread().getName() + " get data :" + data); } } }
ThreadLocal:相当与一个map集合,他把每个线程当作是键,而把你想存入的数据当作值。所以在一个ThreadLocal当中
只能存在一个线程(键值对)因为map中的数据是用key作为唯一的标识的。
注册事件的理解:相当于 生活中场景:张三对李四说,李四你走的时候给我个电话。那么张三就在在李四的脑海里就注册了
一个事件(打电话,走的时候)。线程也是一样。线程在死的时候要触发什么事件也是被注册进去的。
不可能每次都询问线程你什么时候死啊?死了帮我干嘛干嘛...
对于ThreadLocal的使用看如下代码:
package cn.itcast.heima2; import java.util.HashMap; import java.util.Map; import java.util.Random; public class ThreadLocalTest { private static ThreadLocal<Integer> x = new ThreadLocal<Integer>(); private static ThreadLocal<MyThreadScopeData> myThreadScopeData = new ThreadLocal<MyThreadScopeData>(); public static void main(String[] args) { for(int i=0;i<2;i++){ new Thread(new Runnable(){ @Override public void run() { int data = new Random().nextInt(); System. out.println(Thread.currentThread().getName() + " has put data :" + data); x.set(data); /* MyThreadScopeData myData = new MyThreadScopeData(); myData.setName("name" + data); myData.setAge(data); myThreadScopeData.set(myData);*/ MyThreadScopeData. getThreadInstance().setName( "name" + data); MyThreadScopeData. getThreadInstance().setAge(data); new A().get(); new B().get(); } }).start(); } } static class A{ public void get(){ int data = x.get(); System. out.println("A from " + Thread.currentThread().getName() + " get data :" + data); /* MyThreadScopeData myData = myThreadScopeData.get();; System.out.println("A from " + Thread.currentThread().getName() + " getMyData: " + myData.getName() + "," + myData.getAge());*/ MyThreadScopeData myData = MyThreadScopeData.getThreadInstance(); System. out.println("A from " + Thread.currentThread().getName() + " getMyData: " + myData.getName() + "," + myData.getAge()); } } static class B{ public void get(){ int data = x.get(); System. out.println("B from " + Thread.currentThread().getName() + " get data :" + data); MyThreadScopeData myData = MyThreadScopeData.getThreadInstance(); System. out.println("B from " + Thread.currentThread().getName() + " getMyData: " + myData.getName() + "," + myData.getAge()); } } } class MyThreadScopeData{ private MyThreadScopeData(){} public static /*synchronized*/ MyThreadScopeData getThreadInstance(){ MyThreadScopeData instance = map.get(); if(instance == null){ instance = new MyThreadScopeData(); map.set(instance); } return instance; } //private static MyThreadScopeData instance = null;//new MyThreadScopeData(); private static ThreadLocal<MyThreadScopeData> map = new ThreadLocal<MyThreadScopeData>(); private String name; private int age ; public String getName() { return name ; } public void setName(String name) { this.name = name; } public int getAge() { return age ; } public void setAge(int age) { this.age = age; } }
多线程访问共享对象和数据方式
一个runnable对象只是封装一种操作共享数据的方式
1. 如果每个线程执行的代码相同,可以使用同一个runnable对象,这样Runnable
对像中有那个共享数据。
2. 如果每一个线程执行的代码不同,这时候需要用不同的Runnable对象:
1. 将共享数据封装在另外一个对像中,然后将这个对象传递给各个Runnnable对象
每个线程对共享数据的操作方法也分别分配到那个对像身上去完成,
这样容易实现针对该数据进行的各个操作的互斥和同信
2. 就这些Runnable对象作为某个类中的内部类,共享数据作为这个外部类中的
成员变量,每个线程对共享数据的操作方法也分配到外部类中,以便实现对共享数据进行的各个操作
的互斥和通信,作为内部类中的各个Runnable对象调用外部类的这些方法。
3. 上面两种方式的结合:将共享数据封装在另外一个对象中,每个线程对共享数据的操作方法也
分配到对象身上去完成。对象在为外部类中的成员变量或方法中的局部变量
每个线程中Runnable对象作为外部类中的成员内部类或局部内部类。
package cn.itcast.heima2; public class MultiThreadShareData { private static ShareData1 data1 = new ShareData1(); public static void main(String[] args) { //第2种情况的第二条 ShareData1 data2 = new ShareData1(); new Thread(new MyRunnable1(data2)).start(); new Thread(new MyRunnable2(data2)).start(); //第2种情况的第三条 final ShareData1 data1 = new ShareData1(); new Thread(new Runnable(){ @Override public void run() { data1.decrement(); } }).start(); new Thread(new Runnable(){ @Override public void run() { data1.increment(); } }).start(); } } class MyRunnable1 implements Runnable{ private ShareData1 data1 ; public MyRunnable1(ShareData1 data1){ this.data1 = data1; } public void run() { data1.decrement(); } } class MyRunnable2 implements Runnable{ private ShareData1 data1 ; public MyRunnable2(ShareData1 data1){ this.data1 = data1; } public void run() { data1.increment(); } } class ShareData1 /*implements Runnable*/{ /* private int count = 100; @Override public void run() { // TODO Auto-generated method stub while(true){ count--; } }*/ private int j = 0; public synchronized void increment(){ j++; } public synchronized void decrement(){ j--; } }
java5 中的线程并发库
java.util.concurrent
1. 原子性操作
2. 线程池
package xyxysjxy.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ExecutorTest { /*Executors 执行者 线程就是执行者吗,所以有执行的权利去执行任务吧 * concurrent 并发的;一致的;同时发生的 线程的并发库 * schedule 时间表;计划表;一览表 [‘ ?edju?l; * */ public static void main (String[] args) { // ExecutorService threadPool = Executors.newFixedThreadPool(3); // ExecutorService threadPool = Executors.newCachedThreadPool(); ExecutorService threadPool = Executors.newSingleThreadExecutor(); //假如线程死人,会重新开启一个新的线程 for (int j = 1; j <= 10; j++) { final int task = j; threadPool.execute( new Runnable() { public void run() { for (int i = 1; i <= 10; i++) System. out .println(Thread.currentThread().getName() + "第" + task + "任务干了第个" + i + "个"); // try{Thread.sleep(2000);}catch(Exception e ){} } }); } // threadPool.shutdown(); //threadPool.shutdownNow(); System. out .println("10个任务已经提交" ); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.schedule( new Runnable(){ public void run(){ System. out .println("BOMBING" ); } }, 3, TimeUnit. SECONDS ); } }
Callable(runnable) & future(Future 表示异步计算的结果)
1. futrue取得的结果类型和Callable返回的结果类型必须一致,这是通过泛型来实现的
2. Callable要采用和ExecutorSERVICE的submi方法提交,返回的future对象可以取消任务。
3. completionSERVICE用于提交一组callable任务,其take方法返回已经完成的一个任务对应的future对象
将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。
使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。
例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,
然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。
好比:种了几块麦地,而后去收割。收割时则是先成熟的先收割。
package xyxysjxy.thread; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class Callable_Future { @SuppressWarnings( "unchecked" ) public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { ExecutorService executorService = Executors.newFixedThreadPool(3); Future<Integer> future = executorService .submit( new Callable<Integer>() { @Override public Integer call() throws Exception { return 12; } }); System. out .println(future.get(1, TimeUnit.SECONDS )); // 等待多少秒,假如还没有完成任务就抛异常 // 提交多个任务,等待着任务的完成 ExecutorService executorService2 = Executors.newFixedThreadPool(3); CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>( executorService2); for (int i = 0; i < 10; i++) { final int result = i; completionService.submit( new Callable<Integer>() { @Override public Integer call() throws Exception { Thread. sleep(1000); return result; } }); } for (int j = 0; j < 10; j++) { Future<Integer> future2 = completionService.take(); System. out .println(future2.get());// 等着数据的到来 } } }
lock & condition
readwritelock:读写锁。
class CachedData { Object data; volatile boolean cacheValid; ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // Must release read lock before acquiring write lock rwl.readLock().unlock(); rwl.writeLock().lock(); // Recheck state because another thread might have acquired // write lock and changed state before we did. if (!cacheValid) { data = ... cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock rwl.readLock().lock(); rwl.writeLock().unlock(); // Unlock write, still hold read } use(data); rwl.readLock().unlock(); } }
对condition的讲解:
package cn.itcast.heima2; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ThreeConditionCommunication { /** * @param args */ public static void main(String[] args) { final Business business = new Business(); new Thread( new Runnable() { @Override public void run() { for (int i=1;i<=50;i++){ business.sub2(i); } } } ).start(); new Thread( new Runnable() { @Override public void run() { for (int i=1;i<=50;i++){ business.sub3(i); } } } ).start(); for (int i=1;i<=50;i++){ business.main(i); } } static class Business { Lock lock = new ReentrantLock(); Condition condition1 = lock .newCondition(); Condition condition2 = lock .newCondition(); Condition condition3 = lock .newCondition(); private int shouldSub = 1; public void sub2( int i){ lock .lock(); try { while (shouldSub != 2){ try { condition2 .await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } for (int j=1;j<=10;j++){ System. out .println("sub2 thread sequence of " + j + ",loop of " + i); } shouldSub = 3; condition3 .signal(); } finally { lock .unlock(); } } public void sub3( int i){ lock .lock(); try { while (shouldSub != 3){ try { condition3 .await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } for (int j=1;j<=20;j++){ System. out .println("sub3 thread sequence of " + j + ",loop of " + i); } shouldSub = 1; condition1 .signal(); } finally { lock .unlock(); } } public void main( int i){ lock .lock(); try { while (shouldSub != 1){ try { condition1 .await(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } for (int j=1;j<=100;j++){ System. out .println("main thread sequence of " + j + ",loop of " + i); } shouldSub = 2; condition2 .signal(); } finally { lock .unlock(); } } } }
Semaphore 实现信号登:(类似于多个线程多把锁只要拿了锁就可以实现同步与互斥了)
1. semaphore可以维护当前访问自身的线程个数,并提供了同步机制。
使用semaphore可以控制同时访问资源的线程个数,eg 实现一个允许的并发访问数
eg:比例10个人上厕所,就只有5个位置,那么只能是5个人先上
(其实这个是可以通过semaphore对象控制先后顺序的),
等出来了任意的一个,那么剩下的任意一个就可以接上去了。
2. 单个信号量的semaphore对象可以实现互斥锁的功能,并且可以是一个线程获得了锁,再由另外一个
线程释放锁,这可以应用于死锁的恢复的一些场合。
其他的同步工具:
1. CyclicBarrier 表示 大家彼此等待:大家一起去游玩所以规定一个地点集合只
有全部到了指定了集合点才可以一起出发,但是不一定是同时到。
package cn.itcast.heima2; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3); for (int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { Thread. sleep(( long)(Math. random()*10000)); System. out .println("线程" + Thread.currentThread().getName() + "即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊" :"正在等候" )); cb.await(); Thread. sleep(( long)(Math. random()*10000)); System. out .println("线程" + Thread.currentThread().getName() + "即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊" :"正在等候" )); cb.await(); Thread. sleep(( long)(Math. random()*10000)); System. out .println("线程" + Thread.currentThread().getName() + "即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊" :"正在等候" )); cb.await(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } } 2. countdownlatch 犹如倒计数器,一个裁判可以多运动员的命令的命令。一个裁判也可以等待多个运动员的结果 package cn.itcast.heima2; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountdownLatchTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(3); for (int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { System. out .println("线程" + Thread.currentThread().getName() + "正准备接受命令" ); cdOrder.await(); System. out .println("线程" + Thread.currentThread().getName() + "已接受命令" ); Thread. sleep(( long)(Math. random()*10000)); System. out .println("线程" + Thread.currentThread().getName() + "回应命令处理结果" ); cdAnswer.countDown(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread. sleep(( long)(Math. random()*10000)); System. out .println("线程" + Thread.currentThread().getName() + "即将发布命令" ); cdOrder.countDown(); System. out .println("线程" + Thread.currentThread().getName() + "已发送命令,正在等待结果" ); cdAnswer.await(); System. out .println("线程" + Thread.currentThread().getName() + "已收到所有响应结果" ); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } }
3.exchange 两个线程数据的交换:两个线程都到达了才可以进行数据交换。买毒粉---卖毒粉
数据队列 BlockingQueue
0.用三个空间的队列来演示阻塞队列的功能和效果
1.可用2个具有一个空间的队列来实现同步通知的功能
2. 阻塞队列与semaphore有些类似,但是也不同,阻塞队列是一方放数据,一方取数据,
semaphore通常是同一方设置和释放信号量。
0.用三个空间的队列来演示阻塞队列的功能和效果
package cn.itcast.heima2; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTest { public static void main(String[] args) { final BlockingQueue queue = new ArrayBlockingQueue(3); for (int i=0;i<2;i++){ new Thread(){ public void run(){ while (true ){ try { Thread. sleep(( long)(Math. random()*1000)); System. out .println(Thread.currentThread().getName() + "准备放数据!" ); queue.put(1); System. out .println(Thread.currentThread().getName() + "已经放了数据," + "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } new Thread(){ public void run(){ while (true ){ try { //将此处的睡眠时间分别改为100和1000,观察运行结果 Thread. sleep(1000); System. out .println(Thread.currentThread().getName() + "准备取数据!" ); queue.take(); System. out .println(Thread.currentThread().getName() + "已经取走数据," + "队列目前有" + queue.size() + "个数据" ); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } } 1.可用2个具有一个空间的队列来实现同步通知的功能 package cn.itcast.heima2; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class BlockingQueueCommunication { /** * @param args */ public static void main(String[] args) { final Business business = new Business(); new Thread( new Runnable() { @Override public void run() { for (int i=1;i<=50;i++){ business.sub(i); } } } ).start(); for (int i=1;i<=50;i++){ business.main(i); } } static class Business { BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1); BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1); { Collections. synchronizedMap( null); try { System.out .println("xxxxxdfsdsafdsa" ); queue2 .put(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void sub( int i){ try { queue1 .put(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } for (int j=1;j<=10;j++){ System. out .println("sub thread sequece of " + j + ",loop of " + i); } try { queue2 .take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void main( int i){ try { queue2 .put(1); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } for (int j=1;j<=100;j++){ System. out .println("main thread sequece of " + j + ",loop of " + i); } try { queue1 .take(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
同步集合:
以前的集合存在线程不安全的情况,假如遇到多线程访问集合时用同步集合
以前的集合是不能在遍历的同时还进行删减的操作但是替换了同步集合就不会出现这种情况
package cn.itcast.heima2; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.CopyOnWriteArrayList; public class CollectionModifyExceptionTest { public static void main(String[] args) { Collection users = new CopyOnWriteArrayList(); //new ArrayList(); users.add( new User( "张三" ,28)); users.add( new User( "李四" ,25)); users.add( new User( "王五" ,31)); Iterator itrUsers = users .iterator(); while (itrUsers.hasNext()){ System. out .println("aaaa" ); User user = (User)itrUsers.next(); if ("李四" .equals(user.getName())){ users.remove(user); //itrUsers.remove(); } else { System. out .println(user); } } } }
线程的深度加强