Redis 通讯协议(RESP)

2022-04-05 16:23:02 浏览数 (1)

RESP 协议

Redis 基于 RESP (Redis Serialization Protocal)协议来完成客户端和服务端通讯的。RESP 本质是一种文本协议,实现简单、易于解析。如下表所示:

类型

协议描述

实例

网络层

客户端和服务端通过 tcp/流式套接字来进行通讯,为了 防止粘包 因此命令或数据均以 rn (CRLF) 结尾

okrn

请求

*<参数数量> CR LF<参数字节数量 > CR LF<参数的数据> CR LF<参数 N 的字节数量 >CR LF<参数 N 的数据> CR LF

*2rn3rngetrn$13rnusername:1234rn。见 callSendCommond -> redis AppendConnadnArgv -> redisFromatCommandArgv

简单字符串回复

第一个字节

okrn

错误回复

第一个字节-

-ERR unknown command 'sa' rn

整数回复

第一个字节:

:0rn

批量回复

第一个字节$

$6rnfoobarrn 空回复 $-1

多条批量回复

第一个字节*

5rn:1rn:2rn:3rn:4rn$6rnfoobarrn, 空回复 0rn

如果客户端和服务端在一台机器上。那么会对通讯协议进行优化,直接走本地回环

我们可以通过 tcpdump 命令来抓取客户端和服务端请求、响应的数据包, 命令如下:

代码语言:javascript复制
# linux
tcpdump -i lo part 6379 -Ann

# mac 
tcpdump -i lo0 port 6379 -Ann

我们以一条 `set msg100 1` 这条命令测试一下 ( 我本机是 mac 环境):

代码语言:javascript复制
# 客户端 A
127.0.0.1:6379> set msg100 1
OK

服务端抓包结果如下所示:

代码语言:javascript复制
➜  ~ sudo tcpdump -i lo0 port 6379 -Ann
Password:
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on lo0, link-type NULL (BSD loopback), capture size 262144 bytes
21:52:53.447885 IP 127.0.0.1.51645 > 127.0.0.1.6379: Flags [P.], seq 1564111974:1564112006, ack 169183468, win 6272, options [nop,nop,TS val 774447713 ecr 772455554], length 32: RESP "set" "msg100" "1"
E..T..@.@...............]:tf
........H.....
.)"a.
..*3
$3
set
$6
msg100
$1
1

21:52:53.447912 IP 127.0.0.1.6379 > 127.0.0.1.51645: Flags [.], ack 32, win 6376, options [nop,nop,TS val 774447713 ecr 774447713], length 0
E..4..@.@...............
...]:t......(.....
.)"a.)"a
21:52:53.528935 IP 127.0.0.1.6379 > 127.0.0.1.51645: Flags [P.], seq 1:6, ack 32, win 6376, options [nop,nop,TS val 774447793 ecr 774447713], length 5: RESP "OK"
E..9..@.@...............
...]:t......-.....
.)"..)"a OK

21:52:53.528966 IP 127.0.0.1.51645 > 127.0.0.1.6379: Flags [.], ack 6, win 6272, options [nop,nop,TS val 774447793 ecr 774447793], length 0
E..4..@.@...............]:t.
........(.....
.)"..)".

redis-cli 客户端效果:

客户端是对显示结果做了转化,在 redis-cli.c 文件中下面是它的部分源码

