用Python实现一个LRU缓存,不使用堆或树

2024-02-22 15:55:14 浏览数 (1)

用Python实现一个LRU缓存,不使用堆或树

译:《This is not interview advice: a priority-expiry LRU cache without heaps or trees in Python》 《这不是面试建议:在Python中实现的无堆或树的优先级到期LRU缓存》 原文地址:https://death.andgravity.com/lru-cache

我们将要Python标准库实现一个LRU(least recently used)缓存,具有优先级到期时间。这是一个常见的面食问题,但我们将远离数据结构——没有堆、没有二叉树。总之,我们会得到一个可用的方案。

要求

是的,你正在面试,你需要实现一个优先级、有过期时间的缓存(priority-expiry LRU cache)。 缓存是一种存储数据的方式,通过将耗时操作的结果保存起来,下次直接重用。缓存空间有限,不能无限制地存储数据。需要清理缓存。优先级、过期则暗示了缓存中的物品有优先级和过期时间:

  • • 当物品过期后,我们不再保存在缓存中
  • • 如果缓存满了,我们有限清理优先级低的物品,保存优先级高的物品。
  • • 其它条件相同时,我们会清理最近最少使用(LRU)的物品。

我们至少需要读/写缓存的方法:

  • • set(key, value, maxage, priority)
  • • get(key) -> value or None

同时应该还有用于清理缓存的方法:

  • • delete(key)
  • • evict(now)

最小可行方案

我们先实现一个最简单的缓存,只实现写、读两个方法(set, get)。

我们使用字典实现缓存的读写,并用Item类保存额外的信息(过期时间、优先级)。在写入的过程中,我们需要处理缓存满了的情况,因此需要实现清理元素的方法evict

代码语言:javascript复制
import time
from typing import NamedTuple


class Item(NamedTuple):
    key: object
    value: object
    expires: int
    priority: int


class Cache:
    def __init__(self, maxsize, time=time.monotonic):
        self.maxsize = maxsize
        self.time = time

        self.cache = {}

    def set(self, key, value, *, maxage=10, priority=0):
        now = self.time()

        if key in self.cache:
            self.cache.pop(key)
        elif len(self.cache) >= self.maxsize:
            self.evict(now)

        expires = now   maxage
        item = Item(key, value, expires, priority)
        self.cache[key] = item

    def get(self, key):
        item = self.cache.get(key)
        if not item:
            return None

        if self.time() >= item.expires:
            return None

        return item.value

    def evict(self, now):
        if not self.cache:
            return

        key = next(iter(self.cache))
        del self.cache[key]

我们写了一个测试验证setget方法:

代码语言:javascript复制
class FakeTime:
    def __init__(self, now=0):
        self.now = now

    def __call__(self):
        return self.now


def test_basic():
    cache = Cache(2, FakeTime())

    assert cache.get('a') == None

    cache.set('a', 'A')
    assert cache.get('a') == 'A'

    cache.set('b', 'B')
    assert cache.get('a') == 'A'
    assert cache.get('b') == 'B'

    cache.set('c', 'C')
    assert cache.get('a') == None
    assert cache.get('b') == 'B'
    assert cache.get('c') == 'C'

问题:过期的物品应该先走

我们需要记录一下物品的时间,并且优先清理过期的物品。一个优先级队列可以实现这个需求。

代码语言:javascript复制
def __init__(self, maxsize, time=time.monotonic):
    ...
    self.cache = {}
    self.expires = PriorityQueue()

set中,将物品的过期时间保存在优先级队列中:

代码语言:javascript复制
def set(self, key, value, *, maxage=10, priority=0):
    now = self.time()

    if key in self.cache:
        item = self.cache.pop(key)
        self.expires.remove((item.expires, key))
        del item
    elif len(self.cache) >= self.maxsize:
        self.evict(now)

    expires = now   maxage
    item = Item(key, value, expires, priority)
    self.cache[key] = item
    self.expires.push((expires, key))

evict方法中,我们会先清空所有过期的物品;如果没有过期的,我们会清理掉一个最接近过期的:

代码语言:javascript复制
def evict(self, now):
    if not self.cache:
        return

    initial_size = len(self.cache)
    # 先清空所有过期的物品
    while self.cache:
        expires, key = self.expires.peek()
        if expires > now:
            break
        self.expires.pop()
        del self.cache[key]

    # 如果没有过期的,清理掉一个最接近过期的
    if len(self.cache) == initial_size:
        _, key = self.expires.pop()
        del self.cache[key]

问题:未定义名称 PriorityQueue

PriorityQueue尚未实现,如果你尝试运行上面的代码将会得到报错。下面我们使用列表和sort实现了一个优先级队列PriorityQueue

这个实现效率很低,但我们先不考虑效率。

代码语言:javascript复制
import pytest

class PriorityQueue:
    def __init__(self):
        self.data = []

    def push(self, item):
        self.data.append(item)
        self.data.sort()

    def peek(self):
        return self.data[0]

    def pop(self):
        rv = self.data[0]
        self.data[:1] = []
        return rv

    def remove(self, item):
        self.data.remove(item)

    def __bool__(self):
        return bool(self.data)

测试优先级队列:

代码语言:javascript复制
def test_priority_queue():
    pq = PriorityQueue()
    pq.push(1)
    pq.push(3)
    pq.push(2)

    assert pq
    assert pq.peek() == 1
    assert pq.pop() == 1
    assert pq.peek() == 2

    assert pq.remove(3) is None

    assert pq
    assert pq.peek() == 2
    with pytest.raises(ValueError):
        pq.remove(3)

    assert pq.pop() == 2

    assert not pq
    with pytest.raises(IndexError):
        pq.peek()
    with pytest.raises(IndexError):
        pq.pop()

