Redis网络连接层的过去、现状和展望

2022-12-03 12:29:20 浏览数 (3)

Redis网络连接层

Redis取自Remote Dictionary Server,顾名思义,Redis是运行在网络环境之上的。Redis目前支持3种网络连接类型:

  • TCP:默认监听TCP 6379端口,接收网络请求,提供服务。
  • Unix Socket:可以用作测试,以及使用Unix Socket做配置变更等。
  • TLS:使用TLS加密的网络连接,可以防止网络链路上被监听、劫持,更加安全。不过代价就是性能有损失。

Redis通过这3种网络连接类型的支持,满足了绝大多数的用户需求,成为了目前最流行的KV存储数据库。

过去

截至2022-Q3,Redis最新的版本是7.0.5。在当前版本中,使用了“传统”的网络连接管理方式。在redis/src/server.c中监听端口:

代码语言:javascript复制
    /* Open the TCP listening socket for the user commands. */
    if (server.port != 0 &&
        listenToPort(server.port,&server.ipfd) == C_ERR) {
        /* Note: the following log text is matched by the test suite. */
        serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
        exit(1);
    }
    if (server.tls_port != 0 &&
        listenToPort(server.tls_port,&server.tlsfd) == C_ERR) {
        /* Note: the following log text is matched by the test suite. */
        serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port);
        exit(1);
    }

    /* Open the listening Unix domain socket. */
    if (server.unixsocket != NULL) {
        unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,
            (mode_t)server.unixsocketperm, server.tcp_backlog);
        if (server.sofd == ANET_ERR) {
            serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL,server.sofd);
        anetCloexec(server.sofd);
    }

以及设置监听文件描述符的处理函数,开始提供网络服务:

代码语言:javascript复制
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
        serverPanic("Unrecoverable error creating TCP socket accept handler.");
    }
    if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
        serverPanic("Unrecoverable error creating TLS socket accept handler.");
    }
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");

从代码中,我们可以清晰地看到这几种连接类型的初始化过程和配置参数等。但是它的代价是:

  • 不能扩展:如果想要支持一个新的连接类型,那么势必要修改代码。事实上,需要修改的代码还包含另外的几处。
  • 宏的引用:因为TLS不是操作系统默认支持,需要依赖编译选项控制,那么存在TLS是否支持两种情况,就需要使用宏控制。在Redis的代码中存在多处#ifdef USE_OPENSSL的使用。
  • 代码的可维护性降低:在server.c中,不得不引用、调用TCP、TLS和Unix Socket相关的代码逻辑。

现状

连接层框架

截至2022-Q3,在Redis最新的开发分支上,支持了连接层框架(connection layer framework),它长成这样:

代码语言:javascript复制
                           uplayer
                              |
                       connection layer
                         /    |     
                       TCP   Unix   TLS

connection layer负责抽象连接类型,它要求每种连接类型具有如下的方法:

代码语言:javascript复制
typedef struct ConnectionType {
    /* connection type */
    const char *(*get_type)(struct connection *conn);
    /* connection type initialize & finalize & configure */
    void (*init)(void); /* auto-call during register */
    void (*cleanup)(void);
    int (*configure)(void *priv, int reconfigure);

    /* ae & accept & listen & error & address handler */
    void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
    aeFileProc *accept_handler;
    int (*addr)(connection *conn, char *ip, size_t ip_len, int *port, int remote);
    int (*listen)(connListener *listener);

    /* create/close connection */
    connection* (*conn_create)(void);
    connection* (*conn_create_accepted)(int fd, void *priv);
    void (*close)(struct connection *conn);
    /* connect & accept */
    int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
    int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
    int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
    /* IO */
    int (*write)(struct connection *conn, const void *data, size_t data_len);
    int (*writev)(struct connection *conn, const struct iovec *iov, int iovcnt);
    int (*read)(struct connection *conn, void *buf, size_t buf_len);
    int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
    int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
    const char *(*get_last_error)(struct connection *conn);
    ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);

    /* pending data */
    int (*has_pending_data)(void);
    int (*process_pending_data)(void);
    /* TLS specified methods */
    sds (*get_peer_cert)(struct connection *conn);
} ConnectionType;

