Synchronizing Distributed Applications: Harnessing the Power of Distributed Systems
Imagine a symphony of computers, each playing its part in perfect harmony, even if they’re scattered across the globe. Synchronization in distributed applications is like the conductor ensuring that every note is played in perfect sync, no matter where the musicians are located. It’s the secret sauce that turns a collection of distant machines into a coordinated masterpiece.
Why is synchronization important in distributed systems?
Synchronization in distributed systems is like teamwork — it makes sure all the computers work together smoothly. It keeps the data safe, stops problems when many computers try to do things at once, and makes sure everything happens in the right order. With synchronization, the computers can talk to each other, share stuff, and stay on the same page, even if the internet is slow or some computers act up.
What are the types of Synchronization Mechanisms?
There are several types of synchronization mechanisms, to name a few :
- Locks (Mutexes): It is a mutual exclusion lock. These are basic synchronisation primitives that only let one process or thread hold a lock at a time. This limits who can use a shared resource. Even though it works well to avoid race situations, using it wrong can lead to deadlocks or less concurrency.These locks can be implemented using various algorithms, such as the Lamport timestamps or the Chandy-Lamport snapshot algorithm.
- Semaphore: Semaphores are counters with number values that are used to control who can use a set of resources. They are useful for limiting the number of processes that can use a shared resource at the same time. Semaphore uses two atomic operations for process synchronisation:
- Wait (P)
- Signal (V)
Types of semaphore –
- Binary semaphore
- Counting semaphore
Mutex acts similarly to a binary semaphore, we can use it to implement mutual exclusion.
3. Monitors: Monitors are a type of concept that combines data and synchronisation basics into a single unit. They wrap up shared data and methods and only let one thread run at a time in the monitor, so only one thread can view the data at a time.
Monitor in OS has a simple syntax similar to how we define a class, it is as follows:
public class MonitorName {
// Variables declaration
private int sharedVariable1;
private String sharedVariable2;
public MonitorName() {
// Initializing code
sharedVariable1 = 0;
sharedVariable2 = "";
}
public synchronized void p1() {
// Procedure p1 code
// ...
}
public synchronized void p2() {
// Procedure p2 code
// ...
}
public synchronized void pn() {
// Procedure pn code
// ...
}
public synchronized void conditionVariableWait() throws InterruptedException {
// Wait for a condition to be satisfied
wait();
}
public synchronized void conditionVariableSignal() {
// Signal that a condition has been met
notify();
}
public synchronized void conditionVariableSignalAll() {
// Signal that a condition has been met for all waiting threads
notifyAll();
}
}
Monitor in an operating system is simply a class containing variable_declarations, condition_variables, various procedures (functions), and an initializing_code block that is used for process synchronization.
4. Message Passing: In a distributed system, processes talk to each other by sending each other messages. Messages are sent and received consistently and in the right order when the right protocols are used.It refers to the communication medium used by nodes (computers or processes) to commute information and coordinate their actions. It involves transferring and entering messages between nodes to achieve various goals such as coordination, synchronization, and data sharing.
Types of Message Passing:
- Synchronous message passing
- Asynchronous message passing
- Hybrids
Properly designed message-passing protocols and frameworks, such as the Message Queue, can facilitate synchronization between distributed components by ensuring the ordered and reliable delivery of messages.
Some popular message queue implementations are RabbitMQ, Kafka, and Amazon SQS.
5. Atomic Operations: Atomic operations are operations that are executed in their entirety without interruption. In distributed systems, atomic operations are crucial for ensuring that certain sequences of operations occur as a single, indivisible unit. Techniques like Two-Phase Commit (2PC) and Three-Phase Commit (3PC) are used to implement atomic transactions in distributed databases and systems.
Two-Phase Commit (2PC) Protocol:
The essence of two phase commit, unsurprisingly, is that it carries out an update in two phases:
- the first, prepare, asks each node if it’s able to promise to carry out the update
- the second, commit, actually carries it out.
Benefits And Use-Cases Of 2PC:
- Data integrity and consistency across the distributed systems.
- Provides atomicity and durability properties in transactions.
- Suitable for various applications, including databases, distributed applications, and blockchain.
Limitation and Challenges of 2PC:
- Performance issues as it requires multiple message exchanges between nodes.
- Blocking problems during failures, leading to resource unavailability.
- Scalability issues in large-scale distributed systems.
Considering a distributed key-value store implementation,which works as follows:
The client creates a special ID called a “transaction identifier”(transactionRef) and remembers when the transaction started. This ID, along with the start time, helps prevent deadlocks, as explained later. The client uses this ID to keep track of the transaction across all cluster nodes. When the client sends requests to other nodes, it includes this ID with each request.
class TransactionRef {
private UUID transactionId;
private long startTime;
public TransactionRef(long startTime) {
// Generate a unique transaction ID
this.transactionId = UUID.randomUUID();
// Record the start time
this.startTime = startTime;
}
// Other methods and class content...
}
class TransactionClient {
private SystemClock clock;
private TransactionRef currentTransaction;
private ReplicaMapper replicaMapper;
public TransactionClient(ReplicaMapper replicaMapper, SystemClock systemClock) {
// Set the system clock
this.clock = systemClock;
// Create a new transaction reference
this.currentTransaction = new TransactionRef(clock.now());
// Initialize the replica mapper
this.replicaMapper = replicaMapper;
}
private TransactionalKVStore coordinator;
private void maybeBeginTransaction(String key) {
if (coordinator == null) {
coordinator = replicaMapper.getServerFor(key);
coordinator.beginTransaction(transactionRef);
}
}
// Request to retrieve data
public CompletableFuture<String> get(String key) {
ensureTransactionStarted(key);
coordinator.addToTransaction(transactionRef, key);
TransactionalKVStore kvStore = replicaMapper.getServerFor(key);
return kvStore.get(transactionRef, key);
}
// Request to store data
public void put(String key, String value) {
ensureTransactionStarted(key);
coordinator.addToTransaction(transactionRef, key);
TransactionalKVStore kvStore = replicaMapper.getServerFor(key);
kvStore.put(transactionRef, key, value);
}
// Once the client successfully reads without facing any conflicts and
//writes all the key values
public CompletableFuture<Boolean> commit() {
return coordinator.commit(transactionRef);
}
}
Before saving anything, the client tells this manager about the start of the transaction. This manager is picked dynamically when the client begins a “get” or “put” operation with a particular key.
The transaction coordinator watches over the transaction and writes down every change in a log. This way, if something goes wrong, all the details are safe and can be used.
import java.util.concurrent.CompletableFuture;
class TransactionCoordinator {
// Map to store transaction metadata
private Map<TransactionRef, TransactionMetadata> transactions = new ConcurrentHashMap<>();
// Transaction log for recording entries
private WriteAheadLog transactionLog;
// Start a new transaction
public void startTransaction(TransactionRef transactionRef) {
// Create transaction metadata
TransactionMetadata txnMetadata = new TransactionMetadata(transactionRef, systemClock, transactionTimeoutMs);
// Serialize and write metadata to the transaction log
transactionLog.writeEntry(txnMetadata.serialize());
// Store metadata in the transactions map
transactions.put(transactionRef, txnMetadata);
}
// Add a key to the ongoing transaction
public synchronized void addKeyToTransaction(TransactionRef transactionRef, String key) {
TransactionMetadata metadata = transactions.get(transactionRef);
if (!metadata.containsKey(key)) {
metadata.addKey(key);
transactionLog.writeEntry(metadata.serialize());
}
}
// Commit the transaction
public CompletableFuture<Boolean> commitTransaction(TransactionRef transactionRef) {
TransactionMetadata metadata = transactions.get(transactionRef);
metadata.markPreparingToCommit(transactionLog);
List<CompletableFuture<Boolean>> allPrepared = sendPrepareRequestToParticipants(transactionRef);
CompletableFuture<List<Boolean> futureList = sequence(allPrepared);
return futureList.thenApply(result -> {
if (!result.stream().allMatch(r -> r)) {
logger.info("Rolling back = " + transactionRef);
rollbackTransaction(transactionRef);
return false;
}
metadata.markPrepared(transactionLog);
sendCommitMessageToParticipants(transactionRef);
metadata.markCommitComplete(transactionLog);
return true;
});
}
// Send prepare requests to participants
public List<CompletableFuture<Boolean>> sendPrepareRequestToParticipants(TransactionRef transactionRef) {
TransactionMetadata transactionMetadata = transactions.get(transactionRef);
var transactionParticipants = getParticipants(transactionMetadata.getParticipatingKeys());
return transactionParticipants.keySet()
.stream()
.map(server -> server.prepareTransaction(transactionRef))
.collect(Collectors.toList());
}
// Send commit messages to participants
private void sendCommitMessageToParticipants(TransactionRef transactionRef) {
TransactionMetadata transactionMetadata = transactions.get(transactionRef);
var participantsForKeys = getParticipants(transactionMetadata.getParticipatingKeys());
participantsForKeys.keySet().stream()
.forEach(kvStore -> {
List<String> keys = participantsForKeys.get(kvStore);
kvStore.commitTransaction(transactionRef, keys);
});
}
// Get participants for keys
private Map<TransactionalKVStore, List<String>> getParticipants(List<String> participatingKeys) {
return participatingKeys.stream()
.map(k -> Pair.of(serverFor(k), k))
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())));
}
}
class TransactionMetadata {
// Reference to the transaction
private TransactionRef transactionRef;
// List of keys participating in the transaction
private List<String> participatingKeys = new ArrayList<>();
// Status of the transaction
private TransactionStatus transactionStatus;
// Constructors, getters, setters, and other methods...
}
In the cluster, one node acts as a coordinator for the client’s transactions. This node usually stores data for the first key used by the client.
The client tells the coordinator about all the keys it’s using in the transaction. The coordinator keeps a list of these keys. This list helps us know which cluster nodes are part of the transaction.
Each key’s data is often duplicated with the Replicated Log. This means the leader server for a key might change during the transaction. So, instead of keeping track of server addresses, we keep track of keys.
When the client needs to get or store data for a key, it talks directly to the server responsible for that key, based on a strategy. Importantly, it doesn’t go through the coordinator. This avoids sending data twice over the network, first to the coordinator and then to the server.
The keys also help us know which cluster nodes are involved in the transaction.
class TransactionalKVStore {
public void storeDataInTransaction(TransactionRef transactionRef, String key, String value) {
TransactionState transactionState = getOrCreateTransactionState(transactionRef);
transactionState.addPendingUpdate(key, value);
}
// Retrieve data within a transaction
public CompletableFuture<String> retrieveDataInTransaction(TransactionRef transactionRef, String key) {
CompletableFuture<TransactionRef> lockFuture = lockManager.acquire(transactionRef, key, LockMode.READ);
return lockFuture.thenApply(lockedTransactionRef -> {
TransactionState transactionState = getOrCreateTransactionState(lockedTransactionRef);
return kv.get(key);
});
}
// Create or retrieve the transaction state
private synchronized TransactionState getOrCreateTransactionState(TransactionRef transactionRef) {
TransactionState state = this.ongoingTransactions.get(transactionRef);
if (state == null) {
state = new TransactionState();
this.ongoingTransactions.put(transactionRef, state);
}
return state;
}
public void put(TransactionRef transactionRef, String key, String value) {
TransactionState state = getOrCreateTransactionState(transactionRef);
state.addPendingUpdates(key, value);
}
public synchronized CompletableFuture<Boolean> handlePrepare(TransactionRef txn) {
try {
TransactionState state = getTransactionState(txn);
if (state.isPrepared()) {
return CompletableFuture.completedFuture(true); //already prepared.
}
if (state.isAborted()) {
return CompletableFuture.completedFuture(false); //aborted by another transaction.
}
Optional<Map<String, String>> pendingUpdates = state.getPendingUpdates();
CompletableFuture<Boolean> prepareFuture = prepareUpdates(txn, pendingUpdates);
return prepareFuture.thenApply(ignored -> {
Map<String, Lock> locksHeldByTxn = lockManager.getAllLocksFor(txn);
state.markPrepared();
writeToWAL(new TransactionMarker(txn, locksHeldByTxn, TransactionStatus.PREPARED));
return true;
});
} catch (TransactionException| WriteConflictException e) {
logger.error(e);
}
return CompletableFuture.completedFuture(false);
}
private CompletableFuture<Boolean> prepareUpdates(TransactionRef txn, Optional<Map<String, String>> pendingUpdates) {
if (pendingUpdates.isPresent()) {
Map<String, String> pendingKVs = pendingUpdates.get();
CompletableFuture<List<TransactionRef>> lockFuture = acquireLocks(txn, pendingKVs.keySet());
return lockFuture.thenApply(ignored -> {
writeToWAL(txn, pendingKVs);
return true;
});
}
return CompletableFuture.completedFuture(true);
}
TransactionState getTransactionState(TransactionRef txnRef) {
return ongoingTransactions.get(txnRef);
}
private void writeToWAL(TransactionRef txn, Map<String, String> pendingUpdates) {
for (String key : pendingUpdates.keySet()) {
String value = pendingUpdates.get(key);
wal.writeEntry(new SetValueCommand(txn, key, value).serialize());
}
}
private CompletableFuture<List<TransactionRef>> acquireLocks(TransactionRef txn, Set<String> keys) {
List<CompletableFuture<TransactionRef>> lockFutures = new ArrayList<>();
for (String key : keys) {
CompletableFuture<TransactionRef> lockFuture = lockManager.acquire(txn, key, LockMode.READWRITE);
lockFutures.add(lockFuture);
}
return sequence(lockFutures);
}
// Other methods and class content...
}
The requests lock the keys: “get” requests take a “read” lock, and “put” requests take a “write” lock. Read locks are used when reading values.
The locks are held until the transaction is done, not just after the request finishes. This method, where locks are only released when the transaction is done (either by committing or rolling back), is called “two-phase locking.” It’s crucial for maintaining the “serializable” isolation level.
The coordinator first says the transaction is “getting ready to finish.” Then it does two things:
- It asks everyone if they are ready.
- If everyone says yes, it says, “Okay, now we’re definitely finishing,” and tells everyone to finish.
2PC, is a (blocking) consensus protocol that can get stuck when trying to agree on whether a transaction should be committed. This happens when it’s in the “prepare” phase, and the coordinator, which manages the decision-making, fails. In such cases, the process can’t move forward to commit the transaction until the coordinator and all participants are back and working. In simpler terms, if something goes wrong with the coordinator, participants can’t complete the transaction.
In real-world use, the XA Specification (which guides the implementation of the two-phase commit protocol) allows participants to make spontaneous decisions, even if it goes against what the transaction manager decided after the “prepare” phase. When this happens, the protocol can’t reliably continue, and it stops. The transaction ends up in a “heuristic” state, requiring manual intervention. This practical change to the protocol reduces the risk of permanently locking up resources, but it may result in potential inconsistency.
To reduce some of the blocking issues during recovery, two enhancements are the “presumed abort” and “presumed commit” optimizations. These optimizations assume that if there’s no clear evidence of a transaction’s commit during recovery, it was either aborted or committed, and they act accordingly.
Three-Phase Commit(3PC) Protocol:
The 3PC protocol is like an improved version of the two-phase commit, with an extra step. It’s designed to deal with a problem in the two-phase commit where everything gets stuck if a coordinator or participant fails. It is a non-blocking consensus protocol.
The 3PC protocol splits the “prepare” state into two: “waiting” and “pre-commit.” This change eliminates the need for external input during this phase.
When a participant is in the “pre-commit” state, it means that all participants have acknowledged the coordinator’s query, and they are ready to commit. This makes the “pre-commit” state “commitable.” This structure allows participants to collectively decide the overall result of the transaction if the coordinator fails.
It’s important to note that being “non-blocking” in 3PC doesn’t mean participants aren’t blocked during processing. In practical terms, such as in a database, local transactions must start, and locks are placed, which can temporarily block other operations until the entire 3PC process is complete. “Non-blocking” means that the protocol can continue despite failures.
The 3PC defines the following states:
- initial (the 3PC processing is starting),
- waiting (participant is available to commit , it received canCommit message from the coordinator),
- pre-commit (participant is ready to commit and received preCommit message from the coordinator),
- committed (participant is committed, it was commanded by coordinator to commit),
- aborted (participant is aborted, it was commanded by coordinator to abort)
- Phase 1: Participant aborts if it doesn’t hear from a coordinator in timeCoordinator sends aborts to all if it doesn’t hear from any participant
- Phase 2: If coordinator times out waiting for a participant — assume it crashed, tell everyone to abort if participant times out waiting for a coordinator, elect a new coordinator
- Phase 3: If a participant fails to hear from a coordinator, it can contact any other participant for results.
Benefits And Use-Cases Of 3PC:
- Improved fault tolerance: 3PC reduces the chance of failures affecting the entire system.
- Data consistency: Ensuring transactions are atomic and durable across distributed systems.
- Reduced global deadlock risk: The protocol allows participants to make decisions independently, minimizing deadlock risk.
Limitation And Challenges Of 3PC:
More messages: The extra phase means more messages, which can slow down performance and use up network resources.
Still blocking: Although it’s less likely to block compared to 2PC, there are situations where it can still cause blocking.
Complexity: It’s a more complicated protocol than 2PC, making it harder to implement and maintain.
6. Clock Synchronization:
In a distributed system, it’s important to have synchronized clocks across different nodes to maintain consistency. Algorithms like the Network Time Protocol (NTP) and the Berkeley Algorithm help synchronize the clocks of distributed nodes.
Types of Clock Synchronization-
- Physical clock synchronization
- Logical clock synchronization
- Mutual exclusion synchronization