这个模式是分布式系统的模式

请求等待名单

根据来自其他集群节点的响应,在满足响应条件之后跟踪需要响应的客户机请求。

2022年9月07

问题

在处理客户机请求时,集群节点需要与其他集群节点通信以复制数据。来自所有其他集群节点的响应法定人数在回复客户之前。

与其他集群节点的通信是异步完成的。异步通信允许这样的模式请求管道而且批处理请求要使用。

因此,集群节点以异步方式接收和处理来自多个其他集群节点的响应。然后它需要将它们关联起来,以检查是否法定人数针对特定的客户端请求。

解决方案

集群节点维护一个映射键和回调函数的等待列表。键的选择取决于调用回调的特定条件。例如,如果需要在接收到来自其他集群节点的消息时调用它,那么它可以是相关标识符的消息。在复制日志这是高水位线.回调处理响应并决定是否可以满足客户端请求。

考虑键值存储的例子,其中数据在多个服务器上复制。在这里,法定人数可用于决定何时可以认为复制成功,从而向客户机发起响应。然后,集群节点跟踪发送到其他集群节点的请求,并为每个请求注册一个回调。每个请求都标有相关标识符,用于将响应映射到请求。然后,当收到来自其他集群节点的响应时,通知等待列表调用回调。

对于这个示例,我们将三个集群节点称为athens、byzantium和cyrene。客户端与athens连接,将“title”存储为“Microservices”。雅典在拜占庭和昔兰尼复制了它;因此,它向自己发送一个请求来存储键值,并同时向Byzantium和cyrene发送请求。为了跟踪响应,athens创建了一个WriteQuorumResponseCallback,并将其添加到每个发送的请求的等待列表中。

对于接收到的每个响应,都会调用WriteQuorumResponseCallback来处理响应。它检查是否已收到所需数量的响应。一旦从byzantium接收到响应,就会达到法定人数,并完成挂起的客户机请求。Cyrene可以稍后响应,但是响应可以不等待就发送到客户机。

代码类似于下面的示例:注意,每个集群节点维护自己的等待列表实例。等待列表跟踪键和相关的回调,并存储注册回调时的时间戳。时间戳用于检查回调函数是否需要过期,如果在预期的时间内没有收到响应。

public class RequestWaitingList {private Map pendingRequests = new ConcurrentHashMap<>();public void add(Key Key, RequestCallback callback) {pendingRequests。put(key, new CallbackDetails(callback, clock.nanoTime()));}
类CallbackDetails {RequestCallback;长createTime;public CallbackDetails(RequestCallback, RequestCallback, long createTime) {this。requestCallback = requestCallback;这一点。createTime = createTime;}公共RequestCallback getRequestCallback(){返回RequestCallback;} public long elapsedTime(long now){返回now - createTime;}}
公共接口RequestCallback {void onResponse(T r);空白onError (Throwable e);}

一旦从另一个集群节点接收到响应,就要求它处理响应或错误。

类RequestWaitingList……

public void handleResponse(Key Key, Response Response) {if (!pendingRequests.containsKey(Key)) {return;} CallbackDetails CallbackDetails = pendingRequests.remove(key);callbackDetails.getRequestCallback () .onResponse(响应);}

类RequestWaitingList……

public void handleError(int requestId, Throwable e) {CallbackDetails CallbackDetails = pendingRequests.remove(requestId);callbackDetails.getRequestCallback () .onError (e);}

等待列表可以用来处理仲裁响应,实现如下所示:

静态类WriteQuorumCallback实现了RequestCallback{私有最终int仲裁;private volatile int expectedNumberOfResponses;private volatile int receivedResponses;私有volatile int receivedErrors;私有易失性布尔值完成;private final RequestOrResponse请求;客户端连接;public WriteQuorumCallback(int totalExpectedResponses, RequestOrResponse clientRequest, ClientConnection ClientConnection){这个。expectedNumberOfResponses = totalExpectedResponses;这一点。quorum = expectedNumberOfResponses / 2 + 1; this.request = clientRequest; this.clientConnection = clientConnection; } @Override public void onResponse(RequestOrResponse response) { receivedResponses++; if (receivedResponses == quorum && !done) { respondToClient("Success"); done = true; } } @Override public void onError(Throwable t) { receivedErrors++; if (receivedErrors == quorum && !done) { respondToClient("Error"); done = true; } } private void respondToClient(String response) { clientConnection.write(new RequestOrResponse(RequestId.SetValueResponse.getId(), response.getBytes(), request.getCorrelationId())); } }

类向其他节点发送请求时,它向等待列表映射添加一个回调相关标识符发送的请求。

类ClusterNode……

private void handleSetValueClientRequestRequiringQuorum(List replicas, RequestOrResponse request, ClientConnection ClientConnection) {int totalExpectedResponses = replicas.size();RequestCallback = new WriteQuorumCallback(totalExpectedResponses, request, clientConnection);for (InetAddressAndPort replica: replicas) {int correlationId = nextRequestId();requestWaitingList。add (correlationId requestCallback);try {SocketClient客户端=新的SocketClient(副本);客户端。sendOneway(新RequestOrResponse(requesttid . setvaluerequest . getid (), request.getMessageBodyJson(), correlationId, listenAddress));} catch (IOException e) {requestWaitingList.}handleError (correlationId e);}}}

一旦收到响应,等待列表将被要求处理它:

类ClusterNode……

private void handleSetValueResponse(RequestOrResponse response) {requestWaitingList.handleResponse(response. getcorrelationid (), response);}

然后,等待列表将调用相关的WriteQuorumCallback。WriteQuorumCallback实例验证是否已接收到仲裁响应,并调用回调以响应客户机。

过期的长时间挂起的请求

有时,来自其他集群节点的响应会延迟。在这些情况下,等待列表通常有一种在超时后使请求过期的机制:

类RequestWaitingList……

私人SystemClock时钟;private ScheduledExecutorService执行器= Executors.newSingleThreadScheduledExecutor();private long expirationIntervalMillis = 2000;public RequestWaitingList(SystemClock){这个。时钟=时钟;遗嘱执行人。scheduleWithFixedDelay(this::expire, expirationIntervalMillis, expirationIntervalMillis, MILLISECONDS);} private void expire() {long now = clock.nanoTime();> expiredRequestKeys = getExpiredRequestKeys(现在);expiredRequestKeys.stream()。forEach(expiredRequestKey -> {CallbackDetails request = pendingRequests.remove(expiredRequestKey); request.requestCallback.onError(new TimeoutException("Request expired")); }); } private List getExpiredRequestKeys(long now) { return pendingRequests.entrySet().stream().filter(entry -> entry.getValue().elapsedTime(now) > expirationIntervalMillis).map(e -> e.getKey()).collect(Collectors.toList()); }

例子

(cassandra)使用异步消息传递进行节点间通信。它使用法定人数并以同样的方式异步处理响应消息。

卡夫卡使用名为。的数据结构跟踪挂起的请求(kafka-purgatory)

(etcd)控件中响应客户端请求的等待列表同样的方式

重大修改

2022年9月07:

Baidu
map