java.util.concurrent.CyclicBarrier 类是一种同步机制,它能够对处理一些算法的线程实现同步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。
1 package cyclicbarrier; 2 3 import java.util.concurrent.BrokenBarrierException; 4 import java.util.concurrent.ConcurrentHashMap; 5 import java.util.concurrent.CyclicBarrier; 6 7 public class Total { 8 public static ConcurrentHashMap<String, Integer> result = new ConcurrentHashMap<String, Integer>(); 9 public static void main(String[] args) { 10 11 TotalService totalService = new TotalServiceImpl(); 12 CyclicBarrier barrier = new CyclicBarrier(5, 13 new TotalTask(totalService)); 14 15 // 实际系统是查出所有省编码code的列表,然后循环,每个code生成一个线程。 16 new BillTask(new BillServiceImpl(), barrier, "北京").start(); 17 new BillTask(new BillServiceImpl(), barrier, "上海").start(); 18 new BillTask(new BillServiceImpl(), barrier, "广西").start(); 19 new BillTask(new BillServiceImpl(), barrier, "四川").start(); 20 new BillTask(new BillServiceImpl(), barrier, "黑龙江").start(); 21 22 } 23 } 24 25 /** 26 * 主任务:汇总任务 27 */ 28 class TotalTask implements Runnable { 29 private TotalService totalService; 30 31 TotalTask(TotalService totalService) { 32 this.totalService = totalService; 33 } 34 35 public void run() { 36 // 读取内存中各省的数据汇总,过程略。 37 int totalCount = totalService.count(); 38 Total.result.put("total", totalCount); 39 System.out.println("======================================="); 40 System.out.println("开始全国汇总"); 41 System.out.println("全国汇总结果:" + Total.result.get("total")); 42 } 43 } 44 45 /** 46 * 子任务:计费任务 47 */ 48 class BillTask extends Thread { 49 // 计费服务 50 private BillService billService; 51 private CyclicBarrier barrier; 52 // 代码,按省代码分类,各省数据库独立。 53 private String code; 54 55 BillTask(BillService billService, CyclicBarrier barrier, String code) { 56 this.billService = billService; 57 this.barrier = barrier; 58 this.code = code; 59 } 60 61 public void run() { 62 System.out.println("开始计算--" + code + "省--数据!"); 63 int ret = billService.bill(code); 64 Total.result.put(code, ret); 65 // 把bill方法结果存入内存,如ConcurrentHashMap,vector等,代码略 66 System.out.println("结果:" + ret); 67 System.out.println(code + "省已经计算完成,并通知汇总Service!"); 68 try { 69 // 通知barrier已经完成 70 barrier.await(); 71 } catch (InterruptedException e) { 72 e.printStackTrace(); 73 } catch (BrokenBarrierException e) { 74 e.printStackTrace(); 75 } 76 } 77 } 78 79 interface TotalService 80 { 81 public int count(); 82 } 83 84 class TotalServiceImpl implements TotalService 85 { 86 @Override 87 public int count() 88 { 89 int totalCount = 0; 90 for(String key : Total.result.keySet()) 91 { 92 totalCount += Total.result.get(key); 93 } 94 return totalCount; 95 } 96 } 97 98 interface BillService 99 { 100 public int bill(String code); 101 } 102 class BillServiceImpl implements BillService 103 { 104 105 @Override 106 public int bill(String code) 107 { 108 return 1; 109 } 110 }
Total.java
原文 http://blog.chinaunix.net/uid-7374279-id-4658408.html
时间: 2024-11-08 19:39:52