Distributed Architecture

Santosh P.
38 min readAug 6, 2024

--

“A distributed system is a system whose components are located on different networked computers, which communicate and coordinate their actions across compute resources by passing messages to one another”.

Distributed computing is a fundamental approach in modern computing that enables the coordination and collaboration of multiple interconnected systems or nodes to achieve a common goal. This paradigm is widely used in various fields, particularly where large-scale data processing, high availability, fault tolerance, and scalability are critical. This is also a foundation of high performance computing and super computer infrastructure. Here are some major areas where distributed computing is predominantly used:

  1. Cloud computing infrastructure.
  2. Big data processing.
  3. Content delivery networks (CDNs).
  4. Distributed database.
  5. Financial apps.
  6. High performance Computing.
  7. Social networking sites.
  8. Edge computing.

many more.. So the era of modern computing, distributed architecture is the core of computing infrastructure. Some of the major design pattern for distributed computing are:

State machine

A state machine in distributed systems is a model used to describe the behavior of a system where the system can be in one of a finite number of states, and it transitions between these states based on certain inputs or events. State machines are fundamental in building reliable, consistent, and fault-tolerant distributed systems, as they provide a clear and deterministic way to manage system behavior across multiple nodes.

Key Concepts of State Machines in Distributed Systems are states (The system can exist in one of many possible states) transitions (how the system moves from one state to another. These are triggered by events or conditions, such as receiving a message or a timeout.) determinism (In a distributed state machine, given the same initial state and sequence of inputs, all nodes should transition through the same states in the same order. This ensures consistency across the system.) State Replication (In distributed systems, state machines are often replicated across multiple nodes. Ensuring that each node’s state machine transitions in lockstep with others is critical for consistency and fault tolerance.)

// State defines a state of the state machine
type State string

const (
StateInit State = "INIT"
StateStarted State = "STARTED"
StateStopped State = "STOPPED"
)
// Event defines an event that triggers state transitions
type Event string
const (
EventStart Event = "START"
EventStop Event = "STOP"
)
// StateMachine defines the structure of our state machine
type StateMachine struct {
state State
}
// NewStateMachine initializes the state machine with an initial state
func NewStateMachine(initialState State) *StateMachine {
return &StateMachine{
state: initialState,
}
}
// Transition handles state transitions based on the given event
func (sm *StateMachine) Transition(event Event) {
switch sm.state {
case StateInit:
if event == EventStart {
sm.state = StateStarted
}
case StateStarted:
if event == EventStop {
sm.state = StateStopped
}
case StateStopped:
if event == EventStart {
sm.state = StateStarted
}
}
}
// GetState returns the current state of the state machine
func (sm *StateMachine) GetState() State {
return sm.state
}
func main() {
sm := NewStateMachine(StateInit)
fmt.Println("Current State:", sm.GetState()) // INIT
sm.Transition(EventStart)
fmt.Println("Current State:", sm.GetState()) // STARTED
sm.Transition(EventStop)
fmt.Println("Current State:", sm.GetState()) // STOPPED
sm.Transition(EventStart)
fmt.Println("Current State:", sm.GetState()) // STARTED
}

Request and Response Pattern

Command query responsibility segregation pattern (CQRS):

The Command Query Responsibility Segregation (CQRS) pattern is an architectural pattern that separates read and write operations for a data store. With CQRS pattern, the requests being splitted into two major categories of command and query. Command represents the write operations (create, update, delete) that are responsible for changing the state of an application. Query represent the read operations that are responsible for retrieving data without modifying it. This way CQRS pattern helps in maximizing performance, scalability and security. It uses the event sourcing pattern where application state stored as a sequence of events. Each event, update the data from current state by replaying events. More information about event sourcing pattern given later.

// define the datamodel 
// Order represents an order entity
type Order struct {
ID string
Item string
Amount int
}

// create a command handler. Command represent the create/update/delete.
// CommandHandler handles commands that modify the order data
type CommandHandler struct {
orders map[string]Order
mutex sync.Mutex
}

// NewCommandHandler creates a new CommandHandler
func NewCommandHandler() *CommandHandler {
return &CommandHandler{
orders: make(map[string]Order),
}
}

// CreateOrder creates a new order
func (h *CommandHandler) CreateOrder(id, item string, amount int) {
h.mutex.Lock()
defer h.mutex.Unlock()

h.orders[id] = Order{
ID: id,
Item: item,
Amount: amount,
}

fmt.Printf("Order created: %v\n", h.orders[id])
}

// UpdateOrder updates an existing order
func (h *CommandHandler) UpdateOrder(id, item string, amount int) {
h.mutex.Lock()
defer h.mutex.Unlock()

if order, exists := h.orders[id]; exists {
order.Item = item
order.Amount = amount
h.orders[id] = order
fmt.Printf("Order updated: %v\n", h.orders[id])
} else {
fmt.Printf("Order with ID %s not found\n", id)
}
}

// Create a query handler
// QueryHandler handles queries that retrieve the order data
type QueryHandler struct {
orders map[string]Order
mutex sync.Mutex
}

// NewQueryHandler creates a new QueryHandler
func NewQueryHandler(orders map[string]Order) *QueryHandler {
return &QueryHandler{
orders: orders,
}
}

// GetOrder retrieves an order by ID
func (h *QueryHandler) GetOrder(id string) (Order, bool) {
h.mutex.Lock()
defer h.mutex.Unlock()

order, exists := h.orders[id]
return order, exists
}

// main function

func main() {
// Initialize command handler
commandHandler := NewCommandHandler()

// Create some orders
commandHandler.CreateOrder("1", "Laptop", 2)
commandHandler.CreateOrder("2", "Phone", 5)

// Update an order
commandHandler.UpdateOrder("1", "Laptop", 3)

// Initialize query handler with the same order map
queryHandler := NewQueryHandler(commandHandler.orders)

// Query for a specific order
order, exists := queryHandler.GetOrder("1")
if exists {
fmt.Printf("Queried Order: %v\n", order)
} else {
fmt.Println("Order not found")
}
}

Splitter pattern

Splits a single message into multiple parts, processes them independently, and often routes them to different destinations.

// Splitter splits a request into multiple parts
func Splitter(request string) []string {
return []string{"Part1: " + request, "Part2: " + request}
}

func main() {
request := "OriginalRequest"
parts := Splitter(request)
for _, part := range parts {
fmt.Println(part)
}
}

In case of splitter message splitted based on certain cliteria. Like “Part 1: request” is one message and “Part 2: request” is another message.

Segregator pattern

Segregates different types of messages based on specific criteria, routing them to appropriate processing units. These patterns help in managing complex request/response handling in distributed systems by breaking down and routing messages efficiently. Using the segregator pattern large batch job can be splitted into smaller tasks. (Almost similar to splitter pattern).

// Segregator routes the request based on type
func Segregator(request string) string {
if request == "TypeA" {
return "Handled by TypeAProcessor"
}
return "Handled by TypeBProcessor"
}

func main() {
request := "TypeA"
result := Segregator(request)
fmt.Println(result)
}

Gateway aggregation or Aggregator pattern

