一致性hash实现

2019-07-01 11:44:14 浏览数 (1)

缘起

一致性hash也算是接触到过很多回了,不过一直没有自己真正去实现过,今天在一致性hash在分布式系统中的应用 看到了一个简单的Python版本实现,正好这两天在学习scala,尝试着用scala实现了一版。

一致性hash

分布式系统

一致性hash一般都是在分布式系统中应用,分布式系统的目的之二就是提高系统可用性和容量,可用性要求我们在机器挂掉之后能快速恢复,容量要求我们要用多台机器来存储数据,一致性hash提供了一种思想,一种如何将请求,数据均匀分布到各台机器上的一种思想。

使用场景

  • KV系统
  • 缓存系统
  • 负载均衡系统

原理

先来看一张图

上面这张图是一致性hash的简单描述,这是一个有2^32个位置的圆,每个node根据ID(可以是主机名也可以是ip)分布到环上,在数据过来之后,将数据取hash值,向后找到离自己最近的那个节点,就由这个节点来提供具体的服务了。

在节点数较少的时候,因为node也是通过计算hash来确认自己在环上的位置,想要保证均匀分布还是很困难的。我们可以通过加入虚拟节点来保证,每个节点有n个replica,这样可以做到相对均匀。

原文是使用Python实现的,在这我使用scala实现一下,也算是动手实践一下scala编程。

注:不要使用Object的hashCode方法,在只有i变化时,生成的hash也是连续的,我使用md5摘要后做了一下变通

代码语言:javascript复制
import java.security.MessageDigest

import scala.collection.mutable

/**
  * Created by liufengquan on 2018/4/13.
  */
class ConsistentHash(virtualNodes: Int) {
  val servers: mutable.Set[String] = mutable.Set()

  /** store virtual node's hash */
  private var hashRing = List[Int]()
  /** virtual node's hash -> node identity*/
  private var hashServerMap = mutable.Map[Int, String]()

  /**
    * put a node, that is a server in pool
    * @param ip
    */
  def addNode(ip: String): Unit = {
    servers  = ip
    (1 to virtualNodes).foreach(i => {
      val key = getKeyHash(ip   ":"   i)
      hashRing = key :: hashRing
      hashServerMap  = key -> ip
    })

    hashRing = hashRing.sorted
  }

  /**
    * remove a node
    * @param ip
    */
  def removeNode(ip: String): Unit = {
    servers -= ip
    (1 to virtualNodes).foreach(i => {
      val key = getKeyHash(ip   "i")
      hashRing = hashRing.filter(_ != key)
      hashServerMap = hashServerMap.filter(_._1 != key)
    })
  }

  /**
    * get a node according to a key
    * @param key
    * @return
    */
  def getNode(key: String): String = {
    require(servers != null && servers.nonEmpty)

    val hash = getKeyHash(key)
    val avail = hashRing.filter(_ > hash)
    if (avail.nonEmpty) hashServerMap(avail.head)
    else hashServerMap(hashRing.head)
  }

  def getServers: String = servers mkString ":"

  /**
    * naive hashCode has bad randomness,
    * use md5 and do a little exchange
    * @param key
    * @return
    */
  private def getKeyHash(key: String): Int = {
    val digest = MessageDigest.getInstance("MD5")

    val bytes = digest.digest(key.getBytes("UTF-8"))

    bytes.map(_.toInt).reduceLeft(_ * 11   _)
  }
}

object ConsistentHash {
  val servers = List("10.10.187.1", "10.10.187.2", "10.10.187.3", "10.10.187.4", "10.10.187.5")

  def consistentHashTest(replica: Int): Unit = {
    val consistentHash = new ConsistentHash(replica)

    var map = mutable.Map[String, List[String]]()
    val text = """The following examples show some safe operations to drop or change columns. Dropping the final column in a table lets Impala ignore the data causing any disruption to existing data files. Changing the type of a column works if existing data values can be safely converted to the new type. The type conversion rules depend on the file format of the underlying table. For example, in a text table, the same value can be interpreted as a STRING or a numeric value, while in a binary format such as Parquet, the rules are stricter and type conversions only work between certain sizes of integers."""

    servers.foreach(consistentHash.addNode)

    servers.foreach(map  = _ -> List[String]())

    text.split(" ").foreach(s => {
      val server = consistentHash.getNode(s)
      map(server) = s :: map(server)
    })

    println(map mkString "n")
  }

  def main(args: Array[String]): Unit = {
    require(args.length > 0)
    consistentHashTest(args(0).toInt)
  }
}

0 人点赞