深入一致性哈希(Consistent Hashing)算法原理,并附100行代码实现

  本文为实现分布式任务调度系统中用到的一些关键技术点分享——Consistent Hashing算法原理和Java实现,以及效果测试。

 背景介绍

  一致性Hashing在分布式系统中经常会被用到, 用于尽可能地降低节点变动带来的数据迁移开销。Consistent Hashing算法在1997年就在论文Consistenthashing and random trees中被提出。

  先来简单理解下Hash是解决什么问题。假设一个分布式任务调度系统,执行任务的节点有n台机器,现有m个job在这n台机器上运行,这m个Job需要逐一映射到n个节点中一个,这时候可以选择一种简单的Hash算法来让m个Job可以均匀分布到n个节点中,比如 hash(Job)%n ,看上去很完美,但考虑如下两种情形:

  1. n个节点中有一个宕掉了,这时候节点数量变更为n-1,此时的映射公式变成 hash(Job)%(n-1)
  2. 由于Job数量增加,需要新增机器,此时的映射公式变成 hash(Job)%(n+1)

 1、2两种情形可以看到,基本上所有的Job会被重新分配到跟节点变动前不同的节点上,意味着需要迁移几乎所有正在运行的Job,想想这样会给系统带来多大的复杂性和性能损耗。

另外还有一种情况,假设节点的硬件处理性能不完全一致,想让性能高的节点多被分配一些Job,这时候上述简单的Hash映射算法更是很难做到。

如何解决这种节点变动带来的大量数据迁移和数据不均匀分配问题呢?一致性哈希算法就很巧妙的解决了这些问题。

Consistent Hashing是一种Hashing算法,典型的特征是:在减少或者添加节点时,可以尽可能地保证已经存在Key映射关系不变,尽可能地减少Key的迁移。

Consistent Hashing算法原理,如何处理Job->Node映射过程

  1. 确定hashing值空间

  给定值空间2^32,[0,2^32]是所有hash值的取值空间,形象地描述为如下一个环(ring):

2. 节点向值空间映射

  将节点Node向这个值空间映射,取Node的Hash值,选取一个可以固定标识一个Node的属性值进行Hashing,假设以字符串形式输入,算法如下:

可以取Node标识的md5值,然后截取其中32位作为映射值。md5取值如下:

private byte[] md5(String value) {
        MessageDigest md5;
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        md5.reset();
        byte[] bytes;
        try {
            bytes = value.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        md5.update(bytes);
        return md5.digest();
    }

  因为映射值只需要32位即可,所以可以利用以下方式计算最终值(number取0即可):

private long hash(byte[] digest, int number) {
        return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                | (digest[0 + number * 4] & 0xFF))
                & 0xFFFFFFFFL;
}

  把n个节点Node通过以上方式取得hash值,映射到环形值空间如下:

  算法中,将以有序Map的形式在内存中缓存每个节点的Hash值对应的物理节点信息。缓存于这个内存变量中:private final TreeMap<Long, String> virtualNodes 。

3. 数据向值空间映射

  数据Job取hash的方式跟节点Node的方式一模一样,可以使用上述md5->hash的方式同样将所有Job取得Hash映射到这个环中。

4. 数据和节点映射

 当节点和数据都被映射到这个环上后,可以设定一个规则把哪些数据hash值放在哪些节点Node Hash值上了,规则就是,沿着顺时针方向,数据hash值向后找到第一个Node Hash值即认为该数据hash值对应的数据映射到该Node上。至此,这一个从数据到节点的映射关系就确定了。

  顺时针找下一个Node Hash值算法如下:

public String select(Trigger trigger) {
        String key = trigger.toString();
        byte[] digest = md5(key);
        String node = sekectForKey(hash(digest, 0));
        return node;
    }

    private String sekectForKey(long hash) {
        String node;
        Long key = hash;
        if (!virtualNodes.containsKey(key)) {
            SortedMap<Long, String> tailMap = virtualNodes.tailMap(key);
            if (tailMap.isEmpty()) {
                key = virtualNodes.firstKey();
            } else {
                key = tailMap.firstKey();
            }
        }
        node = virtualNodes.get(key);
        return node;
    }

