主流程(接上篇)
代码语言:javascript
复制主流程(服务端或客户端):
1. 主函数中解析命令行参数(parse_cmd), 设置默认服务端口
2. 初始化上下文(ucs_async_context_create, 异步事件上下文用于管理定时器和FD通知), 在其中, 初始化多生产者/多消费者队列(ucs_mpmc_queue_init), 初始化非阻塞异步轮询器(ucs_async_poll_init), 初始化可重入自旋锁上下文等
3. 创建工人(uct_worker_create), 工人代表着 progress 的引擎。 可以在应用程序中创建多个进度引擎,例如供多个线程使用
4. 根据入参查找期望的传输层(dev_tl_lookup, 由最小延迟决定要使用的设备和传输)
5. 设置回调(uct_iface_set_am_handler), 设置服务端接收到客户端数据后的回调
6. 建立socket连接(connect_common), 服务端监听端口, 等待客户端发起socket连接
7. 客户端连接服务端后, 两边交换地址(sendrecv, 先通过socket发送和接收长度, 然后发送和接收地址, 交换地址)
8. 创建端点(uct_ep_create), 获取端点地址(uct_ep_get_address), 连接对等端点(uct_ep_connect_to_ep, 内部通过 ibv_modify_qp 设置QP状态机建立QP连接)
9. 连接建立后, 客户端调用短消息(do_am_short)/缓冲区(do_am_bcopy)/零拷贝(do_am_zcopy)发送数据
10. 显示驱动工人推进(uct_worker_progress, 该例程显式地处理任何未完成的通信操作和活动消息请求, 底层通过poll网卡完成事件,ibv_poll_cq)
11. 资源销毁(uct_ep_destroy,free其他资源等)
启动服务端源码解读
代码语言:javascript
复制启动服务端:
gdb: examples/uct_hello_world.c -> int main(int argc, char **argv)
if (parse_cmd(argc, argv, &cmd_args)) -> int parse_cmd(int argc, char * const argv[], cmd_args_t *args)
args->server_port = 13337
status = ucs_async_context_create(UCS_ASYNC_MODE_THREAD_SPINLOCK, &async) -> 创建异步执行上下文, 分配并初始化异步执行上下文。 这可用于确保安全的事件传递, 模式为线程自旋锁(可重入), 在异步对象上, 初始化多生产者和多消费者队列
async = ucs_malloc(sizeof(*async), "async context") -> 内部分配, 并记录内存分配信息
void *ptr = malloc(size)
status = ucs_async_context_init(async, mode)
ucs_trace_func("async=%p", async) -> 跟踪方法
status = ucs_mpmc_queue_init(&async->missed) -> 初始化多生产者/多消费者队列
ucs_queue_head_init(&mpmc->queue)
pthread_spin_init(&lock->lock, lock_flags) -> 初始化自旋锁, 标志: PTHREAD_PROCESS_SHARED 或 PTHREAD_PROCESS_PRIVATE
status = ucs_async_method_call(mode, context_init, async) -> .context_init = ucs_async_poll_init -> 异步轮询初始化, 不阻塞
async->last_wakeup = ucs_get_time() -> asm volatile("rdtsc" : "=a"(low), "=d"(high)) 汇编获取时间 -> UCS/ARCH/INFO:如果无法从CPU型号中读取x86 TSC值,请不要从/proc/cpuinfo中读取测量的CPU频率,因为它只能代表核心频率而不是TSC频率。 相反,通过一个短循环进行测量,当频率测量收敛或达到 1ms 时间限制时停止
ucs_async_thread_spinlock_ops.context_init(async) -> ucs_async_thread_spinlock_init -> 可重入自旋锁上下文初始化
status = uct_worker_create(async, UCS_THREAD_MODE_SINGLE, &if_info.worker) -> 创建工人(独立资源) -> UCS_CLASS_DEFINE_NAMED_NEW_FUNC(uct_worker_create -> 用宏初始化工人(类似面向对象实例化) -> 创建一个工作对象。 工人代表着progress的引擎。 可以在应用程序中创建多个进度引擎,例如供多个线程使用。 Transports 可以为每个 Worker 分配单独的通信资源,以便每个 Worker 都可以独立于其他 Worker 进行操作 -> 声明/定义一个创建类实例的函数, 初始化工人私有worker, 传输层链表
ucs_class_t *cls = &uct_priv_worker_t_class -> typedef struct uct_priv_worker
obj = ucs_class_malloc(cls)
ucs_class_t *_cls = &uct_priv_worker_t_class
uct_priv_worker_t_init -> static UCS_CLASS_INIT_FUNC(uct_priv_worker_t -> static UCS_CLASS_INIT_FUNC(uct_priv_worker_t -> 初始化父类以及当前类的传输链表
static UCS_CLASS_INIT_FUNC(uct_worker_t)
ucs_callbackq_init(&self->progress_q);
ucs_vfs_obj_add_dir(NULL, self, "uct/worker/%p", self)
status = dev_tl_lookup(&cmd_args, &if_info) -> 查找期望的传输层, 动态加载, 由最小延迟决定要使用的设备和传输, 实参为地址指针, 形参为指针
status = uct_query_components(&components, &num_components) -> 查询组件列表。 获取当前系统上可用的传输组件列表, 得到8个组件, 为每个组件添加vfs对象:uct/component/组件名
UCS_MODULE_FRAMEWORK_DECLARE(uct) -> 声明一个“框架”,它是可加载模块的特定集合的上下文。 通常特定框架中的模块提供相同内部接口的替代实现
static ucs_init_once_t ucs_framework_init_once_uct = { { { 0, 0, 0, 0, 0, 0, 0, { 0, 0 } } }, 0 } -> 互斥锁 初始化标记
UCS_MODULE_FRAMEWORK_LOAD(uct, 0) -> void ucs_load_modules 加载所有模块, self, tcp, sysv, posix, ib, rdmacm, cma, knem, ...
[in] – _flags 模块加载标志,参见 ucs_module_load_flags_t 框架中的模块由 dlopen() 加载。 模块的共享库名称为:“lib<framework>_<module>.so.<version>”,其中: - <framework> 是框架名称 - <module> 是模块名称。 框架中所有模块的列表由自动生成的 config.h 文件中的预处理器宏 <framework>_MODULES 定义,例如:#define foo_MODULES ":bar1:bar2"。 - <version> 是模块的共享库版本,由 libtool 生成。 它是从当前库 (libucs) 的完整路径中提取的。 在以下位置搜索模块共享库(按优先级顺序): 1. 当前共享库 (libucs) 目录内的“ucx”子目录 2. ${libdir}/ucx,其中 ${libdir} 是 库的安装目录 请注意,如果 libucs 是从其安装路径加载的,则 (1) 和 (2) 是同一位置。 仅当 libucs 被移动或从构建目录运行时,路径才会不同,在这种情况下,优先使用“本地库”而不是“已安装的”库。[in] – _name 框架名称(作为令牌)
ucs_load_modules("uct", uct_MODULES, &ucs_framework_init_once_uct, 0) -> 加载框架, 模块
ucs_module_loader_init_paths -> 找到路径, 然后动态加载: ucs_module_load_one(framework, module_name, flags) -> ucs_module_init(module_path, dl) -> 找到初始化方法名, 如: module_init_name, 动态打开模块: dl = dlopen(module_path, mode)
fullpath = realpath(module_path, buffer)
init_func = (init_func_t)ucs_module_dlsym_shallow module_init_name -> 找到全局初始化函数入口, 一般都没有
modules_str = ucs_strdup(modules, "modules_list") -> 从内存跟踪表中查找字符串
module_name = strtok_r(modules_str, ":", &saveptr) -> 每次取冒号分割的第一个字符串分段(字符串分割函数 strtok_r )
ucs_module_loader_add_dl_dir
动态库路径: 0x6070b0 "/home/xb/project/ucx/src/ucs/.libs/libucs.so.0"
dladdr((void*)&ucs_module_loader_state, &dl_info) -> 利用dladdr来获得so自身的路径(ucs_module_loader_state)
ucs_module_loader_state.srch_path[ucs_module_loader_state.srchpath_cnt ] = path -> 记录动态库位置
ucs_module_loader_add_install_dir
ucs_module_global_init -> 查找动态库入库函数地址: addr = dlsym(dl, symbol) -> status = init_func()
void UCS_F_CTOR uct_ib_init()
uct_component_register(&uct_ib_component) -> 注册组件
uct_tl_register(&uct_ib_component, uct_ib_tls[i]) -> 注册所有IB传输层
ucs_list_for_each uct_components_list 8个组件 -> ucs_vfs_obj_add_dir -> ucs_vfs_node_add 虚拟文件系统
component_attr.md_resources = alloca -> alloca - 分配自动释放的内存
status = uct_component_query(components[cmpt_index], &component_attr) -> 查询网卡, 拷贝内存域数据资源
status = component->query_md_resources(component, &resources, &num_resources); -> uct_md_query_single_md_resource -> 调用每个组件的查询内存域资源接口
UCS_MODULE_FRAMEWORK_LOAD(uct_ib, 0) -> 调用查询内存域资源接口中会加载对应的动态库
status = uct_md_config_read(components[cmpt_index], NULL, NULL, &md_config); -> 读取内存域配置
status = uct_config_read(&bundle, &component->md_config, env_prefix)
status = ucs_config_parser_fill_opts(config_bundle->data, entry, full_prefix, 0)
ucs_config_parser_set_default_values(opts, entry->table) -> ucs_config_sscanf_table
ucs_config_parser_get_sub_prefix(env_prefix, &sub_prefix)
ucs_config_parse_config_files()
ucs_config_apply_config_vars -> 应用环境变量, 以及自定义前缀的环境变量
for 迭代内存域资源
uct_md_open -> 重要函数, 打开内存域
status = component->md_open(component, md_name, config, &md) -> ucs_status_t uct_ib_md_open -> IB实现的内存域打开函数
ib_device_list = ibv_get_device_list(&num_devices) -> 获取所有网卡列表, 获取设备列表, 比如4个网口(网卡设备), 可通过 ibdev2netdev 查询rdma网口映射
ibv_fork_init -> 核心原理: 通过对所有已注册的MR所在内存页打MADV_DONTFORK标记,创建子进程后,MR所在内存页不会触发COW拷贝,避免了前面所说的COW带来网卡DMA内存地址不一致的问题, 但会引入额外的内存记录和查找开销(降低性能)
status = uct_ib_ops[i]->ops->open(ib_device, md_config, &md) -> static ucs_status_t uct_ib_mlx5_devx_md_open
vhca_id -> 虚拟主机通道适配器ID
ctx = uct_ib_mlx5_devx_open_device(ibv_device) -> 通过创建完成队列和事件通道来检查网卡设备是否支持事件通道
ctx = mlx5dv_open_device(ibv_device, &dv_attr) -> verbs_open_device -> rdma-core
ibv_create_cq(ctx, 1, NULL, NULL, 0)
ibv_destroy_cq(cq)
event_channel = mlx5dv_devx_create_event_channel
mlx5dv_devx_destroy_event_channel(event_channel)
md = ucs_derived_of(uct_ib_md_alloc(sizeof(*md), "ib_mlx5_devx_md", ctx)
status = uct_ib_mlx5_check_uar(md) -> 用户访问区域
uct_ib_mlx5_devx_uar_init
uct_ib_mlx5_devx_alloc_uar
mlx5dv_devx_alloc_uar
uct_ib_mlx5_devx_uar_cleanup
md->mkey_tag = 0; -> 使用间接密钥使 MR 无效
uct_ib_mlx5_devx_mr_lru_init(md)
status = uct_ib_device_query(dev, ibv_device)
uct_ib_query_device
ibv_get_device_name
ret = ibv_query_device_ex(ctx, NULL, attr)
ret = vctx->query_device_ex(context, input, attr, sizeof(*attr))
ibv_query_port
ucs_topo_resolve_sysfs_path -> "/sys/devices/pci0000:15/0000:15:04.0/0000:17:00.0" -> PCI地址
ucs_topo_get_sysfs_dev
uct_ib_device_set_pci_id
ucs_topo_get_pci_bw -> 获取PCI带宽
effective_bw = (p->bw_gbps * 1e9 / 8.0) * width * ((double)p->encoding / p->decoding) * link_utilization; -> 计算带宽
ret = mlx5dv_devx_general_cmd(ctx, in, sizeof(in), out, sizeof(out)) -> ucs_status_t uct_ib_mlx5_devx_general_cmd -> rdma-core -> 通过 devx 接口发出通用命令, 介绍 DEVX 对象及其 DV API:创建/修改/读取/销毁。 还添加了 DEVX 通用命令 API,以便能够直接从固件读取 CAP, 参考: https://patchwork.kernel.org/project/linux-rdma/patch/1539190590-31186-2-git-send-email-yishaih@mellanox.com/
status = uct_ib_mlx5_devx_query_lag(md, &lag_state) UCT_IB_MLX5_CMD_OP_QUERY_LAG -> 链路汇聚(bonding), 参考: https://docs.nvidia.com/networking/display/bluefielddpuosv385/link aggregation
md->port_select_mode = uct_ib_mlx5_devx_query_port_select(md)
uct_ib_mlx5_devx_general_cmd UCT_IB_MLX5_CMD_OP_QUERY_LAG
uct_ib_mlx5_is_xgvmi_alias_supported(ctx) -> 跨越GVMI (XGVMI) - DPU 可以代表主机常驻内存启动 RDMA 操作,仅当数据源自或目标为 DPU 内存时才涉及 DPU 内存, Guest VM ID 主机_虚机ID
uct_ib_mlx5_devx_check_odp(md, md_config, cap) -> 按需分页 (ODP) 是一种可以缓解内存注册缺点的技术。 应用程序不再需要确定地址空间的底层物理页,并跟踪映射的有效性。 相反,当页面不存在时,HCA 向操作系统请求最新的转换,并且操作系统使由于不存在页面或映射更改而不再有效的转换无效。 ODP 不支持连续页。ODP 可以进一步分为 2 个子类:显式 ODP 和隐式 ODP。显式 ODP 在显式 ODP 中,应用程序仍然注册内存缓冲区以进行通信,但此操作用于定义 IO 的访问控制而不是 pin-down 页面。 ODP 内存区域 (MR) 在注册时不需要具有有效的映射。 隐式 ODP 在隐式 ODP 中,为应用程序提供了一个特殊的内存密钥,该密钥代表其完整的地址空间。 所有引用该键的 IO 访问(受限于与该键关联的访问权限)不需要注册任何虚拟地址范围。 有关 ODP 的更多信息,请参阅了解按需寻呼 (ODP) 社区帖子
uct_ib_mlx5_devx_general_cmd
dev->atomic_align = ucs_rounddown_pow2(arg_size) -> 向下取整对齐
uct_ib_md_open_common(&md->super, ibv_device, md_config) -> ucs_status_t uct_ib_md_open_common
uct_ib_device_init -> 初始化IB设备
uct_ib_device_get_locality -> 获取cpu位置top
ucs_sys_fcntl_modfl O_NONBLOCK -> 设置fd为非阻塞模式
oldfl = fcntl(fd, F_GETFL)
ucs_async_set_event_handler uct_ib_async_event_handler -> 异步事件处理
ucs_async_method_call(mode, add_event_fd, async, event_fd, events)
ucs_async_thread_add_event_fd
ucs_async_thread_start(&thread)
ucs_event_set_add
epoll_ctl(event_set->event_fd, EPOLL_CTL_ADD, fd, &raw_event)
ucs_async_pipe_push(&thread->wakeup)
ret = write(p->write_fd, &dummy, sizeof(dummy)) -> 写0通知对端
uct_ib_md_parse_subnet_prefix -> 添加UCT_IB_SUBNET_PREFIX=fe80::以按subnet_prefix过滤IB端口
uct_ib_check_gpudirect_driver /sys/kernel/mm/memory_peers/nv_mem/version
uct_ib_check_gpudirect_driver /dev/kfd
uct_ib_md_check_dmabuf(md)
ibv_reg_dmabuf_mr(md->pd, 0, ucs_get_page_size(), 0, bad_fd, UCT_IB_MEM_ACCESS_FLAGS)
uct_ib_md_set_pci_bw(md, md_config) -> 修复: PCI 速度和系统设备 ID 设置不正确, 先从系统获取, 没找到则从底层设备获取
uct_ib_mlx5_md_port_counter_set_id_init(md)
ucs_carray_for_each(counter_set_id, md->port_counter_set_ids, sizeof(md->port_counter_set_ids)) -> 遍历c语言数组
ucs_mpool_params_reset(&mp_params) -> 重置内存池参数
mp_params.ops = &uct_ib_mlx5_dbrec_ops -> 设置池操作表
ucs_mpool_init(&mp_params, &md->dbrec_pool)
VALGRIND_CREATE_MEMPOOL(mp, 0, 0) -> valgrind内存池
uct_ib_mlx5_md_buf_alloc(md, ucs_get_page_size(), 0, &md->zero_buf, &md->zero_mem, 0, "zero umem");
先对齐内存
madvise(buf, size, MADV_DONTFORK) -> MADV_DONTFORK 在执行fork(2)后,子进程不允许使用此范围的页面。这样是为了避免COW机制导致父进程在写入页面时更改页面的物理位置
mlx5dv_devx_umem_reg(md->super.dev.ibv_context, buf, size, access_mode) -> 注册或取消注册由 devx 接口使用的用户内存。 寄存器动词公开 UMEM DEVX 对象,用于 DMA 的用户内存注册。 用于注册用户存储器的 API 将用户地址、长度和访问标志作为输入,并向用户提供一个对象作为输出,该对象保存由固件返回到该已注册存储器的 UMEM ID。 用户将在使用此内存而不是物理地址列表的设备直接命令中使用该 UMEM ID,例如在 mlx5dv_devx_obj_create 上创建 QP
uct_ib_md_parse_relaxed_order(&md->super, md_config, ksm_atomic)
uct_ib_mlx5_devx_init_flush_mr(md)
uct_ib_reg_mr(&md->super, md->zero_buf, UCT_IB_MD_FLUSH_REMOTE_LENGTH, ¶ms, UCT_IB_MEM_ACCESS_FLAGS, &md->flush_mr); -> 当 access_flags 包含 IBV_ACCESS_ON_DEMAND 时,ibv_reg_mr() 可能会失败并出现 EAGAIN。 这意味着预取由于与失效冲突而失败
UCS_PROFILE_CALL_ALWAYS(ibv_reg_mr, md->pd, address, length, access_flags) -> 调用verbs接口注册内存
ibv_reg_dmabuf_mr -> 注册内存的另一种方式
uct_ib_md_print_mem_reg_err_msg -> 如果内存为空, 打印错误
UCS_STRING_BUFFER_ONSTACK(msg, 256) -> 声明一个字符串缓冲区,该缓冲区使用现有字符串作为后备存储。 此类字符串缓冲区不会分配额外的内存,也不必进行清理,并且它还可以用于在作为函数参数传递的现有 C 字符串缓冲区上构建字符串。
uct_ib_memlock_limit_msg(&msg, err)
ucs_sys_get_effective_memlock_rlimit
getrlimit(RLIMIT_MEMLOCK, &limit_info) -> 获取内存限制
ucs_string_buffer_cstr(&msg)
c_str = ucs_array_begin(&strb->str) -> 返回字符串数组第一个元素
((&strb->str)->buffer)
uct_ib_mlx5_devx_reg_ksm_data_addr
uct_ib_mlx5_alloc_mkey_inbox(list_size, &in)
uct_ib_mlx5_devx_reg_ksm(md, atomic, iova, length, list_size -> UCT/IB/MLX5:内存注册流程部分重构,使用params struct进行多线程注册(从上层传递),使用params struct注册flush_mr,改进调试日志记录
UCT_IB_MLX5DV_SET(create_mkey_in, in, opcode, UCT_IB_MLX5_CMD_OP_CREATE_MKEY) -> 往结构体中插入一个值
mkc = UCT_IB_MLX5DV_ADDR_OF(create_mkey_in, in, memory_key_mkey_entry) -> 限制所有条目的缓冲区大小以提高性能。 将密钥长度的虚拟地址 (KLM) 与固定内存大小关联时使用 KSM
mlx5dv_devx_obj_create
static ucs_status_t uct_self_md_open -> self模块实现的打开内存域
uct_md_vfs_init(component, md, md_name) -> 用vfs节点表示组件和内存域
uct_config_release(md_config)
uct_md_query(iface_p->md, &iface_p->md_attr)
status = uct_md_attr_v2_init(md, &md_attr_v2) -> static ucs_status_t uct_self_md_query -> ucs_status_t uct_ib_md_query
md_attr->access_mem_types = UCS_BIT(UCS_MEMORY_TYPE_HOST) -> 内存访问的类型
ucs_sys_cpuset_copy(&md_attr->local_cpus, &md->dev.local_cpus) -> for -> CPU_ISSET(c, src) -> 拷贝CPU参数
uct_md_attr_from_v2(md_attr, &md_attr_v2) -> 用源(参数2)赋值给目的(参数1), 并用内存拷贝(memcpy)cpu和组件名
uct_md_query_tl_resources(iface_p->md, &tl_resources, &num_tl_resources) -> 查询传输层资源。 该例程查询 uct_md_h“内存域”以获取可用的通信资源
uct_self_query_tl_devices | uct_dc_mlx5_query_tl_devices
uct_ib_device_query_ports
uct_ib_device_port_check
uct_ib_device_port_attr
ucs_realloc
for -> 遍历可用的传输层并找出合适的传输层名
status = init_iface(tl_resources[tl_index].dev_name, tl_resources[tl_index].tl_name, cmd_args->func_am_type, iface_p) -> 按传输层名初始化接口, 入参为: 网卡设备名,传输层名,活动消息类型, 接口
status = uct_md_iface_config_read(iface_p->md, tl_name, NULL, NULL, &config) -> 读取并填充配置
status = uct_iface_open(iface_p->md, iface_p->worker, ¶ms, config, &iface_p->iface) -> 打开通信接口
uct_find_tl(md->component, params->mode.device.tl_name) -> 查找传输层
status = tl->iface_open -> static UCS_CLASS_DEFINE_NEW_FUNC(uct_rc_verbs_iface_t -> static UCS_CLASS_INIT_FUNC(uct_rc_verbs_iface_t <- .iface_open = UCS_CLASS_NEW_FUNC_NAME(_iface_class) <- UCT_TL_DEFINE_ENTRY -> 宏展开得到(uct_rc_verbs_iface_t_new)
init_attr.qp_type = IBV_QPT_RC -> 设置QP属性
UCS_CLASS_CALL_SUPER_INIT(uct_rc_iface_t -> 执行父类的构造函数(初始化) -> uct_rc_verbs_iface_t_init -> UCS_CLASS_INIT_FUNC(uct_rc_iface_t
UCS_CLASS_CALL_SUPER_INIT(uct_ib_iface_t -> UCS_CLASS_INIT_FUNC(uct_ib_iface_t
preferred_cpu = ucs_cpu_set_find_lcs(&cpu_mask)
UCS_CLASS_CALL_SUPER_INIT(uct_base_iface_t
uct_base_iface_t_init (&self->super, _myclass->superclass, -> UCS_CLASS_INIT_FUNC(uct_base_iface_t
UCS_CLASS_CALL_SUPER_INIT(uct_iface_t, ops)
uct_iface_t_init (&self->super, _myclass->superclass -> UCS_CLASS_INIT_FUNC(uct_iface_t, uct_iface_ops_t *ops)
self->ops = *ops;
UCT_CB_FLAGS_CHECK((params->field_mask
self->internal_ops = internal_ops -> UCS_CLASS_INIT_FUNC(uct_base_iface_t, uct_iface_ops_t *ops,
uct_worker_progress_init(&self->prog)
uct_iface_set_stub_am_handler(self, id);
iface->am[id].cb = uct_iface_stub_am_handler
UCS_STATS_NODE_ALLOC(&self->stats -> ucs_status_t ucs_stats_node_alloc
ucs_stats_node_new(cls, &node)
ucs_stats_node_initv(node, cls, name, ap)
ucs_stats_name_check(cls->name)
ucs_vsnprintf_safe(node->name, UCS_STAT_NAME_MAX, name, ap)
vsnprintf(buf, size, fmt, ap)
buf[size - 1] = '