一、背景介绍
memcached的分布式
memcached虽然称为“分布式”缓存服务器,但服务器端并没有“分布式”功能。服务器端内存存储功能,其实现非常简单。至于memcached的分布式,则是完全由客户端程序库实现的。这种分布式是memcached的最大特点。分布的原则是由client端的api来决定的,api根据存储用的key以及已知的服务器列表,根据key的hash计算将指定的key存储到对应的服务器列表上。
memcached的分布式是什么意思?
这里多次使用了“分布式”这个词,但并未做详细解释。现在开始简单地介绍一下其原理,各个客户端的实现基本相同。
下面假设memcached服务器有node1~node3三台,应用程序要保存键名为“tokyo”“kanagawa”“chiba”“saitama”“gunma” 的数据。
图1 分布式简介:准备
首先向memcached中添加“tokyo”。将“tokyo”传给客户端程序库后,客户端实现的算法就会根据“键”来决定保存数据的memcached服务器。服务器选定后,即命令它保存“tokyo”及其值。
图2 分布式简介:添加时
同样,“kanagawa”“chiba”“saitama”“gunma”都是先选择服务器再保存。
接下来获取保存的数据。获取时也要将要获取的键“tokyo”传递给函数库。函数库通过与数据保存时相同的算法,根据“键”选择服务器。使用的算法相同,就能选中与保存时相同的服务器,然后发送get命令。只要数据没有因为某些原因被删除,就能获得保存的值。
图3 分布式简介:获取时
这样,将不同的键保存到不同的服务器上,就实现了memcached的分布式。 memcached服务器增多后,键就会分散,即使一台memcached服务器发生故障无法连接,也不会影响其他的缓存,系统依然能继续运行。
问题:
在这里我们通常使用的方法是根据 key的hash值%服务器数取余数 的方法来决定当前这个key的内容发往哪一个服务器的。这里会涉及到一个hash算法的分布问题,哈希的原理用一句话解释就是两个集合间的映射关系函数,在我们通常的应用中基本上可以理解为 在集合A(任意字母数字等组合,此处为存储用的key)里的一条记录去查找集合B(如0-2^32)中的对应记录。
服务实例本身发生变动的时候,导致服务列表变动从而会照成大量的cache数据请求会miss,几乎大部分数据会需要迁移到另外的服务实例上。这样在大型服务在线时,瞬时对后端数据库/硬盘照成的压力很可能导致整个服务的crash。
二、基本原理
Consistent Hashing的简单说明
Consistent Hashing如下所示:首先求出memcached服务器(节点)的哈希值,并将其配置到0~232的圆(continuum)上。然后用同样的方法求出存储数据的键的哈希值,并映射到圆上。然后从数据映射到的位置开始顺时针查找,将数据保存到找到的第一个服务器上。如果超过232仍然找不到服务器,就会保存到第一台memcached服务器上。
图4 Consistent Hashing:基本原理
从上图的状态中添加一台memcached服务器。余数分布式算法由于保存键的服务器会发生巨大变化而影响缓存的命中率,但Consistent Hashing中,只有在continuum上增加服务器的地点逆时针方向的第一台服务器上的键会受到影响。
图5 Consistent Hashing:添加服务器
因此,Consistent Hashing最大限度地抑制了键的重新分布。而且,有的Consistent Hashing的实现方法还采用了虚拟节点的思想。使用一般的hash函数的话,服务器的映射地点的分布非常不均匀。因此,使用虚拟节点的思想,为每个物理节点(服务器)在continuum上分配100~200个点。这样就能抑制分布不均匀,最大限度地减小服务器增减时的缓存重新分布。
通过下文中介绍的使用Consistent Hashing算法的memcached客户端函数库进行测试的结果是,由服务器台数(n)和增加的服务器台数(m)计算增加服务器后的命中率计算公式如下:
(1 - n/(n m)) * 100
三、hash函数的选择
在memcached的实际应用,虽然官方的版本并不支持Consistent Hashing,但是已经有了现实的Consistent Hashing实现以及虚节点的实现,第一个实现的是last.fm(国外流行的音乐平台)开发的libketama, 其中调用的hash的部分的java版本的实现(基于md5)
代码语言:javascript复制/**
* Calculates the ketama hash value for a string
* @param s
* @return
*/
public static Long md5HashingAlg(String key) {
if(md5==null) {
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
log.error( " no md5 algorythm found" );
throw new IllegalStateException( " no md5 algorythm found");
}
}
md5.reset();
md5.update(key.getBytes());
byte[] bKey = md5.digest();
long res = ((long)(bKey[3]&0xFF) << 24) | ((long)(bKey[2]&0xFF) << 16) | ((long)(bKey[1]&0xFF) << 8) | (long)(bKey[0]&0xFF);
return res;
}
四、程序实现
下面就是实现了:核心有两点,一是虚拟节点问题,一是查找时注意return 第一个node的情况;
Java的代码实现
代码语言:javascript复制import java.util.Collection;
import java.util.SortedMap;
import java.util.TreeMap;
public class ConsistentHash<T> {
private final HashFunction hashFunction;
private final int numberOfReplicas;
private final SortedMap<Integer, T> circle = new TreeMap<Integer, T>();
public ConsistentHash(HashFunction hashFunction,
int numberOfReplicas, Collection<T> nodes) {
this.hashFunction = hashFunction;
this.numberOfReplicas = numberOfReplicas;
for (T node : nodes) {
add(node);
}
}
public void add(T node) {
for (int i = 0; i < numberOfReplicas; i ) {
circle.put(hashFunction.hash(node.toString() ":" i),
node);
}
}
public void remove(T node) {
for (int i = 0; i < numberOfReplicas; i ) {
circle.remove(hashFunction.hash(node.toString() ":" i));
}
}
public T get(Object key) {
if (circle.isEmpty()) {
return null;
}
int hash = hashFunction.hash(key);
SortedMap<Integer, T> tailMap =
circle.tailMap(hash);
hash = tailMap.isEmpty() ?
circle.firstKey() : tailMap.firstKey();
return circle.get(hash);
}
}
Python代码的实现
代码语言:javascript复制import md5
class HashRing(object):
def __init__(self, nodes=None, replicas=3):
"""Manages a hash ring.
`nodes` is a list of objects that have a proper __str__ representation.
`replicas` indicates how many virtual points should be used pr. node,
replicas are required to improve the distribution.
"""
self.replicas = replicas
self.ring = dict()
self._sorted_keys = []
if nodes:
for node in nodes:
self.add_node(node)
def add_node(self, node):
"""Adds a `node` to the hash ring (including a number of replicas).
"""
for i in xrange(0, self.replicas):
key = self.gen_key('%s:%s' % (node, i))
self.ring[key] = node
self._sorted_keys.append(key)
self._sorted_keys.sort()
def remove_node(self, node):
"""Removes `node` from the hash ring and its replicas.
"""
for i in xrange(0, self.replicas):
key = self.gen_key('%s:%s' % (node, i))
del self.ring[key]
self._sorted_keys.remove(key)
def get_node(self, string_key):
"""Given a string key a corresponding node in the hash ring is returned.
If the hash ring is empty, `None` is returned.
"""
return self.get_node_pos(string_key)[0]
def get_node_pos(self, string_key):
"""Given a string key a corresponding node in the hash ring is returned
along with it's position in the ring.
If the hash ring is empty, (`None`, `None`) is returned.
"""
if not self.ring:
return None, None
key = self.gen_key(string_key)
nodes = self._sorted_keys
for i in xrange(0, len(nodes)):
node = nodes[i]
if key <= node:
return self.ring[node], i
return self.ring[nodes[0]], 0
def get_nodes(self, string_key):
"""Given a string key it returns the nodes as a generator that can hold the key.
The generator is never ending and iterates through the ring
starting at the correct position.
"""
if not self.ring:
yield None, None
node, pos = self.get_node_pos(string_key)
for key in self._sorted_keys[pos:]:
yield self.ring[key]
while True:
for key in self._sorted_keys:
yield self.ring[key]
def gen_key(self, key):
"""Given a string key it returns a long value,
this long value represents a place on the hash ring.
md5 is currently used because it mixes well.
"""
m = md5.new()
m.update(key)
return long(m.hexdigest(), 16)