Trigger是对Job一次触发任务的抽象,这里可忽略关注,重写了toString方法返回一个标记一个Job的唯一标志,计算Hash值,从节点Hash值中按规则寻找。 虚拟节点后续介绍。

算法表现

    接下来就可以见识下一致性哈希基于这样的数据结构是如何发挥前文提到的优势的。

1. 节点减少时,看需要迁移的节点情况

   假设Node_1宕掉了,图中数据对象只有Job_1会被重新映射到Node_k,而其他Job_x扔保持原有映射关系不变。

2. 节点新增时

     假设新增Node_i,图中数据对象只有Job_k会被重新映射到Node_i上,其他Job_x同样保持原有映射关系不变。

算法优化-虚拟节点

    上述算法过程,会想到两个问题,第一,数据对象会不会分布不均匀,特别是新增节点或者减少节点时;第二,前文提到的如果想让部分节点多映射到一些数据对象,如何处理。虚拟节点这是解决这个问题。

将一个物理节点虚拟出一定数量的虚拟节点,分散到这个值空间上,需要尽可能地随机分散开。

  假设有4个物理节点Node,环上的每个色块代表一个虚拟节点涵盖的hash值区域,每种颜色代表一个物理节点。当物理节点较少时,虚拟节点数需要更高来确保更好的一致性表现。经测试,在物理节点为个位数时,虚拟节点可设置为160个,此时可带来较好的表现(后文会给出测试结果,160*n个总节点数情况下,如果发生一个节点变动,映射关系变化率基本为1/n,达到预期)。

  具体做算法实现时,已知物理节点,虚拟节点数设置为160,可将这160*n的节点计算出Hash值,以Hash值为key,以物理节点标识为value,以有序Map的形式在内存中缓存,作为后续计算数据对象对应的物理节点时的查询数据。代码如下,virtualNodes中缓存着所有虚拟节点Hash值对应的物理节点信息。

public ConsistentHash(List<String> nodes) {
        this.virtualNodes = new TreeMap<>();
        this.identityHashCode = identityHashCode(nodes);
        this.replicaNumber = 160;
        for (String node : nodes) {
            for (int i = 0; i < replicaNumber / 4; i++) {
                byte[] digest = md5(node.toString() + i);
                for (int h = 0; h < 4; h++) {
                    long m = hash(digest, h);
                    virtualNodes.put(m, node);
                }
            }
        }
    }

算法测试

以上详细介绍了一致性哈希(Consistent Hashing)的算法原理和实现过程,接下来给出一个测试结果:

以10个物理节点,160个虚拟节点,1000个数据对象做测试,10个物理节点时,这1000个数据对象映射结果如下:

减少一个节点前,path_7节点数据对象个数:113
减少一个节点前,path_0节点数据对象个数:84
减少一个节点前,path_6节点数据对象个数:97
减少一个节点前,path_8节点数据对象个数:122
减少一个节点前,path_3节点数据对象个数:102
减少一个节点前,path_2节点数据对象个数:99
减少一个节点前,path_4节点数据对象个数:98
减少一个节点前,path_9节点数据对象个数:102
减少一个节点前,path_1节点数据对象个数:99
减少一个节点前,path_5节点数据对象个数:84

减少一个物理节点path_9,此时9个物理节点,原有1000个数据对象映射情况如下:

减少一个节点后,path_7节点数据对象个数:132
减少一个节点后,path_6节点数据对象个数:107
减少一个节点后,path_0节点数据对象个数:117
减少一个节点后,path_8节点数据对象个数:134
减少一个节点后,path_3节点数据对象个数:104
减少一个节点后,path_4节点数据对象个数:104
减少一个节点后,path_2节点数据对象个数:115
减少一个节点后,path_5节点数据对象个数:89
减少一个节点后,path_1节点数据对象个数:98

先从数量上对比下每个物理节点上数据对象的个数变化:

减少一个节点后,path_7节点数据对象个数从113变为132
减少一个节点后,path_6节点数据对象个数从97变为107
减少一个节点后,path_0节点数据对象个数从84变为117
减少一个节点后,path_8节点数据对象个数从122变为134
减少一个节点后,path_3节点数据对象个数从102变为104
减少一个节点后,path_4节点数据对象个数从98变为104
减少一个节点后,path_2节点数据对象个数从99变为115
减少一个节点后,path_5节点数据对象个数从84变为89
减少一个节点后,path_1节点数据对象个数从99变为98

