分布式模式14-State Watch

2021-12-21 16:18:41 浏览数 (1)

作者: Unmesh Joshi

译者: java达人

来源: https://martinfowler.com/articles/patterns-of-distributed-systems/

当服务器上的特定值更改时通知客户端

问题

客户端关注服务器上特定值的更改。如果客户需要不断地轮询服务器以查找更改,则很难构造其逻辑。如果客户端打开太多的服务器连接来监视更改,则可能使服务器不堪重负。

解决方案

允许客户端向服务器注册其关注点以进行特定状态更改。状态发生变化时,服务器会通知关注的客户端。客户端与服务器维护一个单一套接字通道。服务器在此通道上发送状态更改通知。客户端可能对多个值感兴趣,但是保持每个监视的连接可能会使服务器不堪重负。因此客户可以使用请求管道。

考虑一个在Consistent Core中使用的简单键值存储示例:当某个特定键的值更改或某个键被删除时,客户端可能会对此关注。该实现由两部分组成:客户端实现和服务器端实现。

客户端实现

客户端将接收键和函数,当客户端从服务器获取监视事件时函数被调用, 客户端存储方法对象以供以后调用。然后,它向服务器发送注册监视器的请求。

代码语言:javascript复制
ConcurrentHashMap<String, Consumer<WatchEvent>> watches = new ConcurrentHashMap<>();


public void watch(String key, Consumer<WatchEvent> consumer) {
    watches.put(key, consumer);
    sendWatchRequest(key);
}

private void sendWatchRequest(String key) {
    requestSendingQueue.submit(new RequestOrResponse(RequestId.WatchRequest.getId(),
            JsonSerDes.serialize(new WatchRequest(key)),
            correlationId.getAndIncrement()));
}

当在连接上收到监视器事件时,相应的消费者触发调用

代码语言:javascript复制
this.pipelinedConnection = new PipelinedConnection(address, requestTimeoutMs, (r) -> {
    logger.info("Received response on the pipelined connection "   r);
    if (r.getRequestId() == RequestId.WatchRequest.getId()) {
        WatchEvent watchEvent = JsonSerDes.deserialize(r.getMessageBodyJson(), WatchEvent.class);
        Consumer<WatchEvent> watchEventConsumer = getConsumer(watchEvent.getKey());
        watchEventConsumer.accept(watchEvent);
        lastWatchedEventIndex = watchEvent.getIndex(); //capture last watched index, in case of connection failure.
    }
    completeRequestFutures(r);
});

服务端实现

当服务器收到监视器注册请求时,它将保留接收请求的管道连接以及keys的映射。

代码语言:javascript复制
private Map<String, ClientConnection> watches = new HashMap<>();
private Map<ClientConnection, List<String>> connection2WatchKeys = new HashMap<>();

public void watch(String key, ClientConnection clientConnection) {
    logger.info("Setting watch for "   key);
    addWatch(key, clientConnection);
}

private synchronized void addWatch(String key, ClientConnection clientConnection) {
    mapWatchKey2Connection(key, clientConnection);
    watches.put(key, clientConnection);
}

private void mapWatchKey2Connection(String key, ClientConnection clientConnection) {
    List<String> keys = connection2WatchKeys.get(clientConnection);
    if (keys == null) {
        keys = new ArrayList<>();
        connection2WatchKeys.put(clientConnection, keys);
    }
    keys.add(key);
}

ClientConnection包装了与客户端连接的套接字。它具有以下结构。对于基于阻塞IO的服务器和基于非阻塞IO的服务器,此结构均相同。

代码语言:javascript复制
public interface ClientConnection {
    void write(RequestOrResponse response);
    void close();
}

单个连接上可以注册多个监听器。因此,存储从连接到监视器键列表的映射很重要。关闭客户端连接时需要使用它来删除所有关联的监视器,如下所示:

代码语言:javascript复制
  public void close(ClientConnection connection) {
        removeWatches(connection);
    }

    private synchronized void removeWatches(ClientConnection clientConnection) {
        List<String> watchedKeys = connection2WatchKeys.remove(clientConnection);
        if (watchedKeys == null) {
            return;
        }
        for (String key : watchedKeys) {
            watches.remove(key);
        }
    }

