MQTT---HiveMQ源码详解(十七)Cluster-Consistent Hashing Ring & Node Lifecycle

源博客地址:http://blog.csdn.net/pipinet123


MQTT交流群:221405150


Consistent Hashing Ring

基本上只要做Cluster,都会使用到一致性Hash环,具体作用此处就不细讲,我们只了解HiveMQ怎么用它,怎么实现它,这样实现能够带来什么好处。

  • HiveMQ没有Master/Slave,它只由JGroup View(详情请查阅JGroup)第一个node作为Coordinator,这样就可以达到一个node也可以做集群(虽然这样的集群没有什么卵用)。
  • HiveMQ采用两个一致性Hash环,来解决脑裂问题,以及脑裂后merge的问题。
  • 每个node 500个虚拟节点,来增加node变化带来的动荡问题。
  • Primary环:排除joining的node,即只添加RUNNING状态的node。
  • Minority环:包含joining的node,即添加JOINING、RUNNING、MERGING状态的node。
  • 它的hash算法由net.openhft.hashing.LongHashFunction.xx_r39()提供

ConsistentHashingRing源码

相对来说比较简单,我就不一行一行写注释了,网上针对一致性hash环实现各种版本到处都是,详细讲解也到处都是。


@Singleton
public class ConsistentHashingRing {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsistentHashingRing.class);
    private final String name;
    public static final int NODE_BUCKET_COUNT = 500;
    private final LongHashFunction hashFunction;
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
    @VisibleForTesting
    final NavigableMap<Long, String> buckets;
    @VisibleForTesting
    final ConcurrentHashMap<String, String> bucketNodes = new ConcurrentHashMap<>();
    final Set<String> nodes = Sets.newConcurrentHashSet();

    public ConsistentHashingRing(String name, LongHashFunction hashFunction) {
        this.name = name;
        this.buckets = new ConcurrentSkipListMap();
        this.hashFunction = hashFunction;
    }

    public void add(@NotNull String node) {
        Preconditions.checkNotNull(node, "Name must not be null");
        LOGGER.trace("Add node {} to the {}.", node, this.name);
        Lock lock = this.readWriteLock.writeLock();
        lock.lock();
        try {
            for (int bucketIndex = 0; bucketIndex < NODE_BUCKET_COUNT; bucketIndex++) {
                long bucketHash = this.hashFunction.hashChars(node + bucketIndex);
                if (this.buckets.containsKey(bucketHash)) {
                    if (this.buckets.get(bucketHash).compareTo(node + 1) > 0) {
                        this.buckets.put(bucketHash, node + bucketIndex);
                        this.nodes.add(node);
                        this.bucketNodes.put(node + bucketIndex, node);
                    }
                } else {
                    this.buckets.put(bucketHash, node + bucketIndex);
                    this.nodes.add(node);
                    this.bucketNodes.put(node + bucketIndex, node);
                }
            }
        } finally {
            lock.unlock();
        }
    }

    public void remove(@NotNull String node) {
        Preconditions.checkNotNull(node, "Name must not be null");
        LOGGER.trace("Remove node {} from the {}.", node, this.name);
        Lock lock = this.readWriteLock.writeLock();
        lock.lock();
        try {
            for (int bucketIndex = 0; bucketIndex < NODE_BUCKET_COUNT; bucketIndex++) {
                long bucketHash = this.hashFunction.hashChars(node + bucketIndex);
                this.buckets.remove(bucketHash);
                this.bucketNodes.remove(node + bucketIndex);
            }
            this.nodes.remove(node);
        } finally {
            lock.unlock();
        }
    }

    public Set<String> getReplicaNodes(@NotNull String key, int replicateCount) {
        Preconditions.checkNotNull(key, "key must not be null");
        int nodeCount = this.nodes.size();
        if (replicateCount > nodeCount - 1) {
            LOGGER.trace("There are not enough buckets in the consistent hash ring for {} replicas.", replicateCount);
            replicateCount = nodeCount - 1;
        }
        String bucket = getBucket(key);
        long bucketHash = this.hashFunction.hashChars(bucket);
        Lock lock = this.readWriteLock.readLock();
        lock.lock();
        Set<String> buckets = new HashSet<>();
        try {
            for (Map.Entry<Long, String> entry = this.buckets.higherEntry(bucketHash);
                 buckets.size() < replicateCount;
                 entry = this.buckets.higherEntry(entry.getKey())) {
                if (entry == null) {
                    entry = this.buckets.firstEntry();
                }
                if (!this.bucketNodes.get(entry.getValue()).equals(this.bucketNodes.get(bucket))) {
                    buckets.add(this.bucketNodes.get(entry.getValue()));
                }
            }
            return buckets;
        } finally {
            lock.unlock();
        }
    }

    public Set<String> getNodes() {
        ImmutableSet.Builder<String> builder = ImmutableSet.builder();
        Lock lock = this.readWriteLock.readLock();
        lock.lock();
        try {
            return builder.addAll(this.nodes).build();
        } finally {
            lock.unlock();
        }
    }

    public String getBucket(@NotNull String key) {
        Preconditions.checkNotNull(key, "key must not be null");
        if (this.buckets.isEmpty()) {
            throw new IllegalStateException("Consistent hash ring is empty.");
        }
        long keyHash = this.hashFunction.hashChars(key);
        Lock lock = this.readWriteLock.readLock();
        lock.lock();
        try {
            Map.Entry<Long, String> entry = this.buckets.ceilingEntry(keyHash);
            if (entry != null) {
                return entry.getValue();
            }
            return this.buckets.ceilingEntry(Long.MIN_VALUE).getValue();
        } finally {
            lock.unlock();
        }
    }

    public String getNode(@NotNull String key) {
        Preconditions.checkNotNull(key, "key must not be null");
        if (this.buckets.isEmpty()) {
            throw new IllegalStateException("Consistent hash ring is empty.");
        }
        long keyHash = this.hashFunction.hashChars(key);
        Lock lock = this.readWriteLock.readLock();
        lock.lock();
        try {
            Map.Entry<Long, String> entry = this.buckets.ceilingEntry(keyHash);
            if (entry != null) {
                return this.bucketNodes.get(entry.getValue());
            }
            return this.bucketNodes.get(this.buckets.ceilingEntry(Long.MIN_VALUE).getValue());
        } finally {
            lock.unlock();
        }
    }
}

