Nvidia-NCCL-GPU集合通信接口简介_源码笔记

2023-11-24 23:03:06 浏览数 (1)

术语

nccl: NVIDIA Collective Communications Library (NCCL) 集合通信接口

常用链接

NCCL开发者文档: https://developer.nvidia.com/nccl

用户文档: https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/index.html

项目主页: https://github.com/NVIDIA/nccl

介绍

针对 GPU 间通信的优化原语

NCCL(发音为“Nickel”)是 GPU 标准通信例程的独立库,可实现全归约、全收集、归约、广播、归约分散以及任何基于发送/接收的通信模式。它经过优化,可在使用 PCIe、NVLink、NVswitch 的平台以及使用 InfiniBand Verbs 或 TCP/IP 套接字的网络上实现高带宽。NCCL 支持在单个节点或跨多个节点安装任意数量的 GPU,并且可用于单进程或多进程(例如 MPI)应用程序

源码阅读笔记

代码语言:javascript复制
GIT仓库: https://github.com/ssbandjl/nccl.git

总结:
NCCL库用原生的RDMA的VERBS接口, 极简(相比UCX和Libfabric,或其他verbs通信库)的实现了单边读写和双边发送/接收通信语义, 以此来满足GPU间高性能和高带宽通信, 满足AI大模型训练中的数据传输

RDMA调用栈-QP建连:
ncclResult_t ncclIbCreateQp
    qpInitAttr.qp_type = IBV_QPT_RC
    qpInitAttr.cap.max_send_wr = 2*MAX_REQUESTS -> 128
    qpInitAttr.cap.max_recv_wr = MAX_REQUESTS -> 64
    wrap_ibv_modify_qp


RDMA单边写-调用栈/带立即数
ncclResult_t ncclIbIsend
    NCCLCHECK(ncclIbMultiSend(comm, slot))
ncclResult_t ncclIbMultiSend
    for (int r=0; r<nreqs; r  )
        struct ibv_sge* sge = comm->sges r
        wr->opcode = IBV_WR_RDMA_WRITE
        wr->wr.rdma.remote_addr = slots[r].addr
        wr->next = wr 1
        wr_id  = (reqs[r] - comm->verbs.reqs) << (r*8)
    immData |= (reqs[r]->send.size ? 1 : 0) << r -> 将数据大小写入立即数。 多个发送时,只写0或1作为size来表示是否有数据发送或接收
    lastWr->wr_id = wr_id
    lastWr->opcode = IBV_WR_RDMA_WRITE_WITH_IMM
    lastWr->send_flags = IBV_SEND_SIGNALED
    NCCLCHECK(wrap_ibv_post_send(comm->qps[comm->qpIndex], comm->wrs, &bad_wr))
        qp->context->ops.post_send(qp, wr, bad_wr) -> ibv_post_send


main: 主函数
net_type: 网络类型
ncclNet_t* ncclNets[3] = { nullptr, &ncclNetIb, &ncclNetSocket };
src/transport/net_ib.cc

IB网卡实现的接口API
ncclNet_t ncclNetIb = {
  "IB",
  ncclIbInit,
  ncclIbDevices,
  ncclIbGetProperties,
  ncclIbListen,
  ncclIbConnect,
  ncclIbAccept,
  ncclIbRegMr,
  ncclIbRegMrDmaBuf,
  ncclIbDeregMr,
  ncclIbIsend,
  ncclIbIrecv,
  ncclIbIflush,
  ncclIbTest,
  ncclIbCloseSend,
  ncclIbCloseRecv,
  ncclIbCloseListen
};



