分布式系统模式10-Request Pipeline

2021-01-05 15:05:07 浏览数 (1)

作者: Unmesh Joshi

译者: java达人

来源: https://martinfowler.com/articles/patterns-of-distributed-systems/

在连接上发送多个请求而不等待前一个请求的响应,从而减少延迟。

问题

如果请求需要等待对前一个请求的响应,使用单一套接字通道在集群服务器之间通信可能会导致性能问题。为了达到更好的吞吐量和更少的延迟,服务器上的请求队列应该被填满,以确保服务器容量得到充分利用。例如,当服务器使用Singular Update Queue,处理一个请求时,它总是可以接受更多的请求,直到队列满为止。如果一次只发送一个请求,服务器的大部分容量都被不必要地浪费了。

解决方案

节点向其他节点发送请求,而不等待以前请求的响应。这是通过创建两个独立的线程来实现的,一个用于通过网络通道发送请求,另一个用于从网络通道接收响应。

发送方节点通过套接字通道发送请求,而不等待响应。

代码语言:javascript复制
class SingleSocketChannel…

  public void sendOneWay(RequestOrResponse request) throws IOException {
      var dataStream = new DataOutputStream(socketOutputStream);
      byte[] messageBytes = serialize(request);
      dataStream.writeInt(messageBytes.length);
      dataStream.write(messageBytes);
  }

启动一个单独的线程来读取响应。

代码语言:javascript复制
class ResponseThread…

  class ResponseThread extends Thread implements Logging {
      private volatile boolean isRunning = false;
      private SingleSocketChannel socketChannel;

      public ResponseThread(SingleSocketChannel socketChannel) {
          this.socketChannel = socketChannel;
      }

      @Override
      public void run() {
          try {
              isRunning = true;
              logger.info("Starting responder thread = "   isRunning);
              while (isRunning) {
                  doWork();
              }

          } catch (IOException e) {
              getLogger().error(e); //thread exits if stopped or there is IO error
          }
      }

      public void doWork() throws IOException {
          RequestOrResponse response = socketChannel.read();
          logger.info("Read Response = "   response);
          processResponse(response);
      }

响应处理程序可以立即处理响应或将其提交到单一更新队列

请求管道有两个问题需要处理。

如果在不等待响应的情况下连续发送请求,则接受请求的节点可能会不堪重负。由于这个原因,对于一次可以保持的请求数量有一个上限。任何节点都可以向其他节点发送最大数量的请求。一旦发送了最大数量的执行中请求而没有收到响应,就不会接受更多的请求,发送方将被阻塞。限制最大数量执行中请求的一个非常简单的策略是保持一个阻塞队列来跟踪请求。队列由请求数量参数进行初始化。一旦接收到请求的响应,就会从队列中删除它,以便为更多请求腾出空间。如下面的代码所示,每个套接字连接最多可接受五个执行中请求。

代码语言:javascript复制
class RequestLimitingPipelinedConnection…

  private final Map<InetAddressAndPort, ArrayBlockingQueue<RequestOrResponse>> inflightRequests = new ConcurrentHashMap<>();
  private int maxInflightRequests = 5;
  public void send(InetAddressAndPort to, RequestOrResponse request) throws InterruptedException {
      ArrayBlockingQueue<RequestOrResponse> requestsForAddress = inflightRequests.get(to);
      if (requestsForAddress == null) {
          requestsForAddress = new ArrayBlockingQueue<>(maxInflightRequests);
          inflightRequests.put(to, requestsForAddress);
      }
      requestsForAddress.put(request);

一旦收到响应,该请求将从执行中请求队列中删除。

代码语言:javascript复制
class RequestLimitingPipelinedConnection…

  private void consume(SocketRequestOrResponse response) {
      Integer correlationId = response.getRequest().getCorrelationId();
      Queue<RequestOrResponse> requestsForAddress = inflightRequests.get(response.getAddress());
      RequestOrResponse first = requestsForAddress.peek();
      if (correlationId != first.getCorrelationId()) {
          throw new RuntimeException("First response should be for the first request");
      }
      requestsForAddress.remove(first);
      responseConsumer.accept(response.getRequest());
  }

处理故障和维护顺序保证的实现比较棘手。假设有两个正在运行的请求。第一个请求失败并重试,服务器可能在重试的第一个请求到达服务器之前已经处理了第二个请求。服务器需要某种机制来确保错误的请求被拒绝。否则,在失败和重试的情况下,总是有消息被重新排序的风险。例如,Raft总是发送每个日志条目所期望的前一个日志索引。如果前一个日志索引不匹配,服务器拒绝请求。Kafka可以允许max.in.flight.requests.per.connection 的值大于1,使用幂等生产者实现,该实现为发送给broker的每个消息批次分配唯一标识符。然后,broker可以检查传入请求的序列号,并在请求乱序时拒绝该请求。

例子

• 所有的共识算法如Zab和Raft都允许request pipeline支持。

• Kafka鼓励客户使用request pipeline来提高吞吐量。

java达人

ID:drjava

(长按或扫码识别)

0 人点赞