Node Lifecycle

其实了解了上面HiveMQ Cluster的基础之后,再来看node的生命周期,就是一件简单的事情了。

废话少说,我们直接上状态变化图。

各种状态简介

UNKNOWN

当JGroup通知新的node连接,但在本地不存在,则该node状态标记为UNKNOWN

NOT_JOINED

当node连接上JGroup后,若它不是唯一的node,则它将自己主动标记为NOT_JOINED

JOINING

当node将自己的状态更新至Cluster完成后,它将自己主动标记为JOINING

MERGE_MINORITY

当脑裂后与Coordinator在同组的其他node都将被标记为MERGE_MINORITY;或者加入Primary Group失败后它将自己主动标记为MERGE_MINORITY

MERGING

MERGE_MINORITY会一直去尝试主动将自己标记为MERGING

RUNNING

当MERGING成功后,node将会进行Replicate操作,当Replicate操作完成,就主动将自己标记为RUNNING

SHUTTING_DOWN/SHUTDOWN_FINISHED/DEAD

这三种状态在源码中未被使用,但HiveMQ还这样定义,或许是保留吧,反正博主未搞懂,不过不重要,不懂就算了,^_^。

时间: 2024-10-13 20:16:54

MQTT---HiveMQ源码详解(十七)Cluster-Consistent Hashing Ring & Node Lifecycle的相关文章

Android编程之Fragment动画加载方法源码详解

