Netty在Redis客户端中的应用

2022-06-02 13:54:06 浏览数 (1)

在我们日常使用Redis实现分布式锁中,依赖如下

代码语言:javascript复制
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.10.1</version>
</dependency>

在使用Redisson作为客户端,它需要与服务端进行通信,那么它的底层通信使用的是Netty.

在启动Redisson客户端时,底层Netty就已经与服务端建立好了通信(通道Channel).

简单写了一个示例代码

代码语言:javascript复制
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.util.concurrent.TimeUnit;

public class RedisClient {

   public static void main(String[] args) {

       Config config = new Config();
       config.useSingleServer().setAddress("redis://127.0.0.1:6379");

       // 单机模式
       RedissonClient redissonClient = Redisson.create(config);
       RLock redLock = redissonClient.getLock("computerLock");// 获取锁实例

       try {
           boolean isLock = redLock.tryLock(500, 1000, TimeUnit.MILLISECONDS);

           if (isLock) {
               System.out.println("获取到锁,执行业务逻辑");
          }
      } catch (Exception x) {
      } finally {
           redLock.unlock();
           System.out.println("释放锁");
      }
  }
}

以下代码摘录在源码,部分无关紧要的代码做了删减

以上代码中,一开始在执行

代码语言:javascript复制
RedissonClient redissonClient = Redisson.create(config);

时候,就会创建Netty客户端,并与服务端建立好通信.建立好通信通道之后,我们的业务代码向服务端发送的命令就是通过建立好的通信通道发送给服务端的.

代码语言:javascript复制
public static RedissonClient create(Config config) {
   // #2 创建Redisson
   Redisson redisson = new Redisson(config);
   return redisson;
}

Redisson类是最重要的类之一

代码语言:javascript复制
// 实例化Redisson
protected Redisson(Config config) {
   this.config = config;
   Config configCopy = new Config(config);
   // #3
   connectionManager = ConfigSupport.createConnectionManager(configCopy);
   evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
}
代码语言:javascript复制
public static ConnectionManager createConnectionManager(Config configCopy) {
   UUID id = UUID.randomUUID();

   if (configCopy.getMasterSlaveServersConfig() != null) {
       return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);
  } else if (configCopy.getSingleServerConfig() != null) {
       // #4 单机模式
       return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);
  } else {
       throw new IllegalArgumentException("server(s) address(es) not defined!");
  }
}

此篇文章我们以单机模式的部署进行分析的代码

代码语言:javascript复制
public SingleConnectionManager(SingleServerConfig cfg, Config config, UUID id) {
   // #5
   super(create(cfg), config, id);
}

super调用父类MasterSlaveConnectionManager构造器

代码语言:javascript复制
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
   this(config, id);
   this.config = cfg;

   initTimer(cfg);
   // #6 内部创建Netty客户端
   initSingleEntry();
}
代码语言:javascript复制
protected void initSingleEntry() {
   try {
       MasterSlaveEntry entry;
       if (config.checkSkipSlavesInit()) {
           entry = new SingleEntry(this, config);
      } else {
           entry = createMasterSlaveEntry(config);
      }
       // #7
       RFuture<RedisClient> f = entry.setupMasterEntry(config.getMasterAddress());
       f.syncUninterruptibly();

  } catch (RuntimeException e) {
       stopThreads();
       throw e;
  }
}
代码语言:javascript复制
public RFuture<RedisClient> setupMasterEntry(URI address) {
   // #8 创建RedisClient
   RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);
   // #9
   return setupMasterEntry(client);
}

在#8处会创建RedisClient,通过名字可以猜到,它是一个客户端对象,在它的内部有一个用于连接服务端的Netty的Bootstrap对象

代码语言:javascript复制
private RedisClient(RedisClientConfig config) {
   RedisClientConfig copy = new RedisClientConfig(config);
   channels = new DefaultChannelGroup(copy.getGroup().next());
   // 创建Bootstrap
   bootstrap = createBootstrap(copy, Type.PLAIN);
   pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);

   this.commandTimeout = copy.getCommandTimeout();
}

在#9处便会通过Bootstrap对象连接服务端了

代码语言:javascript复制
private RFuture<RedisClient> setupMasterEntry(final RedisClient client) {
   final RPromise<RedisClient> result = new RedissonPromise<RedisClient>();
   RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
   addrFuture.addListener(new FutureListener<InetSocketAddress>() {

       @Override
       public void operationComplete(Future<InetSocketAddress> future) throws Exception {
           masterEntry = new ClientConnectionsEntry(
               client,
               config.getMasterConnectionMinimumIdleSize(),
               config.getMasterConnectionPoolSize(),
               config.getSubscriptionConnectionMinimumIdleSize(),
               config.getSubscriptionConnectionPoolSize(),
               connectionManager,
               NodeType.MASTER);

           // #10 内部会连接服务端
           RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);

           if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
               RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
          }
      }
  });

   return result;
}
代码语言:javascript复制
public RFuture<Void> add(final ClientConnectionsEntry entry) {
   final RPromise<Void> promise = new RedissonPromise<Void>();
   promise.addListener(new FutureListener<Void>() {
       @Override
       public void operationComplete(Future<Void> future) throws Exception {
           if (future.isSuccess()) {
               entries.add(entry);
          }
      }
  });
   // #11 内部会连接服务端
   initConnections(entry, promise, true);
   return promise;
}
代码语言:javascript复制
private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean checkFreezed) {
   final int minimumIdleSize = getMinimumIdleSize(entry);
   final AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);
   int startAmount = Math.min(50, minimumIdleSize);
   final AtomicInteger requests = new AtomicInteger(startAmount);
   
   for (int i = 0; i < startAmount; i  ) {
       // #12 创建连接
       createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
  }
}
代码语言:javascript复制
private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final RPromise<Void> initPromise,
           final int minimumIdleSize, final AtomicInteger initializedConnections) {

   // #13 获取连接
   acquireConnection(entry, new Runnable() {
       @Override
       public void run() {
           RPromise<T> promise = new RedissonPromise<T>();
           // #14 创建连接
           createConnection(entry, promise);
           promise.addListener(new FutureListener<T>() {
               @Override
               public void operationComplete(Future<T> future) throws Exception {

              }
          });
      }
  });

}
代码语言:javascript复制
private void createConnection(final ClientConnectionsEntry entry, final RPromise<T> promise) {
   // #15 连接
   RFuture<T> connFuture = connect(entry);
}
代码语言:javascript复制
protected RFuture<T> connect(ClientConnectionsEntry entry) {
   return (RFuture<T>) entry.connect();
}
代码语言:javascript复制
public RFuture<RedisConnection> connect() {
   // #16 这里的client就是之前创建的RedisClient
   RFuture<RedisConnection> future = client.connectAsync();
   
   return future;
}
代码语言:javascript复制
public RFuture<RedisConnection> connectAsync() {
   final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();

   RFuture<InetSocketAddress> addrFuture = resolveAddr();
   addrFuture.addListener(new FutureListener<InetSocketAddress>() {
       @Override
       public void operationComplete(Future<InetSocketAddress> future) throws Exception {
           // #17 连接服务端
           ChannelFuture channelFuture = bootstrap.connect(future.getNow());
      }
  });

   return f;
}

在#17处通过之前创建的Bootstrap对象,调用connect方法连接服务端,底层就是通过Netty连接服务端的.

至此客户端就与服务端建立了连接,之后需要发送给服务端的命令,都通过这个建立好的连接发送出去.

最后系统中会多出许多'redisson-netty-1-x'命名的线程.它们都是已经和服务端建立好了连接,随时都可以进行通信.

0 人点赞