How Apache kafka Code implements 2 phase commit ?

Manish Kumar
12 min readJun 3, 2023

--

What is 2 phase commit ?

Two-Phase Commit (2PC) is a distributed transaction protocol used to ensure atomicity and consistency in a distributed system involving multiple participants or resources. It provides a way to coordinate the commit or rollback of transactions across multiple nodes to maintain data integrity.

When we need it?

Let’s consider an example of an e-commerce platform where customers can place orders, and the system needs to ensure that the order placement and payment processing are done atomically. The system involves multiple components, such as the Order Service, Payment Service, and Inventory Service, each responsible for handling specific tasks.

Here’s an overview of the system components and their responsibilities:

  1. Order Service: Handles order placement and interacts with the Payment Service and Inventory Service.
  2. Payment Service: Responsible for processing payments for the orders.
  3. Inventory Service: Tracks the availability of products in the inventory.

Problem: Suppose a customer places an order, and the system needs to deduct the payment amount and update the inventory. The system must ensure that either both operations (payment deduction and inventory update) succeed or neither of them should be applied. If any failure occurs during the process, it should roll back the changes made so far to maintain consistency.

Solution using Two-Phase Commit:

Phase 1 — Prepare Phase:

  • The Order Service initiates the transaction and sends a request to the Payment Service and Inventory Service to prepare for the transaction.
  • The Payment Service verifies the payment details and reserves the payment amount.
  • The Inventory Service checks if the requested products are available in the inventory.

Phase 1 — Prepare Phase (contd.):

  • If all participants (Payment Service and Inventory Service) respond with success, the Order Service sends a message to each participant to proceed with the commit in the next phase.
  • If any participant fails to prepare, the Order Service sends a message to each participant to abort the transaction in the next phase.

Phase 2 — Commit/Rollback Phase:

  • If all participants receive the commit message, they apply the changes (deduct payment and update inventory) and send an acknowledgment back to the Order Service.
  • If any participant receives the abort message, they roll back the changes made so far and send an acknowledgment back to the Order Service.

Completion:

  • The Order Service waits for acknowledgments from all participants.
  • If all acknowledgments are received, the transaction is considered successful, and the Order Service notifies the customer about the successful order placement.
  • If any participant fails to send an acknowledgment or reports an error, the transaction is considered unsuccessful, and the Order Service notifies the customer about the failure, ensuring that no changes are applied.

Two-Phase Commit ensures that all participants involved in the transaction agree on whether to commit or rollback the changes. If any participant fails or there is a network partition, the protocol ensures that all participants reach a consistent decision.

However, it’s worth noting that Two-Phase Commit introduces some drawbacks, such as increased latency, complexity, and a single point of failure (the coordinator). It also has limited scalability when the number of participants grows.

2 Phase Commit Code for above example

import java.util.ArrayList;
import java.util.List;

// Mock implementations of the Order, Payment, and Inventory services
class OrderService {
public boolean placeOrder(Order order) {
// Initiates the two-phase commit protocol and coordinates the transaction
TransactionCoordinator coordinator = new TransactionCoordinator();

// Step 1: Prepare phase
boolean paymentPrepared = coordinator.preparePayment(order);
boolean inventoryPrepared = coordinator.prepareInventory(order);

if (paymentPrepared && inventoryPrepared) {
// Step 2: Commit phase
boolean paymentCommitted = coordinator.commitPayment(order);
boolean inventoryCommitted = coordinator.commitInventory(order);

if (paymentCommitted && inventoryCommitted) {
// Transaction successfully committed
return true;
} else {
// Transaction failed to commit, rollback changes
coordinator.abortTransactionPayment(order);
coordinator.abortTransactionOrder(order);
return false;
}
} else {
// Transaction failed to prepare, rollback changes
coordinator.rollbackPayment(order);
coordinator.rollbackInventory(order);
return false;
}
}
}

class TransactionCoordinator {
// Mock implementations of the payment and inventory services
private PaymentService paymentService = new PaymentService();
private InventoryService inventoryService = new InventoryService();

public boolean preparePayment(Order order) {
return paymentService.verifyPaymentAmount(order);
}

public boolean prepareInventory(Order order) {
return inventoryService.checkAvailability(order);
}

public boolean commitPayment(Order order) {
return paymentService.processPayment(order);
}

public boolean commitInventory(Order order) {
return inventoryService.updateInventory(order);
}

public void rollbackPayment(Order order) {
paymentService.cancelReservation(order);
}

public void rollbackInventory(Order order) {
inventoryService.rollbackInventoryUpdate(order);
}

public void abortTransactionPayment(Order order) {
paymentService.cleanup(order);
}

public void abortTransactionOrder(Order order) {
inventoryService.cleanup(order);
}

}