Combine data from multiple services or sources into a single response, ensuring that clients receive a unified and coherent result. This pattern also useful in microservices architectures where a client request requires data from multiple services.

// Service1 simulates a microservice response
func Service1() string {
return "Data from Service 1"
}

// Service2 simulates another microservice response
func Service2() string {
return "Data from Service 2"
}

// Aggregator collects responses from multiple services
func Aggregator() string {
var wg sync.WaitGroup
var response1, response2 string

wg.Add(2)
go func() {
defer wg.Done()
response1 = Service1()
}()
go func() {
defer wg.Done()
response2 = Service2()
}()
wg.Wait()

return response1 + "; " + response2
}

func main() {
aggregatedResponse := Aggregator()
fmt.Println("Aggregated Response:", aggregatedResponse)
}

Data Distribution Pattern

In distributed system architecture, data distribution is the most critical flow. There are some common patterns are used for efficient data distributions.

Data partitioning or chunking

Dividing data into partitions, which are then distributed across nodes. This approach is particularly useful in situations where processing large datasets or handling large files in a single operation could be inefficient or lead to excessive memory usage. By partition it and distributing them across multiple compute resources to perform them in parallel is efficient.

// ChunkSize defines the size of each chunk to read in bytes
const ChunkSize = 1024 // 1 KB

func main() {
filePath := "largefile.txt" // Replace with your large file path

// Open the file for reading
file, err := os.Open(filePath)
if err != nil {
fmt.Println("Error opening file:", err)
return
}
defer file.Close()

// Create a buffered reader
reader := bufio.NewReader(file)

// Process the file in chunks
for {
// Create a buffer to hold the chunk data
buffer := make([]byte, ChunkSize)

// Read a chunk of data from the file
bytesRead, err := reader.Read(buffer)
if err != nil && err != io.EOF {
fmt.Println("Error reading file:", err)
return
}

// If EOF is reached, break the loop
if bytesRead == 0 {
break
}

// Process the chunk (for example, print it out)
processChunk(buffer[:bytesRead])

// Break the loop if EOF is reached
// ensure all data is processed.
if err == io.EOF {
break
}
}

fmt.Println("File processing completed.")
}

// processChunk is a placeholder function to process each chunk of data
func processChunk(chunk []byte) {
fmt.Println("Processing chunk:")
fmt.Println(string(chunk))
}

Sharding (Horizontal partitioning)

Data is partitioned across multiple nodes such that each node contains a subset of the data. Each partition is called a shard. Distributes data across multiple databases or multiple storage nodes to improve performance and scalability. For sharding, more information can be found here.

Replication

Replication in a distributed system refers to the process of copying and maintaining multiple instances of the same data across different servers or nodes. This ensures data availability, fault tolerance, and load distribution.

// Node represents a single node in the system
type Node struct {
Name string
Data map[string]string
mu sync.RWMutex
}

// NewNode creates a new Node
func NewNode(name string) *Node {
return &Node{
Name: name,
Data: make(map[string]string),
}
}

// Set stores a key-value pair in the node
func (n *Node) Set(key, value string) {
n.mu.Lock()
defer n.mu.Unlock()
n.Data[key] = value
fmt.Printf("Data stored in %s: %s => %s\n", n.Name, key, value)
}

// Get retrieves the value for a given key from the node
func (n *Node) Get(key string) (string, bool) {
n.mu.RLock()
defer n.mu.RUnlock()
value, exists := n.Data[key]
return value, exists
}

// Replicate replicates the data across a set of nodes
func Replicate(nodes []*Node, key, value string) {
for _, node in range nodes {
node.Set(key, value)
}
}

func main() {
// Create nodes
node1 := NewNode("Node1")
node2 := NewNode("Node2")
node3 := NewNode("Node3")

// Group nodes together
nodes := []*Node{node1, node2, node3}

// Simulate storing a key-value pair and replicating it across nodes
key := "user123"
value := "John Doe"

// Replicate the data to all nodes
Replicate(nodes, key, value)

// Verify the data replication by fetching the value from each node
for _, node := range nodes {
if v, exists := node.Get(key); exists {
fmt.Printf("Node %s has value for %s: %s\n", node.Name, key, v)
} else {
fmt.Printf("Node %s does not have value for %s\n", node.Name, key)
}
}
}

Replication strategy

Replication strategies define how data is copied and maintained across multiple servers or nodes in a distributed system. Replication strategies are essential for performance improvement, data redundancy, high availability, and disaster recovery in distributed systems. Type of replication strategies:

  1. Single Leader Replication
  2. Master-Slave (One primary node handles writes; replicas handle reads.)
  3. Multi-Leader Replication (Multiple nodes can accept writes, providing better fault tolerance and availability.)
  4. Leaderless replication (there’s no central figure directing traffic. Instead, clients themselves, or a coordinator, send writes directly to several replicas. )
  5. Quorum-based Replication (Uses a consensus algorithm to ensure data consistency across nodes. Reads and writes must reach a majority ).
  6. Chain Replication (Nodes are arranged in a chain. Write operations propagate from the head to the tail, ensuring high throughput and strong consistency.)
  7. Gossip based Replication (Nodes randomly exchange state information (or “gossip”) with other nodes to propagate updates.)

A system might use a combination of these strategies, depending on the specific needs of different parts of the system. Like a system that requires strong consistency might use quorum-based replication for critical data, whereas to achieve eventual consistency gossip-based replication is the preferred strategy.

// Node represents a single node in the system
type Node struct {
Name string
Data map[string]string
Peers []*Node
mu sync.RWMutex
quorum int
}

// NewNode creates a new Node
func NewNode(name string, quorum int) *Node {
return &Node{
Name: name,
Data: make(map[string]string),
quorum: quorum,
}
}

// Set stores a key-value pair in the node
func (n *Node) Set(key, value string) {
n.mu.Lock()
defer n.mu.Unlock()
n.Data[key] = value
fmt.Printf("%s set: %s => %s\n", n.Name, key, value)
}

// Get retrieves the value for a given key from the node
func (n *Node) Get(key string) (string, bool) {
n.mu.RLock()
defer n.mu.RUnlock()
value, exists := n.Data[key]
return value, exists
}

// Gossip shares this node's data with a random peer
func (n *Node) Gossip() {
if len(n.Peers) == 0 {
return
}
peer := n.Peers[rand.Intn(len(n.Peers))]
n.mu.RLock()
defer n.mu.RUnlock()
for key, value := range n.Data {
peer.Set(key, value)
}
fmt.Printf("%s gossiped data to %s\n", n.Name, peer.Name)
}

// StartGossiping starts the gossip protocol in the background
func (n *Node) StartGossiping() {
go func() {
for {
n.Gossip()
time.Sleep(time.Second * time.Duration(rand.Intn(5)+1))
}
}()
}

// AddPeer adds a peer to this node
func (n *Node) AddPeer(peer *Node) {
n.Peers = append(n.Peers, peer)
}

