在自动驾驶中,需要大量的sensor信息上传到服务器进行训练。即使在车辆的行驶过程中也需要相关的sensor信息进行融合,感知。而sensor的信息可能来自不同的域,这样就需要高速稳定的网络来提供基础服务。对RX 和TX 提供了下面技术,
- 接收侧:
- RSS是网卡驱动支持的多队列属性,队列通过中断绑定到不同的CPU,以实现流量负载。
- RPS是以软件形式实现流量在不同CPU之间的分发。
- RFS是报文需要在用户态处理时,保证处理的CPU与内核相同,防止缓存miss而导致的消耗。
- LRO 和 GRO,多个报文组成一个大包上送协议栈。
- 发送侧:
- XPS 软件多队列发送。
- TSO 是利用网卡来对大数据包进行自动分段,降低CPU负载的技术。
- GSO 是协议栈分段功能。分段之前判断是否支持TSO,支持则推迟到网卡分段。 如果TSO开启,GSO会自动开启。
- UFO 类似TSO,不过只针对UDP报文。
RSS(Receive Side Scaling)
当前多数据网卡支持多个接收和发送队列(multi-queue),在接收方,NIC 可以将不同的 packet 分发给不同的 CPU。NIC 通过一个 filter将每个 packet 分到不同的 flows 中,每个 flow 的 packet 都被分到同一个接收队列中,而每个接收队列可以由一个独立的 CPU 来处理。
RSS 实现
这里的 filter 一般是一个 hash 函数,它以网络数据包的头文件为key,比如说以 IP 地址和 TCP 端口的 4 元组为 key 进行 hash。RSS 最常见的硬件实现是一个 128-entry 的 indirectiontable,每个entry 存储了一个 queue number。一个packet 所属的接收队列是由 hash (通常是Toeplitz hash) 计算出来的低 7bit 作为key,从 indirection table 中拿到queue number。有一些更高级的网卡支持 programmable filter,比如对 80 端口的 webserver 映射到固定的接收队列。这种 n-tuple 可以通过 ethtool 的 --config-ntuple 配置
设置多队列 IRQ 绑核
每一个接收队列都有自己的 IRQ number,NIC 通过 IRQ 通知 CPU 数据包到来,对于 PCIe 类型设备使用 MSI-X 类型中断。我们可以通过配置 /proc/irq/IRQ_NUMBER/smp_affinity 来配置 IRQ 与 CPU 的affinity。
适用场景
为了CPU load平衡在设置 RSS 的时候,可以将不同网卡队列的 IRQ 均分到不同 CPU,实现每个CPU 处理各自的硬中断。
RPS (Receive Packet Steering)
RPS是 RSS 的软件实现。因为是软件实现,所以 RPS 在 data path 的后半段实现,而 RSS 是直接在中断前就通过硬件分发给不同的网卡队列。RPS 相对于 RSS 由以下特点:
- RPS 可以被用在任何 NIC 上,不依赖于 NIC 的硬件能力
- software filter 可以很容易的加入来对新的协议进行 hash,而 RSS 需要 NIC 硬件实现 filter
- RPS 不会增大硬件的 interrupt rate,除了 IPIs(Inter-Processor Interrupts)
RPS 实现
RPS 在网络中断后半部被调用,在 netif_rx_internal (传统中断模式)或者 netif_receive_skb_internal (NAPI模式下),如果使能了RPS,则调用 get_rps_cpu 选择合适的cpu,将 skb 放入此 CPU 的 backlog 队列中,然后 waking up the CPU for processing。这样就可以让多个CPU 来处理协议栈的工作,避免一个 CPU 负载过大。
代码语言:javascript复制//设备接收队列,和 RPS 相关成员是 rps_map
/* This structure contains an instance of an RX queue. */
struct netdev_rx_queue {
#ifdef CONFIG_RPS
struct rps_map __rcu *rps_map;
struct rps_dev_flow_table __rcu *rps_flow_table;
#endif
struct kobject kobj;
struct net_device *dev;
} ____cacheline_aligned_in_smp;
//存放配置RPS的值,假如/sys/class/net/(dev)/queues/rx-(n)/rps_cpus=f,则
/len=4, cpus指向额外分配的内存数组,每个元素保存一个cpu值
struct rps_map {
unsigned int len;
struct rcu_head rcu;
u16 cpus[0];
};
设置 /sys/class/net/<dev>/queues/rx-<n>/rps_cpus 时:
代码语言:javascript复制store_rps_map
for_each_cpu_and(cpu, mask, cpu_online_mask)
map->cpus[i ] = cpu;
if (i)
map->len = i;
//将map赋值到queue->rps_map中,在get_rps_cpu中会使用到
rcu_assign_pointer(queue->rps_map, map);
if (map)
//使能RPS
static_key_slow_inc(&rps_needed);
RPS选择 CPU 的第一步是计算 flow 的 hash。这个hash 作为一致性hash,可以直接使用 hardware 算出来并保存在 skb 的 hash,一般也是 RSS使用的 hash (即 computed Toeplitzhash)。如果没有硬件算出来的 hash 的话,可以使用软件计算 hash。
代码语言:javascript复制static inline __u32 skb_get_hash(struct sk_buff *skb)
{
if (!skb->l4_hash && !skb->sw_hash)
__skb_get_hash(skb);
return skb->hash;
}
get_rps_cpu 函数中也涉及到了 RFS 的流程,这里先忽略 RFS 流程,只关注RPS相关的。因为 RPS 设置的是某个队列对应的 CPU 列表,所以需要先获取队列id,再获取此队列对应的 CPU 列表。
代码语言:javascript复制/*
* get_rps_cpu is called from netif_receive_skb and returns the target
* CPU from the RPS map of the receiving queue for a given skb.
* rcu_read_lock must be held on entry.
*/
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
struct rps_dev_flow **rflowp)
{
struct netdev_rx_queue *rxqueue;
struct rps_map *map;
struct rps_dev_flow_table *flow_table;
struct rps_sock_flow_table *sock_flow_table;
int cpu = -1;
u16 tcpu;
u32 hash;
//skb->queue_mapping 1 记录了skb从哪个队列接收上来
if (skb_rx_queue_recorded(skb)) {
//获取接收skb队列index
u16 index = skb_get_rx_queue(skb);
if (unlikely(index >= dev->real_num_rx_queues)) {
WARN_ONCE(dev->real_num_rx_queues > 1,
"%s received packet on queue %u, but number "
"of RX queues is %un",
dev->name, index, dev->real_num_rx_queues);
goto done;
}
//根据index,获取rxqueue
rxqueue = dev->_rx index;
} else
//这里应该是queue 0吧
rxqueue = dev->_rx;
map = rcu_dereference(rxqueue->rps_map);
if (map) {
//如果rps_map只配置了一个CPU,并且没有配置rps_flow_table,
//并且rps_sock_flow_entries 配置的这个cpu在线,则直接使用这个cpu。
//如果这个cpu不在线,则返回-1.
if (map->len == 1 &&
!rcu_access_pointer(rxqueue->rps_flow_table)) {
tcpu = map->cpus[0];
if (cpu_online(tcpu))
cpu = tcpu;
goto done;
}
//没有配置 rps_map,也没有配置 rps_flow_table
} else if (!rcu_access_pointer(rxqueue->rps_flow_table)) {
goto done;
}
skb_reset_network_header(skb);
//根据skb获取hash值。如果在RSS模式下,可以直接使用网
//卡计算的hash值,否则需要根据数据包信息计算一个
hash = skb_get_hash(skb);
if (!hash)
goto done;
//flow_table 和 sock_flow_table 是RFS的流程,暂时忽略
flow_table = rcu_dereference(rxqueue->rps_flow_table);
sock_flow_table = rcu_dereference(rps_sock_flow_table);
if (flow_table && sock_flow_table) {
/* ... */
}
//如果没有在设备流表rps_flow_table和全局流表
//rps_sock_flow_table中找到目标cpu,则使用hash在
//rps_map中找一个cpu即可。
if (map) {
tcpu = map->cpus[reciprocal_scale(hash, map->len)];
if (cpu_online(tcpu)) {
cpu = tcpu;
goto done;
}
}
done:
return cpu;
}
RPS 配置
RPS 要求 kernel 开启了 CONFIG_RPS 的选项,这对于 SMP 系统是默认的。为了打开 RPS 的能力,需要通过 sysfs 配置
1 | /sys/class/net/<dev>/queues/rx-<n>/rps_cpus |
---|
对于单队列网卡,RPS 将会设置 rps_cpus 到接收中断的 CPU 的同一个 memory domain 中,这里的 memory domain 说的是一个 CPU 集合
RPS Flow Limit
RPS 在不同 CPU 之间分发 packet,但是,如果一个 flow 特别大,会出现单个 CPU 被打爆,而其他 CPU 无事可做(饥饿)的状态。因此引入了 flow limit 特性,放到一个 backlog 队列的属于同一个 flow 的包的数量不能超过一个阈值。这可以保证即使有一个很大的 flow 在大量收包 ,小 flow 也能得到及时的处理。
默认,flow limit 功能是关掉的。要打开 flow limit,需要指定一个 bitmap(类似于 RPS 的 bitmap)。
1 | /proc/sys/net/core/flow_limit_cpu_bitmap |
---|
监控:由于 input_pkt_queue 打满或 flow limit 导致的丢包,在/proc/net/softnet_stat 里面的 dropped 列计数。
如果使用了 RPS,或者驱动调用了 netif_rx,那增加 netdev_max_backlog 可以改善在 enqueue_to_backlog 里的丢包:
RFS(Receive Flow Steering)
从 RPS 选择 CPU 方法可知,就是使用 skb 的hash 随机选择一个 CPU,没有考虑到应用层运行在哪个CPU 上,如果执行软中断的 CPU 和运行应用层的 CPU 不是同一个 CPU ,势必会降低 CPU Cache 命中率,降低性能。一般来说,高性能场景下都会为应用设置 CPU Affinity,将应用和 CPU绑核。
为了解决这个问题,RFS 通过指派应用程序所在的 CPU 来在内核态处理报文,以此来增加 CPU 的缓存命中率。RFS 主要是通过两个流表来实现的:
- 设备流表,记录的是上次在内核态处理该流中报文的 CPU
- 全局的socket流表,记录的是流中的报文渴望被处理的目标 CPU
原理是将运行应用的 CPU 保存到一个表中,在 get_rps_cpu 时,从这个表中获取 CPU,即可保证执行软中断的 CPU 和运行应用层的 CPU 是同一个 CPU。
全局 socket 流表
全局socket流表 rps_sock_flow_table 的定义如下:
代码语言:javascript复制/*
* The rps_sock_flow_table contains mappings of flows to the last CPU
* on which they were processed by the application (set in recvmsg).
*/
struct rps_sock_flow_table {
unsigned int mask;
u16 ents[0]; // 弹性数组,key 是 flow hash,value 是流渴望被处理的CPU,也就是应用所在的CPU
};
mask成员存放的就是ents这个柔性数组的大小,该值也是通过配置文件的方式指定的,相关的配置文件为 /proc/sys/net/core/rps_sock_flow_entries,可以通过 sysctl 修改 net.core.rps_sock_flow_entries 配置:
rps_sock_flow_table 是一个全局的数据流表,这个表中包含了数据流渴望被处理的CPU。这个 CPU 是当前处理流中报文的应用程序所在的CPU。全局socket流表会在调 recvmsg,sendmsg (特别是 inet_accept(), inet_recvmsg(), inet_sendmsg(), inet_sendpage() and tcp_splice_read()),被设置或者更新。
全局socket流表会在调用 recvmsg()等函数时被更新,而在这些函数中是通过调用函数 sock_rps_record_flow() 来更新或者记录流表项信息的,而sock_rps_record_flow() 中最终又是调用函数 rps_record_sock_flow() 来更新 ents 柔性数组的,该函数实现如下:
代码语言:javascript复制static inline void rps_record_sock_flow(struct rps_sock_flow_table *table,
u32 hash)
{
if (table && hash) {
unsigned int cpu, index = hash & table->mask;
/* We only give a hint, preemption can change cpu under us */
/*当前CPU*/
cpu = raw_smp_processor_id();
/*ents存放当前cpu*/
if (table->ents[index] != cpu)
table->ents[index] = cpu;
}
}
设备流表
代码语言:javascript复制struct netdev_rx_queue {
struct rps_map __rcu *rps_map;
/*设备流表*/
struct rps_dev_flow_table __rcu *rps_flow_table;
struct kobject kobj;
struct net_device *dev;
} ____cacheline_aligned_in_smp;
struct rps_dev_flow_table {
unsigned int mask;
struct rcu_head rcu;
struct rps_dev_flow flows[0]; //弹性数组
};
struct rps_dev_flow {
u16 cpu; /* 处理该流的cpu */
u16 filter;
unsigned int last_qtail; /* sd->input_pkt_queue队列的尾部索引,即该队列长度 */
};
struct rps_dev_flow 类型弹性数组大小由配置文件 /sys/class/net/(dev)/queues/rx-(n)/rps_flow_cnt 进行指定的。这个表可以记录之前 cpu backlog上数据包何时处理完,等数据包都处理完后就可以将流迁移到新的CPU 上了,这样就可以避免调度到新的 CPU 时候出现乱序。
代码语言:javascript复制//设置 /sys/class/net/<dev>/queues/rx-<n>/rps_flow_cnt 时
store_rps_dev_flow_table_cnt
table->mask = mask;
for (count = 0; count <= mask; count )
table->flows[count].cpu = RPS_NO_CPU;
RFS 实现
代码语言:javascript复制/*
* get_rps_cpu is called from netif_receive_skb and returns the target
* CPU from the RPS map of the receiving queue for a given skb.
* rcu_read_lock must be held on entry.
*/
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
struct rps_dev_flow **rflowp)
{
struct netdev_rx_queue *rxqueue;
struct rps_map *map;
struct rps_dev_flow_table *flow_table;
struct rps_sock_flow_table *sock_flow_table;
int cpu = -1;
u16 tcpu;
u32 hash;
/* ... */
map = rcu_dereference(rxqueue->rps_map);
/* ... */
skb_reset_network_header(skb);
hash = skb_get_hash(skb);
if (!hash)
goto done;
flow_table = rcu_dereference(rxqueue->rps_flow_table);
sock_flow_table = rcu_dereference(rps_sock_flow_table);
if (flow_table && sock_flow_table) {
u16 next_cpu;
struct rps_dev_flow *rflow;
// tcpu记录的是处理数据包的cpu
rflow = &flow_table->flows[hash & flow_table->mask];
tcpu = rflow->cpu;
// next_cpu 记录的是运行 application 的 cpu
next_cpu = sock_flow_table->ents[hash & sock_flow_table->mask];
/*
* If the desired CPU (where last recvmsg was done) is
* different from current CPU (one in the rx-queue flow
* table entry), switch if one of the following holds:
* - Current CPU is unset (equal to RPS_NO_CPU).
* - Current CPU is offline.
* - The current CPU's queue tail has advanced beyond the
* last packet that was enqueued using this table entry.
* This guarantees that all previous packets for the flow
* have been dequeued, thus preserving in order delivery.
*/
if (unlikely(tcpu != next_cpu) &&
(tcpu == RPS_NO_CPU || !cpu_online(tcpu) ||
((int)(per_cpu(softnet_data, tcpu).input_queue_head -
rflow->last_qtail)) >= 0)) {
tcpu = next_cpu;
rflow = set_rps_cpu(dev, skb, rflow, next_cpu);
}
if (tcpu != RPS_NO_CPU && cpu_online(tcpu)) {
*rflowp = rflow;
cpu = tcpu;
goto done;
}
}
// 如果没有在设备流表rps_flow_table和全局流表rps_sock_flow_table中找到目标cpu,
// 则使用hash在rps_map中找一个cpu即可。
if (map) {
tcpu = map->cpus[reciprocal_scale(hash, map->len)];
if (cpu_online(tcpu)) {
cpu = tcpu;
goto done;
}
}
done:
return cpu;
}
更新 rflow->cpu 为 next_cpu,并且记录 next_cpu 队列的 input_queue_head 到 rflow->last_qtail 中,后续数据包入队到 next_cpu 队列上时,rflow->last_qtail 都会加1,通过判断 input_queue_head 和 rflow->last_qtail 来判断 next_cpu 队列是否为空。
代码语言:javascript复制static struct rps_dev_flow *
set_rps_cpu(struct net_device *dev, struct sk_buff *skb,
struct rps_dev_flow *rflow, u16 next_cpu)
{
if (next_cpu != RPS_NO_CPU) {
rflow->last_qtail =
per_cpu(softnet_data, next_cpu).input_queue_head;
}
rflow->cpu = next_cpu;
return rflow;
}
Accelebrated RFS
RFS 是将 skb 放在运行应用的 CPU 的 backlog 中处理的,而且我们知道默认情况下哪个 CPU 处理硬件中断,就由哪个 CPU 处理软件中断,即 who trigger, who run,那能不能通过网卡的 fdir 功能(流重定向) 将数据流重定向到运用应用的 CPU 所处理的队列上呢?这就是 Accelerated RFS 的作用。
aRFS 之于 RFS 就像 RSS 之于 RPS,是一种硬件加速的负载均衡机制,直接将 flows 发送给接收 packet 的应用所在的 CPU。具体的实现是,网络协议站会调用驱动中的 ndo_rx_flow_steer 来将 flow 分发到 desired hardwarequeue。每次在 rps_dev_flow_table 的 flow entry 更新后,网络协议栈会调用 ndo_rx_flow_steer 。
除了使能RFS的两个表,没其他需要使能的,前提是网卡驱动得支持函数 ndo_rx_flow_steer,不过貌似支持的网卡没几个。
XPS
前面的几种技术都是接收方向的,XPS是针对发送方向的,即从网卡发送出去时,如果有多个发送队列,选择使用哪个队列。
可通过如下命令设置,此命令表示运行在f指定的cpu上的应用调用socket发送的数据会从网卡的tx-n队列发送出去。
1 | echo f > /sys/class/net/<dev>/queues/tx-<n>/xps_cpus |
---|
虽然设置的是设备的tx queue对应的cpu列表,但是转换到代码中保存的是每个cpu可使用的queue列表。因为查找xps_cpus时,肯定是已知cpu id,寻找从哪个tx queue发送。
选择 tx queue 时,优先选择 xps_cpu 指定的 queue,如果没有指定就使用 skb hash 计算出来一个。当然也不是每个报文都得经过这个过程,只有 socket的第一个报文需要,选择出 queue 后,将此queue设置到 sk->sk_tx_queue_mapping,后续报文直接获取 sk_tx_queue_mapping 即可。
通常 RPS 和 XPS 同id的队列选择的CPU相同,这也是防止不同CPU切换时性能消耗。
Linux通过配置文件的方式指定哪些cpu核参与到报文的分发处理,配置文件存放的路径是:/sys/class/net/(dev)/queues/tx-(n)/xps_cpus。
GRO
Large Receive Offloading (LRO) 是一个硬件优化,Generic ReceiveOffloading (GRO) 是 LRO 的一种软件实现。
两种方案的主要思想都是:通过合并“足够类似”的包来减少传送给网络栈的包数,这有助于减少 CPU 的使用量。例如,考虑大文件传输的场景,包的数量非常多,大部分包都是一段文件数据。相比于每次都将小包送到网络栈,可以将收到的小包合并成一个很大的包再送到网络栈。GRO 使协议层只需处理一个 header,而将包含大量数据的整个大包送到用户程序。
这类优化方式的缺点是信息丢失:包的 option 或者 flag 信息在合并时会丢失。这也是为什么大部分人不使用或不推荐使用LRO 的原因。
LRO 的实现,一般来说对合并包的规则非常宽松。GRO 是 LRO 的软件实现,但是对于包合并的规则更严苛。如果用 tcpdump 抓包,有时会看到机器收到了看起来不现实的、非常大的包, 这很可能是系统开启了 GRO。接下来会看到,tcpdump 的抓包点(捕获包的 tap )在GRO 之后。
GSO/TSO
计算机网络上传输的数据基本单位是离散的网包,既然是网包,就有大小限制,这个限制就是 MTU(Maximum Transmission Unit)的大小,(以太网)一般是1500字节(这里的MTU所指的是无需分段的情况下,可以传输的最大IP报文(包含IP头部,但不包含协议栈更下层的头部))。比如我们想发送很多数据出去,经过os协议栈的时候,会自动帮你拆分成几个不超过MTU的网包。然而,这个拆分是比较费计算资源的(比如很多时候还要计算分别的checksum),由 CPU 来做的话,往往会造成使用率过高。
那可不可以把这些简单重复的操作 offload 到网卡上呢?于是就有了 LSO(Large Segment Offload ),在发送数据超过 MTU 限制的时候(太容易发生了),OS 只需要提交一次传输请求给网卡,网卡会自动的把数据拿过来,然后进行切片,并封包发出,发出的网包不超过 MTU 限制。
现在基本上用不到 LSO,已经有更好的替代。
- TSO (TCP Segmentation Offload): 是一种利用网卡来对大数据包进行自动分段,降低CPU负载的技术。 其主要是延迟分段。
- GSO (Generic Segmentation Offload): GSO是协议栈是否推迟分段,在发送到网卡之前判断网卡是否支持TSO,如果网卡支持TSO则让网卡分段,否则协议栈分完段再交给驱动。 如果TSO开启,GSO会自动开启。
以下是TSO和GSO的组合关系:
- GSO开启, TSO开启:协议栈推迟分段,并直接传递大数据包到网卡,让网卡自动分段。
- GSO开启, TSO关闭:协议栈推迟分段,在最后发送到网卡前才执行分段。
- GSO关闭, TSO开启:同GSO开启, TSO开启。
- GSO关闭, TSO关闭:不推迟分段,在tcp_sendmsg中直接发送MSS大小的数据包。
开启GSO/TSO
驱动程序在注册网卡设备的时候默认开启GSO: NETIF_F_GSO
是否推迟分段
GSO/TSO是否开启是保存在 dev->features 中,而设备和路由关联,当我们查询到路由后就可以把配置保存在sock中。
比如在 tcp_v4_connect 和 tcp_v4_syn_recv_sock 都会调用 sk_setup_caps 来设置 GSO/TSO 配置。
GSO的数据包长度
对紧急数据包或 GSO/TSO 都不开启的情况,才不会推迟发送,默认使用当前MSS。开启GSO后,tcp_send_mss 返回 mss 和单个 skb 的 GSO 大小,为 mss 的整数倍。需要注意的是,只要开启了GSO,即使硬件不支持TSO,也会设置NETIF_F_TSO,使得sk_can_gso(sk)在GSO开启或者TSO开启的时候都返回true。