可以看到基本是均匀变化,现在逐个对比每个数据对象前后映射到的物理节点,发生变化的数据对象占比情况,统计如下:

数据对象迁移比率:0.9%

该结果基本体现出一致性哈希所能带来的最佳表现,尽可能地减少节点变动带来的数据迁移。

附Java完整代码

最后附上完整的算法代码,供大家参照。代码中数据对象是以Trigger抽象,可以调整成特定场景的,即可运行测试。

package com.cronx.core.common;

import com.cronx.core.entity.Trigger;

import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;

/**
 * Created by echov on 2018/1/9.
 */
public class ConsistentHash {

    private final TreeMap<Long, String> virtualNodes;

    private final int replicaNumber;

    private final int identityHashCode;

    private static ConsistentHash consistentHash;

    public ConsistentHash(List<String> nodes) {
        this.virtualNodes = new TreeMap<>();
        this.identityHashCode = identityHashCode(nodes);
        this.replicaNumber = 160;
        for (String node : nodes) {
            for (int i = 0; i < replicaNumber / 4; i++) {
                byte[] digest = md5(node.toString() + i);
                for (int h = 0; h < 4; h++) {
                    long m = hash(digest, h);
                    virtualNodes.put(m, node);
                }
            }
        }
    }

    private static int identityHashCode(List<String> nodes){
        Collections.sort(nodes);
        StringBuilder sb = new StringBuilder();
        for (String s: nodes
             ) {
            sb.append(s);
        }
        return sb.toString().hashCode();
    }

    public static String select(Trigger trigger, List<String> nodes) {
        int _identityHashCode = identityHashCode(nodes);
        if (consistentHash == null || consistentHash.identityHashCode != _identityHashCode) {
            synchronized (ConsistentHash.class) {
                if (consistentHash == null || consistentHash.identityHashCode != _identityHashCode) {
                    consistentHash = new ConsistentHash(nodes);
                }
            }
        }
        return consistentHash.select(trigger);
    }

    public String select(Trigger trigger) {
        String key = trigger.toString();
        byte[] digest = md5(key);
        String node = sekectForKey(hash(digest, 0));
        return node;
    }

    private String sekectForKey(long hash) {
        String node;
        Long key = hash;
        if (!virtualNodes.containsKey(key)) {
            SortedMap<Long, String> tailMap = virtualNodes.tailMap(key);
            if (tailMap.isEmpty()) {
                key = virtualNodes.firstKey();
            } else {
                key = tailMap.firstKey();
            }
        }
        node = virtualNodes.get(key);
        return node;
    }

    private long hash(byte[] digest, int number) {
        return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                | (digest[0 + number * 4] & 0xFF))
                & 0xFFFFFFFFL;
    }

    private byte[] md5(String value) {
        MessageDigest md5;
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        md5.reset();
        byte[] bytes;
        try {
            bytes = value.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        md5.update(bytes);
        return md5.digest();
    }

}

  转自:https://my.oschina.net/yaohonv/blog/1610096

原文地址:https://www.cnblogs.com/yaohonv/p/consistent_hashing.html

时间: 2024-10-12 03:15:56

深入一致性哈希(Consistent Hashing)算法原理,并附100行代码实现的相关文章

Go语言实现一致性哈希(Consistent Hashing)算法

一致性哈希可用于解决服务器均衡问题. 用Golang简单实现了下,并加入了权重.可采用合适的权重配合算法使用. package main //一致性哈希(Consistent Hashing) //author: Xiong Chuan Liang //date: 2015-2-20 import ( "fmt" "hash/crc32" "sort" "strconv" "sync" ) const DE

一致性哈希(consistent hashing)算法

文章同步发表在博主的网站朗度云,传输门:http://www.wolfbe.com/detail/201608/341.html 1.背景 我们都知道memcached服务器是不提供分布式功能的,memcached的分布式完全是由客户端来实现的.在部署memcached服务器集群时,我们需要把缓存请求尽可能分散到不同的缓存服务器中,这样可以使得所有的缓存空间都得到利用,而且可以降低单独一台缓存服务器的压力.     最简单的一种实现是,缓存请求时通过计算key的哈希值,取模后映射到不同的memc

