jedis实现分布式锁底层通信与协议

2022-06-02 13:55:03 浏览数 (1)

通过此篇文章可以了解Redis的底层通信,Redis的协议,以及自己手写与服务器通信.

在分布式锁的实现上, 基于Redis的实现是其中一种.

而具体的实现依赖包又有两个

代码语言:javascript复制
<dependency>
<groupId>redis.clients</groupId>
   <artifactId>jedis</artifactId>
   <version>3.0.1</version>
</dependency>

<dependency>
<groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.10.1</version>
</dependency>

本篇文章我们就讲解第一种Jedis.

基于Jedis又有两种实现

代码语言:javascript复制
// 单机
Jedis(String host, int port, int connectionTimeout, int soTimeout)

// 集群
JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig)

不管是单机还是集群,它们的底层和服务器之间的通信都是基于java.net.Socket

比如说我们通过Jedis(String host, int port, int connectionTimeout, int soTimeout)构造了一个客户端,连接单机的服务器.

代码语言:javascript复制
public Jedis(final String host, final int port, final int connectionTimeout, final int soTimeout) {
   // 调用父类BinaryJedis
   super(host, port, connectionTimeout, soTimeout);
}
代码语言:javascript复制
public BinaryJedis(final String host, final int port, final int connectionTimeout,
     final int soTimeout) {
   // 创建一个Client,它并没有连接服务器,只是先保存了host,port. 在Client类内部有个Socket属性.
   client = new Client(host, port);
   client.setConnectionTimeout(connectionTimeout);
   client.setSoTimeout(soTimeout);
}

调用首次调用setnx向服务器发送命令时,会连接服务器

代码语言:javascript复制
public Long setnx(final String key, final String value) {
   checkIsInMultiOrPipeline();
   // 调用上面构造好的client的setnx方法
   client.setnx(key, value);
   return client.getIntegerReply();
}

最后会跟进到如下代码

代码语言:javascript复制
protected Connection sendCommand(final Command cmd, final byte[]... args) {
   try {
       // 连接服务器
       connect();
       // 发送指令
       Protocol.sendCommand(outputStream, cmd, args);
       pipelinedCommands  ;
       return this;
  } catch (JedisConnectionException ex) {
       // ...
  }
}

跟进到connect()方法

代码语言:javascript复制
public void connect() {
   // 判断socket是否已连接,如果没有连接服务器则连接服务器
   if (!isConnected()) {
       try {
           socket = new Socket();
           socket.setReuseAddress(true);
           socket.setKeepAlive(true);
           socket.setTcpNoDelay(true);
           socket.setSoLinger(true, 0);
           socket.connect(new InetSocketAddress(host, port), connectionTimeout);
           socket.setSoTimeout(soTimeout);
           // 输出流
           outputStream = new RedisOutputStream(socket.getOutputStream());
           // 输入流
           inputStream = new RedisInputStream(socket.getInputStream());
      } catch (IOException ex) {
           broken = true;
           throw new JedisConnectionException(ex);
      }
  }
}

这就是最熟悉的通过Socket和服务器进行通信,还有输入输出流.

连接好服务器之后,接下来就是发送命令给服务器了.

跟进到发送命令的代码

代码语言:javascript复制
private static void sendCommand(final RedisOutputStream os, final byte[] command,
     final byte[]... args) {
   // 通过输出流向服务器发送数据
   try {
       os.write(ASTERISK_BYTE);
       os.writeIntCrLf(args.length   1);
       os.write(DOLLAR_BYTE);
       os.writeIntCrLf(command.length);
       os.write(command);
       os.writeCrLf();

       for (final byte[] arg : args) {
           os.write(DOLLAR_BYTE);
           os.writeIntCrLf(arg.length);
           os.write(arg);
           os.writeCrLf();
      }
  } catch (IOException e) {
       throw new JedisConnectionException(e);
  }
}

通过输出流,将命令发送给服务器.

下面我们将这个方法里面的常量替换一下,把一些不必要的代码删除,再看下这个方法

代码语言:javascript复制
// #1
os.write('*');
os.write(args.length   1);
os.write("rn");

// #2
os.write('$');
os.write(command.length);
os.write("rn");
os.write(command);
os.write("rn");