func main() {
// Create nodes
node1 := NewNode("Node1", 2)
node2 := NewNode("Node2", 2)
node3 := NewNode("Node3", 2)
node4 := NewNode("Node4", 2)

// Add peers (create the network)
node1.AddPeer(node2)
node1.AddPeer(node3)
node2.AddPeer(node1)
node2.AddPeer(node4)
node3.AddPeer(node1)
node3.AddPeer(node4)
node4.AddPeer(node2)
node4.AddPeer(node3)

// Start gossiping
node1.StartGossiping()
node2.StartGossiping()
node3.StartGossiping()
node4.StartGossiping()

// Simulate setting a key-value pair
node1.Set("user123", "Alice")

// Wait for gossip to propagate
time.Sleep(10 * time.Second)

// Check if the data has been propagated to all nodes
for _, node := range []*Node{node1, node2, node3, node4} {
if value, exists := node.Get("user123"); exists {
fmt.Printf("%s has user123: %s\n", node.Name, value)
} else {
fmt.Printf("%s does not have user123\n", node.Name)
}
}
}

How to sync data across replicas

There are two ways the data in replicas being in sync.

Read Repair: Imagine a client requests data from several nodes. If it gets a stale response from, say, Replica 3, it can immediately write back the updated value to that replica.

Anti-Entropy: Databases continuously run background processes to find discrepancies between replicas. They then sync the data, but not in any particular order. Unlike the replication log in leader-based systems, anti-entropy may result in significant delays before data is copied. For instance, if a rarely updated user profile is changed, it might take some time for all replicas to show the new information.

Quorums for reading and writing

Quorum pattern is to ensure that a sufficient number of nodes agree on an operation before it is considered successful. Data is typically replicated across multiple nodes to ensure reliability and availability. The quorum pattern helps in coordinating read and write operations across these replicas. This way quorum helps to strike a balance between availability and consistency.

Imagine you have a distributed database with five replicas. In this system, to ensure consistency, you define quorums for both reading and writing operations. Let’s say you set a write quorum (W) of 3 and a read quorum (R) of 3. This means that for a write operation to be considered successful, at least 3 out of the 5 replicas must acknowledge the write. Similarly, for a read operation to be reliable, it must gather data from at least 3 replicas.

Now, consider a scenario where a client writes data to the system. The write request is sent to all five replicas, but only needs acknowledgments from any 3 to proceed. This ensures that even if 2 replicas are down or slow, the system can still function effectively. Similarly for read operation, where the client will query at least 3 replicas and use the most recent data among the responses.

// Node represents a single node in the distributed system
type Node struct {
ID int
Data map[string]string
mu sync.RWMutex
}

// SetValue simulates a write operation on a node
func (n *Node) SetValue(key, value string) {
n.mu.Lock()
defer n.mu.Unlock()
n.Data[key] = value
}

// GetValue simulates a read operation on a node
func (n *Node) GetValue(key string) (string, bool) {
n.mu.RLock()
defer n.mu.RUnlock()
value, exists := n.Data[key]
return value, exists
}

// QuorumWrite performs a write operation with a write quorum
func QuorumWrite(nodes []*Node, key, value string, W int) bool {
var wg sync.WaitGroup
ack := make(chan bool, len(nodes))
for _, node := range nodes {
wg.Add(1)
go func(n *Node) {
defer wg.Done()
n.SetValue(key, value)
ack <- true
}(node)
}

wg.Wait()
close(ack)

// Count the acknowledgements
successCount := 0
for range ack {
successCount++
if successCount >= W {
return true
}
}
return false
}

// QuorumRead performs a read operation with a read quorum
func QuorumRead(nodes []*Node, key string, R int) (string, bool) {
var wg sync.WaitGroup
reads := make(chan string, len(nodes))
for _, node := range nodes {
wg.Add(1)
go func(n *Node) {
defer wg.Done()
if value, exists := n.GetValue(key); exists {
reads <- value
}
}(node)
}

wg.Wait()
close(reads)

// Collect the reads
readCount := make(map[string]int)
for value := range reads {
readCount[value]++
if readCount[value] >= R {
return value, true
}
}
return "", false
}

func main() {
// Simulate a distributed system with 5 nodes
nodes := []*Node{
{ID: 1, Data: make(map[string]string)},
{ID: 2, Data: make(map[string]string)},
{ID: 3, Data: make(map[string]string)},
{ID: 4, Data: make(map[string]string)},
{ID: 5, Data: make(map[string]string)},
}

// Quorum parameters
N := len(nodes)
W := 3 // Write quorum
R := 3 // Read quorum

// Perform a write operation
if QuorumWrite(nodes, "user123", "Alice", W) {
fmt.Println("Write successful with quorum")
} else {
fmt.Println("Write failed to achieve quorum")
}

// Simulate some delay to mimic network latency
time.Sleep(time.Second)

// Perform a read operation
if value, exists := QuorumRead(nodes, "user123", R); exists {
fmt.Printf("Read successful with quorum: %s\n", value)
} else {
fmt.Println("Read failed to achieve quorum")
}
}

If W + R > N (the total number of replicas), the system guarantees strong consistency, meaning every read receives the most recent write.

Sloppy Quorum: This occurs when the write quorum ends up on different nodes than the read quorum, resulting in no guaranteed overlap. Handling sloppy quorum involves several steps to ensure data availability and eventual consistency in distributed systems. Write operations are temporarily stored on a node that is not part of the intended replica set if some nodes are unavailable. The node that receives the write is responsible for forwarding it to the correct node when it becomes available. When the intended nodes become available, hinted handoff nodes forward the stored writes. Use vector clocks or timestamps to merge and resolve conflicts. Background processes periodically compare and synchronize data across nodes.

Some of issue of inconsistency you can find during update across replicas:

Concurrent Writes During Reads (If a write happens concurrently with a read, the write may only be reflected on some replicas).

Partial Write Success (write is successful on some replicas but fails on others, and the overall operation is deemed successful on fewer than ‘W’ nodes without a rollback)

Node Failure and Data Recovery (a node carrying the new value fails and its data is recovered from a replica with the old value, the number of replicas storing the new value might fall below ‘W’.)

How to monitor whether the data returned is fresh or stale?

In case of with leader replication like single leader or multileader, to monitor replication lag is easier compared to leaderless. Since writes are applied in the same order to both the leader and the followers, each node has a specific position in the replication log. By comparing a follower’s current position to the leader’s, you can effectively measure the lag.

In leaderless replications Writes aren’t applied in a fixed order, making it challenging to monitor staleness. There are several strategies to monitor and ensure data freshness:

  1. Attach a timestamp to data records
  2. Use caching mechanisms that automatically invalidate or refresh.
  3. Implement versioning.
  4. Use an event-driven architecture where the system notifies consumers of changes.
  5. Use E-tags (entity tags) for HTTP responses to ensure clients are working with fresh data.
  6. Periodic data consistency check.

To gauge data freshness in leaderless replication, use techniques like vector clocks, timestamps, read repair, anti-entropy protocols, and quorum reads/writes. These methods help maintain consistency and ensure the system resolves conflicts effectively.

Each data write is accompanied by a timestamp. The latest timestamp generally represents the freshest data. During read operations, nodes compare their data and synchronize to the most recent version. We can also have background processes that compare and synchronize data across nodes periodically.

Consistent Hashing

