Elasticsearch Distributed Consistency Principles Analysis (3) — Data

Alibaba Cloud
Jan 17 · 18 min read

Current Issues

Data Write Process

From the Replication Perspective: Primary -> Replica

String activeShardCountFailure = checkActiveShardCount();
primaryResult = primary.perform(request);
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
private void decPendingAndFinishIfNeeded() {
assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
if (pendingActions.decrementAndGet() == 0) {
public void execute() throws Exception {
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure ! = null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
primaryResult = primary.perform(request);
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest ! = null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
// We must obtain the replication group after successfully indexing into the primary to follow recovery semantics.
// We must make sure that every operation indexed into the primary after recovery start is also replicated
// to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
// We also must make sure to obtain the global checkpoint before the replication group to ensure that the global checkpoint
// is valid for this replication group. If we sampled in the reverse direction, the global checkpoint might be based on a subset
// of the sampled replication group and advanced further than what the given replication group would allow.
// This would mean that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.globalCheckpoint();
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
successfulShards.incrementAndGet(); // mark primary as successful
"_shards" : {
"total" : 2,
"failed" : 0,
"successful" : 2
ReplicationOperation.java, OnFailure function for failure to write to Replica nodes:            public void onFailure(Exception replicaException) {
(org.apache.logging.log4j.util.Supplier<? >) () -> new ParameterizedMessage(
"[{}] failure while performing [{}] on replica {}, request [{}]",
if (TransportActions.isShardNotAvailableException(replicaException)) {
} else {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
call failShardIfNeeded: public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
logger.warn((org.apache.logging.log4j.util.Supplier<? >)
() -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
shardStateAction.remoteShardFailed sends the request to the Master, executes the ShardFailed logic of the Replica, and removes the Shard from InSyncAllocation. public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
if (failedShard.active() && unassignedInfo.getReason() ! = UnassignedInfo.Reason.NODE_LEFT) {
if (failedShard.primary()) {
Updates updates = changes(failedShard.shardId());
if (updates.firstFailedPrimary == null) {
// more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
updates.firstFailedPrimary = failedShard;
if (failedShard.active() && failedShard.primary()) {

From the Perspective of the Primary

PacificA Algorithm

Glossary Terms

Primary Invariant



Reconfiguration: Secondary Failure, Primary Failure, Newly Added Node

PacificA Algorithm Summary

SequenceNumber, Checkpoint, and Failure Discovery

Term and SequenceNumber

LocalCheckpoint and GlobalCheckpoint

Fast Failure Rcovery

Comparison between Elasticsearch and PacificA





Alibaba Cloud

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade