LRU(续)

2024-02-22 15:54:43 浏览数 (1)

问题:我们的优先级队列很慢

好了,我们已经有一个完整的解决方案,是时候处理优先级队列的实现了。让我们快速回顾一下我们需要的方法:

  • • push() – 添加item
  • • peek() – 获取到期时间/优先级最低的项目/桶
  • • remove() – 删除item
  • • pop() – 未使用

我们在维基百科上看到优先级队列没有删除操作。其次,我们的delete()只能从一个队列中pop(),并且需要从另一个队列中remove()

问题就在这里:我们需要维护两个独立的优先级队列。

现在,我们将快速连续地介绍一些数据结构,如果没有准备,这可能会有点不知所措。请记住,我们并不关心它们是如何工作的(至少现在还不关心),我们只是根据规格货比三家。

如果你在文档中搜索“priority queues”,你会发现heapq

[...]供堆队列(heqp queue)算法(也称为优先级队列算法)的实现。

继续阅读,我们发现了大量关于实现优先级队列的注释,特别有趣的是使用(priority,item)元组(已经在这样做了!)并删除条目:

删除条目或更改其优先级更加困难,因为它会破坏堆结构的不变性。因此,一个可能的解决方案是将条目标记为已删除,并添加一个具有修订后的优先级的新条目。

需要这种解决方法,因为虽然可以在 O(log n) 中删除第 i 个元素,但找到它的索引是 O(n)。总而言之,我们有:

sort

heapq

push()

O(n)

O(log n)

peek()

O(1)

O(1)

pop()

O(n)

O(log n)

remove()

O(n)

O(n)

不过,通过一些缓解的假设,heapq可以工作:

  • • 我们可以假设优先级是静态的,因此存储桶永远不会被remove
  • • 无论如何,待删除的到期时间迟早会被弹出。我们可以假设大多数逐出是由于过期的项目,并且由于低优先级(即当缓存已满时)和项目更新而被逐出的项目很少见(两者都会导致待删除的条目在过期队列中累积)

bisect

bisect是找到元素的一种方法,它的时间复杂的优于O(n) :

[...]支持按排序顺序维护列表,而不必在每次插入后对列表进行排序。

这可能会改进我们幼稚的实现;悲剧的是,进一步阅读性能说明我们发现:

insort() 函数是 O(n),因为对数搜索步骤由线性时间插入步骤主导。

虽然一般来说,这比任何类型排序都要好,但我们的排序刚好与它具有相同的复杂度。(尽管如此,移动元素可能比比较元素更快)。

sort

heapq

bisect

push()

O(n)

O(log n)

O(n)

peek()

O(1)

O(1)

O(1)

pop()

O(n)

O(log n)

O(n)

remove()

O(n)

O(n)

O(n)

在文档的下方,有一个“see also”:

Sorted Collections 是一个高性能模块,它使用 bisect 来管理排序的数据集合。

Sorted Collection不在标准库中,继续前进...... ̄_(ツ)_/ ̄

优化pop()

有一个不相关的改进,适用于原始的解决方案和bisect。对于排序好的列表,pop()的复杂度是O(n),因为它在第一个元素之后向前移动所有剩下的元素;如果顺序颠倒,我们从末尾pop(),复杂度变为O(1)。所以:

sort

heapq

bisect

push()

O(n)

O(log n)

O(n)

peek()

O(1)

O(1)

O(1)

pop()

O(1)*

O(log n)

O(1)*

remove()

O(n)

O(n)

O(n)

二叉搜索树

好吧,我没有想法了,标准库中没有其他东西可以提供帮助。

我们可以将问题重述如下:我们需要一个排序的数据结构,它可以用低于O(n)的时间复杂度实现push() / remove()

我们继续阅读维基百科的优先级队列页面,看到“通常实现”一节写道:

为了提高性能,优先级队列通常基于堆,[...]

调查了一下,没有用;下一个:

或者,当使用自平衡二叉搜索树(BST)时,插入和删除需要O(log n) 时间 [...]

