使用ZooKeeper实现的FIFO队列,这个队列是分布式的。
package fifo; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * 使用ZooKeeper实现的FIFO队列 * @author lisg * */ public class ZKFIFO { private static final String HOSTS = "vm1"; private ZooKeeper zk = null; private static final String PARENT_PATH = "/fifo"; private static final String SEQ_PREFIX = "seq-"; public ZKFIFO() { try { final CountDownLatch cdl = new CountDownLatch(1); zk = new ZooKeeper(HOSTS, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if(KeeperState.SyncConnected.equals(event.getState())) { cdl.countDown(); } } }); cdl.await(); //创建父节点 Stat stat = zk.exists(PARENT_PATH, false); if(stat == null) { zk.create(PARENT_PATH, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { System.out.println("zookeeper集群连接失败!"); e.printStackTrace(); } } /** * 在父节点下创建顺序子节点 * @param data */ public void push(String data) { if(data == null) { data = ""; } try { zk.create(PARENT_PATH + "/" + SEQ_PREFIX, data.getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); } catch (Exception e) { e.printStackTrace(); } } /** * 删除字第一个子节点,并返回它的值 * @return */ public String pop() { try { final List<String> children = zk.getChildren(PARENT_PATH, false); if(children.isEmpty()) { return null; } Collections.sort(children); String firstChildPath = PARENT_PATH + "/" + children.get(0); final byte[] data = zk.getData(firstChildPath, false, null); zk.delete(firstChildPath, -1); return new String(data, "UTF-8"); } catch (Exception e) { e.printStackTrace(); } return null; } public void close() { try { this.zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { final ZKFIFO fifo = new ZKFIFO(); /* for(int i=0; i<10; i++) { new Thread() { public void run() { fifo.push("data-" + UUID.randomUUID().toString().replace("-", "")); }; }.start(); } */ System.out.println(fifo.pop()); fifo.close(); } }
需要改进的地方:
1)zookeeper异常处理、重试
时间: 2024-12-21 04:24:07