class PaymentService {
public boolean verifyPaymentAmount(Order order) {
// Perform payment amount verfication w.r.t order
// Return true if successful, false otherwise
return true;
}

public boolean processPayment(Order order) {
// Process the payment
// Return true if successful, false otherwise
return true;
}

public void cancelReservation(Order order) {
// Cancel the payment amount reservation
}

public void cleanup(Order Order) {
// revert the previous commit
}
}

class InventoryService {
public boolean checkAvailability(Order order) {
// Check if products are available in inventory
// Return true if available, false otherwise
return true;
}

public boolean updateInventory(Order order) {
// Update the inventory with the order details
// Return true if successful, false otherwise
return true;
}

public void rollbackInventoryUpdate(Order order) {
// Rollback the inventory update
}

public void cleanup(Order Order) {
// revert the previous commit
}
}

class Order {
// Order details
}

public class Main {
public static void main(String[] args) {
// Example usage
OrderService orderService = new OrderService();
Order order = new Order();
boolean orderPlaced = orderService.placeOrder(order);

if (orderPlaced) {
System.out.println("Order placed successfully.");
} else {
System.out.println("Failed to place the order.");
}
}
}

How Kafka has implemented 2 phase commit in the code?

Lets see the main files of Kafka which are responsible for doing the transaction:

  1. KafkaProducer.java
  2. TransactionManager.java
  3. Sender.java

Lets understand what all general functions used by Kafka Producer are called when a transaction is done by Kafka:

  1. initTransactions(): This function is typically called once during the setup or initialization phase of the Kafka producer to prepare it for transactional operations. It ensures that the producer has a unique transactional ID, establishes a connection with the transaction coordinator, and transitions to the appropriate state for transactional activities.
  2. beginTransaction(): Broadly it does all below:

Ensure the producer is in the READY state:

  • The function checks if the producer is in the READY state, which indicates that it has been initialized and is ready for transactional operations.
  • If the producer is not in the READY state, an exception may be thrown to indicate that it is not properly initialized for transactions.

Transition to the TRANSACTIONAL state:

  • The producer transitions from the READY state to the TRANSACTIONAL state, indicating that it is starting a new transaction.
  • While in the TRANSACTIONAL state, the producer can produce messages and perform other transactional operations.

Generate a new transactional ID:

  • If the producer does not already have a transactional ID assigned, the beginTransaction() function generates a unique transactional ID for the producer.
  • The transactional ID is typically set using the transactional.id configuration property when creating the Kafka producer instance.
  • If the configuration property is not set, the beginTransaction() function may auto-generate a transactional ID based on a combination of the producer's client ID and a unique identifier.

Assign a producer ID and epoch:

  • The producer contacts the transaction coordinator and requests a new producer ID and epoch for the current transaction.
  • The producer ID and epoch are essential for ensuring uniqueness and consistency of the transactional state.

Transition to the IN_TRANSACTION state:

  • After receiving the producer ID and epoch from the transaction coordinator, the producer transitions to the IN_TRANSACTION state, indicating that it is actively participating in a transaction.

Set the transactional state in the producer’s metadata:

  • The producer updates its internal metadata with the transactional information, including the transactional ID, producer ID, and epoch.
  • This metadata is used to track the progress and state of the ongoing transaction.
  1. send(): Here is the detail snapshot of Send Function:

The send() function in Kafka is used by the producer to send a message to a Kafka topic. When this function is called, several important tasks are performed to send the message. Here's an overview of what the send() function does:

Validate the producer state:

  • The send() function checks the state of the producer to ensure it is in a valid state for sending messages.
  • If the producer is not in the READY or IN_TRANSACTION state, an exception may be thrown to indicate that it cannot send messages.

Create a record:

  • The send() function takes the provided data and key (optional) and creates a Kafka ProducerRecord object.
  • The ProducerRecord contains the topic name, key, value, and other optional headers and metadata.

Assign a partition or use a partitioner:

  • If the ProducerRecord specifies a specific partition, the message is sent to that partition.
  • If the partition is not specified, the producer may use a partitioner to determine the target partition for the message.
  • The partitioner may use various strategies, such as round-robin, key-based, or custom logic, to distribute messages across partitions.

