DAOS大块数据传输(RDMA单边读/写)流程要点
1. 以设置/获取池属性为例(dmg pool list -v)
2. 发送端把一段不连续的内存封装为sgl, 调用bulk_create分段注册好(crt_bulk_create(ctx, &sgl, CRT_BULK_RW, bulk))
3. 封装RPC发送给服务端(引擎), 参考(daos_rpc_send)
4. 引擎收到RPC后, 根据数据长度, 准备大块内存接收客户端的数据(crt_bulk_get_len, crt_bulk_create)
5. 执行大块数据传输(crt_bulk_transfer), 用RDMA的单边读语义, 将数据拉取(DMA)到引擎指定的数据块
6. 释放资源
源码流程分析
代码语言:javascript复制设置/获取池属性 -> src/tests/suite/daos_pool.c -> { "POOL7: set/get/list user-defined pool attributes (sync)" -> pool_attribute
daos_pool_list_attr(poh, NULL, &total_size, arg->async ? &ev : NULL) -> 查询池属性
dc_task_create(dc_pool_list_attr, NULL, ev, &task)
args->buf = buf
args->size = size -> 0
dc_task_schedule(task, true)
...
dc_pool_list_attr(tse_task_t *task)
pool_req_prepare(args->poh, POOL_ATTR_LIST, daos_task2ctx(task), &cb_args); -> 准备RPC请求
opcode = DAOS_RPC_OPCODE -> 反解析OPC
crt_req_create(crt_ctx, tgt_ep, opcode, req)
crt_bulk_create(daos_task2ctx(task), &sgl, CRT_BULK_RW, &in->pali_bulk);
crt_hg_bulk_create(&ctx->cc_hg_ctx, sgl, bulk_perm, bulk_hdl)
buf_sizes[i] = sgl->sg_iovs[i].iov_buf_len
buf_ptrs[i] = sgl->sg_iovs[i].iov_buf
hg_return_t HG_Bulk_create(hg_class_t *hg_class, hg_uint32_t count, void **buf_ptrs,const hg_size_t *buf_sizes, hg_uint8_t flags, hg_bulk_t *handle)
"Creating new bulk handle with %u segment(s)", count) -> 打印有多少分段
cb_args.pra_bulk = in->pali_bulk -> 设置bulk
return daos_rpc_send(cb_args.pra_rpc, task) -> 发送RPC到服务端
...
ds_pool_attr_list_handler(crt_rpc_t *rpc) -> 服务端收到RPC
ds_rsvc_list_attr(&svc->ps_rsvc, &tx, &svc->ps_user,in->pali_bulk, rpc, &out->palo_size) -> 拿到池属性列表参数上的bulk(客户端已注册), 服务端解释为远端bulk: remote_bulk
rc = crt_bulk_get_len(remote_bulk, &bulk_size)
d_sg_list_t sgl -> 将服务端属性设置到sgl上
rc = crt_bulk_create(rpc->cr_ctx, &sgl, CRT_BULK_RW, &local_bulk); -> 创建本地BULK, 放入sgl内容
attr_bulk_transfer(rpc, CRT_BULK_PUT, local_bulk, remote_bulk, 0, 0, bulk_size - iter_args.available) -> 服务端调用RDMA写操作将数据DMA给客户端
crt_bulk_transfer(&bulk_desc, bulk_cb, &eventual, NULL)
crt_bulk_free(local_bulk) -> 释放服务端的BULK
iv_op_ult(void *arg)
crt_iv_update_internal
crt_ivsync_rpc_issue
crt_bulk_create
dmg pool list -v
func (cmd *poolQueryCmd) Execute(_ []string)
C.daos_pool_query(cmd.cPoolHandle, rlPtr, &pinfo, nil, nil)
dc_pool_query(tse_task_t *task)
map_bulk_create(daos_task2ctx(task), &in->pqi_map_bulk, &map_buf, pool_buf_nr(pool->dp_map_sz)) -> 池创建bulk
d_iov_set(&iov, *buf, pool_buf_size((*buf)->pb_nr));
sgl.sg_nr = 1;
sgl.sg_nr_out = 0;
sgl.sg_iovs = &iov;
rc = crt_bulk_create(ctx, &sgl, CRT_BULK_RW, bulk);
return daos_rpc_send(rpc, task) 发送RPC
客户端
---------------------------
服务端
ds_pool_query_handler_v5 <- POOL_QUERY -> ds_pool_query_handler(rpc, 5)
ds_pool_query_handler(crt_rpc_t *rpc, int version)
pool_svc_lookup_leader
ds_rebuild_query
rdb_tx_begin(svc->ps_rsvc.s_db, svc->ps_rsvc.s_term, &tx)
daos_rpc_from_client(crt_rpc_t *rpc)
crt_req_src_rank_get(rpc, &srcrank)
pool_prop_read(&tx, svc, DAOS_PO_QUERY_PROP_GLOBAL_VERSION, &prop);
daos_prop_entry_get
rc = read_map_buf(&tx, &svc->ps_root, &map_buf, &map_version) -> 读取到map_buf(池映射)
rc = locate_map_buf(tx, kvs, &b, version)
size = pool_buf_size(b->pb_nr)
memcpy(*buf, b, size)
ds_pool_transfer_map_buf(map_buf, map_version, rpc, in->pqi_map_bulk, &out->pqo_map_buf_size)
crt_bulk_get_len(remote_bulk, &remote_bulk_size) -> remote_bulk_size = 4096, map_buf_size=128
d_iov_set(&map_iov, map_buf, map_buf_size) ->
map_sgl.sg_nr = 1;
map_sgl.sg_nr_out = 0;
map_sgl.sg_iovs = &map_iov;
crt_bulk_create(rpc->cr_ctx, &map_sgl, CRT_BULK_RO, &bulk) -> 用服务端的map_sgl数据, 服务端创建只读本地BULK
...
hg_bulk = (struct hg_bulk *) calloc(1, sizeof(*hg_bulk))
hg_bulk->desc.info.segment_count = count -> 1段
segments = hg_bulk->desc.segments.s
segments[i].base = (hg_ptr_t) bufs[i] -> 140039780584816
segments[i].len = lens[i] -> 128
hg_bulk->desc.info.len = lens[i]
hg_bulk_create_na_mem_descs(&hg_bulk->na_mem_descs, na_class,
segments, count, flags, (enum na_mem_type) attrs->mem_type,
attrs->device)
na_mem_handles = na_mem_descs->handles.s
hg_bulk_register(na_class, (void *) segments[i].base,
segments[i].len, flags, mem_type, device, &na_mem_handles[i],
&na_mem_serialize_sizes[i])
...
map_desc.bd_bulk_op = CRT_BULK_PUT -> 设置为服务端通过RDMA写操作(wr.opcode = IBV_WR_RDMA_WRITE), 将池map, DMA给客户端
map_desc.bd_remote_hdl = remote_bulk
map_desc.bd_local_hdl = bulk
crt_bulk_transfer(&map_desc, bulk_cb, &eventual, &map_opid) -> 传输BULK, 在: rc = bulk_cbinfo->bci_cb(&crt_bulk_cbinfo) 中执行回调
客户端发送RPC/BULK的阈值(19KB)
1. 在客户端更新对象(写/改对象)的任务函数中(dc_obj_update_task)
2. 如果所有SGL总长度达到19K(1k留给header), 或者对象是EC模式且不是对单个tgt下发的EC_SGL重组请求, 则需要使用BULK传输, 参考: if (sgls_size >= DAOS_BULK_LIMIT || (obj_is_ec(obj) && !obj_auxi->reasb_req.orr_single_tgt))
参考源码:
代码语言:javascript复制通过以下对象连接
.cpf_name =
daos_opc_t
dc_funcs[opc].task_func 客户端方法数组
dc_obj_update_task(tse_task_t *task) DAOS_OPC_OBJ_UPDATE 写 -> 客户端更新对象(写/改对象)的任务
obj_req_valid(task, args, DAOS_OBJ_RPC_UPDATE
obj_auxi = tse_task_stack_push(task, sizeof(*obj_auxi)) -> 将任务压栈
pushed_ptr = dtp->dtp_buf sizeof(dtp->dtp_buf) - dtp->dtp_stack_top
...
dc_io_epoch_set(epoch, opc)
tse_task_stack_pop -> 将任务从栈上弹出来
poped_ptr = dtp->dtp_buf sizeof(dtp->dtp_buf) - dtp->dtp_stack_top
dc_tx_attach(args->th, obj, DAOS_OBJ_RPC_UPDATE, task) 如果事务有效(hdl.cookie == 1), 则走dtx
return dc_obj_update(task, &epoch, map_ver, args, obj) -> 提交对象更新
obj_task_init(task, DAOS_OBJ_RPC_UPDATE, map_ver, args->th, &obj_auxi, obj)
obj_task_init_common(task, opc, map_ver, th, auxi, obj)
tse_task_stack_push
shard_task_list_init(obj_auxi)
obj_auxi->is_ec_obj = obj_is_ec(obj) -> 设置EC对象标志
tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0) -> 为任务注册对象完成回调, 弹出任务参数, 重试, 错误处理等
----------------------
obj_update_sgls_dup(obj_auxi, args) -> 用户可能提供 iov_len < iov_buf_len 的 sql,这可能会给内部处理带来一些麻烦,例如 crt_bulk_create/daos_iov_left() 总是使用 iov_buf_len。 对于这种情况,我们复制 sql 并使其 iov_buf_len = iov_len
obj_auxi->dkey_hash = obj_dkey2hash(obj->cob_md.omd_id, args->dkey) -> 比如为1
if (obj_is_ec(obj)) -> 如果是EC对象(对象类属性上的封装方法为 DAOS_RES_EC ), 则重新组装对象写请求
obj_rw_req_reassemb(obj, args, NULL, obj_auxi) -> 配置了EC的对象需要重新组装, 对象的读写请求
struct obj_reasb_req *reasb_req = &obj_auxi->reasb_req -> EC请求, 重新组装 obj 请求。 用户输入的 iod/sgl 可能需要在发送到服务器之前在客户端重新组装,例如:合并相邻的recx,或者对无序的recx进行排序并生成新的sgl与之匹配; 对于EC obj,将iod/recxs拆分到每个目标,生成新的sgl与之匹配,创建oiod/siod以指定每个shard/tgt的IO req
if (!obj_auxi->req_reasbed)
obj_reasb_req_init(&obj_auxi->reasb_req, obj, args->iods, args->nr) -> 创建reasb_req并设置iod的值,从输入iod中重用缓冲区,iod_type / iod_size分配为输入iod,iod_kcsum / iod_nr / iod_recx / iod_csums / iod_eprs数组将设置为0 / NULL
daos_recx_t -> 记录是任意长度的原子 blob,它总是作为一个整体来获取/更新。 记录的大小可能会随着时间的推移而改变。 记录由以下复合键唯一标识: - 分布键(又名 dkey)表示位于同一存储目标上的一组数组。 dkey 具有任意大小。 - 属性键(又名 akey)区分各个数组。 同样,akey 具有任意大小。 - 数组中的索引区分各个记录。 索引是一个范围从零到无穷大的整数。 一系列索引标识称为范围的连续记录集。 范围内的所有记录必须具有相同的大小。 记录范围是数组内相同大小的连续记录范围。 rx_idx 是该范围的第一个数组索引,rx_nr 是该范围覆盖的记录数
reasb_req->orr_oca = obj_get_oca(obj)
size_iod = roundup(sizeof(daos_iod_t) * iod_nr, 8)
...
obj_ec_req_reasb(obj, args->iods, obj_auxi->dkey_hash, args->sgls, reasb_req, args->nr, obj_auxi->opc == DAOS_OBJ_RPC_UPDATE);
for (i = 0; i < iod_nr; i )
obj_ec_singv_req_reasb
ec_recx_array->oer_k = oca->u.ec.e_k
ec_recx_array->oer_p = oca->u.ec.e_p
if (obj_ec_singv_one_tgt(iod->iod_size, sgl, oca))
obj_ec_fail_info_parity_get
obj_ec_singv_small_idx
obj_ec_set_parity_bitmaps
obj_ec_parity_tgt_nr
obj_ec_singv_cell_bytes
obj_io_desc_init
codec = codec_get(reasb_req, obj->cob_md.omd_id) -> isal支持
reasb_req->orr_codec = obj_ec_codec_get(daos_obj_id2class(oid))
daos_array_find(ecc_array, oc_ec_codec_nr, oc_id, &ecc_sort_ops)
daos_array_find(ecc_array, oc_ec_codec_nr, oc_id, &ecc_redun_sort_ops)
obj_ec_singv_encode(codec, oca, iod, sgl, ec_recx_array) -> DAOS-7539 EC:自定义 EC 单元大小 (#5832),用户可以通过池或容器属性指定 EC 单元大小,DAOS_PROP_PO_EC_CELL_SZ,设置池的默认 EC 单元大小,DAOS_PROP_CO_EC_CELL_SZ,设置容器的默认 EC 单元大小,如果是 EC 为池和容器都设置了单元格大小,然后容器的值会覆盖池的值。 此补丁将 EC 单元大小从属性应用到客户端 EC 堆栈、服务器 I/O 处理程序、数据迁移服务和 EC 聚合服务。 - EC单元大小应为4K的倍数且小于1MB,默认单元大小仍为1MB,后续应更改修补
obj_ec_pbufs_init(recxs, c_bytes)
obj_ec_recx_encode(codec, oca, iod, sgl, recxs) -> 编码满条带
for (i = 0; i < recx_nr; i )
obj_ec_stripe_encode(iod, sgl, iov_idx, iov_off, codec, oca, cell_bytes, parity_buf)
ec_encode_data(cell_bytes, k, p, codec->ec_gftbls, data, parity_bufs) -> ec编码, isa-l 中 ec_init_tables() 的用途: https://blog.csdn.net/choumin/article/details/126898021
d_sgl_init(r_sgl, sgl->sg_nr obj_ec_parity_tgt_nr(oca)) -> 重组sgl
d_iov_set(&r_sgl->sg_iovs[iov_nr idx]
obj_reasb_req_dump(reasb_req, sgl, oca, 0, iod_idx) -> 打印EC调试信息
obj_ec_recx_scan
obj_ec_recx_cell_nr
ec_partial_tgt_recx_nrs
ec_all_tgt_recx_nrs
obj_ec_recov_tgt_recx_nrs
obj_ec_recxs_init
obj_io_desc_init
obj_ec_riod_init
obj_ec_seg_sorter_init
obj_ec_pbufs_init
obj_ec_recx_reasb -> 为EC重组iod/sgl/recx, 输入iod, sgl, recx_array, 输出riod, rsgl, oiod
recx_with_full_stripe
ec_recov_recx_seg_add
ec_data_recx_add
ec_data_seg_add
ec_parity_recx_add
ec_parity_seg_add
obj_ec_seg_pack
obj_ec_encode
obj_ec_recx_encode -> 对全条带recx_array中的数据进行编码,结果奇偶校验存储在struct obj_ec_recx_array::oer_pbufs中
obj_ec_stripe_encode -> 编码一个完整的条带,结果奇偶校验缓冲区将被填满
obj_update_shards_get
obj_shards_2_fwtgts -> 根据分片查找转发的目标
req_tgts->ort_shard_tgts = req_tgts->ort_tgts_inline -> 分片目标数组,包含 (ort_grp_nr * ort_grp_size) 个目标。 如果#targets <= OBJ_TGT_INLINE_NR 那么它指向ort_tgts_inline。 在数组中,[0, ort_grp_size - 1] 表示第一组,[ort_grp_size, ort_grp_size * 2 - 1] 表示第二组,依此类推。 如果 (ort_srv_disp == 1),则在每个组中,第一个目标是领导分片,后面的 (ort_grp_size - 1) 目标是前向非领导分片。 现在只有一种情况 (ort_grp_nr > 1) 用于对象打孔,所有其他情况均为 (ort_grp_nr == 1)
obj_shard_tgts_query -> 分片目标查询
obj_csum_update
-------------------
obj_req_get_tgts 获取对象对应的目标
obj_dkey2grpmemb
obj_dkey2grpidx
pool_map_ver = pool_map_get_version(pool->dp_map)
grp_size = obj_get_grp_size(obj)
grp_idx = d_hash_jump(hash, obj->cob_shards_nr / grp_size) how hash generate? obj with pool
obj_shards_2_fwtgts
obj_shard_tgts_query 分片目标查询
obj_shard_open
dc_obj_shard_open
pool_map_find_target 二分查找
comp_sorter_find_target(sorter, id)
daos_array_find
array_bin_search
obj_shard2tgtid
*tgt_id = obj->cob_shards->do_shards[shard].do_target_id -> dc_obj_layout 客户端对象布局
obj_shard_close(obj_shard)
obj_auxi->flags |= ORF_CONTAIN_LEADER -> 要求转发给容器leader
obj_grp_leader_get
pl_select_leader obj_get_shard
array_bin_search 二分查找 daos_obj_classes
tse_task_register_comp_cb(task, obj_comp_cb, NULL, 0)
obj_csum_update(obj, args, obj_auxi)
obj_rw_bulk_prep(obj, args->iods, args->sgls, args->nr, true, obj_auxi->req_tgts.ort_srv_disp, task, obj_auxi) -> 准备读写大块数据
sgls_size = daos_sgls_packed_size(sgls, nr, NULL) -> 计算sgl大小, 内联提取需要将 sqls 缓冲区打包到 RPC 中,因此使用它来检查是否需要批量传输(bulk)
if (sgls_size >= DAOS_BULK_LIMIT || (obj_is_ec(obj) && !obj_auxi->reasb_req.orr_single_tgt)) -> 如果所有SGL总长度达到19K(1k留给header), 或者对象是EC模式且不是对单个tgt下发的EC_SGL重组请求, 则需要使用BULK传输
obj_bulk_prep(sgls, nr, bulk_bind, bulk_perm, task, &obj_auxi->bulks)
crt_bulk_create(daos_task2ctx(task), &sgls[i], bulk_perm, &bulks[i]) -> 针对每个SGL, 创建1个BULK, 得到bulk的指针数组和bulk个数
crt_bulk_bind -> 将批量句柄绑定到本地上下文,将本地上下文的源地址与批量句柄关联起来。 它可用于将批量句柄从一台服务器转发/共享到另一台服务器,在这种情况下,批量句柄的原始地址可以即时序列化/反序列化。 示例用法:客户端向服务器 A 发送嵌入批量句柄的 RPC 请求,服务器 A 将客户端批量句柄转发到另一台服务器 B。对于该用法,客户端应调用此 API 将批量句柄与其本地上下文绑定 因此,当服务器B收到服务器A转发的反序列化的批量句柄时,服务器B就可以知道客户端的原始地址来进行批量传输。 用户应注意,绑定批量句柄会增加序列化的额外开销,因此建议谨慎使用。 在源上绑定批量句柄时,应使用 crt_bulk_bind_transfer(),因为源地址信息嵌入在句柄中
obj_req_fanout(obj, obj_auxi, dkey_hash, map_ver, epoch, shard_rw_prep, dc_obj_shard_rw, task) -> 扇出 shard_io_cb = io_cb = dc_obj_shard_rw
晓兵(ssbandjl)
博客: https://logread.cn | https://blog.csdn.net/ssbandjl | https://cloud.tencent.com/developer/user/5060293/articles
DAOS汇总: https://cloud.tencent.com/developer/article/2344030
晓兵技术杂谈(系列)
https://cloud.tencent.com/developer/user/5060293/video
欢迎对DAOS, SPDK, RDMA等高性能技术感兴趣的朋友加入DAOS技术交流(群)