// #3
for (final byte[] arg : args) {
   os.write('$');
   os.write(arg.length);
   os.write("rn");
   os.write(arg);
   os.write("rn");
}

在说这段代码之前,我们要说下Redis的协议.

互联网的通信是基于协议的,我们熟悉的TCP/IP协议,Dubbo通信的dubbo协议,Zookeeper的zookeeper协议,RocketMQ通信的自身应用层协议.没有协议,那么客户端和服务器就不能通信,彼此'听不懂'对方在说什么.

那么Redis客户端和服务器之间要想彼此知道对方说的什么,那么它们之间也有通过协议通信,这就是Redis协议.

具体协议如下

代码语言:javascript复制
*<number of arguments> CR LF
$<number of bytes of argument 1> CR LF
<argument data> CR LF
...
$<number of bytes of argument N> CR LF
<argument data> CR LF

比如我们要向服务器发送SET mykey myvalue这个命令,那么转换成协议之后,具体的内容如下

代码语言:javascript复制
*3
$3
SET
$5
mykey
$7
myvalue

并不是我不把它们写成一行,而是在它们彼此之间有'rn',也就是回车换行. 我们简单介绍下它

代码语言:javascript复制
*3

*这个符号是固定的,那么后面这个3是什么意思呢,3表示后面有3个内容(或者说命令由3部分组成),细心的读者也看到了,整个命令中有3个符号,每个符号代表一个内容.

代码语言:javascript复制
$3
SET

$符号是固定的,后面的3表示后面有3个字符,因为SET就是3个字符组成的

代码语言:javascript复制
$5
mykey

$符号是固定的,后面的5表示后面有5个字符,因为mykey就是5个字符组成的

代码语言:javascript复制
$7
myvalue

$符号是固定的,后面的7表示后面有7个字符,因为myvalue就是7个字符组成的

协议的编解码也是一个话题. Redis的协议是基于长度的,通过长度就可以准确的知道,命令的开始在哪里,结束又在哪里. 基于长度的协议有很多,比如Dubbo或者RocketMQ的协议,它们将数据发送给对方之后,对方就是通过基于长度的解码器,将数据解码出来.

相信读者朋友应该明白了Redis的协议.那么我们只要通过Socket的输出流将这些协议内容发送给服务器就可以了,服务器基于协议,就能'读懂'我们发送给它的命令是什么了.

我们再回到上面的那段代码.

代码语言:javascript复制
// #1
os.write('*');
os.write(args.length   1);
os.write("rn");

// #2
os.write('$');
os.write(command.length);
os.write("rn");
os.write(command);
os.write("rn");

// #3
for (final byte[] arg : args) {
   os.write('$');
   os.write(arg.length);
   os.write("rn");
   os.write(arg);
   os.write("rn");
}

相信这个时候,你再来看这段代码应该就明白了. 就是平铺直叙的将协议'翻译'成代码.

所以说,当我们需要和服务器通信的时候,也未必是必须依赖Redis的依赖包,我们完成可以自己通过Socket与服务器直接通信. 比如下面这段简单的代码,就可以直接和Redis通信了,当然它很简单.这里只是提供给你一个思路.

代码语言:javascript复制
import java.io.IOException;
import java.net.Socket;

public static void connectRedis(String key, String value) throws IOException {

   Socket client = new Socket(host, port);

   // 执行 set key value命令
   StringBuilder command = new StringBuilder();

   String number = "*3"   CRLF;
   command.append(number);

   String cmd = "$3"   CRLF   "SET"   CRLF;
   command.append(cmd);

   cmd = "$"   key.getBytes().length   CRLF   key   CRLF;
   command.append(cmd);

   cmd = "$"   value.getBytes().length   CRLF   value   CRLF;
   command.append(cmd);

   // 向服务器发送命令
   client.getOutputStream().write(command.toString().getBytes());

   // 接收服务器响应
   byte[] response = new byte[1024];
   client.getInputStream().read(response);
   System.out.println(new String(response, 0, response.length));

}

在开篇我们也讲到,Redis分布式锁的实现有jedis和redisson两种. jedis的底层通信是直接基于Socket, 而redisson的底层与服务器通信是基于Netty.

0 人点赞