Skip to content

Latest commit

 

History

History
163 lines (113 loc) · 10.1 KB

consistent-core.md

File metadata and controls

163 lines (113 loc) · 10.1 KB

一致性内核(Consistent Core)

原文

https://martinfowler.com/articles/patterns-of-distributed-systems/consistent-core.html

维护一个较小的内核,为大规模数据集群提供更强的一致性,这样,可以在无需实现基于 Quorum 算法的前提下协调服务器行为。

2021.1.5

问题

集群需要处理大规模的数据,就需要越来越多的服务器。对于服务器集群而言,有一些通用性的需求,比如,选择某个特定的服务器成为某个任务的主节点、管理分组成员信息、将数据分区映射到服务器上等等。这些需求都需要强一致性的保证,也就是说,要有线性一致性。实现本身还要有对失效的容忍。一种常见的方式是,使用一种基于 Quorum 且支持失效容忍的一致性算法。但是,基于 Quorum 的系统,其吞吐量会随着集群规模的变大而降低。

解决方案

实现一个三五个节点的小集群,提供线性一致性的保证,同时支持失效容忍[1]。一个单独数据集群可以使用小的一致性集群管理元数据,采用类似于租约(Lease) 之类的原语在集群范围内进行决策。这样一来,数据集群就可以增长到很大的规模,但对于某些需要强一致性保证的动作,可以使用比较小的元数据集群。

一致性内核

图1:一致性内核

一个典型一致性内核接口应该是下面这个样子:

public interface ConsistentCore {
    CompletableFuture put(String key, String value);
    List<String> get(String keyPrefix);
    CompletableFuture registerLease(String name, long ttl);
    void refreshLease(String name);
    void watch(String name, Consumer<WatchEvent> watchCallback);
}

以最低的标准看,一致性内核提供了一个简单的键值存储机制,用于存储元数据。

元数据存储

存储可以用诸如 Raft 之类的共识算法实现。它是可复制的预写日志的一个样例实现,其中的复制由领导者和追随者(Leader and Followers) 进行处理,使用 Quorum 的话,可以使用高水位标记(High-Water Mark)追踪成功的复制。

支持层级结构的存储

一致性内核通常用于存储这样的数据,比如,分组成员、跨服务器的任务分布。一种常见的使用模式是,通过前缀将元数据的类型做一个分类,比如,对于分组成员信息,键值可以存成类似于 /servers/1servers/2 等等。对于任务分配给哪些服务器,键值可以是 /tasks/task1/tasks/task2。通常来说,要读取这些数据,所有的键值上都要带上特定的前缀。比如,要读取集群中的所有服务器信息,就要读取所有与以 /servers 为前缀的键值。

下面是一个示例的用法:

服务器只要创建一个属于自己的有 /servers 前缀的键值,就可以将自身注册到一致性内核中。

client1.setValue("/servers/1", "{address:192.168.199.10, port:8000}");
client2.setValue("/servers/2", "{address:192.168.199.11, port:8000}");
client3.setValue("/servers/3", "{address:192.168.199.12, port:8000}");

客户端只要读取以 /servers 为前缀的键值,就可以获取所有集群中的服务器信息,像下面这样:

assertEquals(client1.getValue("/servers"), Arrays.asList(
  "{address:192.168.199.12, port:8000}",
  "{address:192.168.199.11, port:8000}",
  "{address:192.168.199.10, port:8000}"));

因为数据存储的层次结构属性,像 zookeeperchubby 提供了类似于文件系统的接口,其中,用户可以创建目录和文件,或是节点,有父子节点概念的那种。etcd3 有扁平的键值空间,这样它就有能力获取更大范围的键值。

处理客户端交互

一致性内核功能的一个关键需求是,客户端如何与内核进行交互。下面是客户端与一致性内核协同工作的关键方面。

找到领导者