问题:按优先级清理

我们希望清理缓存是优先级低的缓存先被清理。为此在 __init__() 中,为优先级添加另一个优先级队列:

代码语言:javascript复制
self.cache = {}
self.expires = PriorityQueue()
self.priorities = PriorityQueue()

set()中,将新物品添加到优先级队列:

代码语言:javascript复制
self.cache[key] = item
self.expires.push((expires, key))
self.priorities.push((priority, key))

并且删除已缓存物品:

代码语言:javascript复制
if key in self.cache:
    item = self.cache.pop(key)
    self.expires.remove((item.expires, key))
    self.priorities.remove((item.priority, key))

evict()中,从优先级队列中删除过期的物品:

代码语言:javascript复制
self.expires.pop()
item = self.cache.pop(key)
self.priorities.remove((item.priority, key))

最后,如果没有过期,删除优先级最低的那个:

代码语言:javascript复制
if len(self.cache) == initial_size:
    _, key = self.priorities.pop()
    item = self.cache.pop(key)
    self.expires.remove((item.expires, key))

测试优先级功能:

代码语言:javascript复制
def test_priorities():
    cache = Cache(2, FakeTime())

    cache.set('a', 'A', priority=1)
    cache.set('b', 'B', priority=0)
    assert cache.get('a') == 'A'
    assert cache.get('b') == 'B'

    cache.set('c', 'C')
    assert cache.get('a') == 'A'
    assert cache.get('b') == None
    assert cache.get('c') == 'C'

def test_update_priorities():
    cache = Cache(2, FakeTime())

    cache.set('a', 'A', priority=1)
    cache.set('b', 'B', priority=0)
    cache.set('b', 'Y', priority=2)

    cache.set('c', 'C')
    assert cache.get('a') == None
    assert cache.get('b') == 'Y'
    assert cache.get('c') == 'C'

问题:我们在三个地方删除Item

我们每次删除Item时,都要在cache, expires, priorities三个地方删除,这样很容易不一致。为了保持一致性,我们添加一个delete方法:

代码语言:javascript复制
def delete(self, key):
    *_, expires, priority = self.cache.pop(key)
    self.expires.remove((expires, key))
    self.priorities.remove((priority, key))

set()中删除已缓存的物品缩短为:

代码语言:javascript复制
if key in self.cache:
    self.delete(key)

同样,evict()中:

代码语言:javascript复制
while self.cache:
    expires, key = self.expires.peek()
    if expires > now:
        break
    self.delete(key)

    if len(self.cache) == initial_size:
        _, key = self.priorities.peek()
        self.delete(key)

问题:最近最少使用

那么如何实现最近最少使用呢?

其它条件相同时(过期时间,优先级级别),我们会清理最近最少使用(LRU)的物品。

我们可以参考现有的实现functools.lru_cache()lru_cache()委托给 lru_cache_wrapper(),后者设置了一堆可供嵌套函数使用的变量。变量中有一个cache字典和一个双向链表,其中节点是 [prev, next, key, value] 列表。 这就是答案——双链表允许在O(1)中使用跟踪物品:每次使用节点时,将其从当前位置删除并将其放在“最近使用”的一端;另一端的任何东西都将是最近使用最少的物品。请注意,我们需要为每个优先级提供一个双向链表。

lru_cache文档的下面,我们看到了OrderedDict

OrderedDict 算法可以比 dict 更好地处理频繁的重新排序操作。OrderedDict 有一种 move_to_end() 方法可以有效地将元素重新定位到端点。内部实现是一个双向链表

看来我们可以用move_to_end()实现我们的需求。因此,我们需要为每个优先级创建一个 OrderedDict(读作:双链表:) ),但仍然需要跟踪最低优先级:

__init__中:

代码语言:javascript复制
self.cache = {}
self.expires = PriorityQueue()
self.priority_buckets = {} # 
self.priority_order = PriorityQueue() # 记录最低优先级。

set()中处理优先级的操作变得有点复杂。我们为每个优先级创建一个OrderedDict(如果没有),并将当前key记录在OrderedDict中:

代码语言:javascript复制
priority_bucket = self.priority_buckets.get(priority)
if not priority_bucket: 
    priority_bucket = self.priority_buckets[priority] = OrderedDict()
    self.priority_order.push(priority)
priority_bucket[key] = None # 不需要存储值。只要记录key。

现在我们终于可以在evict()中删除最近使用最少的物品:

代码语言:javascript复制
if len(self.cache) == initial_size:
    priority = self.priority_order.peek()
    priority_bucket = self.priority_buckets.get(priority)
    key = next(iter(priority_bucket)) # 最久没访问的物品
    self.delete(key)

delete()中 ,我们小心翼翼地摆脱了空桶:

代码语言:javascript复制
priority_bucket = self.priority_buckets[priority]
del priority_bucket[key]
if not priority_bucket:
    del self.priority_buckets[priority]
    self.priority_order.remove(priority)

现有测试再次通过,我们可以添加一个新的测试:

代码语言:javascript复制
def test_lru():
    cache = Cache(2, FakeTime())
    # 得到一个容量为2的缓存。
    cache.set('a', 'A') 
    cache.set('b', 'B')
    # cache.priority_buckets  {0: ['a', 'b']}
    cache.get('a') == 'A' #       ['b', 'a']

    cache.set('c', 'C') #         ['a', 'c']

    assert cache.get('a') == 'A' 
    assert cache.get('b') == None
    assert cache.get('c') == 'C'

这个测试失败了,因为我们没有在get时修改最近访问字典。要使它通过,只需在get()的return之前调用move_to_end()(将最近访问的key移到最后):

代码语言:javascript复制
self.priority_buckets[item.priority].move_to_end(key)
return item.value

0 人点赞