网络支持的接口:
typedef struct {
  // Name of the network (mainly for logs)
  const char* name;
  // Initialize the network.
  ncclResult_t (*init)(ncclDebugLogger_t logFunction);
  // Return the number of adapters.
  ncclResult_t (*devices)(int* ndev);
  // Get various device properties.
  ncclResult_t (*getProperties)(int dev, ncclNetProperties_v6_t* props);
  // Create a receiving object and provide a handle to connect to it. The
  // handle can be up to NCCL_NET_HANDLE_MAXSIZE bytes and will be exchanged
  // between ranks to create a connection.
  ncclResult_t (*listen)(int dev, void* handle, void** listenComm);
  // Connect to a handle and return a sending comm object for that peer.
  // This call must not block for the connection to be established, and instead
  // should return successfully with sendComm == NULL with the expectation that
  // it will be called again until sendComm != NULL.
  ncclResult_t (*connect)(int dev, void* handle, void** sendComm);
  // Finalize connection establishment after remote peer has called connect.
  // This call must not block for the connection to be established, and instead
  // should return successfully with recvComm == NULL with the expectation that
  // it will be called again until recvComm != NULL.
  ncclResult_t (*accept)(void* listenComm, void** recvComm);
  // Register/Deregister memory. Comm can be either a sendComm or a recvComm.
  // Type is either NCCL_PTR_HOST or NCCL_PTR_CUDA.
  ncclResult_t (*regMr)(void* comm, void* data, int size, int type, void** mhandle);
  /* DMA-BUF support */
  ncclResult_t (*regMrDmaBuf)(void* comm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle);
  ncclResult_t (*deregMr)(void* comm, void* mhandle);
  // Asynchronous send to a peer.
  // May return request == NULL if the call cannot be performed (or would block)
  ncclResult_t (*isend)(void* sendComm, void* data, int size, int tag, void* mhandle, void** request);
  // Asynchronous recv from a peer.
  // May return request == NULL if the call cannot be performed (or would block)
  ncclResult_t (*irecv)(void* recvComm, int n, void** data, int* sizes, int* tags, void** mhandles, void** request);
  // Perform a flush/fence to make sure all data received with NCCL_PTR_CUDA is
  // visible to the GPU
  ncclResult_t (*iflush)(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request);
  // Test whether a request is complete. If size is not NULL, it returns the
  // number of bytes sent/received.
  ncclResult_t (*test)(void* request, int* done, int* sizes);
  // Close and free send/recv comm objects
  ncclResult_t (*closeSend)(void* sendComm);
  ncclResult_t (*closeRecv)(void* recvComm);
  ncclResult_t (*closeListen)(void* listenComm);
} ncclNet_v6_t;


ncclIbRegMr -> 内存注册
    ncclResult_t ncclIbRegMrDmaBuf
        flags = IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ
        NCCLCHECKGOTO(wrap_ibv_reg_mr_iova2(&mr, verbs->pd, (void*)addr, pages*pageSize, addr, flags), res, returning)
        NCCLCHECKGOTO(wrap_ibv_reg_mr(&mr, verbs->pd, (void*)addr, pages*pageSize, flags), res, returning)