代码语言:javascript复制
static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
    sds out = sdsempty();
    switch (r->type) {
    case REDIS_REPLY_ERROR:
        out = sdscatprintf(out,"(error) %sn", r->str);
    break;
    case REDIS_REPLY_STATUS:
        out = sdscat(out,r->str);
        out = sdscat(out,"n");
    break;
    case REDIS_REPLY_INTEGER:
        out = sdscatprintf(out,"(integer) %lldn",r->integer);
    break;
    case REDIS_REPLY_DOUBLE:
        out = sdscatprintf(out,"(double) %sn",r->str);
    break;
    case REDIS_REPLY_STRING:
    case REDIS_REPLY_VERB:
        /* If you are producing output for the standard output we want
        * a more interesting output with quoted characters and so forth,
        * unless it's a verbatim string type. */
        if (r->type == REDIS_REPLY_STRING) {
            out = sdscatrepr(out,r->str,r->len);
            out = sdscat(out,"n");
        } else {
            out = sdscatlen(out,r->str,r->len);
            out = sdscat(out,"n");
        }
    break;
    case REDIS_REPLY_NIL:
        out = sdscat(out,"(nil)n");
    break;
    case REDIS_REPLY_BOOL:
        out = sdscat(out,r->integer ? "(true)n" : "(false)n");
    break;
    case REDIS_REPLY_ARRAY:
    case REDIS_REPLY_MAP:
    case REDIS_REPLY_SET:
    case REDIS_REPLY_PUSH:
        if (r->elements == 0) {
            if (r->type == REDIS_REPLY_ARRAY)
                out = sdscat(out,"(empty array)n");
            else if (r->type == REDIS_REPLY_MAP)
                out = sdscat(out,"(empty hash)n");
            else if (r->type == REDIS_REPLY_SET)
                out = sdscat(out,"(empty set)n");
            else if (r->type == REDIS_REPLY_PUSH)
                out = sdscat(out,"(empty push)n");
            else
                out = sdscat(out,"(empty aggregate type)n");
        } else {
            unsigned int i, idxlen = 0;
            char _prefixlen[16];
            char _prefixfmt[16];
            sds _prefix;
            sds tmp;

            /* Calculate chars needed to represent the largest index */
            i = r->elements;
            if (r->type == REDIS_REPLY_MAP) i /= 2;
            do {
                idxlen  ;
                i /= 10;
            } while(i);

            /* Prefix for nested multi bulks should grow with idxlen 2 spaces */
            memset(_prefixlen,' ',idxlen 2);
            _prefixlen[idxlen 2] = '';
            _prefix = sdscat(sdsnew(prefix),_prefixlen);

            /* Setup prefix format for every entry */
            char numsep;
            if (r->type == REDIS_REPLY_SET) numsep = '~';
            else if (r->type == REDIS_REPLY_MAP) numsep = '#';
            else numsep = ')';
            snprintf(_prefixfmt,sizeof(_prefixfmt),"%%s%%%ud%c ",idxlen,numsep);

            for (i = 0; i < r->elements; i  ) {
                unsigned int human_idx = (r->type == REDIS_REPLY_MAP) ?
                                         i/2 : i;
                human_idx  ; /* Make it 1-based. */

                /* Don't use the prefix for the first element, as the parent
                 * caller already prepended the index number. */
                out = sdscatprintf(out,_prefixfmt,i == 0 ? "" : prefix,human_idx);

                /* Format the multi bulk entry */
                tmp = cliFormatReplyTTY(r->element[i],_prefix);
                out = sdscatlen(out,tmp,sdslen(tmp));
                sdsfree(tmp);

                /* For maps, format the value as well. */
                if (r->type == REDIS_REPLY_MAP) {
                    i  ;
                    sdsrange(out,0,-2);
                    out = sdscat(out," => ");
                    tmp = cliFormatReplyTTY(r->element[i],_prefix);
                    out = sdscatlen(out,tmp,sdslen(tmp));
                    sdsfree(tmp);
                }
            }
            sdsfree(_prefix);
        }
    break;
    default:
        fprintf(stderr,"Unknown reply type: %dn", r->type);
        exit(1);
    }
    return out;
}

我们也可以使用 nc 命令来替代 redis-cli 命令行:

代码语言:javascript复制
➜  ~ sudo nc 127.0.0.1 6379
set a a
 OK
get a
$1
a

错误代码

  • Redis 常见的错误代码定义如下:
代码语言:javascript复制
#define REDIS_ERR -1
#define REDIS_OK 0

/* When an error occurs, the err flag in a context is set to hold the type of
 * error that occurred. REDIS_ERR_IO means there was an I/O error and you
 * should use the "errno" variable to find out what is wrong.
 * For other values, the "errstr" field will hold a description. */
#define REDIS_ERR_IO 1 /* Error in read or write */
#define REDIS_ERR_EOF 3 /* End of file */
#define REDIS_ERR_PROTOCOL 4 /* Protocol error */
#define REDIS_ERR_OOM 5 /* Out of memory */
#define REDIS_ERR_TIMEOUT 6 /* Timed out */
#define REDIS_ERR_OTHER 2 /* Everything else... */

