最近一直在学习 Rust。想起以前每次学习新语言,都会实现一个俄罗斯方块来验证对语言的掌握,但是从来没有尝试去实现其AI。正好这次碰到这个挑战,所以没有多想就使用 Rust 来做此题了。
题目分析
题目之前的大佬已经分析得很完善了,这里不再赘述。
算法
我采用的方法也是常规深度搜索(DFS)。即,在当前局面下,计算出下一个方块的所有可能放置方式,然后根据消行和计分规则得到当前局面的子局面,以及子局面的分数。以此类推,可以得到游戏的完整状态树。最后,只需要找到通往最高分的子局面的路径即可。
但是由于方块共有10000块,简单计算就知道,没有足够的空间和时间完成完整的状态遍历。所以后面使用了一系列方法来优化这个过程:
- 限制深度: 最容易想到的办法,不做任何优化,只是将限制一下深度,即可向下搜索。此时,AI看起来就有点意思了,但是还是容易死。但是搜索的深度只能在5块以下。
- 方向去重:由于不是每个方块都有四个方向的,所以如果方向相同就不重复搜索了。如,只有T,J,L才有四向,I,S,Z是两向,O是一向。
- 评估函数:之前子局面的得分就是依据计分规则算得的,而深度又不是特别够(5个方块不消一行是完全可能的),就会造成所有子树得到的分数基本是一样的,这样,这一块就会乱放。因此,最终考虑了两个扣分规则。如果有空洞,扣分;提升了高度,扣分。做到此时,AI基本就不容易死了。
- 提升方块高度:由于计分规则消行得分是以当前局面下已经有方块成正比的。因此,如果完全消完无法取得高分。所以修改了扣分规则,如果高度小于10行,不扣分,大于10行,再扣分;最初扣是线性扣分,但是特别容易累得过高,然后失败。所以最终的扣分改成了平方扣分:
sum = sum - (height - 10)^2
. - 剪枝,提高深度:有了评估函数,就可以在不搜索到最深的局面时,得到一个子局面的得分,对于得分太低的子局面,就不必搜索了。因此,可以对子局面进行排序,选择得分Top N进行搜索。最初尝试的是10个子局面,在深度到7层时,勉强可以搜索,但是还比较慢。考虑了一下,深度越深,搜索空间其实越小,所以相应地也可以少选一些子局面来搜索。所以最后剪枝策略时:
2 * tree_height
. 做到这一步时,第一次提交了成绩 66w. 此时深度为7。
工程优化
到上面为止,算法的整体框架就完成了,后面做的努力基本就是在调整评估函数和并行优化.
评估函数
最后选择的评函数为:
代码语言:txt复制 self.evaluated =
min_occupied.iter().enumerate().fold(0i64, |a, (_col, b)| {
//const LVL: i64 = 5;
let b = *b as i64;
//if _col < 8 {
let b = b - avg_occupied;
a - b * b
// if b < LVL {
// let b = (LVL - b) as i64;
// a - b * b
// } else {
// // a b
// a
// }
// } else {
// if avg_occupied > 6 {
// let b = 20 - b;
// a - b * b
// } else {
// let b = b - avg_occupied;
// a - b * b
// }
// }
//}) space_count.iter().fold(0i64, |a, b| a - std::cmp::min(6, *b) * 15)
}) space_count.iter().fold(0i64, |a, b| a - b * 20)
occupied_grid as i64 * 4
match full_rows.len() {
1 => occupied_grid,
2 => occupied_grid * 3,
3 => occupied_grid * 6,
4 => occupied_grid * 10,
_ => 0,
} as i64;
故意保留了注释,说明了当时尝试过的一些思路:
- 最终高度的判定,选择的是方差最小。尝试过分段判定,或者在右侧留空。
- 空洞扣分,最终权重选择的是20。如果太低,容易留空;太高,容易不消行,被顶死。当然,后面还是加了另一个优化,才能做到权重20。后面再说。
- 由于局面中所有的格子会影响到消行扣分,因此,每个被占用的格子有个基础分,权重最后选择的是4。
- 根据计分规则得到的消行得分
并行优化
这一部分其实是我在这个游戏中消耗时间最多的。为了 make rust compiler happy. 几乎重写了搜索的数据结构。最后使用 crossbeam
的 deque
和 scope
完成了搜索树的 BFS 处理。关键代码如下:
fn find_task<T>(
local: &Worker<T>,
global: &Injector<T>,
stealers: &[Stealer<T>],
) -> Option<T> {
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a batch of tasks from the global queue.
global
.steal()
//.steal_batch_and_pop(local)
// Or try stealing a task from one of the other threads.
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
}
由于,子树不是完全相同的,有些子树快,有些慢,所以必须要支持一个工程线程可以取得(steal)其它工程线程的任务,才能保证并行加速的最大化。最开始只是根据第一层子节点,每个节点一个线程进行搜索。CPU占有率无法超过500%,大部分时间200%左右。
代码语言:txt复制 pub fn search(
mut init: AIState,
depth: usize,
min_row: usize,
) -> Option<AIState> {
let end_level = init.level depth;
let injector: Injector<AIState> = Injector::new();
let mut workers = Vec::new();
let mut stealers = Vec::new();
let (tx, rx) = channel::unbounded();
for _ in 0..THREADS {
let worker = Worker::new_fifo();
workers.push(worker);
}
for worker in workers.iter() {
stealers.push(worker.stealer());
}
let global = &injector;
let stealers = &stealers[0..];
let mut roots = init.search_next_step(min_row)?;
for (index, state) in roots.iter_mut().enumerate() {
state.root_index = index as i8;
injector.push(*state);
}
let rt = thread::scope(|s| {
for (_i, local) in workers.into_iter().enumerate() {
let tx = tx.clone();
s.spawn(move |_| {
while let Some(mut state) = BFS::find_task(&local, global, &stealers)
{
if state.level < end_level {
if let Some(states) = state.search_next_step(min_row) {
//let width = std::cmp::min(14, (end_level - state.level) * 2);
let width = std::cmp::max(3, end_level - state.level);
for mut sub_state in states.into_iter().take(width) {
//for mut sub_state in states.into_iter() {
sub_state.evaluated = state.evaluated;
local.push(sub_state);
}
} else {
tx.send(state).unwrap();
}
} else {
tx.send(state).unwrap();
}
}
drop(tx);
});
}
drop(tx);
let result_join = s.spawn(|_| {
let mut chosen: Option<AIState> = None;
for state in rx.iter() {
chosen = chosen
.and_then(|selected| {
if state.evaluated > selected.evaluated {
Some(state)
} else {
Some(selected)
}
})
.or_else(|| Some(state));
}
chosen
});
result_join.join().unwrap()
})
.unwrap()?;
Some(roots[rt.root_index as usize])
}
这里需要注意的一点,就是 crossbeam::thread::scope
的使用。如果没有它,injector
和 stealers
传入到线程闭包中都会让编译器UNHAPPY!!
最后两个小优化
搜索回溯
在最后一天尝试取得高分时,会去尝试评估函数的各种参数,为了高分,参数选得比较大时,会碰到无法完成10000块的情况。回看录像,可以看到有更好的走法,但是由于深度不够,陷入局部最优解,然后把自己玩死。最后解决这个办法是,先使用低的搜索深度向后走,如果死了,就回退两倍深度的方块,再使用 1深度的搜索尝试。在走过难点后,再回退用低深度向后走。最终使用的参数是,默认8层,最多回退使用12层。最终这个方法做到104w
特殊方块优化
由于方块序列提前可以计算得出,所以可以提前判定I方块在何时出现。考虑到I方块是唯一可以完成四行同时消的方块。因此,我们可以对I方块前的若干方块提高深度进行搜索。这个优化是在比赛快结束时想到的。最开始选择的参数是提前10块用11层搜索。但跑了一个多小时后发现,预计要3个小时才能完成,预计得分109w左右。但已经过了最后提交时间了,所以不得不忍痛放弃。所以改成提前9块用10层搜索,最终半小时取得107w的结果, 剩下的时间已经不够其它尝试了,遂躺平,等待最后的结果。
一些心得
- 最好能够快速迭代。为了等待一个结果,花很长时间,浪费优化代码的时间就不值得了。之前有一次,等待了5小时,最后发现没有搜索出结果。既没有得到结果,时间又浪费了,沉没成本太高了。
- Rust 很安全,但确实需要对各个概念理解得比较清楚时,才会比较顺手。有些专题,互联网能找到的资料不多,需要自己尝试,感觉突然回到了前互联网时代学习编程一样。
- 最后感谢组委会,感谢龙哥,给到技术人员展示自己的舞台.