下刷:
ncclResult_t ncclIbIflush(void* recvComm, int n, void** data, int* sizes, void** mhandles, void** request) {
    NCCLCHECK(wrap_ibv_poll_cq(r->verbs->cq, 4, wcs, &wrDone))



ncclNet_v4_as_v6_init


collNetRegMr

对外接口: src/include/coll_net.h
// Translation to external API
static const char* collNetName(struct ncclComm* comm) { return comm->ncclCollNet->name; }
static ncclResult_t collNetDevices(struct ncclComm* comm, int* ndev) { NCCLCHECK(comm->ncclCollNet->devices(ndev)); return ncclSuccess; }
static ncclResult_t collNetGetProperties(struct ncclComm* comm, int dev, ncclNetProperties_t* props) { NCCLCHECK(comm->ncclCollNet->getProperties(dev, props)); return ncclSuccess; }
static ncclResult_t collNetListen(struct ncclComm* comm, int dev, void* handle, void** listenComm) { NCCLCHECK(comm->ncclCollNet->listen(dev, handle, listenComm)); return ncclSuccess; }
static ncclResult_t collNetConnect(struct ncclComm* comm, void* handles[], int nranks, int rank, void* listenComm, void** collComm) { NCCLCHECK(comm->ncclCollNet->connect(handles, nranks, rank, listenComm, collComm)); return ncclSuccess; }
static ncclResult_t collNetReduceSupport(struct ncclComm* comm, ncclDataType_t dataType, ncclRedOp_t redOp, int* supported) { NCCLCHECK(comm->ncclCollNet->reduceSupport(dataType, redOp, supported)); return ncclSuccess; }
static ncclResult_t collNetRegMr(struct ncclComm* comm, void* collComm, void* data, int size, int type, void** mhandle) { NCCLCHECK(comm->ncclCollNet->regMr(collComm, data, size, type, mhandle)); return ncclSuccess; }
/* DMA-BUF support */
static ncclResult_t collNetRegMrDmaBuf(struct ncclComm* comm, void* collComm, void* data, int size, int type, uint64_t offset, int fd, void** mhandle) { NCCLCHECK(comm->ncclCollNet->regMrDmaBuf(collComm, data, size, type, offset, fd, mhandle)); return ncclSuccess; }
static ncclResult_t collNetDeregMr(struct ncclComm* comm, void* collComm, void* mhandle) { NCCLCHECK(comm->ncclCollNet->deregMr(collComm, mhandle)); return ncclSuccess; }
static ncclResult_t collNetIallreduce(struct ncclComm* comm, void* collComm, void* sendData, void* recvData, int count, ncclDataType_t dataType, ncclRedOp_t redOp, void* sendMhandle, void* recvMhandle,  void** request) {
  NCCLCHECK(comm->ncclCollNet->iallreduce(collComm, sendData, recvData, count, dataType, redOp, sendMhandle, recvMhandle, request)); return ncclSuccess; }
static ncclResult_t collNetIflush(struct ncclComm* comm, void* collComm, void* data, int size, void* mhandle, void** request) { NCCLCHECK(comm->ncclCollNet->iflush(collComm, data, size, mhandle, request)); return ncclSuccess; }
static ncclResult_t collNetTest(struct ncclComm* comm, void* request, int* done, int* size) { NCCLCHECK(comm->ncclCollNet->test(request, done, size)); return ncclSuccess; }
static ncclResult_t collNetCloseColl(struct ncclComm* comm, void* collComm) { NCCLCHECK(comm->ncclCollNet->closeColl(collComm)); return ncclSuccess; }
static ncclResult_t collNetCloseListen(struct ncclComm* comm, void* listenComm) { NCCLCHECK(comm->ncclCollNet->closeListen(listenComm)); return ncclSuccess; }

static int collNetSupport(struct ncclComm* comm) { return comm->ncclCollNet != nullptr ? 1 : 0; }





struct ncclTransport collNetTransport = {
  "COL",
  canConnect,
  { sendSetup, sendConnect, sendFree, NULL, sendProxySetup, sendProxyConnect, sendProxyFree, sendProxyProgress },
  { recvSetup, recvConnect, recvFree, NULL, recvProxySetup, recvProxyConnect, recvProxyFree, recvProxyProgress }
};



对齐和常用数学库:
#define DIVUP(x, y) 
    (((x) (y)-1)/(y))

#define ROUNDUP(x, y) 
    (DIVUP((x), (y))*(y))

#define ALIGN_POWER(x, y) 
    ((x) > (y) ? ROUNDUP(x, y) : ((y)/((y)/(x))))

#define ALIGN_SIZE(size, align) 
  size = ((size   (align) - 1) / (align)) * (align);

#if !__CUDA_ARCH__
  #ifndef __host__
    #define __host__
  #endif
  #ifndef __device__
    #define __device__
  #endif
#endif

template<typename X, typename Y, typename Z = decltype(X() Y())>
__host__ __device__ constexpr Z divUp(X x, Y y) {
  return (x y-1)/y;
}

template<typename X, typename Y, typename Z = decltype(X() Y())>
__host__ __device__ constexpr Z roundUp(X x, Y y) {
  return (x y-1) - (x y-1)%y;
}

// assumes second argument is a power of 2
template<typename X, typename Z = decltype(X() int())>
__host__ __device__ constexpr Z alignUp(X x, int a) {
  return (x a-1) & Z(-a);
}



公共状态:
enum ncclIbCommState {
  ncclIbCommStateStart = 0,
  ncclIbCommStateConnect = 1,
  ncclIbCommStateAccept = 3,
  ncclIbCommStateSend = 4,
  ncclIbCommStateRecv = 5,
  ncclIbCommStateConnecting = 6,
  ncclIbCommStateConnected = 7,
  ncclIbCommStatePendingReady = 8,
};



ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm)
    if (stage->state == ncclIbCommStateSend)       goto ib_send;
    NCCLCHECK(ncclIbInitVerbs(dev, ctx, &comm->verbs))



初始化
init()
编译时加载:
ncclResult_t wrap_ibv_symbols(void) {
  pthread_once(&initOnceControl,
               [](){ initResult = buildIbvSymbols(&ibvSymbols); });
  return initResult;
}
ncclResult_t buildIbvSymbols(struct ncclIbvSymbols* ibvSymbols)
    ibvhandle=dlopen("libibverbs.so", RTLD_NOW)