上层(uplayer)通过connection layer访问Redis的各个连接类型,则可以忽略连接类型的具体实现,仅仅需要调用各个方法即可。

同时,connection layer还负责管理各个连接类型,例如一个新连接类型在使用之前,需要向Redis进行注册,参考redis/src/connection.c:

代码语言:javascript复制
int connTypeRegister(ConnectionType *ct) {
    const char *typename = ct->get_type(NULL);
    ConnectionType *tmpct;
    int type;

    /* find an empty slot to store the new connection type */
    for (type = 0; type < CONN_TYPE_MAX; type  ) {
        tmpct = connTypes[type];
        if (!tmpct)
            break;
        /* ignore case, we really don't care "tls"/"TLS" */
        if (!strcasecmp(typename, tmpct->get_type(NULL))) {
            serverLog(LL_WARNING, "Connection types %s already registered", typename);
            return C_ERR;
        }
    }

    serverLog(LL_VERBOSE, "Connection type %s registered", typename);
    connTypes[type] = ct;

    if (ct->init) {
        ct->init();
    }

    return C_OK;
}

基于此,在redis/src/server.c监听各个连接类型则变成:

代码语言:javascript复制
    /* create all the configured listener, and add handler to start to accept */
    int listen_fds = 0; 
    for (int j = 0; j < CONN_TYPE_MAX; j  ) {
        listener = &server.listeners[j];
        if (listener->ct == NULL)
            continue;

        if (connListen(listener) == C_ERR) {
            serverLog(LL_WARNING, "Failed listening on port %u (%s), aborting.", listener->port, listener->ct->get_type(NULL));
            exit(1);
        }    

        if (createSocketAcceptHandler(listener, connAcceptHandler(listener->ct)) != C_OK)
            serverPanic("Unrecoverable error creating %s listener accept handler.", listener->ct->get_type(NULL));

       listen_fds  = listener->count;
    }       

动态加载连接类型

在过去的版本中,需要在Redis编译时决定是否支持TLS。得益于新的连接层框架,Redis支持:

  • 不支持TLS。
  • 静态支持TLS:make BUILD_TLS=yes,代价是redis-server始终需要链接libssl和libcrypto,尽管可能不运行。
  • 动态支持TLS:make BUILD_TLS=module即可把TLS支持编译成为redis-tls.so。如果希望使用TLS,通过redis-server --loadmodule src/redis-tls.so即可动态加载TLS,达到了“运行时加载”的效果。

同时,在代码结构上,也带来了一定的收益:几乎移除掉#ifdef USE_OPENSSL,仅在redis/src/tls.c中使用,同时重载ConnectionType:

代码语言:javascript复制
static ConnectionType CT_TLS = {
    /* connection type */
    .get_type = connTLSGetType,
    /* connection type initialize & finalize & configure */
    .init = tlsInit,
    .cleanup = tlsCleanup,
    .configure = tlsConfigure,
    /* ae & accept & listen & error & address handler */
    .ae_handler = tlsEventHandler,
    .accept_handler = tlsAcceptHandler,
    .addr = connTLSAddr,
    .listen = connTLSListen,
    /* create/close connection */
    .conn_create = connCreateTLS,
    .conn_create_accepted = connCreateAcceptedTLS,
    .close = connTLSClose,
    /* connect & accept */
    .connect = connTLSConnect,
    .blocking_connect = connTLSBlockingConnect,
    .accept = connTLSAccept,
    /* IO */
    .read = connTLSRead,
    .write = connTLSWrite,
    .writev = connTLSWritev,
    .set_write_handler = connTLSSetWriteHandler,
    .set_read_handler = connTLSSetReadHandler,
    .get_last_error = connTLSGetLastError,
    .sync_write = connTLSSyncWrite,
    .sync_read = connTLSSyncRead,
    .sync_readline = connTLSSyncReadLine,

    /* pending data */
    .has_pending_data = tlsHasPendingData,
    .process_pending_data = tlsProcessPendingData,

    /* TLS specified methods */
    .get_peer_cert = connTLSGetPeerCert,
};