#define REDIS_REPLY_STRING 1
#define REDIS_REPLY_ARRAY 2
#define REDIS_REPLY_INTEGER 3
#define REDIS_REPLY_NIL 4
#define REDIS_REPLY_STATUS 5
#define REDIS_REPLY_ERROR 6
#define REDIS_REPLY_DOUBLE 7
#define REDIS_REPLY_BOOL 8
#define REDIS_REPLY_MAP 9
#define REDIS_REPLY_SET 10
#define REDIS_REPLY_ATTR 11
#define REDIS_REPLY_PUSH 12
#define REDIS_REPLY_BIGNUM 13
#define REDIS_REPLY_VERB 14

/* Default max unused reader buffer. */
#define REDIS_READER_MAX_BUF (1024*16)

/* Default multi-bulk element limit */
#define REDIS_READER_MAX_ARRAY_ELEMENTS ((1LL<<32) - 1)
  • 字符串错误信息 - 共享对象
代码语言:javascript复制
void createSharedObjects(void) {
    int j;

    /* Shared command responses */
    shared.crlf = createObject(OBJ_STRING,sdsnew("rn"));
    shared.ok = createObject(OBJ_STRING,sdsnew(" OKrn"));
    shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0rnrn"));
    shared.czero = createObject(OBJ_STRING,sdsnew(":0rn"));
    shared.cone = createObject(OBJ_STRING,sdsnew(":1rn"));
    shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0rn"));
    shared.pong = createObject(OBJ_STRING,sdsnew(" PONGrn"));
    shared.queued = createObject(OBJ_STRING,sdsnew(" QUEUEDrn"));
    shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2rn$1rn0rn*0rn"));
    shared.space = createObject(OBJ_STRING,sdsnew(" "));
    shared.colon = createObject(OBJ_STRING,sdsnew(":"));
    shared.plus = createObject(OBJ_STRING,sdsnew(" "));

    /* Shared command error responses */
    shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
        "-WRONGTYPE Operation against a key holding the wrong kind of valuern"));
    shared.err = createObject(OBJ_STRING,sdsnew("-ERRrn"));
    shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
        "-ERR no such keyrn"));
    shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
        "-ERR syntax errorrn"));
    shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
        "-ERR source and destination objects are the samern"));
    shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
        "-ERR index out of rangern"));
    shared.noscripterr = createObject(OBJ_STRING,sdsnew(
        "-NOSCRIPT No matching script. Please use EVAL.rn"));
    shared.loadingerr = createObject(OBJ_STRING,sdsnew(
        "-LOADING Redis is loading the dataset in memoryrn"));
    shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
        "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.rn"));
    shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
        "-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.rn"));
    shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
        "-MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.rn"));
    shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
        "-READONLY You can't write against a read only replica.rn"));
    shared.noautherr = createObject(OBJ_STRING,sdsnew(
        "-NOAUTH Authentication required.rn"));
    shared.oomerr = createObject(OBJ_STRING,sdsnew(
        "-OOM command not allowed when used memory > 'maxmemory'.rn"));
    shared.execaborterr = createObject(OBJ_STRING,sdsnew(
        "-EXECABORT Transaction discarded because of previous errors.rn"));
    shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
        "-NOREPLICAS Not enough good replicas to write.rn"));
    shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
        "-BUSYKEY Target key name already exists.rn"));

    /* The shared NULL depends on the protocol version. */
    shared.null[0] = NULL;
    shared.null[1] = NULL;
    shared.null[2] = createObject(OBJ_STRING,sdsnew("$-1rn"));
    shared.null[3] = createObject(OBJ_STRING,sdsnew("_rn"));

    shared.nullarray[0] = NULL;
    shared.nullarray[1] = NULL;
    shared.nullarray[2] = createObject(OBJ_STRING,sdsnew("*-1rn"));
    shared.nullarray[3] = createObject(OBJ_STRING,sdsnew("_rn"));

    shared.emptymap[0] = NULL;
    shared.emptymap[1] = NULL;
    shared.emptymap[2] = createObject(OBJ_STRING,sdsnew("*0rn"));
    shared.emptymap[3] = createObject(OBJ_STRING,sdsnew("%0rn"));

    shared.emptyset[0] = NULL;
    shared.emptyset[1] = NULL;
    shared.emptyset[2] = createObject(OBJ_STRING,sdsnew("*0rn"));
    shared.emptyset[3] = createObject(OBJ_STRING,sdsnew("~0rn"));

    for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j  ) {
        char dictid_str[64];
        int dictid_len;

        dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
        shared.select[j] = createObject(OBJ_STRING,
            sdscatprintf(sdsempty(),
                "*2rn$6rnSELECTrn$%drn%srn",
                dictid_len, dictid_str));
    }
    shared.messagebulk = createStringObject("$7rnmessagern",13);
    shared.pmessagebulk = createStringObject("$8rnpmessagern",14);
    shared.subscribebulk = createStringObject("$9rnsubscribern",15);
    shared.unsubscribebulk = createStringObject("$11rnunsubscribern",18);
    shared.psubscribebulk = createStringObject("$10rnpsubscribern",17);
    shared.punsubscribebulk = createStringObject("$12rnpunsubscribern",19);

    /* Shared command names */
    shared.del = createStringObject("DEL",3);
    shared.unlink = createStringObject("UNLINK",6);
    shared.rpop = createStringObject("RPOP",4);
    shared.lpop = createStringObject("LPOP",4);
    shared.lpush = createStringObject("LPUSH",5);
    shared.rpoplpush = createStringObject("RPOPLPUSH",9);
    shared.lmove = createStringObject("LMOVE",5);
    shared.blmove = createStringObject("BLMOVE",6);
    shared.zpopmin = createStringObject("ZPOPMIN",7);
    shared.zpopmax = createStringObject("ZPOPMAX",7);
    shared.multi = createStringObject("MULTI",5);
    shared.exec = createStringObject("EXEC",4);
    shared.hset = createStringObject("HSET",4);
    shared.srem = createStringObject("SREM",4);
    shared.xgroup = createStringObject("XGROUP",6);
    shared.xclaim = createStringObject("XCLAIM",6);
    shared.script = createStringObject("SCRIPT",6);
    shared.replconf = createStringObject("REPLCONF",8);
    shared.pexpireat = createStringObject("PEXPIREAT",9);
    shared.pexpire = createStringObject("PEXPIRE",7);
    shared.persist = createStringObject("PERSIST",7);
    shared.set = createStringObject("SET",3);
    shared.eval = createStringObject("EVAL",4);

    /* Shared command argument */
    shared.left = createStringObject("left",4);
    shared.right = createStringObject("right",5);
    shared.pxat = createStringObject("PXAT", 4);
    shared.px = createStringObject("PX",2);
    shared.time = createStringObject("TIME",4);
    shared.retrycount = createStringObject("RETRYCOUNT",10);
    shared.force = createStringObject("FORCE",5);
    shared.justid = createStringObject("JUSTID",6);
    shared.lastid = createStringObject("LASTID",6);
    shared.default_username = createStringObject("default",7);
    shared.ping = createStringObject("ping",4);
    shared.setid = createStringObject("SETID",5);
    shared.keepttl = createStringObject("KEEPTTL",7);
    shared.load = createStringObject("LOAD",4);
    shared.createconsumer = createStringObject("CREATECONSUMER",14);
    shared.getack = createStringObject("GETACK",6);
    shared.special_asterick = createStringObject("*",1);
    shared.special_equals = createStringObject("=",1);
    shared.redacted = makeObjectShared(createStringObject("(redacted)",10));

    for (j = 0; j < OBJ_SHARED_INTEGERS; j  ) {
        shared.integers[j] =
            makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
        shared.integers[j]->encoding = OBJ_ENCODING_INT;
    }
    for (j = 0; j < OBJ_SHARED_BULKHDR_LEN; j  ) {
        shared.mbulkhdr[j] = createObject(OBJ_STRING,
            sdscatprintf(sdsempty(),"*%drn",j));
        shared.bulkhdr[j] = createObject(OBJ_STRING,
            sdscatprintf(sdsempty(),"$%drn",j));
    }
    /* The following two shared objects, minstring and maxstrings, are not
     * actually used for their value but as a special object meaning
     * respectively the minimum possible string and the maximum possible
     * string in string comparisons for the ZRANGEBYLEX command. */
    shared.minstring = sdsnew("minstring");
    shared.maxstring = sdsnew("maxstring");
}