Consistent hashing is a technique used in distributed systems to evenly distribute data across a cluster of machines or nodes while minimizing the amount of data that needs to be moved when a node is added or removed. It’s particularly useful in scalable distributed systems like distributed databases, caches, and load balancers.

More information about consistent hashing can be found here.

Distributed File system

A distributed filesystem (DFS) is a file system that allows data to be stored and accessed across multiple servers or nodes in a network, providing the illusion of a single, unified filesystem. It is designed to manage the complexities of data distribution, redundancy, fault tolerance, and scalability, which are crucial in modern large-scale computing environments.

HDFS (Hadoop Distributed File System) a widely used distributed file system that store and manage large files across multiple nodes. More information about design patterns of distributed file system can be can be found here.

Database Manipulation Pattern

The CAP theorem, also known as Brewer’s theorem, is a fundamental principle in distributed architecture that states it is impossible for a distributed data store to simultaneously provide more than two out of the following three guarantees of Consistency (every read should receive the most recent write or an error), Availability(Every request receives a response, even if it’s not the most recent write), and Partition Tolerance (system continue functioning even if communication breakdowns between nodes).

The CAP theorem plays a crucial role in database selection for distributed systems, as it forces architects and developers to make trade-offs between Consistency, Availability, and Partition Tolerance based on the specific requirements of their application.

For Banking and Financial Applications consistency is a priority. So most preferable databases are relational databases like mysql or postgresql, or no-relational databases like mangodb or Hbase. For E-commerce Platforms which need high availability with eventual consistency, select databases like Cassandra, Couchbase, and DynamoDB. Systems like Amazon’s DynamoDB, application can tolerate some inconsistencies but must remain available during partitions. Whereas HBase sacrifice availability under partition but maintain consistency. For social media, the critical is availability and partition Tolerance, whereas for realtime analysis Consistency and Partition Tolerance (CP).

Database manipulation patterns are design patterns used to efficiently handle data operations within databases. These patterns are critical for ensuring that database interactions to maintain consistency, scalability, and maintainability. Here are some common database manipulation patterns

Database Partitioning

Partitioning is about dividing the database to improve manageability and performance. Partitioning is an integral part of designing an efficient distributed system.

When we have a massive amount of data in a system, we need to decide which data goes to which node. The goal is simple: we want to spread the data and the workload evenly across all nodes. So partitioning can be done in key range. For that we need to devise the proper strategy.

Choose Range Boundaries Wisely. For certain access patterns, this can lead to hot spots. If the key is a timestamp, and we’re always writing new data, all the writes end up in the same partition. This can overload one partition while leaving others idle. A Smart Solution is to partition by More Than Time. So partitioning can be based on hash key of data I store.

After hashing, each partition gets a range of these hashes. Any key whose hash value falls within a partition’s range is stored in that partition.

MongoDB handles partitioning using a technique called sharding, which distributes data across multiple servers (shards) to ensure horizontal scalability. Shard distribution will be based on shard key. Whereas Cassandra performs partitioning using a technique called token-based partitioning. Each node in the cluster is assigned a range of tokens. Data is distributed across nodes based on the token range. The partition key is hashed to determine the token that identifies the node responsible for storing the data. Separate virtual nodes to distribute data efficiently across the cluster, ensuring scalability and fault tolerance. Cassandra also use compound primary key with several columns. The first part of the key is hashed to determine the partition, while the other parts are used as an index for sorting within that partition.

Managing Partitions With SStable or LSM

Each partition can maintain a sorted order, using structures like SSTables or LSM trees, to make range queries more efficient.

SSTables or Sorted String Tables is a persistent, ordered, immutable data structure used in NoSQL databases like Cassandra. It stores key-value pairs in a sorted order, facilitating efficient read and write operations. It’s efficient for range queries due to their sorted nature.

package sstable

// Entry represents a key-value pair in the SSTable.
type Entry struct {
Key, Value string
}

// SSTable Structure
// SSTable File: A file storing key-value pairs sorted by keys.
// Index File: An index for quick lookups, pointing to data offsets in the SSTable file.
// SSTable represents the SSTable structure with an index.
type SSTable struct {
file *os.File
index map[string]int64
reader *bufio.Reader
}
// Create sstable file
func CreateSSTable(filename string, entries []Entry) (*SSTable, error) {
file, err := os.Create(filename)
if err != nil {
return nil, err
}

index := make(map[string]int64)
writer := bufio.NewWriter(file)
for _, entry := range entries {
pos, _ := file.Seek(0, os.SEEK_CUR)
index[entry.Key] = pos
if _, err := writer.WriteString(entry.Key + "\t" + entry.Value + "\n"); err != nil {
return nil, err
}
}
writer.Flush()
return &SSTable{file: file, index: index, reader: bufio.NewReader(file)}, nil
}

// Read from the sstable file
func (s *SSTable) Get(key string) (string, error) {
pos, ok := s.index[key]
if !ok {
return "", os.ErrNotExist
}

if _, err := s.file.Seek(pos, os.SEEK_SET); err != nil {
return "", err
}
line, err := s.reader.ReadString('\n')
if err != nil {
return "", err
}
parts := strings.SplitN(line, "\t", 2)
if len(parts) < 2 {
return "", os.ErrNotExist
}
return strings.TrimSpace(parts[1]), nil
}

SSTables work well in conjunction with log-structured merge-trees (LSM-trees) to handle large-scale data efficiently.

Log-Structured Merge-Trees (LSM-Trees) are data structures designed to handle high write throughput efficiently. They are optimized for scenarios with frequent inserts, updates, and deletions.

Writes are batched and written to a write-ahead log (WAL) and an in-memory table (memtable). When the memtable is full, it is flushed to disk as an immutable SSTable. As part of compaction periodically, smaller SSTables are merged and compacted into larger ones, reducing the number of files and improving read efficiency.

Though reads may need to check multiple SSTables, they are optimized with Bloom filters and indexes to quickly locate data.

// Defining LSM data structure
package lsm

type MemTable struct {
sync.RWMutex
data map[string]string
}
type SSTable struct {
file *os.File
index map[string]int64
reader *bufio.Reader
}
func NewMemTable() *MemTable {
return &MemTable{
data: make(map[string]string),
}
}

// write to memtable and WAL

func (mt *MemTable) Set(key, value string) {
mt.Lock()
defer mt.Unlock()
mt.data[key] = value
// Simultaneously write to WAL (not implemented for brevity)
}

// Flush from memtable to sstable
func (mt *MemTable) FlushToSSTable(filename string) (*SSTable, error) {
mt.Lock()
defer mt.Unlock()

file, err := os.Create(filename)
if err != nil {
return nil, err
}
defer file.Close()
writer := bufio.NewWriter(file)
index := make(map[string]int64)
for key, value := range mt.data {
pos, _ := file.Seek(0, os.SEEK_CUR)
index[key] = pos
if _, err := writer.WriteString(key + "\t" + value + "\n"); err != nil {
return nil, err
}
}
writer.Flush()
return &SSTable{file: file, index: index, reader: bufio.NewReader(file)}, nil
}

// Read from SStable