使用Reactive Streams (https://www.reactive-streams.org/) 此处的示例显示将事件直接写入管道连接。在应用程序级别具有某种类型的背压非常有用。如果产生了很多事件,则控制事件的发送速率非常重要。使事件的生产者和消费者保持同步是重要的考虑因素。etcd中此问题的一个示例(https://github.com/etcd-io/etcd/issues/11906)说明这些注意事项在生产中很重要。[reactive-streams] 使用背压作为首要概念,使代码更容易编写。像rsocket这样的协议提供了一种结构化的方法来实现此目的。

当服务器上发生诸如为key设置值之类的特定事件时,服务器会通过构造相关的WatchEvent来通知所有注册的客户端

代码语言:javascript复制
private synchronized void notifyWatchers(SetValueCommand setValueCommand, Long entryId) {
    if (!hasWatchesFor(setValueCommand.getKey())) {
        return;
    }
    String watchedKey = setValueCommand.getKey();
    WatchEvent watchEvent = new WatchEvent(watchedKey,
                                setValueCommand.getValue(),
                                EventType.KEY_ADDED, entryId);
    notify(watchEvent, watchedKey);
}

private void notify(WatchEvent watchEvent, String watchedKey) {
    List<ClientConnection> watches = getAllWatchersFor(watchedKey);
    for (ClientConnection pipelinedClientConnection : watches) {
        try {
            String serializedEvent = JsonSerDes.serialize(watchEvent);
            getLogger().trace("Notifying watcher of event "
                      watchEvent  
                    " from "
                      server.getServerId());
            pipelinedClientConnection
                    .write(new RequestOrResponse(RequestId.WatchRequest.getId(),
                            serializedEvent));
        } catch (NetworkException e) {
            removeWatches(pipelinedClientConnection); //remove watch if network connection fails.
        }
    }
}

要注意的关键事情之一是,可以从客户端请求处理代码和客户端连接处理代码(关闭连接)中并发访问与监视相关的状态。因此,所有访问监视状态的方法都需要使用锁进行保护。

在层次结构存储中的监控

Consistent Core主要支持分层存储。可以在父节点或键的前缀上设置监视器。对子节点的任何更改都会触发在父节点上设置的监视器。对于每个事件,Consistent Core会遍历路径以检查父路径上是否有监视器设置,并将事件发送给所有这些监视器。

代码语言:javascript复制
List<ClientConnection> getAllWatchersFor(String key) {
    List<ClientConnection> affectedWatches = new ArrayList<>();
    String[] paths = key.split("/");
    String currentPath = paths[0];
    addWatch(currentPath, affectedWatches);
    for (int i = 1; i < paths.length; i  ) {
        currentPath = currentPath   "/"   paths[i];
        addWatch(currentPath, affectedWatches);
    }
    return affectedWatches;
}

private void addWatch(String currentPath, List<ClientConnection> affectedWatches) {
    ClientConnection clientConnection = watches.get(currentPath);
    if (clientConnection != null) {
        affectedWatches.add(clientConnection);
    }
}

这样就可以将监视器设置在key前缀(例如“servers”)上。使用此前缀创建的任何key(例如“ servers / 1”,“ servers / 2”)都将触发此监视器。

由于要调用的函数的映射是使用键前缀存储的,因此遍历层次结构以找到客户端接收到事件时要调用的函数也很重要。一种替代方法是将事件触发的路径与事件一起发送,以便客户端知道导致事件发送的监视器。

处理连接失败

客户端和服务器之间的连接随时可能失败。在某些用例中,这是有问题的,因为客户端在断开连接时可能会错过某些事件。例如,集群控制器可能会对某些节点是否发生故障感兴趣,这由某些key的删除事件指示。客户端需要将收到的最后一个事件告知服务器。当客户端再次重置监视器时,客户端发送最后收到的事件编号。服务器应从该事件号开始发送其已记录的所有事件。

在Consistent Core客户端中,可以在客户端重新建立与leader的连接时完成。

Kafka中基于拉模式的设计 在监视器的典型设计中,服务器将监视器事件推送给客户端。[kafka]遵循端到端基于拉模式的设计。在其新架构中,Kafka broker将定期从Controller Quorum(它本身就是Consistent Core的一个例子)中提取元数据日志。基于偏移量的拉取机制允许客户端像其他任何Kafka消费者一样,从最后一个已知偏移量中读取事件,从而避免事件丢失。

代码语言:javascript复制
private void connectToLeader(List<InetAddressAndPort> servers) {
    while (isDisconnected()) {
        logger.info("Trying to connect to next server");
        waitForPossibleLeaderElection();
        establishConnectionToLeader(servers);
    }
    setWatchesOnNewLeader();
}

private void setWatchesOnNewLeader() {
    for (String watchKey : watches.keySet()) {
        sendWatchResetRequest(watchKey);
    }
}

private void sendWatchResetRequest(String key) {
    pipelinedConnection.send(new RequestOrResponse(RequestId.SetWatchRequest.getId(),
            JsonSerDes.serialize(new SetWatchRequest(key, lastWatchedEventIndex)), correlationId.getAndIncrement()));
}

服务器对发生的每个事件进行编号。例如,如果服务器是Consistent Core,则它以严格的顺序存储所有状态更改,并且每个更改都用“Write-Ahead Log”中讨论的日志索引编号,然后客户端可以从特定指标以下位置开始请求事件。

从键值存储库派生事件

查看键值存储库的当前状态来生成事件,如果它还对发生的每个更改进行编号并存储每个编号值。

当客户端重新建立与服务器的连接时,它可以再次设置监视器,并发送最后看到的更改编号。然后,服务器可以将其与存储的值进行比较,如果它大于客户端发送的值,则服务器可以将事件重新发送给客户端。从键值存储中派生事件可能会有些尴尬,因为需要猜测事件。它可能会错过一些事件。-例如,如果创建了key然后又将其删除-在客户端断开连接时,将丢失create事件。

代码语言:javascript复制
private synchronized void eventsFromStoreState(String key, long stateChangesSince) {
    List<StoredValue> values = getValuesForKeyPrefix(key);
    for (StoredValue value : values) {
        if (values == null) {
            //the key was probably deleted send deleted event
            notify(new WatchEvent(key, EventType.KEY_DELETED), key);
        } else if (value.index > stateChangesSince) {
            //the key/value was created/updated after the last event client knows about
            notify(new WatchEvent(key, value.getValue(), EventType.KEY_ADDED, value.getIndex()), key);
        }
    }
}

[zookeeper]使用这种方法。默认情况下,zookeeper中的监视器也是一次性触发器。触发事件后,如果客户端想要接收其他事件,则需要再次设置监视器。在重新设置监视器之前,可能会错过一些事件,因此客户端需要确保他们读到了最近状态,这样他们不会错过任何更新。

存储事件历史

保留过去事件的历史记录并从事件历史记录中回复客户端会更容易。这种方法的问题在于需要将事件历史记录限制为比如1000个事件。如果客户端断开连接的时间较长,则可能会错过超过1,000个事件窗口的事件。使用Google guava的EvictingQueue的简单实现如下:

代码语言:javascript复制
public class EventHistory implements Logging {
    Queue<WatchEvent> events = EvictingQueue.create(1000);
    public void addEvent(WatchEvent e) {
        getLogger().info("Adding "   e);
        events.add(e);
    }

    public List<WatchEvent> getEvents(String key, Long stateChangesSince) {
        return this.events.stream()
                .filter(e -> e.getIndex() > stateChangesSince && e.getKey().equals(key))
                .collect(Collectors.toList());
    }
}

当客户端重新建立起连接,重新设置监控时,事件可以从历史中发送。

代码语言:javascript复制
private void sendEventsFromHistory(String key, long stateChangesSince) {
    List<WatchEvent> events = eventHistory.getEvents(key, stateChangesSince);
    for (WatchEvent event : events) {
        notify(event, event.getKey());
    }
}

使用多版本存储

跟踪所有更改,可以使用多版本存储。它跟踪每个key的所有版本,并可以轻松地从所请求的版本中获取所有更改。

[etcd]版本3开始使用此方法

例子

[zookeeper]可以在节点上设置监视器。

诸如[kafka]之类的产品将其用于组成员身份和集群成员的故障检测。

[etcd]具有监视器实现,在[kubernetes]资源监视实现中有大量使用。

0 人点赞