用Python实现一个LRU缓存,不使用堆或树
译:《This is not interview advice: a priority-expiry LRU cache without heaps or trees in Python》 《这不是面试建议:在Python中实现的无堆或树的优先级到期LRU缓存》 原文地址:https://death.andgravity.com/lru-cache
我们将要Python标准库实现一个LRU(least recently used)
缓存,具有优先级
和到期时间
。这是一个常见的面食
问题,但我们将远离数据结构——没有堆、没有二叉树。总之,我们会得到一个可用的方案。
要求
是的,你正在面试,你需要实现一个优先级、有过期时间的缓存(priority-expiry LRU cache)。 缓存是一种存储数据的方式,通过将耗时操作的结果保存起来,下次直接重用。缓存空间有限,不能无限制地存储数据。需要清理缓存。优先级、过期则暗示了缓存中的物品有优先级和过期时间:
- • 当物品过期后,我们不再保存在缓存中
- • 如果缓存满了,我们有限清理优先级低的物品,保存优先级高的物品。
- • 其它条件相同时,我们会清理最近最少使用(LRU)的物品。
我们至少需要读/写缓存的方法:
- • set(key, value, maxage, priority)
- • get(key) -> value or None
同时应该还有用于清理缓存的方法:
- • delete(key)
- • evict(now)
最小可行方案
我们先实现一个最简单的缓存,只实现写、读两个方法(set
, get
)。
我们使用字典实现缓存的读写,并用Item类保存额外的信息(过期时间、优先级)。在写入的过程中,我们需要处理缓存满了的情况,因此需要实现清理元素的方法evict
。
import time
from typing import NamedTuple
class Item(NamedTuple):
key: object
value: object
expires: int
priority: int
class Cache:
def __init__(self, maxsize, time=time.monotonic):
self.maxsize = maxsize
self.time = time
self.cache = {}
def set(self, key, value, *, maxage=10, priority=0):
now = self.time()
if key in self.cache:
self.cache.pop(key)
elif len(self.cache) >= self.maxsize:
self.evict(now)
expires = now maxage
item = Item(key, value, expires, priority)
self.cache[key] = item
def get(self, key):
item = self.cache.get(key)
if not item:
return None
if self.time() >= item.expires:
return None
return item.value
def evict(self, now):
if not self.cache:
return
key = next(iter(self.cache))
del self.cache[key]
我们写了一个测试验证set
和get
方法:
class FakeTime:
def __init__(self, now=0):
self.now = now
def __call__(self):
return self.now
def test_basic():
cache = Cache(2, FakeTime())
assert cache.get('a') == None
cache.set('a', 'A')
assert cache.get('a') == 'A'
cache.set('b', 'B')
assert cache.get('a') == 'A'
assert cache.get('b') == 'B'
cache.set('c', 'C')
assert cache.get('a') == None
assert cache.get('b') == 'B'
assert cache.get('c') == 'C'
问题:过期的物品应该先走
我们需要记录一下物品的时间,并且优先清理过期的物品。一个优先级队列可以实现这个需求。
代码语言:javascript复制def __init__(self, maxsize, time=time.monotonic):
...
self.cache = {}
self.expires = PriorityQueue()
在set
中,将物品的过期时间保存在优先级队列中:
def set(self, key, value, *, maxage=10, priority=0):
now = self.time()
if key in self.cache:
item = self.cache.pop(key)
self.expires.remove((item.expires, key))
del item
elif len(self.cache) >= self.maxsize:
self.evict(now)
expires = now maxage
item = Item(key, value, expires, priority)
self.cache[key] = item
self.expires.push((expires, key))
在evict
方法中,我们会先清空所有过期的物品;如果没有过期的,我们会清理掉一个最接近过期的:
def evict(self, now):
if not self.cache:
return
initial_size = len(self.cache)
# 先清空所有过期的物品
while self.cache:
expires, key = self.expires.peek()
if expires > now:
break
self.expires.pop()
del self.cache[key]
# 如果没有过期的,清理掉一个最接近过期的
if len(self.cache) == initial_size:
_, key = self.expires.pop()
del self.cache[key]
问题:未定义名称 PriorityQueue
PriorityQueue
尚未实现,如果你尝试运行上面的代码将会得到报错。下面我们使用列表和sort
实现了一个优先级队列PriorityQueue
。
代码语言:javascript复制这个实现效率很低,但我们先不考虑效率。
import pytest
class PriorityQueue:
def __init__(self):
self.data = []
def push(self, item):
self.data.append(item)
self.data.sort()
def peek(self):
return self.data[0]
def pop(self):
rv = self.data[0]
self.data[:1] = []
return rv
def remove(self, item):
self.data.remove(item)
def __bool__(self):
return bool(self.data)
测试优先级队列:
代码语言:javascript复制def test_priority_queue():
pq = PriorityQueue()
pq.push(1)
pq.push(3)
pq.push(2)
assert pq
assert pq.peek() == 1
assert pq.pop() == 1
assert pq.peek() == 2
assert pq.remove(3) is None
assert pq
assert pq.peek() == 2
with pytest.raises(ValueError):
pq.remove(3)
assert pq.pop() == 2
assert not pq
with pytest.raises(IndexError):
pq.peek()
with pytest.raises(IndexError):
pq.pop()
问题:按优先级清理
我们希望清理缓存是优先级低的缓存先被清理。为此在 __init__()
中,为优先级添加另一个优先级队列:
self.cache = {}
self.expires = PriorityQueue()
self.priorities = PriorityQueue()
在set()
中,将新物品添加到优先级队列:
self.cache[key] = item
self.expires.push((expires, key))
self.priorities.push((priority, key))
并且删除已缓存物品:
代码语言:javascript复制if key in self.cache:
item = self.cache.pop(key)
self.expires.remove((item.expires, key))
self.priorities.remove((item.priority, key))
在evict()
中,从优先级队列中删除过期的物品:
self.expires.pop()
item = self.cache.pop(key)
self.priorities.remove((item.priority, key))
最后,如果没有过期,删除优先级最低的那个:
代码语言:javascript复制if len(self.cache) == initial_size:
_, key = self.priorities.pop()
item = self.cache.pop(key)
self.expires.remove((item.expires, key))
测试优先级功能:
代码语言:javascript复制def test_priorities():
cache = Cache(2, FakeTime())
cache.set('a', 'A', priority=1)
cache.set('b', 'B', priority=0)
assert cache.get('a') == 'A'
assert cache.get('b') == 'B'
cache.set('c', 'C')
assert cache.get('a') == 'A'
assert cache.get('b') == None
assert cache.get('c') == 'C'
def test_update_priorities():
cache = Cache(2, FakeTime())
cache.set('a', 'A', priority=1)
cache.set('b', 'B', priority=0)
cache.set('b', 'Y', priority=2)
cache.set('c', 'C')
assert cache.get('a') == None
assert cache.get('b') == 'Y'
assert cache.get('c') == 'C'
问题:我们在三个地方删除Item
我们每次删除Item时,都要在cache, expires, priorities三个地方删除,这样很容易不一致。为了保持一致性,我们添加一个delete
方法:
def delete(self, key):
*_, expires, priority = self.cache.pop(key)
self.expires.remove((expires, key))
self.priorities.remove((priority, key))
set()
中删除已缓存的物品缩短为:
if key in self.cache:
self.delete(key)
同样,evict()
中:
while self.cache:
expires, key = self.expires.peek()
if expires > now:
break
self.delete(key)
if len(self.cache) == initial_size:
_, key = self.priorities.peek()
self.delete(key)
问题:最近最少使用
那么如何实现最近最少使用呢?
其它条件相同时(过期时间,优先级级别),我们会清理最近最少使用(LRU)的物品。
我们可以参考现有的实现functools.lru_cache()
。
lru_cache()
委托给 lru_cache_wrapper()
,后者设置了一堆可供嵌套函数使用的变量。变量中有一个cache字典和一个双向链表,其中节点是 [prev, next, key, value]
列表。
这就是答案——双链表允许在O(1)中使用跟踪物品:每次使用节点时,将其从当前位置删除并将其放在“最近使用”的一端;另一端的任何东西都将是最近使用最少的物品。请注意,我们需要为每个优先级提供一个双向链表。
在lru_cache
文档的下面,我们看到了OrderedDict
。
OrderedDict 算法可以比 dict 更好地处理频繁的重新排序操作。OrderedDict 有一种 move_to_end() 方法可以有效地将元素重新定位到端点。内部实现是一个双向链表
看来我们可以用move_to_end()
实现我们的需求。因此,我们需要为每个优先级创建一个 OrderedDict
(读作:双链表:) ),但仍然需要跟踪最低优先级:
在__init__
中:
self.cache = {}
self.expires = PriorityQueue()
self.priority_buckets = {} #
self.priority_order = PriorityQueue() # 记录最低优先级。
set()
中处理优先级的操作变得有点复杂。我们为每个优先级创建一个OrderedDict
(如果没有),并将当前key
记录在OrderedDict
中:
priority_bucket = self.priority_buckets.get(priority)
if not priority_bucket:
priority_bucket = self.priority_buckets[priority] = OrderedDict()
self.priority_order.push(priority)
priority_bucket[key] = None # 不需要存储值。只要记录key。
现在我们终于可以在evict()
中删除最近使用最少的物品:
if len(self.cache) == initial_size:
priority = self.priority_order.peek()
priority_bucket = self.priority_buckets.get(priority)
key = next(iter(priority_bucket)) # 最久没访问的物品
self.delete(key)
在delete()
中 ,我们小心翼翼地摆脱了空桶:
priority_bucket = self.priority_buckets[priority]
del priority_bucket[key]
if not priority_bucket:
del self.priority_buckets[priority]
self.priority_order.remove(priority)
现有测试再次通过,我们可以添加一个新的测试:
代码语言:javascript复制def test_lru():
cache = Cache(2, FakeTime())
# 得到一个容量为2的缓存。
cache.set('a', 'A')
cache.set('b', 'B')
# cache.priority_buckets {0: ['a', 'b']}
cache.get('a') == 'A' # ['b', 'a']
cache.set('c', 'C') # ['a', 'c']
assert cache.get('a') == 'A'
assert cache.get('b') == None
assert cache.get('c') == 'C'
这个测试失败了,因为我们没有在get
时修改最近访问字典。要使它通过,只需在get()
的return之前调用move_to_end()
(将最近访问的key移到最后):
self.priority_buckets[item.priority].move_to_end(key)
return item.value