func (sst *SSTable) Get(key string) (string, error) {
pos, ok := sst.index[key]
if !ok {
return "", os.ErrNotExist
}

if _, err := sst.file.Seek(pos, os.SEEK_SET); err != nil {
return "", err
}
line, err := sst.reader.ReadString('\n')
if err != nil {
return "", err
}
parts := strings.SplitN(line, "\t", 2)
if len(parts) < 2 {
return "", os.ErrNotExist
}
return strings.TrimSpace(parts[1]), nil
}

The querying process across multiple partitions is known as scatter and gather. This involves parallel queries to each partition to collect the needed data. Although parallelization can improve speed, scatter and gather can be resource-intensive, especially in large databases. The main costs are associated with querying each partition separately and then consolidating the results, which often requires significant computational and network resources.

Change Data Capture (CDC)

Change Data Capture (CDC) is a process that tracks all data changes in a database and extracts them so they can be reflected in other systems, ensuring they have accurate and up-to-date copies. We have many different approaches to capture updated changes in a database. Like using polling (Periodically checking the database), Trigger (Using database triggers to log changes to a special table) and streaming.

The following example shows a simple example of CDC using polling.

package main

func main() {
db, err := sql.Open("postgres", "user=postgres dbname=mydb sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()

lastTimestamp := time.Now().Add(-time.Hour)

for {
rows, err := db.Query("SELECT id, data, updated_at FROM my_table WHERE updated_at > $1", lastTimestamp)
if err != nil {
log.Fatal(err)
}
for rows.Next() {
var id int
var data string
var updatedAt time.Time
err := rows.Scan(&id, &data, &updatedAt)
if err != nil {
log.Fatal(err)
}
fmt.Printf("ID: %d, Data: %s, UpdatedAt: %v\n", id, data, updatedAt)
// Process the change
lastTimestamp = updatedAt
}
rows.Close()

time.Sleep(10 * time.Second)
}
}

Debezium (using log-based streaming) is an open-source platform for Change Data Capture (CDC) that used with Kafka to capture changes from databases like MySQL or PostgreSQL.. It monitors databases and captures row-level changes, emitting them as event streams.

func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

partitionConsumer, err := consumer.ConsumePartition("my-topic", 0,
sarama.OffsetNewest)
if err != nil {
log.Fatal(err)
}
defer partitionConsumer.Close()

for message := range partitionConsumer.Messages() {
fmt.Printf("Message: %s\n", string(message.Value))
// Process the CDC event
}
}

Distributed Transactions

Distributed transactions are complex operations that involve multiple, separate resources (like databases or services) in a distributed system, ensuring all parts of the transaction either succeed or fail together. These transaction are the operation performed on data. Distributed transaction when update the data on two or more distinct nodes of distributed datastore, it ensure consistent result across transactions. This is the traditional approach to maintain data consistency across multiple services, databases, or message brokers.

Almost every request handled by an enterprise application is executed within a database transaction. Distributed transaction needs to be 2-phase or 3-phase commit or saga pattern.

Two-phase commit (2PC) to ensure that all participants in a transaction either commit or rollback. In the Phase 1, a central coordinator manages the transaction. It sends a prepare message to all participants, waits for their acknowledgment, and then sends a commit or rollback message accordingly. In the Phase 2, if all participant services agree, the coordinator instructs them to commit the changes atomically. If any participant fails, the coordinator instructs all others to rollback.

Here the coordinator is the single point of failure. If the coordinator fails, the entire transaction needs to be restarted. To let one transaction to complete, other transaction need to be blocked before final status being updated to cache. Thats why, while 2PC provides atomicity, isn’t suitable for all scenarios due to its blocking nature. 3phase commit handles the coordinator or participator failure.

SAGA pattern: The Saga Pattern is a design pattern used to manage long-running transactions in a distributed system. The pattern divides a transaction into smaller, isolated steps, each with its own compensation action (a rollback step) in case of failure. There are two main approaches to implementing the Saga Pattern: Choreography and Orchestration. Choreography will be implemented with asynchronous messaging pattern. Client request publishes messages to messaging queue. messages pushed to the subscriber. Subscriber performed the task and push the response back to the messaging queue. If failed can be retried. The Saga Pattern decomposes a long-lived transaction into a sequence of smaller, loosely coupled transactions. Each step in the sequence represent a micro-service operation, and compensating transactions handle rollback in case of failures.

A central orchestrator manages the sequence of transactions. Orchestrator sends commands to each service to perform its part of the transaction. If a service fails, the orchestrator sends compensating commands to undo the completed steps. But the orchestrator can become a bottleneck or a single point of failure. Saga Pattern with Orchestrator is ideal for complex workflows, multi-step transactions with compensating logic, and when centralized control is necessary.

In choreography-based Saga, each service involved communicates directly with others through events using a message broker , avoiding a central coordinator. Each service publishes events to broker based on its state changes. Other services subscribe to relevant events and react accordingly. No single point of failure, but need a careful design.



// order processing system with 4 separate services of order service,
// payment service, inventory and shipping service.

type OrderSagaOrchestrator struct {
orderService *OrderService
paymentService *PaymentService
inventoryService *InventoryService
shippingService *ShippingService
}

func NewOrderSagaOrchestrator() *OrderSagaOrchestrator {
return &OrderSagaOrchestrator{
orderService: &OrderService{},
paymentService: &PaymentService{},
inventoryService: &InventoryService{},
shippingService: &ShippingService{},
}
}

// orchestrator coordinates the execution of the saga. It sends commands
// to the services and handles the compensating actions if any step fails.
func (o *OrderSagaOrchestrator) ProcessOrder(orderID string) {
fmt.Println("Starting Order Saga")

err := o.orderService.CreateOrder(orderID)
if err != nil {
log.Printf("Order creation failed: %v\n", err)
return
}

err = o.paymentService.ProcessPayment(orderID)
if err != nil {
log.Printf("Payment processing failed: %v\n", err)
o.orderService.CancelOrder(orderID)
return
}

err = o.inventoryService.ReserveInventory(orderID)
if err != nil {
log.Printf("Inventory reservation failed: %v\n", err)
o.paymentService.RefundPayment(orderID)
o.orderService.CancelOrder(orderID)
return
}

err = o.shippingService.ShipOrder(orderID)
if err != nil {
log.Printf("Shipping failed: %v\n", err)
o.inventoryService.ReleaseInventory(orderID)
o.paymentService.RefundPayment(orderID)
o.orderService.CancelOrder(orderID)
return
}

fmt.Println("Order processed successfully")
}

// Actions.
type OrderService struct{}

func (o *OrderService) CreateOrder(orderID string) error {
fmt.Printf("Order created: %s\n", orderID)
return nil
}

func (o *OrderService) CancelOrder(orderID string) error {
fmt.Printf("Order cancelled: %s\n", orderID)
return nil
}

type PaymentService struct{}

func (p *PaymentService) ProcessPayment(orderID string) error {
fmt.Printf("Payment processed for order: %s\n", orderID)
return nil
}

func (p *PaymentService) RefundPayment(orderID string) error {
fmt.Printf("Payment refunded for order: %s\n", orderID)
return nil
}

type InventoryService struct{}

func (i *InventoryService) ReserveInventory(orderID string) error {
fmt.Printf("Inventory reserved for order: %s\n", orderID)
return nil
}

