Infra 面试之数据结构五:顺序组装

2024-05-08 15:47:53 浏览数 (1)

这是我在很早之前遇到的一个题,很有意思,所以到现在仍然记得。题意借用了 TCP 的上下文,要求实现 TCP 中一个“顺序组装”的关键逻辑:

  1. 对于 TCP 层来说,IP 层的 packet 是乱序的收到。
  2. 对于应用层来说,TCP 层交付的是顺序的数据。

这个题有意思的点在于,借用了 TCP 的上下文之后,就可以先和候选人讨论一些 TCP 的基础知识,然后话锋一转,引出这道题。这样既可以考察一些基础知识,也可以考察工程代码能力。

题目

代码语言:javascript复制
struct Packet {
    size_t offset;
    size_t length;
    uint8_t *data;
};

// 实现一个“顺序交付”语义
class TCP {
 // 应用层调用:按顺序读取不超过 count 的字节数到 buf 中,并返回实际读到的字节数
 size_t read(void *buf, size_t count);
 // TCP 层回调:得到一些随机顺序的 IP 层封包
 void receive(Packet* p);
 // TCP 层回调:数据发完,连接关闭
 void finish();
};

讨论

由于多少和“实际”有些关联,所以本题目中有相当多的可以讨论的点,反过来说,如果你不进行合适的建模和简化,实现起来复杂度会非常高——四十五分钟是写不完的。

对于 Packet 结构体:

  1. offset 是否会有交叠?(只要发送数据足够多,耗尽 size_t 的范围就有可能发生)
  2. length 的长度是固定的还是变长的?

对于 read 调用:

  1. TCP:read() 是否阻塞?如果是阻塞的,是否要阻塞到凑齐要求大小(count)的数据才能返回,还是只要有一部分数据就立即返回?
  2. 如果TCP::finish() 后,TCP:read() 的返回值是什么?

对于内存问题:

  1. TCP::read 中给应用层中 buf 填充数据时,是否要进行拷贝?
  2. TCP::receivePacket::data 的内存是否要在 TCP 类中释放?

实现

这本质上是一个生产者消费者问题。我们需要维护一个线程安全的有序数据结构,生产者(TCP::receive)往里面放数据,消费者(TCP::read)从里面取数据。要求是:乱序放、顺序取、可切分。

为了简化实现,可以和面试官做如下设定。

对于 Packet 结构体:

  1. offset 没有交叠
  2. length 变长

对于 read 是否阻塞问题,可以设定:

  1. read 函数是阻塞的
  2. read 只要收到一部分数据,即使没达到 count,也可以立即返回

对于内存问题,可以设定:

  1. TCP 类不负责收到的数据的生命周期,但要求调用者保证 TCP 整个生命周期中 packet 中的数据都是有效(不能被释放)的。
  2. 给应用层的数据会拷贝到用户提供的 buf 中。

以下是代码:

代码语言:javascript复制
struct Packet{
    Packet(size_t off, size_t len, uint8_t *data):
        offset(off), length(len), data(data) {}
    size_t offset;
    size_t length;
    uint8_t *data;
};

class TCP {
public:
    TCP() {
        nextOffset_ = 0;
        finished_ = false;
    }

    size_t read(void *buf, size_t count) {
        std::unique_lock<std::mutex> lock(mu_);

        size_t leftBytes = count;
        while (leftBytes > 0) {
            if (!packets_.empty()) {
                size_t offset = packets_.begin()->first;
                auto* p = packets_.begin()->second;
                if (offset == nextOffset_) {
                    // fetch the packet
                    packets_.erase(offset);

                    // copy to the user buf
                    size_t len = std::min(p->length, leftBytes);
                    std::memcpy(buf, p->data, len);
                    leftBytes -= len;
                    nextOffset_  = len;

                    // put back the left data
                    p->length -= len;
                    if (p->length > 0) {
                        p->data  = len;
                        p->offset  = len;
                        packets_[p->offset] = p;
                    }

                    return count-leftBytes;
                }
            } else if (finished_) {
                break;
            }

            cv_.wait(lock);
        }

        // finished
        return 0;
    }

    void receive(Packet* p) {
        std::unique_lock<std::mutex> lock(mu_);
        packets_[p->offset] = p;
        cv_.notify_one();
    }

    void finish() {
        std::unique_lock<std::mutex> lock(mu_);
        finished_ = true;
        cv_.notify_one();
    }

private:
    std::mutex mu_;
    std::condition_variable cv_;
    size_t nextOffset_;
    bool finished_;
    std::map<uint64_t, Packet*> packets_;
};

核心实现点包括:

  1. 使用 std::map 组织所有 IP 层封包
  2. 使用 nextOffset_ 来记录需要交付给应用层下一个偏移量
  3. 如果包被切分了,剩余的包记得放回去

测试

本题目的测试也有些意思:

  1. 如何模拟乱序
  2. 如何控制所有 IP 封包中的 data 生命周期
  3. 如何对应用层收到的数据进行校验
代码语言:javascript复制
TCP tcp;

void producer(uint8_t* data) {
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(50, 99);

    std::cout << "construct data..." << std::endl;
    std::vector<Packet*> packets(100);
    size_t offset = 0;
    for (size_t i = 0; i < 100;   i) {
        size_t randLen = dis(gen);
        packets[i] = new Packet(offset, randLen, data);
        data  = randLen;
        offset  = randLen;
    }

    std::cout << "total " << offset << " bytes" << std::endl;
    std::cout << "make the data unordered..." << std::endl;
    std::shuffle(packets.begin(), packets.end(), gen);

    std::cout << "give it to tcp..." << std::endl;
    for (size_t i = 0; i < 100;   i) {
        tcp.receive(packets[i]);
        std::cout << "receive data [" << packets[i]->offset << " ,"
            << packets[i]->offset packets[i]->length<< "): "
            << packets[i]->length << " bytes" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    tcp.finish();
}

void consumer(uint8_t* data) {
    size_t nBytes = 0;
    uint8_t buf[100];

    size_t offset = 0;
    while ((nBytes = tcp.read(buf, 50)) > 0) {
        auto diff = std::memcmp(data offset, buf, nBytes);
        std::stringstream ss;
        ss << "read data [" << offset << " ," << offset nBytes << "): "
            << nBytes << " bytes";
        if (!diff) {
            ss << "; verify ok";
        } else {
            ss << "; verify bad";
        }
        std::cout << ss.str() << std::endl;

        offset  = nBytes;
    }
    std::cout << "read finish" << std::endl;
}

int main() {
    uint8_t buffer[10000];
    for (size_t i = 0; i < 10000;   i) {
        buffer[i] = i & 0xff;
    }
    std::thread c(consumer, buffer);
    std::thread p(producer, buffer);

    p.join();
    c.join();
}

如果你想自己跑下代码,可以在这里[2]自取。

最后

如果还有时间,面试官可能会跟你讨论些,如果取消你做的那些假设,需要怎么样实现,不过只要能做过基础版,剩下的就都是加分项了。

参考资料

[1]

系统日知录: https://xiaobot.net/p/system-thinking

[2]

infra 程序员面试题目大全: https://github.com/DistSysCorp/infra-interview/tree/main/data_structures

0 人点赞