所有的操作都要在领导者上执行,这是至关重要的,因此,客户端程序库需要先找到领导者服务器。要做到这一点,有两种可能的方式。

  • 一致性内核的追随者服务器知道当前的领导者,因此,如果客户端连接追随者,它会返回 领导者的地址。客户端可以直接连接应答中给出的领导者。值得注意的是,客户端尝试连接时,服务器可能正处于领导者选举过程中。在这种情况下,服务器无法返回领导者地址,客户端需要等待片刻,再尝试连接另外的服务器。

  • 服务器实现转发机制,将所有的客户端请求转发给领导者。这样就允许客户端连接任意的服务器。同样,如果服务器处于领导者 选举过程中,客户端需要不断重试,直到领导者选举成功,一个合法的领导者获得确认。

    类似于 zookeeper 和 etcd 这样的产品都实现了这种方式,它们允许追随者服务器处理只读请求,以免领导者面对大量客户端的只读请求时出现瓶颈。这就降低了客户端基于请求类型去连接领导者或追随者的复杂性。

一个找到领导者的简单机制是,尝试连接每一台服务器,尝试发送一个请求,如果服务器不是领导者,它给出的应答就是一个重定向的应答。

private void establishConnectionToLeader(List<InetAddressAndPort> servers) {
    for (InetAddressAndPort server : servers) {
        try {
            SingleSocketChannel socketChannel = new SingleSocketChannel(server, 10);
            logger.info("Trying to connect to " + server);
            RequestOrResponse response = sendConnectRequest(socketChannel);
            if (isRedirectResponse(response)) {
                redirectToLeader(response);
                break;
            } else if (isLookingForLeader(response)) {
                logger.info("Server is looking for leader. Trying next server");
                continue;
            } else { //we know the leader
                logger.info("Found leader. Establishing a new connection.");
                newPipelinedConnection(server);
                break;
            }
        } catch (IOException e) {
            logger.info("Unable to connect to " + server);
            //try next server
        }
    }
}

private boolean isLookingForLeader(RequestOrResponse requestOrResponse) {
    return requestOrResponse.getRequestId() == RequestId.LookingForLeader.getId();
}

private void redirectToLeader(RequestOrResponse response) {
    RedirectToLeaderResponse redirectResponse = deserialize(response);
    newPipelinedConnection(redirectResponse.leaderAddress);
    logger.info("Connected to the new leader "
            + redirectResponse.leaderServerId
            + " " + redirectResponse.leaderAddress
            + ". Checking connection");
}

private boolean isRedirectResponse(RequestOrResponse requestOrResponse) {
    return requestOrResponse.getRequestId() == RequestId.RedirectToLeader.getId();
}

仅仅建立 TCP 连接还不够,我们还需要知道服务器能否处理我们的请求。因此,客户端会给服务器发送一个特殊的连接请求,服务器需要响应,它是可以处理请求,还是要重定向到领导者服务器上。

private RequestOrResponse sendConnectRequest(SingleSocketChannel socketChannel) throws IOException {
    RequestOrResponse request
        = new RequestOrResponse(RequestId.ConnectRequest.getId(), "CONNECT", 0);
    try {
        return socketChannel.blockingSend(request);
    } catch (IOException e) {
        resetConnectionToLeader();
        throw e;
    }
}

如果既有的领导者失效了,同样的技术将用于识别集群中新选出的领导者。

一旦连接成功,客户端将同领导者服务器间维持一个单一 Socket 通道(Single Socket Channel)

处理重复请求

在失效的场景下,客户端可以重新连接新的 领导者,重新发送请求。但是,如果这些请求在失效的领导者之前已经处理过了,这就有可能产生重复。因此,至关重要的一点是,服务器需要有一种机制,忽略重复的请求。幂等接收者(Idempotent Receiver)模式就是用来实现重复检测的。

使用租约(Lease),可以在一组服务器上协调任务。同样的技术也可以用于实现分组成员信息和失效检测机制。

状态监控(State Watch),可以在元数据发生改变,或是基于时间的租约到期时,获得通知。

示例

众所周知,Google 使用 chubby 锁服务进行协调和元数据管理。

kafka 使用 zookeeper 管理元数据,以及做一些类似于为集群选举领导者之类的决策。Kafka 中提议的一个架构调整是在将来使用自己基于 raft 的控制器集群替换 Zookeeper。

bookkeeper 使用 Zookeeper 管理集群的元数据。

kubernetes 使用 etcd 进行协调、管理集群的元数据和分组成员信息。

所有的大数据存储和处理系统类似于 hdfssparkflink 都使用 zookeeper 实现高可用以及集群协调。