用于KV集群的一致性哈希Consistent Hashing机制

KV集群的请求分发 假定N为后台服务节点数,当前台携带关键字key发起请求时,我们通常将key进行hash后采用模运算 hash(key)%N 来将请求分发到不同的节点上, 后台节点的增删会引起几乎所有key的重新映射, 这样会造成大量的数据迁移,如果数据量大的话会导致服务不可用. 一致性哈希机制 我倾向于称之为一致性哈希机制而不是算法, 因为这其实和算法没太大关系. 设计这种机制的目的是当节点增减时尽量减小重新映射的key的数量, 尽量将key还映射到原来的节点上. 而对于一致性哈希机制, 如

一致性哈希(Consistent Hashing)

传统来讲,数据的存储位置是通过hash(object)%N来计算的,这样造成的问题是如果新的机器添加进来或是某台机器down掉了,通过这种算法计算出来的存储位置会和以前的不同,造成了大量数据的迁移,如果有新的机器添加进来也会造成同样的问题,所以容错性和扩展性都不好.一致性哈希算法的主要目的是尽量减少数据的迁移. 一致性哈希假设有一个闭合圆环空间,上面有2**31个位置,数据通过特定的hash算法被分布到哈希圆环上.机器也通过特定的hash算法(输入值为机器的IP或是机器唯一的别名)放到圆环上,然

2016 -Nginx的负载均衡 - 一致性哈希 (Consistent Hash)

Nginx版本:1.9.1 算法介绍 当后端是缓存服务器时,经常使用一致性哈希算法来进行负载均衡. 使用一致性哈希的好处在于,增减集群的缓存服务器时,只有少量的缓存会失效,回源量较小. 在nginx+ats / haproxy+squid等CDN架构中,nginx/haproxy所使用的负载均衡算法便是一致性哈希. 我们举个例子来说明一致性哈希的好处. 假设后端集群包含三台缓存服务器,A.B.C. 请求r1.r2落在A上. 请求r3.r4落在B上. 请求r5.r6落在C上. 使用一致性哈希时,当

拓扑排序算法原理以及完整的C代码实现

拓扑排序定义 对一个有向无环图(Directed Acyclic Graph简称DAG)G进行拓扑排序,是将G中所有顶点排成一个线性序列,使得图中任意一对顶点u和v,若边(u,v)∈E(G),则u在线性序列中出现在v之前.通常,这样的线性序列称为满足拓扑次序(Topological Order)的序列,简称拓扑序列.简单的说,由某个集合上的一个偏序得到该集合上的一个全序,这个操作称之为拓扑排序. 基础知识 一个较大的工程往往被划分成许多子工程,我们把这些子工程称作活动(activity).在整个

_00013 一致性哈希算法 Consistent Hashing 探讨以及相应的新问题出现解决

一.业务场景 假如我们现在有12台Redis服务器(其它的什么东西也行),有很多User(用户)的数据数据从前端过来,然后往12台redis服务器上存储,在存储中就会出现一个问题,12台服务器,有可能其中几台Redis服务器上(简称集群A)存了很多的数据,然后另外几台Redis服务器(简称集群B)上存的数据很少,这样的话那 A 上的读写压力就会很大(当然,这个要看你的数据量的大小了,如果你数据量很小的话,基本无压力了,但是数据量很大,那就 ...),对于这样的问题,我们通常的解决办法是什么呢 ?

_00013 一致性哈希算法 Consistent Hashing 新的讨论,并出现相应的解决

笔者博文:妳那伊抹微笑 博客地址:http://blog.csdn.net/u012185296 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前.妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术 转载声明:能够转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作. qq交流群:214293307  idkey=bf80524ac3630cb09

一致性哈希算法原理设计

原文出处: 知致智之   欢迎分享原创到伯乐头条 一.前言 一致性哈希(Consistent Hashing),最早由MIT的Karger于1997年提出,主要用于解决易变的分布式Web系统中,由于宕机和扩容导致的服务震荡.现在这个算法思路被大量应用,并且在实践中得到了很大的发展. 二.算法设计 1.问题来源 一个由6台服务器组成的服务,每台Server负责存储1/6的数据,当Server1出现宕机之后,服务重新恢复可用时的场景. 如下表格可以很清楚的看到,当Server1宕机时,Hash1的服