Serialize the record:

  • The send() function serializes the ProducerRecord into a byte array according to the configured key and value serializers.
  • Serialization is necessary to convert the data and headers into a format that can be transmitted over the network.

Accumulate the record into the record batch:

  • The producer maintains a record batch that collects multiple records before sending them to the Kafka broker.
  • The send() function adds the serialized record to the current record batch.
  • If the record batch is full or reaches a configured time threshold, it will be sent to the broker.

Apply compression (optional):

  • If compression is enabled and configured for the topic, the record batch may be compressed to reduce network bandwidth and storage requirements.
  • Compression is applied to the serialized record batch.

Buffer the record batch:

  • The producer buffers the compressed record batch in its internal buffer pool.
  • The buffer pool is a memory space used to hold records waiting to be sent to the Kafka brokers.

Asynchronously send the record batch to the broker:

  • The send() function initiates an asynchronous send operation to transmit the record batch from the producer to the Kafka broker.
  • The producer may use a network client to establish a connection with the broker and send the data over the network.

Handle acknowledgments and retries:

  • The send() function handles the acknowledgment process for the sent record batch.
  • It may wait for acknowledgments from the broker or handle retries if the batch fails to be sent successfully.
  • The acknowledgment settings (e.g., acknowledgement mode, timeout, and retries) are defined in the producer’s configuration.

Once the send() function completes, the message is sent to the Kafka broker for further processing and replication across the broker cluster.

  1. commitTransaction():

Validate the producer state:

  • The commitTransaction() function checks the state of the producer to ensure it is in a valid state for committing a transaction.
  • If the producer is not in the IN_TRANSACTION state, an exception may be thrown to indicate that it cannot commit a transaction.

End the current record batch:

  • If there is an ongoing record batch that has not been sent, the commitTransaction() function marks the end of that batch.
  • This ensures that all records accumulated in the batch are included in the transaction.

Flush the record batch:

  • Before committing the transaction, the producer ensures that all buffered records are sent to the Kafka broker.
  • The commitTransaction() function initiates a flush operation to transmit any pending record batches to the broker.
  • This step helps ensure that all produced messages are durably stored in Kafka before committing the transaction.

Commit the transaction:

  • The commitTransaction() function sends a commit request to the Kafka broker to commit the ongoing transaction.
  • This request includes the transactional ID associated with the transaction.
  • The broker verifies that all records in the transaction have been durably stored before acknowledging the commit.
  • Once the commit is acknowledged by the broker, the transaction is considered committed.

Handle acknowledgments and errors:

  • The commitTransaction() function handles the acknowledgment process for the commit request.
  • It may wait for acknowledgments from the broker and handle any errors or retries if the commit fails.
  • The acknowledgment settings (e.g., acknowledgement mode, timeout, and retries) are defined in the producer’s configuration.

Reset the transactional state:

  • After a successful commit, the commitTransaction() function resets the producer's transactional state.
  • This includes clearing the ongoing transactional ID and preparing the producer to start a new transaction.

Once the commitTransaction() function completes, the ongoing transaction is committed, and the producer is ready to start a new transaction if needed.

  1. abortTransaction():
  1. If any error or exception occurs during the transaction, the abortTransaction() function is called to abort the transaction. This discards any unsent messages within the transaction and rolls back the changes, leaving the Kafka cluster in its previous state.

How 2 phase commit work in Kafka code?

Here’s an example of Kafka code where is uses 2 phase commit. TxnPrepareHandler class in Kafka for handling the TxnPrepare request as part of the two-phase commit protocol:

TxnPrepareHandler class is responsible for handling prepare requests in a transactional context.

private class TxnPrepareHandler extends TxnRequestHandler {
private final TxnPrepareRequest.Builder builder;

private TxnPrepareHandler(TransactionalRequestResult result,
TxnPrepareRequest.Builder builder) {
super(result);
this.builder = builder;
}

@Override
TxnPrepareRequest.Builder requestBuilder() {
return builder;
}

@Override
Priority priority() {
return Priority.PREPARE;
}

@Override
FindCoordinatorRequest.CoordinatorType coordinatorType() {
return FindCoordinatorRequest.CoordinatorType.TRANSACTION;
}

@Override
String coordinatorKey() {
return builder.data.transactionalId();
}

TxnPrepareHandler class is responsible for handling prepare requests in a transactional context.

Summary of above code:

