术语
nccl: NVIDIA Collective Communications Library (NCCL) 集合通信接口
常用链接
NCCL开发者文档: https://developer.nvidia.com/nccl
用户文档: https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/index.html
项目主页: https://github.com/NVIDIA/nccl
介绍
针对 GPU 间通信的优化原语
NCCL(发音为“Nickel”)是 GPU 标准通信例程的独立库,可实现全归约、全收集、归约、广播、归约分散以及任何基于发送/接收的通信模式。它经过优化,可在使用 PCIe、NVLink、NVswitch 的平台以及使用 InfiniBand Verbs 或 TCP/IP 套接字的网络上实现高带宽。NCCL 支持在单个节点或跨多个节点安装任意数量的 GPU,并且可用于单进程或多进程(例如 MPI)应用程序
源码阅读笔记
代码语言:javascript复制GIT仓库: https://github.com/ssbandjl/nccl.git
总结:
NCCL库用原生的RDMA的VERBS接口, 极简(相比UCX和Libfabric,或其他verbs通信库)的实现了单边读写和双边发送/接收通信语义, 以此来满足GPU间高性能和高带宽通信, 满足AI大模型训练中的数据传输
RDMA调用栈-QP建连:
ncclResult_t ncclIbCreateQp
qpInitAttr.qp_type = IBV_QPT_RC
qpInitAttr.cap.max_send_wr = 2*MAX_REQUESTS -> 128
qpInitAttr.cap.max_recv_wr = MAX_REQUESTS -> 64
wrap_ibv_modify_qp
RDMA单边写-调用栈/带立即数
ncclResult_t ncclIbIsend
NCCLCHECK(ncclIbMultiSend(comm, slot))
ncclResult_t ncclIbMultiSend
for (int r=0; r<nreqs; r )
struct ibv_sge* sge = comm->sges r
wr->opcode = IBV_WR_RDMA_WRITE
wr->wr.rdma.remote_addr = slots[r].addr
wr->next = wr 1
wr_id = (reqs[r] - comm->verbs.reqs) << (r*8)
immData |= (reqs[r]->send.size ? 1 : 0) << r -> 将数据大小写入立即数。 多个发送时,只写0或1作为size来表示是否有数据发送或接收
lastWr->wr_id = wr_id
lastWr->opcode = IBV_WR_RDMA_WRITE_WITH_IMM
lastWr->send_flags = IBV_SEND_SIGNALED
NCCLCHECK(wrap_ibv_post_send(comm->qps[comm->qpIndex], comm->wrs, &bad_wr))
qp->context->ops.post_send(qp, wr, bad_wr) -> ibv_post_send
main: 主函数
net_type: 网络类型
ncclNet_t* ncclNets[3] = { nullptr, &ncclNetIb, &ncclNetSocket };
src/transport/net_ib.cc
IB网卡实现的接口API
ncclNet_t ncclNetIb = {
"IB",
ncclIbInit,
ncclIbDevices,
ncclIbGetProperties,
ncclIbListen,
ncclIbConnect,
ncclIbAccept,
ncclIbRegMr,
ncclIbRegMrDmaBuf,
ncclIbDeregMr,
ncclIbIsend,
ncclIbIrecv,
ncclIbIflush,
ncclIbTest,
ncclIbCloseSend,
ncclIbCloseRecv,
ncclIbCloseListen
};
网络支持的接口:
typedef struct {
// Name of the network (mainly for logs)
const char* name;
// Initialize the network.
ncclResult_t (*init)(ncclDebugLogger_t logFunction);
// Return the number of adapters.
ncclResult_t (*devices)(int* ndev);
// Get various device properties.
ncclResult_t (*getProperties)(int dev, ncclNetProperties_v6_t* props);
// Create a receiving object and provide a handle to connect to it. The
// handle can be up to NCCL_NET_HANDLE_MAXSIZE bytes and will be exchanged
// between ranks to create a connection.
ncclResult_t (*listen)(int dev, void* handle, void** listenComm);
// Connect to a handle and return a sending comm object for that peer.
// This call must not block for the connection to be established, and instead
// should return successfully with sendComm == NULL with the expectation that
// it will be called again until sendComm != NULL.
ncclResult_t (*connect)(int dev, void* handle, void** sendComm);
// Finalize connection establishment after remote peer has called connect.
// This call must not block for the connection to be established, and instead
// should return successfully with recvComm == NULL with the expectation that
// it will be called again until recvComm != NULL.
ncclResult_t (*accept)(void* listenComm, void** recvComm);
// Register/Deregister memory. Comm can be either a sendComm or a recvComm.
// Type is either NCCL_PTR_HOST or NCCL_PTR_CUDA.
ncclResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle);
/* DMA-BUF support */
ncclResult_t (*regMrDmaBuf)(void* comm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle);
ncclResult_t (*deregMr)(void* comm, void* mhandle);
// Asynchronous send to a peer.
// May return request == NULL if the call cannot be performed (or would block)
ncclResult_t (*isend)(void* sendComm, void* data, int size, int tag, void* mhandle, void** request);
// Asynchronous recv from a peer.
// May return request == NULL if the call cannot be performed (or would block)
ncclResult_t (*irecv)(void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request);
// Perform a flush/fence to make sure all data received with NCCL_PTR_CUDA is
// visible to the GPU
ncclResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request);
// Test whether a request is complete. If size is not NULL, it returns the
// number of bytes sent/received.
ncclResult_t (*test)(void* request, int* done, int* sizes);
// Close and free send/recv comm objects
ncclResult_t (*closeSend)(void* sendComm);
ncclResult_t (*closeRecv)(void* recvComm);
ncclResult_t (*closeListen)(void* listenComm);
} ncclNet_v6_t;
ncclIbRegMr -> 内存注册
ncclResult_t ncclIbRegMrDmaBuf
flags = IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ
NCCLCHECKGOTO(wrap_ibv_reg_mr_iova2(&mr, verbs->pd, (void*)addr, pages*pageSize, addr, flags), res, returning)
NCCLCHECKGOTO(wrap_ibv_reg_mr(&mr, verbs->pd, (void*)addr, pages*pageSize, flags), res, returning)
下刷:
ncclResult_t ncclIbIflush(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request) {
NCCLCHECK(wrap_ibv_poll_cq(r->verbs->cq, 4, wcs, &wrDone))
ncclNet_v4_as_v6_init
collNetRegMr
对外接口: src/include/coll_net.h
// Translation to external API
static const char* collNetName(struct ncclComm* comm) { return comm->ncclCollNet->name; }
static ncclResult_t collNetDevices(struct ncclComm* comm, int* ndev) { NCCLCHECK(comm->ncclCollNet->devices(ndev)); return ncclSuccess; }
static ncclResult_t collNetGetProperties(struct ncclComm* comm, int dev, ncclNetProperties_t* props) { NCCLCHECK(comm->ncclCollNet->getProperties(dev, props)); return ncclSuccess; }
static ncclResult_t collNetListen(struct ncclComm* comm, int dev, void* handle, void** listenComm) { NCCLCHECK(comm->ncclCollNet->listen(dev, handle, listenComm)); return ncclSuccess; }
static ncclResult_t collNetConnect(struct ncclComm* comm, void* handles[], int nranks, int rank, void* listenComm, void** collComm) { NCCLCHECK(comm->ncclCollNet->connect(handles, nranks, rank, listenComm, collComm)); return ncclSuccess; }
static ncclResult_t collNetReduceSupport(struct ncclComm* comm, ncclDataType_t dataType, ncclRedOp_t redOp, int* supported) { NCCLCHECK(comm->ncclCollNet->reduceSupport(dataType, redOp, supported)); return ncclSuccess; }
static ncclResult_t collNetRegMr(struct ncclComm* comm, void* collComm, void* data, int size, int type, void** mhandle) { NCCLCHECK(comm->ncclCollNet->regMr(collComm, data, size, type, mhandle)); return ncclSuccess; }
/* DMA-BUF support */
static ncclResult_t collNetRegMrDmaBuf(struct ncclComm* comm, void* collComm, void* data, int size, int type, uint64_t offset, int fd, void** mhandle) { NCCLCHECK(comm->ncclCollNet->regMrDmaBuf(collComm, data, size, type, offset, fd, mhandle)); return ncclSuccess; }
static ncclResult_t collNetDeregMr(struct ncclComm* comm, void* collComm, void* mhandle) { NCCLCHECK(comm->ncclCollNet->deregMr(collComm, mhandle)); return ncclSuccess; }
static ncclResult_t collNetIallreduce(struct ncclComm* comm, void* collComm, void* sendData, void* recvData, int count, ncclDataType_t dataType, ncclRedOp_t redOp, void* sendMhandle, void* recvMhandle, void** request) {
NCCLCHECK(comm->ncclCollNet->iallreduce(collComm, sendData, recvData, count, dataType, redOp, sendMhandle, recvMhandle, request)); return ncclSuccess; }
static ncclResult_t collNetIflush(struct ncclComm* comm, void* collComm, void* data, int size, void* mhandle, void** request) { NCCLCHECK(comm->ncclCollNet->iflush(collComm, data, size, mhandle, request)); return ncclSuccess; }
static ncclResult_t collNetTest(struct ncclComm* comm, void* request, int* done, int* size) { NCCLCHECK(comm->ncclCollNet->test(request, done, size)); return ncclSuccess; }
static ncclResult_t collNetCloseColl(struct ncclComm* comm, void* collComm) { NCCLCHECK(comm->ncclCollNet->closeColl(collComm)); return ncclSuccess; }
static ncclResult_t collNetCloseListen(struct ncclComm* comm, void* listenComm) { NCCLCHECK(comm->ncclCollNet->closeListen(listenComm)); return ncclSuccess; }
static int collNetSupport(struct ncclComm* comm) { return comm->ncclCollNet != nullptr ? 1 : 0; }
struct ncclTransport collNetTransport = {
"COL",
canConnect,
{ sendSetup, sendConnect, sendFree, NULL, sendProxySetup, sendProxyConnect, sendProxyFree, sendProxyProgress },
{ recvSetup, recvConnect, recvFree, NULL, recvProxySetup, recvProxyConnect, recvProxyFree, recvProxyProgress }
};
对齐和常用数学库:
#define DIVUP(x, y)
(((x) (y)-1)/(y))
#define ROUNDUP(x, y)
(DIVUP((x), (y))*(y))
#define ALIGN_POWER(x, y)
((x) > (y) ? ROUNDUP(x, y) : ((y)/((y)/(x))))
#define ALIGN_SIZE(size, align)
size = ((size (align) - 1) / (align)) * (align);
#if !__CUDA_ARCH__
#ifndef __host__
#define __host__
#endif
#ifndef __device__
#define __device__
#endif
#endif
template<typename X, typename Y, typename Z = decltype(X() Y())>
__host__ __device__ constexpr Z divUp(X x, Y y) {
return (x y-1)/y;
}
template<typename X, typename Y, typename Z = decltype(X() Y())>
__host__ __device__ constexpr Z roundUp(X x, Y y) {
return (x y-1) - (x y-1)%y;
}
// assumes second argument is a power of 2
template<typename X, typename Z = decltype(X() int())>
__host__ __device__ constexpr Z alignUp(X x, int a) {
return (x a-1) & Z(-a);
}
公共状态:
enum ncclIbCommState {
ncclIbCommStateStart = 0,
ncclIbCommStateConnect = 1,
ncclIbCommStateAccept = 3,
ncclIbCommStateSend = 4,
ncclIbCommStateRecv = 5,
ncclIbCommStateConnecting = 6,
ncclIbCommStateConnected = 7,
ncclIbCommStatePendingReady = 8,
};
ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm)
if (stage->state == ncclIbCommStateSend) goto ib_send;
NCCLCHECK(ncclIbInitVerbs(dev, ctx, &comm->verbs))
初始化
init()
编译时加载:
ncclResult_t wrap_ibv_symbols(void) {
pthread_once(&initOnceControl,
[](){ initResult = buildIbvSymbols(&ibvSymbols); });
return initResult;
}
ncclResult_t buildIbvSymbols(struct ncclIbvSymbols* ibvSymbols)
ibvhandle=dlopen("libibverbs.so", RTLD_NOW)
ibv接口, verbs, 包装, 符号表
LOAD_SYM(ibvhandle, "ibv_get_device_list", ibvSymbols->ibv_internal_get_device_list);
LOAD_SYM(ibvhandle, "ibv_free_device_list", ibvSymbols->ibv_internal_free_device_list);
LOAD_SYM(ibvhandle, "ibv_get_device_name", ibvSymbols->ibv_internal_get_device_name);
LOAD_SYM(ibvhandle, "ibv_open_device", ibvSymbols->ibv_internal_open_device);
LOAD_SYM(ibvhandle, "ibv_close_device", ibvSymbols->ibv_internal_close_device);
LOAD_SYM(ibvhandle, "ibv_get_async_event", ibvSymbols->ibv_internal_get_async_event);
LOAD_SYM(ibvhandle, "ibv_ack_async_event", ibvSymbols->ibv_internal_ack_async_event);
LOAD_SYM(ibvhandle, "ibv_query_device", ibvSymbols->ibv_internal_query_device);
LOAD_SYM(ibvhandle, "ibv_query_port", ibvSymbols->ibv_internal_query_port);
LOAD_SYM(ibvhandle, "ibv_query_gid", ibvSymbols->ibv_internal_query_gid);
LOAD_SYM(ibvhandle, "ibv_query_qp", ibvSymbols->ibv_internal_query_qp);
LOAD_SYM(ibvhandle, "ibv_alloc_pd", ibvSymbols->ibv_internal_alloc_pd);
LOAD_SYM(ibvhandle, "ibv_dealloc_pd", ibvSymbols->ibv_internal_dealloc_pd);
LOAD_SYM(ibvhandle, "ibv_reg_mr", ibvSymbols->ibv_internal_reg_mr);
// Cherry-pick the ibv_reg_mr_iova2 API from IBVERBS 1.8
LOAD_SYM_VERSION(ibvhandle, "ibv_reg_mr_iova2", ibvSymbols->ibv_internal_reg_mr_iova2, "IBVERBS_1.8");
// Cherry-pick the ibv_reg_dmabuf_mr API from IBVERBS 1.12
LOAD_SYM_VERSION(ibvhandle, "ibv_reg_dmabuf_mr", ibvSymbols->ibv_internal_reg_dmabuf_mr, "IBVERBS_1.12");
LOAD_SYM(ibvhandle, "ibv_dereg_mr", ibvSymbols->ibv_internal_dereg_mr);
LOAD_SYM(ibvhandle, "ibv_create_cq", ibvSymbols->ibv_internal_create_cq);
LOAD_SYM(ibvhandle, "ibv_destroy_cq", ibvSymbols->ibv_internal_destroy_cq);
LOAD_SYM(ibvhandle, "ibv_create_qp", ibvSymbols->ibv_internal_create_qp);
LOAD_SYM(ibvhandle, "ibv_modify_qp", ibvSymbols->ibv_internal_modify_qp);
LOAD_SYM(ibvhandle, "ibv_destroy_qp", ibvSymbols->ibv_internal_destroy_qp);
LOAD_SYM(ibvhandle, "ibv_fork_init", ibvSymbols->ibv_internal_fork_init); <- wrap_ibv_fork_init
LOAD_SYM(ibvhandle, "ibv_event_type_str", ibvSymbols->ibv_internal_event_type_str);
RDMA网卡带宽:
static int ibvSpeeds[] = {
2500, /* SDR */
5000, /* DDR */
10000, /* QDR */
10000, /* QDR */
14000, /* FDR */
25000, /* EDR */
50000, /* HDR */
100000 /* NDR */ };
计算带宽的公式:
ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width);
IB异步主线程:
static void* ncclIbAsyncThreadMain(void* args)
wrap_ibv_event_type_str(&str, event.event_type)) -> 事件转字符串
wrap_ibv_ack_async_event(&event)
总结
NCCL库用原生的RDMA的VERBS接口,极简(相比UCX和Libfabric,或其他verbs通信库)的实现了单边读写和双边发送/接收通信语义, 以此来满足GPU间高性能和高带宽通信, 满足AI大模型训练中的数据传输
晓兵(ssbandjl)
博客: https://cloud.tencent.com/developer/user/5060293/articles | https://logread.cn | https://blog.csdn.net/ssbandjl
DAOS汇总: https://cloud.tencent.com/developer/article/2344030
晓兵技术杂谈(系列: DAOS/RDMA/UCX/Mercury/Libfabric/分布式存储等)
视频: https://cloud.tencent.com/developer/user/5060293/video
博客: https://cloud.tencent.com/developer/column/99669
欢迎对DAOS, SPDK, RDMA, 协程等高性能技术感兴趣的朋友加入DAOS技术交流(群)