缘起
一致性hash也算是接触到过很多回了,不过一直没有自己真正去实现过,今天在一致性hash在分布式系统中的应用 看到了一个简单的Python版本实现,正好这两天在学习scala,尝试着用scala实现了一版。
一致性hash
分布式系统
一致性hash一般都是在分布式系统中应用,分布式系统的目的之二就是提高系统可用性和容量,可用性要求我们在机器挂掉之后能快速恢复,容量要求我们要用多台机器来存储数据,一致性hash提供了一种思想,一种如何将请求,数据均匀分布到各台机器上的一种思想。
使用场景
- KV系统
- 缓存系统
- 负载均衡系统
原理
先来看一张图
上面这张图是一致性hash的简单描述,这是一个有2^32个位置的圆,每个node根据ID(可以是主机名也可以是ip)分布到环上,在数据过来之后,将数据取hash值,向后找到离自己最近的那个节点,就由这个节点来提供具体的服务了。
在节点数较少的时候,因为node也是通过计算hash来确认自己在环上的位置,想要保证均匀分布还是很困难的。我们可以通过加入虚拟节点来保证,每个节点有n个replica,这样可以做到相对均匀。
原文是使用Python实现的,在这我使用scala实现一下,也算是动手实践一下scala编程。
代码语言:javascript复制注:不要使用Object的hashCode方法,在只有i变化时,生成的hash也是连续的,我使用md5摘要后做了一下变通
import java.security.MessageDigest
import scala.collection.mutable
/**
* Created by liufengquan on 2018/4/13.
*/
class ConsistentHash(virtualNodes: Int) {
val servers: mutable.Set[String] = mutable.Set()
/** store virtual node's hash */
private var hashRing = List[Int]()
/** virtual node's hash -> node identity*/
private var hashServerMap = mutable.Map[Int, String]()
/**
* put a node, that is a server in pool
* @param ip
*/
def addNode(ip: String): Unit = {
servers = ip
(1 to virtualNodes).foreach(i => {
val key = getKeyHash(ip ":" i)
hashRing = key :: hashRing
hashServerMap = key -> ip
})
hashRing = hashRing.sorted
}
/**
* remove a node
* @param ip
*/
def removeNode(ip: String): Unit = {
servers -= ip
(1 to virtualNodes).foreach(i => {
val key = getKeyHash(ip "i")
hashRing = hashRing.filter(_ != key)
hashServerMap = hashServerMap.filter(_._1 != key)
})
}
/**
* get a node according to a key
* @param key
* @return
*/
def getNode(key: String): String = {
require(servers != null && servers.nonEmpty)
val hash = getKeyHash(key)
val avail = hashRing.filter(_ > hash)
if (avail.nonEmpty) hashServerMap(avail.head)
else hashServerMap(hashRing.head)
}
def getServers: String = servers mkString ":"
/**
* naive hashCode has bad randomness,
* use md5 and do a little exchange
* @param key
* @return
*/
private def getKeyHash(key: String): Int = {
val digest = MessageDigest.getInstance("MD5")
val bytes = digest.digest(key.getBytes("UTF-8"))
bytes.map(_.toInt).reduceLeft(_ * 11 _)
}
}
object ConsistentHash {
val servers = List("10.10.187.1", "10.10.187.2", "10.10.187.3", "10.10.187.4", "10.10.187.5")
def consistentHashTest(replica: Int): Unit = {
val consistentHash = new ConsistentHash(replica)
var map = mutable.Map[String, List[String]]()
val text = """The following examples show some safe operations to drop or change columns. Dropping the final column in a table lets Impala ignore the data causing any disruption to existing data files. Changing the type of a column works if existing data values can be safely converted to the new type. The type conversion rules depend on the file format of the underlying table. For example, in a text table, the same value can be interpreted as a STRING or a numeric value, while in a binary format such as Parquet, the rules are stricter and type conversions only work between certain sizes of integers."""
servers.foreach(consistentHash.addNode)
servers.foreach(map = _ -> List[String]())
text.split(" ").foreach(s => {
val server = consistentHash.getNode(s)
map(server) = s :: map(server)
})
println(map mkString "n")
}
def main(args: Array[String]): Unit = {
require(args.length > 0)
consistentHashTest(args(0).toInt)
}
}