上次谈到了Fragment动画加载的异常问题,今天再聊聊它的动画加载loadAnimation的实现源代码: Animation loadAnimation(Fragment fragment, int transit, boolean enter, int transitionStyle) { 接下来具体看一下里面的源码部分,我将一部分一部分的讲解,首先是: Animation animObj = fragment.onCreateAnimation(transit, enter, fragm

Java concurrent AQS 源码详解

一.引言 AQS(同步阻塞队列)是concurrent包下锁机制实现的基础,相信大家在读完本篇博客后会对AQS框架有一个较为清晰的认识 这篇博客主要针对AbstractQueuedSynchronizer的源码进行分析,大致分为三个部分: 静态内部类Node的解析 重要常量以及字段的解析 重要方法的源码详解. 所有的分析仅基于个人的理解,若有不正之处,请谅解和批评指正,不胜感激!!! 二.Node解析 AQS在内部维护了一个同步阻塞队列,下面简称sync queue,该队列的元素即静态内部类No

深入Java基础(四)--哈希表(1)HashMap应用及源码详解

继续深入Java基础系列.今天是研究下哈希表,毕竟我们很多应用层的查找存储框架都是哈希作为它的根数据结构进行封装的嘛. 本系列: (1)深入Java基础(一)--基本数据类型及其包装类 (2)深入Java基础(二)--字符串家族 (3)深入Java基础(三)–集合(1)集合父类以及父接口源码及理解 (4)深入Java基础(三)–集合(2)ArrayList和其继承树源码解析以及其注意事项 文章结构:(1)哈希概述及HashMap应用:(2)HashMap源码分析:(3)再次总结关键点 一.哈希概

Spring IOC源码详解之容器依赖注入

Spring IOC源码详解之容器依赖注入 上一篇博客中介绍了IOC容器的初始化,通过源码分析大致了解了IOC容器初始化的一些知识,先简单回顾下上篇的内容 载入bean定义文件的过程,这个过程是通过BeanDefinitionReader来完成的,其中通过 loadBeanDefinition()来对定义文件进行解析和根据Spring定义的bean规则进行处理 - 事实上和Spring定义的bean规则相关的处理是在BeanDefinitionParserDelegate中完成的,完成这个处理需

Spring IOC源码详解之容器初始化

Spring IOC源码详解之容器初始化 上篇介绍了Spring IOC的大致体系类图,先来看一段简短的代码,使用IOC比较典型的代码 ClassPathResource res = new ClassPathResource("beans.xml"); DefaultListableBeanFactory factory = new DefaultListableBeanFactory(); XmlBeanDefinitionReader reader = new XmlBeanDe

IntentService源码详解

IntentService可以做什么: 如果你有一个任务,分成n个子任务,需要它们按照顺序完成.如果需要放到一个服务中完成,那么IntentService就会使最好的选择. IntentService是什么: IntentService是一个Service(看起来像废话,但是我第一眼看到这个名字,首先注意的是Intent啊.),所以如果自定义一个IntentService的话,一定要在AndroidManifest.xml里面声明. 从上面的"可以做什么"我们大概可以猜测一下Inten

Android View 事件分发机制源码详解(View篇)

前言 在Android View 事件分发机制源码详解(ViewGroup篇)一文中,主要对ViewGroup#dispatchTouchEvent的源码做了相应的解析,其中说到在ViewGroup把事件传递给子View的时候,会调用子View的dispatchTouchEvent,这时分两种情况,如果子View也是一个ViewGroup那么再执行同样的流程继续把事件分发下去,即调用ViewGroup#dispatchTouchEvent:如果子View只是单纯的一个View,那么调用的是Vie

butterknife源码详解

butterknife源码详解 作为Android开发者,大家肯定都知道大名鼎鼎的butterknife.它大大的提高了开发效率,虽然在很早之前就开始使用它了,但是只知道是通过注解的方式实现的,却一直没有仔细的学习下大牛的代码.最近在学习运行时注解,决定今天来系统的分析下butterknife的实现原理. 如果你之前不了解Annotation,那强烈建议你先看注解使用. 废多看图: 从图中可以很直观的看出它的module结构,以及使用示例代码. 它的目录和我们在注解使用这篇文章中介绍的一样,大体

Android ArrayMap源码详解

尊重原创,转载请标明出处    http://blog.csdn.net/abcdef314159 分析源码之前先来介绍一下ArrayMap的存储结构,ArrayMap数据的存储不同于HashMap和SparseArray,在上一篇<Android SparseArray源码详解>中我们讲到SparseArray是以纯数组的形式存储的,一个数组存储的是key值一个数组存储的是value值,今天我们分析的ArrayMap和SparseArray有点类似,他也是以纯数组的形式存储,不过不同的是他的