这是我在很早之前遇到的一个题,很有意思,所以到现在仍然记得。题意借用了 TCP 的上下文,要求实现 TCP 中一个“顺序组装”的关键逻辑:
- 对于 TCP 层来说,IP 层的 packet 是乱序的收到。
- 对于应用层来说,TCP 层交付的是顺序的数据。
这个题有意思的点在于,借用了 TCP 的上下文之后,就可以先和候选人讨论一些 TCP 的基础知识,然后话锋一转,引出这道题。这样既可以考察一些基础知识,也可以考察工程代码能力。
题目
代码语言:javascript复制struct Packet {
size_t offset;
size_t length;
uint8_t *data;
};
// 实现一个“顺序交付”语义
class TCP {
// 应用层调用:按顺序读取不超过 count 的字节数到 buf 中,并返回实际读到的字节数
size_t read(void *buf, size_t count);
// TCP 层回调:得到一些随机顺序的 IP 层封包
void receive(Packet* p);
// TCP 层回调:数据发完,连接关闭
void finish();
};
讨论
由于多少和“实际”有些关联,所以本题目中有相当多的可以讨论的点,反过来说,如果你不进行合适的建模和简化,实现起来复杂度会非常高——四十五分钟是写不完的。
对于 Packet 结构体:
- offset 是否会有交叠?(只要发送数据足够多,耗尽 size_t 的范围就有可能发生)
- length 的长度是固定的还是变长的?
对于 read 调用:
TCP:read()
是否阻塞?如果是阻塞的,是否要阻塞到凑齐要求大小(count)的数据才能返回,还是只要有一部分数据就立即返回?- 如果
TCP::finish()
后,TCP:read()
的返回值是什么?
对于内存问题:
TCP::read
中给应用层中 buf 填充数据时,是否要进行拷贝?TCP::receive
中Packet::data
的内存是否要在 TCP 类中释放?
实现
这本质上是一个生产者消费者问题。我们需要维护一个线程安全的有序数据结构,生产者(TCP::receive
)往里面放数据,消费者(TCP::read
)从里面取数据。要求是:乱序放、顺序取、可切分。
为了简化实现,可以和面试官做如下设定。
对于 Packet 结构体:
- offset 没有交叠
- length 变长
对于 read 是否阻塞问题,可以设定:
- read 函数是阻塞的
- read 只要收到一部分数据,即使没达到 count,也可以立即返回
对于内存问题,可以设定:
- TCP 类不负责收到的数据的生命周期,但要求调用者保证 TCP 整个生命周期中 packet 中的数据都是有效(不能被释放)的。
- 给应用层的数据会拷贝到用户提供的 buf 中。
以下是代码:
代码语言:javascript复制struct Packet{
Packet(size_t off, size_t len, uint8_t *data):
offset(off), length(len), data(data) {}
size_t offset;
size_t length;
uint8_t *data;
};
class TCP {
public:
TCP() {
nextOffset_ = 0;
finished_ = false;
}
size_t read(void *buf, size_t count) {
std::unique_lock<std::mutex> lock(mu_);
size_t leftBytes = count;
while (leftBytes > 0) {
if (!packets_.empty()) {
size_t offset = packets_.begin()->first;
auto* p = packets_.begin()->second;
if (offset == nextOffset_) {
// fetch the packet
packets_.erase(offset);
// copy to the user buf
size_t len = std::min(p->length, leftBytes);
std::memcpy(buf, p->data, len);
leftBytes -= len;
nextOffset_ = len;
// put back the left data
p->length -= len;
if (p->length > 0) {
p->data = len;
p->offset = len;
packets_[p->offset] = p;
}
return count-leftBytes;
}
} else if (finished_) {
break;
}
cv_.wait(lock);
}
// finished
return 0;
}
void receive(Packet* p) {
std::unique_lock<std::mutex> lock(mu_);
packets_[p->offset] = p;
cv_.notify_one();
}
void finish() {
std::unique_lock<std::mutex> lock(mu_);
finished_ = true;
cv_.notify_one();
}
private:
std::mutex mu_;
std::condition_variable cv_;
size_t nextOffset_;
bool finished_;
std::map<uint64_t, Packet*> packets_;
};
核心实现点包括:
- 使用
std::map
组织所有 IP 层封包 - 使用
nextOffset_
来记录需要交付给应用层下一个偏移量 - 如果包被切分了,剩余的包记得放回去
测试
本题目的测试也有些意思:
- 如何模拟乱序
- 如何控制所有 IP 封包中的 data 生命周期
- 如何对应用层收到的数据进行校验
TCP tcp;
void producer(uint8_t* data) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(50, 99);
std::cout << "construct data..." << std::endl;
std::vector<Packet*> packets(100);
size_t offset = 0;
for (size_t i = 0; i < 100; i) {
size_t randLen = dis(gen);
packets[i] = new Packet(offset, randLen, data);
data = randLen;
offset = randLen;
}
std::cout << "total " << offset << " bytes" << std::endl;
std::cout << "make the data unordered..." << std::endl;
std::shuffle(packets.begin(), packets.end(), gen);
std::cout << "give it to tcp..." << std::endl;
for (size_t i = 0; i < 100; i) {
tcp.receive(packets[i]);
std::cout << "receive data [" << packets[i]->offset << " ,"
<< packets[i]->offset packets[i]->length<< "): "
<< packets[i]->length << " bytes" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
tcp.finish();
}
void consumer(uint8_t* data) {
size_t nBytes = 0;
uint8_t buf[100];
size_t offset = 0;
while ((nBytes = tcp.read(buf, 50)) > 0) {
auto diff = std::memcmp(data offset, buf, nBytes);
std::stringstream ss;
ss << "read data [" << offset << " ," << offset nBytes << "): "
<< nBytes << " bytes";
if (!diff) {
ss << "; verify ok";
} else {
ss << "; verify bad";
}
std::cout << ss.str() << std::endl;
offset = nBytes;
}
std::cout << "read finish" << std::endl;
}
int main() {
uint8_t buffer[10000];
for (size_t i = 0; i < 10000; i) {
buffer[i] = i & 0xff;
}
std::thread c(consumer, buffer);
std::thread p(producer, buffer);
p.join();
c.join();
}
如果你想自己跑下代码,可以在这里[2]自取。
最后
如果还有时间,面试官可能会跟你讨论些,如果取消你做的那些假设,需要怎么样实现,不过只要能做过基础版,剩下的就都是加分项了。
参考资料
[1]
系统日知录: https://xiaobot.net/p/system-thinking
[2]
infra 程序员面试题目大全: https://github.com/DistSysCorp/infra-interview/tree/main/data_structures