func (i *InventoryService) ReleaseInventory(orderID string) error {
fmt.Printf("Inventory released for order: %s\n", orderID)
return nil
}

type ShippingService struct{}

func (s *ShippingService) ShipOrder(orderID string) error {
fmt.Printf("Order shipped: %s\n", orderID)
return nil
}

Each services will have their own database, you need to use a mechanism to maintain data consistency across those databases. Data consistency across distributed databases requires careful consideration of the consistency model, replication strategy, and conflict resolution techniques.

You have several consistency model, like strong consistency, eventual consistency, etc with replication strategy I have already discussed. Finally the conflict resolution which can be achieved using consensus algorithm. Consensus algorithms that ensure a group of distributed nodes agree on a single data value, maintaining consistency. These algorithms can be complex to implement and may introduce latency.

Distributed Events Patterns

Most of the distributed systems are based on event driven architecture. The components in the Systems communicate with each other by producing and consuming events. The flow of the program is determined by events — discrete occurrences or changes in state that are significant to the system. In an event-driven architecture, components of a system communicate with each other through the production and consumption of events, which decouple the communication between components, leading to a more flexible, scalable, and maintainable system.

Event driven architecture revolves around event (change in state), producer, consumer, event broker (with features of event routing, filtering, and persistence), event channel (medium through which events are transmitted from producers to consumers).

// defining event 

type OrderEvent struct {
OrderID string
CustomerID string
ItemID string
Quantity int
}

// defining event broker

func main() {
// Create a channel to act as the event broker
eventChannel := make(chan OrderEvent)

// Start the services that consume the events
go InventoryService(eventChannel)
go NotificationService(eventChannel)

// Simulate creating an order
orderService := OrderService{}
orderService.CreateOrder("Order123", "Customer456", "Item789", 2, eventChannel)

// Wait for consumers to finish processing
select {}
}

// Order service -- event producer

package main

type OrderService struct{}

func (os *OrderService) CreateOrder(orderID, customerID, itemID string, quantity int, eventChannel chan OrderEvent) {
// Simulate order creation logic
fmt.Printf("Order %s created by customer %s for item %s (quantity: %d)\n", orderID, customerID, itemID, quantity)

// Create an OrderEvent
event := OrderEvent{
OrderID: orderID,
CustomerID: customerID,
ItemID: itemID,
Quantity: quantity,
}

// Send the event to the event broker
eventChannel <- event
}

// event consumer

func InventoryService(eventChannel chan OrderEvent) {
for event := range eventChannel {
// Simulate inventory checking and reservation logic
fmt.Printf("Inventory reserved for Order %s: Item %s (quantity: %d)\n", event.OrderID, event.ItemID, event.Quantity)
}
}

Event-driven architecture is a powerful pattern for building scalable, flexible, and resilient systems, especially in scenarios requiring real-time processing, modularity, and loose coupling. While it introduces certain complexities, the benefits it provides in terms of scalability, resilience, and maintainability make it an excellent choice for many modern applications, particularly in distributed systems and microservices architectures.

Distributed Events with Webhooks

Webhook is a mechanism for one software system to notify another system of events. They are widely used in distributed systems for event-driven architectures. This backend server is called Webhook server, it receives events update and process accordingly. Operating engineers add, edit, or delete event configurations in the Webhook system. Each event is defined as a pair of <event_id, operation_id>

  1. External events are sent to our API endpoint
  2. After receiving the event, we match its event_id in our configuration and find the corresponding operation_id
  3. The Webhook system execute the specified operation.
  4. The processing result persists to DB.

Hard questions:

  • Facing the failures of each server, after our receiving API received the event data, how to make sure these data won’t be lost and will be eventually processed?
  • How to scale for the concurrent event triggering?

Client server calls the receiving API endpoint to send the event with event_id and event_data. The event receiver is a web server. It processes each trigger request by enqueueing the <trigger_id, event_id, event_data> tuple to a distributed message queue (MQ).

After the event receiver get the enqueue confirmation from MQ, it response success status (code 200) to the client server. From then on, the event data will live in our system and the client without worrying about the loosing event before it’s processed by the defined routines. The MQ acts both as a buffer for ongoing event processing and a first-class data persistence component. If the enqueueing failed — MQ not reachable or returned failure status — the event receiver will return error to the client. The client will decide what to do.

Event processor is a service consists of background workers. The worker fetch the next event data from MQ and execute the defined routine based on its event_id and event_data. MQ store all event data while waiting to be processed. They can’t leave the queue unless the processed is finalized. If they leave the queue and the processing thread crashed, the data will be lost. Therefore when a event is being processed by an event processor worker, the rest data in the queue.

Event processor execute the webhook routine for the event data. Possibly with retries. The execution only finishes when it’s successful or fully retried. Since the this is possibly a slow process and the rest of the queue is blocked, this design is unacceptable for speed sensitive tasks.

Event processor stores the execution result to DB. After the execution — success or deemed permanent failure, the event processor worker will store the result to DB. This allows later query on the processing result by request_id. After ensuring no data loss and completion of eventual event processing, event receiver returns success to the client, the event will be processed eventually.

Eventual event data processing means the client can indefinitely retry until our API returns success, the event processor worker can indefinitely retry until the event processing reached a closure.

The MQ can be sharded based on request_id and machine id so that it can be load balanced between nodes. We should choose a shard number so that each shard can be stored on one machine and can be handled by one event processor worker.

There is a problem in case of event driven architecture when there are too many ongoing event processing task in the queue: More than one machine can store and too slow for one worker to handle. For addressing that we have time sensitive events by having a fast track in webhook. Add a fast track in the Event receiver, for each received event that opt-in for fast processing, it queries the DB to get the configuration and execute the corresponding processing, it will then store the result in the database.

Another issue is duplicate event processing when the same event in the MQ will be processed multiple times. Duplicate processing can happen when the Event processor has finished the execution but failed to write to DB; or when DB update is finished but the processor crashed before dequeueing.

The design can have three types of guarantees.

  • At least once processing
  • At most once processing
  • Exactly once processing

At-least-once is already guaranteed by our design. This is ideal for idempotent Webhooks.

At-most-once protocol can be easily achieved by changing the event processor so that it dequeues the event task immediately after fetching it — before executing the processing routine and writing to DB.

A stronger guarantee is exactly once processing. It requires the event processing routine to support rollback.

Another way of event processing is by replacing MQ with the DB. to store the incoming event trigger request in a DB and mark the request as created. Then we have a bunch of event processors that periodically scan the DB to fetch requests in created state. The even processors will mark the request as processing before execute the corresponding routine. After that, the processor will mark the request as finished.

But the cons is event processor need to continuously scan the DB. Notice that each scan is a read-then-write transaction because it need to mark created event as processing. There is a quite high load on DB when we scale indefinitely. What happened if csan process itself get crashed. We need smart ways to recover from processor crash.

Example:

package main

type Event struct {
ID int
Timestamp time.Time
Data interface{}
}

type EventStore struct {
mu sync.Mutex
events []Event
}

func (store *EventStore) AddEvent(event Event) {
store.mu.Lock()
defer store.mu.Unlock()
store.events = append(store.events, event)
}

