二阶段提交
二阶段提交(Two-phase Commit),使分布式架构下所有节点保持事务一致性的算法(Algorithm)。
假设
- 2个角色:协调者(Coordinator),参与者(Cohorts)。两者之间可以进行rpc。
- undo/redo:所有节点都预写式日志,且日志持久化在可靠的存储设备上。
- 节点可靠:所有节点不会永久性损坏,即使损坏后仍然可以恢复。
过程;
- 第一阶段投票阶段,各参与者投票是否要继续接下来的提交操作;
- 第二阶段完成阶段,因为无论结果怎样,协调者都必须在此阶段结束当前事务。
堆栈
代码分析
PrioritizedEsThreadPoolExecutor
代码语言:java复制 @Override
public void run() {
synchronized (this) {
// make the task as stared. This is needed for synchronization with the timeout handling
// see #scheduleTimeout()
started = true;
FutureUtils.cancel(timeoutFuture);
}
runAndClean(runnable);
}
MasterService
代码语言:java复制 logger.debug("publishing cluster state version [{}]", newClusterState.version());
try {
clusterStatePublisher.accept(clusterChangedEvent, taskOutputs.createAckListener(threadPool, newClusterState));
} catch (Discovery.FailedToCommitClusterStateException t) {
final long version = newClusterState.version();
logger.warn(() -> new ParameterizedMessage(
"failing [{}]: failed to commit cluster state version [{}]", summary, version), t);
taskOutputs.publishingFailed(t);
return;
}
ZenDiscovery
代码语言:java复制 try {
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (FailedToCommitClusterStateException t) {
// cluster service logs a WARN message
logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",
newState.version(), electMaster.minimumMasterNodes());
synchronized (stateMutex) {
pendingStatesQueue.failAllStatesAndClear(
new ElasticsearchException("failed to publish cluster state"));
rejoin("zen-disco-failed-to-publish");
}
throw t;
}
PublishClusterStateAction
代码语言:java复制 try {
innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates,
serializedDiffs);
} catch (Discovery.FailedToCommitClusterStateException t) {
throw t;
} catch (Exception e) {
// try to fail committing, in cause it's still on going
if (sendingController.markAsFailed("unexpected error", e)) {
// signal the change should be rejected
throw new Discovery.FailedToCommitClusterStateException("unexpected error", e);
} else {
throw e;
}
}
private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,
final SendingController sendingController, final Discovery.AckListener ackListener,
final boolean sendFullVersion, final Map<Version, BytesReference> serializedStates,
final Map<Version, BytesReference> serializedDiffs) {
final ClusterState clusterState = clusterChangedEvent.state();
final ClusterState previousState = clusterChangedEvent.previousState();
final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
final long publishingStartInNanos = System.nanoTime();
// 2pc第一个请求
for (final DiscoveryNode node : nodesToPublishTo) {
// try and serialize the cluster state once (or per version), so we don't serialize it
// per node when we send it over the wire, compress it while we are at it...
// we don't send full version if node didn't exist in the previous version of cluster state
if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
} else {
sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController);
}
}
// 超时或响应node不够,就abore
sendingController.waitForCommit(discoverySettings.getCommitTimeout());
final long commitTime = System.nanoTime() - publishingStartInNanos;
ackListener.onCommit(TimeValue.timeValueNanos(commitTime));
// 2pc的第二个请求
try {
long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - commitTime);
final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
if (sendingController.getPublishingTimedOut()) {
DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();
// everyone may have just responded
if (pendingNodes.length > 0) {
logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})",
clusterState.version(), publishTimeout, pendingNodes);
}
}
// The failure is logged under debug when a sending failed. we now log a summary.
Set<DiscoveryNode> failedNodes = publishResponseHandler.getFailedNodes();
if (failedNodes.isEmpty() == false) {
logger.warn("publishing cluster state with version [{}] failed for the following nodes: [{}]",
clusterChangedEvent.state().version(), failedNodes);
}
} catch (InterruptedException e) {
// ignore & restore interrupt
Thread.currentThread().interrupt();
}
}
// 发布过程
private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes,
final DiscoveryNode node,
final TimeValue publishTimeout,
final SendingController sendingController,
final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) {
try {
transportService.sendRequest(node, SEND_ACTION_NAME,
new BytesTransportRequest(bytes, node.getVersion()),
stateRequestOptions,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
if (sendingController.getPublishingTimedOut()) {
logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node,
clusterState.version(), publishTimeout);
}
sendingController.onNodeSendAck(node);
}
@Override
public void handleException(TransportException exp) {
if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
} else {
logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp);
sendingController.onNodeSendFailed(node, exp);
}
}
});
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
sendingController.onNodeSendFailed(node, e);
}
}