zookeeper实现fifo以及并发访问删除

package cn.sniper.zookeeper;
import java.io.IOException;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
public class ZookeeperUtil {
 
 @Test
 public void helloword() {
  //端口默认是2181
  //String connectString = "192.168.1.231";
  String connectString = "192.168.1.231:2181";
  int sessionTimeout = 20000;
  
  try {
   ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    public void process(WatchedEvent event) {
     System.out.println(event);
    }
   });
   
   System.out.println(zk);
   zk.close();
  } catch (IOException e) {
   e.printStackTrace();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
 
 @Test
 public void create() {
  String connectString = "192.168.1.231:2181";
  int sessionTimeout = 20000;
  
  try {
   ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    public void process(WatchedEvent event) {
     System.err.println("事件类型:" + event.getType());
    }
   });
   
   //创建节点 
   zk.create("/sniper1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
   
   zk.close();
  } catch (IOException e) {
   e.printStackTrace();
  } catch (InterruptedException e) {
   e.printStackTrace();
  } catch (KeeperException e) {
   e.printStackTrace();
  }
 }
 
 @Test
 public void fifoIn() {
  String connectString = "192.168.1.231,192.168.1.232,192.168.1.233";
  int sessionTimeout = 50000;
  
  try {
   ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    public void process(WatchedEvent event) {
     System.err.println("事件类型:" + event.getType());
    }
   });
   
   //每个客户端连进来的时候,都在fifo下创建一个有序节点  模拟10个客户端连接进入
   for(int i=0; i<10; i++) {
    zk.create("/fifo/", String.valueOf(System.currentTimeMillis()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
   }
   
   zk.close();
  } catch (IOException e) {
   e.printStackTrace();
  } catch (InterruptedException e) {
   e.printStackTrace();
  } catch (KeeperException e) {
   e.printStackTrace();
  }
 }
 
 @Test
 public void fifoOut() {
  String connectString = "192.168.1.231,192.168.1.232,192.168.1.233";
  int sessionTimeout = 30000;
  
  try {
   ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
    public void process(WatchedEvent event) {
     System.err.println("事件类型:" + event.getType());
    }
   });
   
   List<String> children = zk.getChildren("/fifo", new Watcher() {
    public void process(WatchedEvent event) {
     System.err.println("事件类型:" + event.getType());
    }
   });
   
   //由于节点的有序性,将节点用treeSet排序一下,取得第一个元素,就可以做到先进先出队列了
   TreeSet<String> set = new TreeSet<String>(children);
   
   String child = set.first();
   
   System.err.println(child);
   
   zk.delete("/fifo/"+child, -1);
   
   zk.close();
  } catch (IOException e) {
   e.printStackTrace();
  } catch (InterruptedException e) {
   e.printStackTrace();
  } catch (KeeperException e) {
   e.printStackTrace();
  }
 }
 
 /**
  * 多线程创建有序节点不存在问题
  * @throws Exception
  */
 public static void fifoInMultiThread() throws Exception {
  //int nThreads = Runtime.getRuntime().availableProcessors();
  
  int nThreads = 20;
  final ExecutorService service = Executors.newFixedThreadPool(nThreads);
  
  final ZooKeeper zk = new ZooKeeper("192.168.1.231,192.168.1.232,192.168.1.233", 30000, new Watcher() {
   public void process(WatchedEvent evet) {
    
   }
  });
  
  final long begin = System.currentTimeMillis();
  
  for(int i=0; i<1000; i++) {
   final int j = i;
   service.execute(new Runnable() {
    public void run() {
     try {
      zk.create("/fifo/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
     } catch (KeeperException e) {
      e.printStackTrace();
     } catch (InterruptedException e) {
      e.printStackTrace();
     } finally {
      if(j == 999) {
       try {
        zk.close();
        service.shutdown();
        System.err.println(System.currentTimeMillis() - begin);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      }
     }
    }
   });
  }
 }
 
 @Test
 public void size() throws Exception {
  final ZooKeeper zk = new ZooKeeper("192.168.1.231,192.168.1.232,192.168.1.233", 50000, new Watcher() {
   public void process(WatchedEvent evet) {
    
   }
  });
  
  System.out.println(zk.getChildren("/fifo", new Watcher() {
   public void process(WatchedEvent evet) {
    
   }
  }).size() + " =========================");
  
  zk.close();
 }
 
 /**
  * 多线程取出节点并且删除节点存在问题,synchronized解决方案
  * @throws Exception
  */
 public static void fifoOutMultiThread() throws Exception {
  int nThreads = 20;
  final ExecutorService service = Executors.newFixedThreadPool(nThreads);
  
  final ZooKeeper zk = new ZooKeeper("192.168.1.231,192.168.1.232,192.168.1.233", 50000, new Watcher() {
   public void process(WatchedEvent evet) {
    
   }
  });
  
  final long begin = System.currentTimeMillis();
  
  for(int i=0; i<1000; i++) {
   final int j = i;
   service.execute(new Runnable() {
    public void run() {
     try {
      synchronized (ZookeeperUtil.class) {
       List<String> children = zk.getChildren("/fifo", new Watcher() {
        public void process(WatchedEvent arg0) {
         
        }
       });
       
       if(children != null && children.size() > 0) {
        TreeSet<String> set = new TreeSet<String>(children);
        String pop = set.first();
        
        System.err.println(pop);
        
        zk.delete("/fifo/"+pop, -1);
       }
      }
     } catch (KeeperException e) {
      e.printStackTrace();
     } catch (InterruptedException e) {
      e.printStackTrace();
     } finally {
      if(j == 999) {
       try {
        zk.close();
        service.shutdown();
        System.err.println(System.currentTimeMillis() - begin);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      }
     }
    }
   });
  }
 }
 
 /**
  * 多线程取出节点并且删除节点存在问题,读写锁方式
  * @throws Exception
  */
 public static void fifoOutMultiThreadLock() throws Exception {
  final ReadWriteLock rwl = new ReentrantReadWriteLock();
  
  int nThreads = 20;
  final ExecutorService service = Executors.newFixedThreadPool(nThreads);
  
  final ZooKeeper zk = new ZooKeeper("192.168.1.231,192.168.1.232,192.168.1.233", 50000, new Watcher() {
   public void process(WatchedEvent evet) {
    
   }
  });
  
  final long begin = System.currentTimeMillis();
  
  for(int i=0; i<1000; i++) {
   final int j = i;
   service.execute(new Runnable() {
    public void run() {
     try {
      rwl.readLock().lock();
       //取得fifo下的所有直接下级节点
       List<String> children = zk.getChildren("/fifo", new Watcher() {
        public void process(WatchedEvent event) {
         
        }
       });
       
       //用treeset排好序
       TreeSet<String> set = new TreeSet<String>(children);
       //取出第一个,即最先进入的一个
       String pop = set.first();
       //判断是否存在
       Stat stat = zk.exists("/fifo/"+pop, new Watcher() {
        public void process(WatchedEvent event) {
         
        }
       });
      
      if(stat != null) {
       rwl.readLock().unlock();
       
       rwl.writeLock().lock();
        //用于多线程读的因素,有可能第一条线程读到节点还存在,紧接着第二条线程就把该节点删除了,所以,加了写锁之后还需要判断一次节点是否存在
        stat = zk.exists("/fifo/"+pop, new Watcher() {
         public void process(WatchedEvent event) {
          
         }
        });
        
        if(stat != null) {
         System.err.println(pop);
         zk.delete("/fifo/"+pop, -1);
        }
       rwl.writeLock().unlock();
       rwl.readLock().lock();
      }
      rwl.readLock().unlock();
     } catch (KeeperException e) {
      e.printStackTrace();
     } catch (InterruptedException e) {
      e.printStackTrace();
     } finally {
      if(j == 999) {
       try {
        zk.close();
        service.shutdown();
        System.err.println(System.currentTimeMillis() - begin);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      }
     }
    }
   });
  }
 }
 
 public static void main(String[] args) throws Exception {
  //fifoInMultiThread();
  //fifoOutMultiThread();
  //fifoOutMultiThreadLock();
 }
 
}
时间: 2024-11-13 09:55:04

zookeeper实现fifo以及并发访问删除的相关文章

Android 文件断点上传器[多用户并发访问]

通过TCP/IP(SOCKET)协议实现文件断点上传(实现多用户并发访问). HTTP不支持文件断点续传,所以无法使用HTTP协议. 场景: 1. 网络不稳定,导致上传失败,下次不是从头开始,而是从断点开始上传: 2. 上传大文件,无法http上传,因为web服务器考虑到安全因素,会限制文件大小,一般10+m. 此文件断点上传器使用自定义协议. 服务器为上传的文件在服务器端生成唯一的sourceid关联上传文件,当客户端上传文件时,首次的sourceid为空,服务端先判断sourceid是否为空

(实例篇)php 使用redis锁限制并发访问类示例

1.并发访问限制问题 对于一些需要限制同一个用户并发访问的场景,如果用户并发请求多次,而服务器处理没有加锁限制,用户则可以多次请求成功. 例如换领优惠券,如果用户同一时间并发提交换领码,在没有加锁限制的情况下,用户则可以使用同一个换领码同时兑换到多张优惠券. 伪代码如下: if A(可以换领)         B(执行换领)              C(更新为已换领)             D(结束) 如果用户并发提交换领码,都能通过可以换领(A)的判断,因为必须有一个执行换领(B)后,才会

大数据量高并发访问的数据库优化方法

一.数据库结构的设计 如果不能设计一个合理的数据库模型,不仅会增加客户端和服务器段程序的编程和维护的难度,而且将会影响系统实际运行的性能.所以,在一个系统开始实施之前,完备的数据库模型的设计是必须的. 在一个系统分析.设计阶段,因为数据量较小,负荷较低.我们往往只注意到功能的实现,而很难注意到性能的薄弱之处,等到系统投入实际运行一段时间后,才发现系统的性能在降低,这时再来考虑提高系统性能则要花费更多的人力物力,而整个系统也不可避免的形成了一个打补丁工程. 所以在考虑整个系统的流程的时候,我们必须

php 使用redis锁限制并发访问类

1.并发访问限制问题 对于一些需要限制同一个用户并发访问的场景,如果用户并发请求多次,而服务器处理没有加锁限制,用户则可以多次请求成功. 例如换领优惠券,如果用户同一时间并发提交换领码,在没有加锁限制的情况下,用户则可以使用同一个换领码同时兑换到多张优惠券. 伪代码如下: if A(可以换领) B(执行换领) C(更新为已换领) D(结束) 如果用户并发提交换领码,都能通过可以换领(A)的判断,因为必须有一个执行换领(B)后,才会更新为已换领(C).因此如果用户在有一个更新为已换领之前,有多少次

SpringCloud注册中心集群化及如何抗住大型系统的高并发访问

一.场景引入 本人所在的项目由于直接面向消费者,迭代周期迅速,所以服务端框架一直采用Springboot+dubbo的组合模式,每个服务由service模块+web模块构成,service模块通过公司API网关向安卓端暴 露restful接口,web模块通过dubbo服务向service模块获取数据渲染页面.测试环境dubbo的注册中心采用的单实例的zookeeper,随着时间的发现注册在zookeeper上的生产者和消费者越来越多,测试 人员经常在大规模的压测后发现zookeeper挂掉的现象

Cocos2d-x优化中多线程并发访问

多线程并发访问在Cocos2d-x引擎中用的不是很多,这主要是因为中整个结构设计没有采用多线程.源自于Objective-C的Ref对象,需要使用AutoreleasePool进行内存管理,AutoreleasePool是非线程安全的,所有不推荐在子多线程中调用Ref对象的retain(). release()和autorelease()等函数.另外,OpenGL上下文对象也是不支持线程安全的.但是有的时候我们需要异步加载一些资源,例如:加载图片纹理.声音的预处理和网络请求数据等.如果是异步加载

iOS Core data多线程并发访问的问题

大家都知道Core data本身并不是一个并发安全的架构:不过针对多线程访问带来的问题,Apple给出了很多指导:同时很多第三方的开发者也贡献了很多解决方法.不过最近碰到的一个问题很奇怪,觉得有一定的特殊性,与大家分享一下. 这个问题似乎在7.0.1以前的版本上并不存在:不过后来我升级版本到了7.0.4.app的模型很简单,主线程在前台对数据库进行读写,而后台线程不断地做扫描(只读).为此每个线程中各创建了一个NSManagedObjectContext. 这个模型其实有点奇怪,因为普遍的模型是

网站大规模并发访问的优化建议

一.服务器配置优化 我们需要根据应用服务器的性能和并发访问量的大小来规划应用服务器的数量.有一个使用原则是:单台应用服务器的性能不一定要求最好,但是数量一定要足够, 最好能有一定的冗余来保障服务器故障.特别是,在高并发访问峰期间,适当增加某些关键应用的服务器数量.比如在某些高峰查询业务上,可以使用多台服务器, 以满足用户每小时上百万次的点击量. 二.使用负载均衡技术 负载均衡技术是解决集中并发访问的核心技术,也是一种较为有效的解决网站大规模并发访问的方法.实现负载均衡技术的主要设备是负载均衡器服

高并发访问mysql时的问题(一):库存超减

如果在对某行记录的更新时不采取任何防范措施,在多线程访问时,就容易出现库存为负数的错误. 以下用php.mysql,apache ab工具举例说明: mysql表结构 CREATE TABLE `yxt_test_concurrence` ( `id` int(11) NOT NULL AUTO_INCREMENT, `value` int(11) NOT NULL COMMENT '库存', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2