func (store *EventStore) GetEvents() []Event {
store.mu.Lock()
defer store.mu.Unlock()
return store.events
}

type AccountCreated struct {
AccountID string
Owner string
}

type MoneyDeposited struct {
AccountID string
Amount float64
}

func main() {
store := &EventStore{}

// Create an account
event1 := Event{
ID: 1,
Timestamp: time.Now(),
Data: AccountCreated{
AccountID: "123",
Owner: "John Doe",
},
}
store.AddEvent(event1)

// Deposit money
event2 := Event{
ID: 2,
Timestamp: time.Now(),
Data: MoneyDeposited{
AccountID: "123",
Amount: 100.0,
},
}
store.AddEvent(event2)

// Retrieve and print events
events := store.GetEvents()
for _, event := range events {
jsonData, _ := json.Marshal(event)
fmt.Println(string(jsonData))
}
}

Event Sourcing pattern:

Instead of storing the current state of an entity, all changes (events) to the entity are stored sequentially. The current state can be derived by replaying these events.

Captures all changes to an application state as a sequence of events. It operates on sequence of events, which is recorded in an append only store. Event store publishes these events, so consumers can be notified and can handle them if needed. Multithreaded application and multiple instances of applications store event in the event store.

// Define Events 
package main

import "time"
// Event represents a state change
type Event struct {
ID string
Timestamp time.Time
Type string
Payload interface{}
}
// AccountCreated is an event payload for account creation
type AccountCreated struct {
AccountID string
Owner string
}
// MoneyDeposited is an event payload for depositing money
type MoneyDeposited struct {
AccountID string
Amount float64
}

// Store events
package main
import (
"fmt"
"time"
)
var eventStore = []Event{}
// SaveEvent saves an event to the event store
func SaveEvent(event Event) {
eventStore = append(eventStore, event)
fmt.Println("Event saved:", event)
}
func main() {
createdEvent := Event{
ID: "1",
Timestamp: time.Now(),
Type: "AccountCreated",
Payload: AccountCreated{AccountID: "123", Owner: "John Doe"},
}
SaveEvent(createdEvent)
depositEvent := Event{
ID: "2",
Timestamp: time.Now(),
Type: "MoneyDeposited",
Payload: MoneyDeposited{AccountID: "123", Amount: 100.0},
}
SaveEvent(depositEvent)
}
// rebuild state from events
package main
import "fmt"
// Account represents an account with a balance
type Account struct {
ID string
Owner string
Balance float64
}
// ApplyEvent applies an event to the account
func (a *Account) ApplyEvent(event Event) {
switch e := event.Payload.(type) {
case AccountCreated:
a.ID = e.AccountID
a.Owner = e.Owner
case MoneyDeposited:
if a.ID == e.AccountID {
a.Balance += e.Amount
}
}
}
func main() {
account := Account{}
for _, event := range eventStore {
account.ApplyEvent(event)
}
fmt.Printf("Rebuilt account: %+v\n", account)
}

The core principles of Event Sourcing: capturing state changes as events, storing those events, and using them to rebuild the application state.

Consistency Pattern

Log replication

Log replication is a core concept in distributed systems, ensuring that data written to a primary node is reliably propagated to replica nodes. This concept is fundamental to building fault-tolerant and highly available systems. Implementing log replication in Go can be done using various approaches, but one common approach is using the Raft consensus algorithm, which is designed to handle log replication across multiple nodes.

  • Log Replication: In distributed systems, log replication ensures that all changes to the state (like database writes) are recorded in a log and replicated across multiple nodes. This ensures consistency and fault tolerance.
  • Primary-Replica Model: The primary node handles all writes and propagates these changes (logs) to replica nodes. The replicas apply these logs to maintain consistency with the primary.

Raft: Raft is a consensus algorithm used for managing replicated logs. It ensures that all nodes in a cluster agree on the same sequence of log entries.

Raft consensus will have the leader Election to select node as the leader, responsible for managing the log, log Replication (the leader appends new entries to its log and replicates them to follower nodes) and commit (Once a log entry is replicated to a majority of the nodes, it is considered committed). Note: Logs should be persisted to disk to recover from crashes.

package main

import (
"fmt"
"sync"
)

// LogEntry represents a single entry in the log
type LogEntry struct {
Term int
Command string
}

// Node represents a node in the cluster
type Node struct {
ID int
Log []LogEntry
CommitIdx int
mu sync.Mutex
}

func (n *Node) AppendEntries(entries []LogEntry) {
n.mu.Lock()
defer n.mu.Unlock()
n.Log = append(n.Log, entries...)
n.CommitIdx = len(n.Log) - 1
fmt.Printf("Node %d committed entry: %+v\n", n.ID, entries)
}

func (n *Node) PrintLog() {
n.mu.Lock()
defer n.mu.Unlock()
fmt.Printf("Node %d Log: %+v\n", n.ID, n.Log)
}

func main() {
node1 := &Node{ID: 1}
node2 := &Node{ID: 2}
node3 := &Node{ID: 3}

cluster := []*Node{node1, node2, node3}

// Leader appends an entry and replicates it
leader := node1
entry := LogEntry{Term: 1, Command: "set x=10"}

fmt.Println("Leader appending entry:", entry)
leader.AppendEntries([]LogEntry{entry})

// Replicate to followers
for _, node := range cluster {
if node.ID != leader.ID {
node.AppendEntries([]LogEntry{entry})
}
}

// Print logs of all nodes
for _, node := range cluster {
node.PrintLog()
}
}

To prevent logs from growing indefinitely, periodic snapshots of the state can be taken and logs can be truncated. Also have the flexibility to scale the system to handle more nodes and larger volumes of data. Log replication is a fundamental concept in distributed systems for ensuring data consistency and fault tolerance.

For a production-level implementation, use a consensus library like etcd’s Raft library. But Raft consensus defined based on log Replication (replicates logs across multiple servers. Each log entry represents a state change in the system, and all nodes agree on the order of these entries. ), leader Election (election process to select a single leader node. The leader is responsible for managing log replication and communicating with the other nodes (followers).) and safety and Liveness that ensures safety by guaranteeing that only one leader can be elected at a time, and it guarantees liveness by ensuring that the system continues to function even if some nodes fail.


type State int

// roles in raft
const (
Follower State = iota
Candidate
Leader
)

type Raft struct {
mu sync.Mutex
state State
votedFor int
term int
heartbeat chan bool
}

func NewRaft() *Raft {
return &Raft{
state: Follower,
votedFor: -1,
term: 0,
heartbeat: make(chan bool),
}
}

func (r *Raft) StartElection() {
r.mu.Lock()
r.state = Candidate
r.term++
r.votedFor = rand.Int()
fmt.Printf("Node started election for term %d\n", r.term)
r.mu.Unlock()
// Simulate receiving votes
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
r.mu.Lock()
r.state = Leader
fmt.Printf("Node became leader for term %d\n", r.term)
r.mu.Unlock()
}

