简析一致性哈希算法及其实现

场景

业务中有N台redis缓存服务器 [0, N-1],那么对于任意一个关键字key的请求,我们都是做一个对key哈希值取模的请求分发。前台请求被分发到编号(hash(key)%N)这台服务器上。理想状态下,这样的缓存命中率是很高的。但当因为业务需求而增删服务节点数,例如添加一台服务器。那么原来被分配到k号服务器的现在会被分到(k-1)号服务器。这个时候几乎所有的缓存全部会被miss掉,造成极大的业务压力。

一致性哈希算法的目的就是,当后台增删节点时,旧的数据能够依旧计算到原来的服务器,而新数据能够按新的哈希算法分配到新服务器。

原理

一致性哈希算法把N个服务节点映射到一个环上。对每个服务节点的key值进行hash计算,获得一个长度为N的数组,再把数组首尾相连,获得一个环。我们对所有的请求节点key作hash计算,会落入该环的某一个弧段,我们可以设定每个弧段上的请求节点都以顺时针最近的Node作为服务节点。这样增加或减少服务节点,那影响的也只是一个弧段上的请求节点,而其他弧段上的请求节点则不受影响。

<<<<<<< HEAD

image-20181005001127885

image-20181005001127885

source

这实际上就是把映射环增大了。减少了数据映射关系的变动,不会像hash(i)%N那样带来全局的变动。

如果某一个弧段上的节点密度很大,导致单个服务节点过载,即热点数据问题,那我们可以在这个弧段上增加一个服务节点就可以分担压力。

改进

雪崩效应

上述的算法有一个弊端,就是如果某个服务节点因为过载而down掉,那么原本分配在它上面的负载都会被分配到顺时针下一台服务器上去,那么下一台服务器也会down掉,依次类推,所有的服务器最终都会挂掉。这就是雪崩效应。

为了解决雪崩效应,最好的办法就是,当一个服务节点挂掉的时候,分配在上面的流量被其他的服务节点共同分担。这样才能保证整个集群的可用性。因此,可以采用虚拟节点的方式来改进算法。

虚拟节点

image-20181005004151056

这里把每个服务器分为两个副本节点,每个副本节点负责一个更精细的环,如图可以得知,如果A挂掉了,那么原本分配给A的两个小环的流量会被D和C来分摊。实际上我们可以通过增加副本节点的数量来提高算法的均衡性。

实现代码

#! /usr/bin/env python
# -*- coding:utf8 -*-

import md5

class HashRing(object):
    def __init__(self, nodes=None, replicas=3):
        """
        创建一个hash环
        :nodes 
            节点 
        :replicas
            每个服务节点的副本数
        """

        self.replicas = replicas
        self.ring = {}
        self._sorted_keys = []
        if nodes:
            for node in nodes:
                self.add_node(node)

    def add_node(self, node):
        """
        把一个服务节点加入hash环
        """
        for i in range(0, self.replicas):
            key = self.gen_key("%s:%s" % (node, i))
            self.ring[key] = node
            self._sorted_keys.append(key)

        self._sorted_keys.sort()

    def remove_node(self, node):
        """
        从hash环删除一个服务节点
        """
        for i in range(0, self.replicas):
            key = self.gen_key("%s:%s" % (node, i))
            del self.ring[key]
            self._sorted_keys.remove(key)

    def get_node_keys(self, node):
        """
        获取一个服务节点的所有副本的虚拟key
        """
        keys = []
        for i in range(0, self.replicas):
            key = self.gen_key("%s:%s" % (node, i))
            keys.append(key)
        return key

    def get_node(self, string_key):
        """
        对于给定的key 获取对应的服务节点,返回节点
        """
        return self.get_node_pos(string_key)[0]

    def get_node_pos(self, string_key):
        """
        对于给定的key 获取对应的服务节点,返回节点和位置
        """
        if not self.ring:
            return None, None

        key = self.gen_key(string_key)
        nodes = self._sorted_keys
        for i in range(0, len(nodes)):
            node = nodes[i]
            if key <= node:
                return self.ring[node], i

        return self.ring[nodes[0]], 0

    def get_nodes(self, string_key):
        """
        对于一个给定的key ,返回一个
        Given a string key it returns the nodes as a generator that can hold the key.
        The generator is never ending and iterates through the ring starting at the correct position.
        """
        if not self.ring:
            yield None

        node, pos = self.get_node_pos(string_key)
        for key in self._sorted_keys:
            yield self.ring[key]

    def gen_key(self, key):
        """
        以md5算法来进行hash(key)运算
        """
        m = md5.new()
        m.update(key)
        return long(m.hexdigest(), 16)