这就是我们要找的!(也可能是你的面试官。

sort

heapq

bisect

BST

push()

O(n)

O(log n)

O(n)

O(log n)

peek()

O(1)

O(1)

O(1)

O(log n)

pop()

O(1)*

O(log n)

O(1)*

O(log n)

remove()

O(n)

O(n)

O(n)

O(log n)

但是标准库中没有自平衡二叉搜索树BST,而且我确定很难实现一个: 当我尝试做一棵红黑树时,两个小时后它仍然有错误!

经过一番谷歌搜索,我们发现:除其他外,bintrees,一个成熟的库,提供各种二叉搜索树......除了:

Bintrees Development Stopped 请改用 sortedcontainers:https://pypi.python.org/pypi/sortedcontainers

听起来很熟悉,不是吗?

Sorted Containers

让我们回到bisect文档推荐的Sorted Collections.

我记得,我现在想起来了......我不咸,因为红黑树花了两个小时才实施。我很咸,因为经过这么长时间,我发现了 Sorted Containers,这是一个纯 Python 库,在实践中比用 C 实现的花哨的自平衡二叉搜索树更快!

它有广泛的基准来证明这一点,并为我们自己的用例、优先级队列模拟了工作负载基准——所以是的,虽然面试的答案是“自平衡 BST”,但实际答案是Sorted Containers。 它是如何工作的?

Sorted Containers 内部实现基于几个观察结果。首先是 Python 的列表很快,非常快。[...] 其次是 bisect.insort 很快。这有点违反直觉,因为它涉及移动列表中的一系列项目。但现代处理器在这方面做得很好。 但是,仅使用一个列表和 bisect.insort 在列表长度超过一万时将产生缓慢行为。因此,Sorted List 的实现使用列表的列表(a list of lists)来存储元素。[...]

还有一个与树的比较,我将总结一下:更少的内存分配、更好的缓存位置、更低的内存开销和更快的迭代。 我认为这让你对它的工作原理和原因有一个体面的了解,只要稍加修修补补,你就可以自己实现它。

问题:Sorted Containers不在标准库中

但是 Sorted Containers 不在标准库中,我们也不想自己实现它。不过,我们确实从中学到了一些东西:

sort

heapq

bisect

bisect < 10k

BST

push()

O(n)

O(log n)

O(n)

O(log n)

O(log n)

peek()

O(1)

O(1)

O(1)

O(1)

O(log n)

pop()

O(1)*

O(log n)

O(1)*

O(1)*

O(log n)

remove()

O(n)

O(n)

O(n)

O(log n)

O(log n)

不过,我们仍然可以做一些假设:

  • • 我们真的需要超过 10k 的优先级吗?可能不是,让我们将它们限制在 10k。
  • • 我们真的需要超过 10k 的到期时间吗?或许?使用 1 秒的粒度,我们最多只能表示 2.7 小时;10 秒需要我们长达 27 小时,这可能刚刚起作用。 除了最长时间之外,另一个问题是细粒度太低,尤其是对于短时间——四舍五入 1 到 10 秒比四舍五入 2001 到 2010 秒要糟糕得多。这就引出了一个问题——
  • • 如果项目在 2010 秒而不是 2001 秒内过期,这真的重要吗?可能不是,但我们需要一种方法来舍入细粒度更小的值。

对数时间

1、2、4、8、...怎么样?四舍五入到 2 的幂会降低粒度,但时间实际上并不是从零开始的。我们通过四舍五入到 2 的幂倍数来解决这个问题。 让我们直观地了解它是如何工作的:

math.ceil的作用是向上取整,即大于或等于 x 的最小的整数。

代码语言:javascript复制
ceil((2000    1) /  1) *  1 = 2001
ceil((2000    2) /  2) *  2 = 2002
ceil((2000    3) /  4) *  4 = 2004
ceil((2000    4) /  4) *  4 = 2004
ceil((2000   15) / 16) * 16 = 2016
ceil((2000   16) / 16) * 16 = 2016
ceil((2000   17) / 32) * 32 = 2048

到目前为止一切顺利,修改初始时间还能正常运行吗?

代码语言:javascript复制
ceil((2013    1) /  1) *  1 = 2014
ceil((2013    2) /  2) *  2 = 2016
ceil((2013    3) /  4) *  4 = 2016
ceil((2013    4) /  4) *  4 = 2020
ceil((2013   15) / 16) * 16 = 2032
ceil((2013   16) / 16) * 16 = 2032
ceil((2013   17) / 32) * 32 = 2048

对齐的幂的美妙之处在于,在相对恒定的到期时间内,桶的数量随着时间的推移(大致)保持不变——因为从一开始就移除了紧密包装的桶,新的桶在结束时填补了稀疏桶之间的空白。

好了,让我们把它写成代码:

代码语言:javascript复制
def log_bucket(now, maxage):
    next_power = 2 ** math.ceil(math.log2(maxage))
    expires = now   maxage
    bucket = math.ceil(expires / next_power) * next_power
    return bucket
代码语言:javascript复制
>>> [log_bucket(0, i) for i in [1, 2, 3, 4, 15, 16, 17]]
[1, 2, 4, 4, 16, 16, 32]
>>> [log_bucket(2000, i) for i in [1, 2, 3, 4, 15, 16, 17]]
[2001, 2002, 2004, 2004, 2016, 2016, 2048]
>>> [log_bucket(2013, i) for i in [1, 2, 3, 4, 15, 16, 17]]
[2014, 2016, 2016, 2020, 2032, 2032, 2048]

看起来不错!

有两个误差来源——首先是取整maxage ,当它等于 2a 1 时最差,其次是取整expiry,当它等于 2b 1 时最差。极端情况下会有接近200%的误差 :

代码语言:javascript复制
>>> log_bucket(0, 17)  # (32 - 17) / 17 ~= 88%
32
>>> log_bucket(0, 33)  # (64 - 33) / 33 ~= 94%
64
>>> log_bucket(16, 17)  # (64 - 31) / 17 ~= 182%
64
>>> log_bucket(32, 33)  # (128 - 64) / 33 ~= 191%
128

191%的误差是相当多的;在我们开始修复它之前,让我们确认一下我们的推理。

代码语言:javascript复制
def error(now, maxage, *args):
    """log_bucket() error."""
    bucket = log_bucket(now, maxage, *args)
    return (bucket - now) / maxage - 1

def max_error(now, max_maxage, *args):
    """Worst log_bucket() error for all maxages up to max_maxage."""
    return max(
        error(now, maxage, *args)
        for maxage in range(1, max_maxage)
    )

def max_error_random(n, *args):
    """Worst log_bucket() error for random inputs, out of n tries."""
    max_now = int(time.time()) * 2
    max_maxage = 3600 * 24 * 31
    rand = functools.partial(random.randint, 1)
    return max(
        error(rand(max_now), rand(max_maxage), *args)
        for _ in range(n)
    )
代码语言:javascript复制
>>> max_error(0, 10_000)
0.9997558891736849
>>> max_error(2000, 10_000)
1.9527896995708156
>>> max_error_random(10_000_000)
1.9995498725910554

看起来我们的猜想是正确的。 那么,我们如何让误差变小呢?我们不是取整到 2 的下一次幂,而是取它的1/2,或1/4,或1/8......

代码语言:javascript复制
def log_bucket(now, maxage, shift=0):
    next_power = 2 ** max(0, math.ceil(math.log2(maxage) - shift))
    expires = now   maxage
    bucket = math.ceil(expires / next_power) * next_power
    return bucket

它似乎正在工作:

代码语言:javascript复制
>>> for s in range(5):
...     print([log_bucket(0, i, s) for i in [1, 2, 3, 4, 15, 16, 17]])
...
[1, 2, 4, 4, 16, 16, 32]
[1, 2, 4, 4, 16, 16, 32]
[1, 2, 3, 4, 16, 16, 24]
[1, 2, 3, 4, 16, 16, 20]
[1, 2, 3, 4, 15, 16, 18]
代码语言:javascript复制
>>> for s in range(10):
...     e = max_error_random(1_000_000, s)
...     print(f'{s} {e:6.1%}')
...
0 199.8%
1  99.9%
2  50.0%
3  25.0%
4  12.5%
5   6.2%
6   3.1%
7   1.6%
8   0.8%
9   0.4%

使用 shift=7 ,误差小于 2%。我想知道那是多少桶......

代码语言:javascript复制
def max_buckets(max_maxage, *args):
    """Number of buckets to cover all maxages up to max_maxage."""
    now = time.time()
    buckets = {
        log_bucket(now, maxage, *args)
        for maxage in range(1, max_maxage)
    }
    return len(buckets)
代码语言:javascript复制
>>> max_buckets(3600 * 24, 7)
729
>>> max_buckets(3600 * 24 * 31, 7)
1047
>>> max_buckets(3600 * 24 * 365, 7)
1279

一整年需要用一千多桶存储,还不错!

在使用其中任何一个之前,我们需要将到期时间expires转换为存储桶。这看起来很像 Priority Buckets 代码,唯一值得注意的部分是eviction()

这篇文章已经很长了,所以这里省略了一些。完整代码见文末“结论”的上方。

代码语言:javascript复制
while self.cache:
    expires = self.expires_order.peek()
    if expires > now:
        break
    #expires_bucket = self.expires_buckets[expires]
    for key in list(self.expires_buckets[expires]):
        self.delete(key)

现在我们使用log_bucket()。既然我们可以做到了无限的时间,为什么不也拥有无限的priorities呢?毕竟,有了锤子,一切都是钉子。(当然,不太可能需要那么多的优先级)

bisect, redux

是时候修复该优先级队列了。 我们使用 insort() 来添加优先级,使用 operator.neg 来保持列表的反转:

代码语言:javascript复制
def push(self, item):
    bisect.insort(self.data, item, key=operator.neg)

我们更新 peek()pop() 来处理相反的顺序:

代码语言:javascript复制
def peek(self):
    return self.data[-1]

def pop(self):
    return self.data.pop()

最后,PriorityQueueremove() 改编了 Searching Sorted Lists 中的index()

代码语言:javascript复制
def remove(self, item):
    i = bisect.bisect_left(self.data, -item, key=operator.neg)
    if i != len(self.data) and self.data[i] == item:
        del self.data[i]
        return
    raise ValueError

就是这样,我们完成了!

完整代码: https://death.andgravity.com/_file/lru-cache/50-bisect.py

结论

任何期望您在一小时内实现这一点的人都是妄想。对于理性的面试官来说,解释你会使用什么以及为什么应该足够了,尽管如果你以前没有解决过这种问题,这可能会很困难。 撇开胡说八道的面试不谈,对时间复杂性有基本的了解是很有用的。 但是,O所说的内容和实践中发生的事情可能会有很大不同。一定要测量程序运行的时间,要考虑O的限制——有时,O(n)中的n足够小,你不必做理论上正确的事情。 你不需要知道如何实现所有的数据结构,这就是(软件)库和维基百科的用途。但是,了解可用的内容以及何时使用它是很有用的。

好的库值得学习:Python标准库文档已经涵盖了我们需要的许多实用知识,Sorted Containers 也是如此。

0 人点赞