func (r *Raft) Run() {
for {
switch r.state {
case Follower:
select {
case <-r.heartbeat:
fmt.Println("Heartbeat received, staying as follower")
case <-time.After(time.Duration(rand.Intn(500)+500) * time.Millisecond):
fmt.Println("No heartbeat, starting election")
r.StartElection()
}
case Leader:
fmt.Println("Sending heartbeats...")
time.Sleep(200 * time.Millisecond)
}
}
}

func main() {
node := NewRaft()
go node.Run()
time.Sleep(5 * time.Second)
}

Interservice Communication Pattern

Consumer-producer model

Consumer-producer model is the communication pattern in which communication can be established between multiple machines over the network. Distributed producers might publish messages, events, or data to a central message broker or directly to distributed consumers. Message broker (like Apache Kafka, RabbitMQ, or AWS SQS) act as the shared buffer, that handles the queuing of messages, provides durable storage, and ensures that producers and consumers can operate independently.

  • Producers generate data and place it in a shared buffer (or queue). Consumers take data from the buffer to process it.
  • The buffer is typically a queue or message broker where data is stored temporarily.
  • Producers and consumers are often aware of each other indirectly through the shared buffer.
  • The model is focused on managing the rate at which producers generate data and consumers process it.

Pub-sub model

The Publish-Subscribe (Pub-Sub) model is a messaging pattern where senders (publishers) send messages without knowing who will receive them, and receivers (subscribers) receive messages without knowing who sent them. This decouples the producers and consumers of messages, allowing for more scalable and flexible systems.

For pub-sub model you should refer to Kafka designs here. For producer and consumer you should have rabbitmq.

The Producer-Consumer model and the Publish-Subscribe (Pub-Sub) model are both used in distributed systems to handle communication between components, but they serve different purposes and have distinct characteristics.

In case of publisher and subscriber model, publisher has no idea on subscriber and vice versa. in case of pub-sub, communication is mediated through topics or channels. Publishers broadcast messages to a topic, and all subscribers to that topic receive the messages. Where as the consumer-producer its shared buffer between multiple services. In case of producer-consumer, data is typically consumed by a single consumer. Once the data is consumed, it is removed from the queue. Where as pubsub doesn’t have that limitations. Producer-consumer used for workflows where there is a clear handoff from producer to consumer, such as task queues, pipelines, and load distribution. Whereas publisher subscriber is used for event-driven systems, real-time notifications, and broadcasting messages to multiple recipients.

Distributed Locking

Locks are a synchronization mechanism for controlling access to a shared resource like files in a multi-threaded environment. How you control access to shared resources and release those locks — differentiate whether to use mutex or semaphore. Using mutex only one thread have access to shared resources and that thread itself should release. Whereas semaphore allows multiple threads to access the same resource up to a specific limit.

In case of distributed environment multiple servers are trying to access the same file, and how do we ensure that this file is not accessed by multiple machines? One obvious solution that comes to mind is that we keep the information of which file is accessed by which server in a database and all the servers will query this database first to see if the file is locked or not. The problem with this approach is that the database represents a single point of failure and to get away with that we need multiple copies of the database but this will make the task of syncing difficult. Also using a database for such frequent reads/writes is not efficient and we should consider using a cache here but the same problem arises as to what happens if the cache goes down.

Distributed lock manager: The lock manager will interact with Redis and provide a boolean value indicating whether a particular resource is locked or not. Distributed lock manager is based on master-slave architecture where one of the nodes is elected as a leader and will perform read+write operations, and the rest of the nodes will perform only read operations. Write is considered successful only if the data is synced by all the nodes. Here we are compromising on latency to ensure high availability. zookeeper will be responsible to keep track of which node is the leader and their health monitoring. If the leader node goes down, the rest of the nodes participates in the leader election and one of the nodes becomes the leader. In case one of the instance acquired the lock but went down before releasing lock, then that would would be resulted in deadlock. In order to solve this problem we introduce TTL in Redis cache that the lock will expire after 5 seconds and other instances can get the lock after expiry so that they don’t get deadlocked.

The lock manager keeps a unique lockId for the instance that acquired the lock and when instance1 comes to modify the data in Redis with lockId1, it will get rejected as it does not match with the current lockId2 which is acquired by another instance. Even then there is a possibility of a collision while generating lockIds. Here we need to ensure that each lock manager instance generates unique lockIds based on nanosecond or microsecond timestamps.

Distributed clock

Some of the major challenges in distributed architectures are:

  1. (Clock Drift) Over time, individual clocks in distributed nodes may drift apart, leading to discrepancies in time readings.
  2. (Latency) The time it takes for a message to travel between nodes can affect synchronization.

Timing synchronization in distributed systems is crucial for ensuring that multiple distributed components (e.g., servers, nodes) operate in a coordinated manner. Distributed clocks the are essential components to coordinate and synchronize across multiple nodes or machines. Since there’s no single global clock in distributed systems, each machine maintains its own local clock, which may drift from others. Distributed clock mechanisms help to synchronize these clocks to ensure consistency and coordination in time-dependent operations, such as logging, transaction ordering, and event sequencing.

For the timing synchronization you can have physical clock with timing protocols like NTP (network time protocol) or PTP(Precision time protocol). But for real world scenario mostly use logical clock like Lamport clock (that use a counter that increments with each event) or Vector Clocks (capture causality between events, offering a more precise ordering and identifying concurrency between events.). Lamport clocks provide a way to order events in a distributed system. Each process in the system maintains a counter (the logical clock) that is incremented with each event.

Similarly synchronization protocol like NTP that exchanges timestamps between a client and a server and adjusting the client’s clock to match the server’s time or PTP which is similar to NTP, but measured in nano seconds.

Example of one such Lamport clock is the Global transaction clock, distributed with all the transaction for all communication, to ensure sync between operations, and fully replicated ordered and atomic transactions. The global transaction clock or GTX make transaction unique and distinguisable. GTX usually have time added to each transaction to mark what time operations(like replications) being done or it still pending and during the failure or error handling whether replication need replay.

Multi-Version Concurrency Control (MVCC)

Multi-Version Concurrency Control (MVCC) is a method used in database management systems to handle concurrent access to data without locking the entire database. It allows multiple versions of a piece of data to exist simultaneously, which helps in managing read and write operations concurrently in a way that ensures data consistency and improves performance, particularly in distributed systems.

With the MVCC, a transaction of reads or writes, the system keeps multiple versions of the data. Each transaction sees a consistent snapshot of the database as of the time the transaction started. MVCC uses timestamps to order transactions. Each transaction gets a unique timestamp, which it uses to determine which version of the data it can see or modify.

When a transaction reads data, it accesses the most recent version of the data that was committed before the transaction started. This ensures that the transaction reads a consistent snapshot of the database. When a transaction writes data, it creates a new version of the data with its timestamp. This new version becomes visible to transactions that start after the current transaction commits. Once a transaction commits, its changes are visible to other transactions. This way it maintain it’s consistency by reducing lock contentions in a concurrent access.

These are some of the commonly used pattern for distributed computing infrastructure. I hope you will find this useful. This might be long, but it might be useful for understanding of distributed computing with examples.

--

--

Santosh P.

A Developer | Aspiring Distributed Computing Expert | Leveraging Algorithms & Data Structures for Optimal Performance | Passionate Techie