ibv接口, verbs, 包装, 符号表
LOAD_SYM(ibvhandle, "ibv_get_device_list", ibvSymbols->ibv_internal_get_device_list);
LOAD_SYM(ibvhandle, "ibv_free_device_list", ibvSymbols->ibv_internal_free_device_list);
LOAD_SYM(ibvhandle, "ibv_get_device_name", ibvSymbols->ibv_internal_get_device_name);
LOAD_SYM(ibvhandle, "ibv_open_device", ibvSymbols->ibv_internal_open_device);
LOAD_SYM(ibvhandle, "ibv_close_device", ibvSymbols->ibv_internal_close_device);
LOAD_SYM(ibvhandle, "ibv_get_async_event", ibvSymbols->ibv_internal_get_async_event);
LOAD_SYM(ibvhandle, "ibv_ack_async_event", ibvSymbols->ibv_internal_ack_async_event);
LOAD_SYM(ibvhandle, "ibv_query_device", ibvSymbols->ibv_internal_query_device);
LOAD_SYM(ibvhandle, "ibv_query_port", ibvSymbols->ibv_internal_query_port);
LOAD_SYM(ibvhandle, "ibv_query_gid", ibvSymbols->ibv_internal_query_gid);
LOAD_SYM(ibvhandle, "ibv_query_qp", ibvSymbols->ibv_internal_query_qp);
LOAD_SYM(ibvhandle, "ibv_alloc_pd", ibvSymbols->ibv_internal_alloc_pd);
LOAD_SYM(ibvhandle, "ibv_dealloc_pd", ibvSymbols->ibv_internal_dealloc_pd);
LOAD_SYM(ibvhandle, "ibv_reg_mr", ibvSymbols->ibv_internal_reg_mr);
// Cherry-pick the ibv_reg_mr_iova2 API from IBVERBS 1.8
LOAD_SYM_VERSION(ibvhandle, "ibv_reg_mr_iova2", ibvSymbols->ibv_internal_reg_mr_iova2, "IBVERBS_1.8");
// Cherry-pick the ibv_reg_dmabuf_mr API from IBVERBS 1.12
LOAD_SYM_VERSION(ibvhandle, "ibv_reg_dmabuf_mr", ibvSymbols->ibv_internal_reg_dmabuf_mr, "IBVERBS_1.12");
LOAD_SYM(ibvhandle, "ibv_dereg_mr", ibvSymbols->ibv_internal_dereg_mr);
LOAD_SYM(ibvhandle, "ibv_create_cq", ibvSymbols->ibv_internal_create_cq);
LOAD_SYM(ibvhandle, "ibv_destroy_cq", ibvSymbols->ibv_internal_destroy_cq);
LOAD_SYM(ibvhandle, "ibv_create_qp", ibvSymbols->ibv_internal_create_qp);
LOAD_SYM(ibvhandle, "ibv_modify_qp", ibvSymbols->ibv_internal_modify_qp);
LOAD_SYM(ibvhandle, "ibv_destroy_qp", ibvSymbols->ibv_internal_destroy_qp);
LOAD_SYM(ibvhandle, "ibv_fork_init", ibvSymbols->ibv_internal_fork_init); <- wrap_ibv_fork_init
LOAD_SYM(ibvhandle, "ibv_event_type_str", ibvSymbols->ibv_internal_event_type_str);

RDMA网卡带宽:
static int ibvSpeeds[] = {
  2500,  /* SDR */
  5000,  /* DDR */
  10000, /* QDR */
  10000, /* QDR */
  14000, /* FDR */
  25000, /* EDR */
  50000, /* HDR */
  100000 /* NDR */ };


计算带宽的公式:
ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width);


IB异步主线程:
static void* ncclIbAsyncThreadMain(void* args)
    wrap_ibv_event_type_str(&str, event.event_type)) -> 事件转字符串
    wrap_ibv_ack_async_event(&event)

总结

NCCL库用原生的RDMA的VERBS接口,极简(相比UCX和Libfabric,或其他verbs通信库)的实现了单边读写和双边发送/接收通信语义, 以此来满足GPU间高性能和高带宽通信, 满足AI大模型训练中的数据传输

晓兵(ssbandjl)

博客: https://cloud.tencent.com/developer/user/5060293/articles | https://logread.cn | https://blog.csdn.net/ssbandjl

DAOS汇总: https://cloud.tencent.com/developer/article/2344030

晓兵技术杂谈(系列: DAOS/RDMA/UCX/Mercury/Libfabric/分布式存储等)

视频: https://cloud.tencent.com/developer/user/5060293/video

博客: https://cloud.tencent.com/developer/column/99669

欢迎对DAOS, SPDK, RDMA, 协程等高性能技术感兴趣的朋友加入DAOS技术交流(群)

0 人点赞