以及在Redis Module的入口函数中,执行connTypeRegister向Redis注册新的连接类型:

代码语言:javascript复制
int RedisModule_OnLoad(void *ctx, RedisModuleString **argv, int argc) {
    UNUSED(argv);
    UNUSED(argc);

    /* Connection modules must be part of the same build as redis. */
    if (strcmp(REDIS_BUILD_ID_RAW, redisBuildIdRaw())) {
        serverLog(LL_NOTICE, "Connection type %s was not built together with the redis-server used.", CONN_TYPE_TLS);
        return REDISMODULE_ERR;
    }

    if (RedisModule_Init(ctx,"tls",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR)
        return REDISMODULE_ERR;
    /* Connection modules is available only bootup. */
    if ((RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_SERVER_STARTUP) == 0) {
        serverLog(LL_NOTICE, "Connection type %s can be loaded only during bootup", CONN_TYPE_TLS);
        return REDISMODULE_ERR;
    }

    RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD);

    if(connTypeRegister(&CT_TLS) != C_OK)
        return REDISMODULE_ERR;

    return REDISMODULE_OK;
}

通过Redis Module机制,以及连接层的抽象和框架扩展能力,让Redis的连接类型支持更加易用、可扩展。

重写的Unix Socket连接类型

尽管Unix Socket和TCP是完全不同的连接类型,但是二者具有很大的相似性:基于一个FD(文件描述符)即可操作;支持read、write、writev等IO操作。于是Redis在代码中谨慎地判断TCP/Unix Socket,最大程度上复用了TCP的函数。

基于新的连接类型框框架,把Unix Socket支持从TCP中剥离出来,让代码拥有更好的维护性,参考redis/src/unix.c:

代码语言:javascript复制
/* ==========================================================================
 * unix.c - unix socket connection implementation
 * --------------------------------------------------------------------------
 * Copyright (C) 2022  zhenwei pi
 *
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to permit
 * persons to whom the Software is furnished to do so, subject to the
 * following conditions:
 *
 * The above copyright notice and this permission notice shall be included
 * in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
 * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
 * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 * USE OR OTHER DEALINGS IN THE SOFTWARE.
 * ==========================================================================
 */
#include "server.h"
#include "connection.h"

static ConnectionType CT_Unix;

static const char *connUnixGetType(connection *conn) {
    UNUSED(conn);


    return CONN_TYPE_UNIX;
}

static void connUnixEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) {
    connectionTypeTcp()->ae_handler(el, fd, clientData, mask);
}

static int connUnixAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) {
    return connectionTypeTcp()->addr(conn, ip, ip_len, port, remote);
}

static int connUnixListen(connListener *listener) {
    int fd;
    mode_t *perm = (mode_t *)listener->priv;

    if (listener->bindaddr_count == 0)
        return C_OK;

    /* currently listener->bindaddr_count is always 1, we still use a loop here in case Redis supports multi Unix socket in the future */
    for (int j = 0; j < listener->bindaddr_count; j  ) {
        char *addr = listener->bindaddr[j];

        unlink(addr); /* don't care if this fails */
        fd = anetUnixServer(server.neterr, addr, *perm, server.tcp_backlog);
        if (fd == ANET_ERR) {
            serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL, fd);
        anetCloexec(fd);
        listener->fd[listener->count  ] = fd;
    }

    return C_OK;
}

static connection *connCreateUnix(void) {
    connection *conn = zcalloc(sizeof(connection));
    conn->type = &CT_Unix;
    conn->fd = -1;

    return conn;
}

static connection *connCreateAcceptedUnix(int fd, void *priv) {
    UNUSED(priv);
    connection *conn = connCreateUnix();
    conn->fd = fd;
    conn->state = CONN_STATE_ACCEPTING;
    return conn;
}