命令对象

redis 命令是使用的是 redisCommand 数据结构来管理的。

代码语言:javascript复制
typedef void redisCommandProc(client *c);
// 函数指针类型,指向命令实现函数
typedef int redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
struct redisCommand {
    char *name;
    redisCommandProc *proc;
    // 限制命令的个数。-N 表示至少 N 个参数,包含命令本身
    int arity;
    // 字符串方式设置命令的属性之间运用 | 运算,程序内部自动解析,函数 populateCommandTable
    char *sflags;   /* Flags as string representation, one char per flag. */
    // 将 flags 字符串类型转换成整数,多个属性
    uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
    /* Use a function to determine keys arguments in a command line.
     * Used for Redis Cluster redirect. */
    redisGetKeysProc *getkeys_proc;
    /* What keys should be loaded in background when calling this command? */
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */
    long long microseconds, calls, rejected_calls, failed_calls;
    int id;     /* Command ID. This is a progressive ID starting from 0 that
                   is assigned at runtime, and is used in order to check
                   ACLs. A connection is able to execute a given command if
                   the user associated to the connection has this command
                   bit set in the bitmap of allowed commands. */
};

针对 sflag 标示,这里可以看看 《redis 设计与实现》

flag 记录的是 flag 值与 sflag 进行运算的结果,见 populateCommandTable 函数