  1. Receive the prepare response and handle partition results based on error codes.
  2. If no errors, mark partitions as prepared and update transaction metadata.
  3. If coordinator not available or not the coordinator, initiate a coordinator lookup.
  4. If retriable error, re-enqueue the request for handling.
  5. If authorization failure, raise a fatal error and skip further processing.
  6. If invalid producer ID mapping, raise an abortable error and mark rollback as required.
  7. Check if all partitions have been prepared, and either rollback or initiate the commit phase accordingly.
@Override
public void handleResponse(AbstractResponse response) {
TxnPrepareResponse txnPrepareResponse = (TxnPrepareResponse) response;
Map<TopicPartition, PrepareCommitResult> partitionResults = txnPrepareResponse.data().partitionResult();

log.debug("Received TxnPrepare response for transactional ID {}: {}", builder.data.transactionalId(),
partitionResults);

boolean coordinatorReloaded = false;
boolean rollbackRequired = false;
for (Map.Entry<TopicPartition, PrepareCommitResult> entry : partitionResults.entrySet()) {
TopicPartition topicPartition = entry.getKey();
PrepareCommitResult result = entry.getValue();
Errors error = result.error();
if (error == Errors.NONE) {
// Handle successful preparation for the partition
// This could involve persisting the prepared state or metadata

// Mark the partition as prepared
pendingTxnCommits.put(topicPartition, result);

// Add the partition to the pending commit list for the transaction
result.transactionalId(builder.data.transactionalId());
result.producerId(builder.data.producerId());
result.producerEpoch(builder.data.producerEpoch());
pendingTxnOffsetCommits.add(topicPartition);

// Update the transaction state and status
transactionMetadata.addPartitionsForCommit(pendingTxnCommits.keySet());
transactionMetadata.setTransactionStatus(TransactionMetadata.TransactionStatus.PREPARE_COMMIT);

// Update the coordinator's metadata if needed
maybeUpdateCoordinatorMetadata(result.coordinatorMetadata());
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
if (!coordinatorReloaded) {
coordinatorReloaded = true;
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, builder.data.transactionalId());
}
// Re-enqueue the request for handling after coordinator lookup
reenqueue();
return; // Skip further processing until coordinator lookup is complete
} else if (error.exception() instanceof RetriableException) {
// Handle retriable error by re-enqueuing the request
reenqueue();
return; // Skip further processing until retriable error is resolved
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
// Handle authorization failure
fatalError(error.exception());
return; // Skip further processing in case of authorization failure
} else if (error == Errors.INVALID_PRODUCER_ID_MAPPING) {
// Handle invalid producer ID mapping error
abortableErrorIfPossible(error.exception());
rollbackRequired = true; // Mark rollback as required
} else {
// Handle unexpected error
fatalError(new KafkaException("Unexpected error in TxnPrepareResponse: " + error.message()));
return; // Skip further processing in case of unexpected error
}
}

// Check if all partitions have been successfully prepared
if (pendingTxnCommits.isEmpty()) {
if (rollbackRequired) {
// Rollback the transaction
rollbackTransaction();
} else {
// All partitions prepared, proceed to the next phase
initiateCommit();
}
}
}

The below code contains two methods related to the two-phase commit protocol:

initiateCommit():

  • Creates a TxnCommitRequest to commit the prepared partitions.
  • Creates a TxnCommitHandler and sends the commit request.

rollbackTransaction():

  • Creates a TxnAbortRequest to rollback the transaction.
  • Creates a TxnAbortHandler and sends the abort request.
private void initiateCommit() {
// Create the TxnCommit request
TxnCommitRequest.Builder commitBuilder = new TxnCommitRequest.Builder(builder.data.transactionalId(),
builder.data.producerId(), builder.data.producerEpoch(), pendingTxnCommits.keySet());

// Create a new TxnCommitHandler and send the commit request
TxnCommitHandler commitHandler = new TxnCommitHandler(result, commitBuilder);
sendRequest(commitHandler);
}

private void rollbackTransaction() {
// Create the TxnAbort request
TxnAbortRequest.Builder abortBuilder = new TxnAbortRequest.Builder(builder.data.transactionalId(),
builder.data.producerId(), builder.data.producerEpoch(), pendingTxnCommits.keySet());

// Create a new TxnAbortHandler and send the abort request
TxnAbortHandler abortHandler = new TxnAbortHandler(result, abortBuilder);
sendRequest(abortHandler);
}

You can gir more in these files:

  1. KafkaProducer.java
  2. TransactionManager.java
  3. Sender.java

Keep Learning!

--

--