static void connUnixAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cfd, max = MAX_ACCEPTS_PER_CALL;
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetUnixAccept(server.neterr, fd);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
        acceptCommonHandler(connCreateAcceptedUnix(cfd, NULL),CLIENT_UNIX_SOCKET,NULL);
    }
}

static void connUnixClose(connection *conn) {
    connectionTypeTcp()->close(conn);
}

static int connUnixAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
    return connectionTypeTcp()->accept(conn, accept_handler);
}

static int connUnixWrite(connection *conn, const void *data, size_t data_len) {
    return connectionTypeTcp()->write(conn, data, data_len);
}

static int connUnixWritev(connection *conn, const struct iovec *iov, int iovcnt) {
    return connectionTypeTcp()->writev(conn, iov, iovcnt);
}

static int connUnixRead(connection *conn, void *buf, size_t buf_len) {
    return connectionTypeTcp()->read(conn, buf, buf_len);
}

static int connUnixSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
    return connectionTypeTcp()->set_write_handler(conn, func, barrier);
}

static int connUnixSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    return connectionTypeTcp()->set_read_handler(conn, func);
}

static const char *connUnixGetLastError(connection *conn) {
    return strerror(conn->last_errno);
}

static ssize_t connUnixSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) {
    return syncWrite(conn->fd, ptr, size, timeout);
}

static ssize_t connUnixSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) {
    return syncRead(conn->fd, ptr, size, timeout);
}

static ssize_t connUnixSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) {
    return syncReadLine(conn->fd, ptr, size, timeout);
}

static ConnectionType CT_Unix = {
    /* connection type */
    .get_type = connUnixGetType,

    /* connection type initialize & finalize & configure */
    .init = NULL,
    .cleanup = NULL,
    .configure = NULL,

    /* ae & accept & listen & error & address handler */
    .ae_handler = connUnixEventHandler,
    .accept_handler = connUnixAcceptHandler,
    .addr = connUnixAddr,
    .listen = connUnixListen,

    /* create/close connection */
    .conn_create = connCreateUnix,
    .conn_create_accepted = connCreateAcceptedUnix,
    .close = connUnixClose,
    /* connect & accept */
    .connect = NULL,
    .blocking_connect = NULL,
    .accept = connUnixAccept,

    /* IO */
    .write = connUnixWrite,
    .writev = connUnixWritev,
    .read = connUnixRead,
    .set_write_handler = connUnixSetWriteHandler,
    .set_read_handler = connUnixSetReadHandler,
    .get_last_error = connUnixGetLastError,
    .sync_write = connUnixSyncWrite,
    .sync_read = connUnixSyncRead,
    .sync_readline = connUnixSyncReadLine,

    /* pending data */
    .has_pending_data = NULL,
    .process_pending_data = NULL,
};

int RedisRegisterConnectionTypeUnix()
{
    return connTypeRegister(&CT_Unix);
}

由于Unix Socket实现较为简单,且复用了大量的TCP连接代码,unix.c中仅使用了少量的代码实现,从中依然可以窥探一个连接类型具有的基本属性:

  • 重载ConnectionType连接类型。
  • 连接类型变量和重载函数为static类型,对外不做任何暴露。
  • 向连接层注册。
  • 事实上,也可以把Unix Socket支持编译成为一个动态链接库,以loadmodule的方式动态加载。Redis的Maintainer Oran认为Unix Socket是一个基础的连接类型,不需要额外的链接和宏控制,因此始终使用静态编译支持。

展望

得益于Redis连接层框架和Module机制,向Redis中增加一个新的连接类型变得更加容易。RDMA是一种高性能的网络技术,iWARP和RoCE v2也让数据中心的以太网络支持了RDMA,近年来也变得更加流行。因此,是不是可以让Redis跑在RDMA上呢?在测试中,在1KB的KV情况下,Redis Over RDMA技术让Redis单核性能达到~450K QPS,大约是相同环境下的TCP性能的2.5倍(~180K QPS)。目前Redis Over RDMA的Pull Request正在社区接受Review,也欢迎提建议、捉BUG。

1 人点赞