代码语言:javascript复制
    for (int j = 0; j < argc; j  ) {
        char *flag = argv[j];
        if (!strcasecmp(flag,"write")) {
            c->flags |= CMD_WRITE|CMD_CATEGORY_WRITE;
        } else if (!strcasecmp(flag,"read-only")) {
            c->flags |= CMD_READONLY|CMD_CATEGORY_READ;
        } else if (!strcasecmp(flag,"use-memory")) {
            c->flags |= CMD_DENYOOM;
        } else if (!strcasecmp(flag,"admin")) {
            c->flags |= CMD_ADMIN|CMD_CATEGORY_ADMIN|CMD_CATEGORY_DANGEROUS;
        } else if (!strcasecmp(flag,"pub-sub")) {
            c->flags |= CMD_PUBSUB|CMD_CATEGORY_PUBSUB;
        } else if (!strcasecmp(flag,"no-script")) {
            c->flags |= CMD_NOSCRIPT;
        } else if (!strcasecmp(flag,"random")) {
            c->flags |= CMD_RANDOM;
        } else if (!strcasecmp(flag,"to-sort")) {
            c->flags |= CMD_SORT_FOR_SCRIPT;
        } else if (!strcasecmp(flag,"ok-loading")) {
            c->flags |= CMD_LOADING;
        } else if (!strcasecmp(flag,"ok-stale")) {
            c->flags |= CMD_STALE;
        } else if (!strcasecmp(flag,"no-monitor")) {
            c->flags |= CMD_SKIP_MONITOR;
        } else if (!strcasecmp(flag,"no-slowlog")) {
            c->flags |= CMD_SKIP_SLOWLOG;
        } else if (!strcasecmp(flag,"cluster-asking")) {
            c->flags |= CMD_ASKING;
        } else if (!strcasecmp(flag,"fast")) {
            c->flags |= CMD_FAST | CMD_CATEGORY_FAST;
        } else if (!strcasecmp(flag,"no-auth")) {
            c->flags |= CMD_NO_AUTH;
        } else if (!strcasecmp(flag,"may-replicate")) {
            c->flags |= CMD_MAY_REPLICATE;
        } else {
            /* Parse ACL categories here if the flag name starts with @. */
            uint64_t catflag;
            if (flag[0] == '@' &&
                (catflag = ACLGetCommandCategoryFlagByName(flag 1)) != 0)
            {
                c->flags |= catflag;
            } else {
                sdsfreesplitres(argv,argc);
                return C_ERR;
            }
        }
    }

具体命令比较多

代码语言:javascript复制
struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,
     "admin no-script",
     0,NULL,0,0,0,0,0,0},

    {"get",getCommand,2,
     "read-only fast @string",
     0,NULL,1,1,1,0,0,0},
}

以 set 为例子 {"set",setCommand,-3, "write use-memory @string", 0,NULL,1,1,1,0,0,0}

参考资料

  • 《Redis 设计与实现》黄健宏

0 人点赞