Handling Asynchronous Processes in Synchronous Manner

4 possible solutions that might match your requirements.

Andre Laksmana
DKatalis

--

We recently encountered a challenging puzzle about how to handle asynchronous processes synchronously. After a lot of consultations and experiments, we managed to come up with a working solution that caters to our technical requirements. But we also include other possible solutions more fitting for different environments.

For easy navigation, here’s the outline:

Let’s get started!

The Puzzle

So, our transactions are processed by the next generation core bank, which is designed in an asynchronous manner. This includes transfers, pre-authorizations, and settlements. The core bank only accepts requests via some Kafka topics and responses via some other dedicated Kafka topics in batches.

Asynchronous Core-bank

However, we are dealing with a third-party dominant payment service provider that insists (even after negotiations) our service to provide in a synchronous manner with Request-Response Message Exchange Pattern, to return responses with either SUCCESS or FAILED status.

In this context, Request-Response is inefficient and may cost a lot of latency depending on the relevant use case. gRPC is better suited for some use cases. Request-Response is currently replaced by CQRS (Command and Query Responsibility Segregation) pattern with Kafka for streaming data.

Synchronous vs. Asynchronous Communication

Most architectures require Request-Response for point-to-point communication (for example, between a server and mobile app) and data streaming for continuous data processing. The Request-Response message exchange pattern implementation is often done synchronously.

However, it may also be implemented asynchronously with the response being returned in the future using a callback API. But not all Partners would be willing to provide a callback, not in our case.

Let us compare both message exchanges: REST and Data streaming.

Synchronous REST API (HTTP)

Although WSDL and SOAP were dominant in the past, REST is the communication standard in all web services now. Most of our partners and including us, provide REST APIs to integrate with others.

Synchronous web service calls over HTTP will hold a connection open and wait until the response is delivered or the timeout period expires. And this will yield high latency for requiring setting up and tearing down a TCP connection for each Request-Response when using HTTP. Although for many use cases, this latency is still acceptable. Another problem is that the HTTP requests might block waiting for the head of the queue request to be processed completely and require circuit breakers set up on the server if there are too many outstanding requests.

Asynchronous Data Streaming (Kafka)

The asynchronous paradigm is similar to the message queues (MQ) plus long-term storage of events and replay-ability of events history. This results in a true decoupling between producers and consumers. In most Kafka deployments, several producers and consumers with very different designs and latencies send and read events. Such data streaming should be processed incrementally using stream processing without having access to all the data.

Apache Kafka does not provide Request-Response APIs built-in, which is not necessarily bad. Data streaming provides different design patterns. We need to explore trade-offs between the Request-Response pattern and the alternative approaches. However, Request-Response is possible with Kafka too.

Request-Response vs CQRS and Event Sourcing

CQRS (Command Query Responsibility Segregation) requires that every method should either be a command that performs action or a query that returns data, but never both.

CQRS (Command Query Responsibility Segregation)

Event Sourcing is an architecture pattern in which entities do not track their internal state using direct serialization or object-relational mapping, but relies on reading and committing events to an event store

Event Sourcing

Combination of Event Sourcing and CQRS with domain-driven design, aggregate roots are responsible for validating and applying commands by having their instance methods invoked from a command handler and then publishing events.

With this, the state is updated against every relevant event message. Therefore, the state is always known. Querying the state that is stored in the materialized view (eg. a KTable in Kafka Streams) is efficient.

In contrast, with Request-Response, the server will calculate and determine the state of every request. With CQRS, it is calculated only once and regardless of the number of state queries.

So… Do not use Request-Response in Kafka if not needed

A bank transaction is never synchronous, but a complex business process with many independent events within and across organizations.

It is best to implement the natural design patterns that work best with the technology. We are building a modern enterprise architecture with domain-driven design with Kafka.

So if your partner is open to collaborating, ask first if they are willing to integrate asynchronously. Callback, or Kafka, or anything. :)

Back to our problem…

In our next-gen core-bank context, there are 2 ways of knowing whether a transaction is completed (success/fail):

  • Subscribe to the core bank response Kafka topic
  • Repeatedly call core bank REST API to get the status of the transaction until it is completed (polling)

Subscribe to the core bank response Kafka topic

Transaction microservice (ms-transfer) is running in the Kubernetes cluster with multiple instances/pods. In the Kafka topic subscription case; The instance which receives a request from the third-party provider may not be the one that consumes the core bank response.

Below is a 4 pods example:

Problem with providing Request-response in async process with multiple pods

Repeatedly call core bank REST API to get the status of the transaction until it is completed (polling)

To do polling, the incoming request thread will sleep for a period and, upon awakening, will call core bank API to get the transaction status. This technique fails our Performance Test with increasing memory consumption and latencies. We did not manage to achieve our target Transaction Per Seconds.

Polling

Solutions

After consulting with many friends, in general, there are several solutions that we will dive deeper into:

  1. Polling Database -> Polling Redis/in-memory
  2. Redis pub/sub
  3. Spring Kafka
  4. Workflow Engine

Polling Database -> Polling Redis/in-memory

By capturing transaction status in Database, the transaction status will be updated in our database when any instances/pods receive the core bank transaction event. Then the instance/pod that processes requests from the third-party provider can keep polling from the database until the transaction status is available.

This is easy to implement and doesn’t require that much effort from our side. Furthermore, there’s no need to add external components to our stack. However, this method will put an extra load into our database, and we also need to implement an idempotency check, error handling, retry, and timeout.

The horror story of this was huge I/O latencies were introduced in Transactions during busy traffic hours causing them to timeout. Which led the team to convert the polling database solution into polling in-memory/cache solutions.

Redis pub/sub

The team found that polling in-memory Redis solution is still haunted by intermittent latencies and time out in many Transactions during busy hours. This is when the team further improves the solution using Redis pub/sub to trigger the waiting thread with transaction status. However, in-memory solution trade-off with cost since the memory consumption quickly increased together with the number of Transactions.

Spring Kafka

An excellent DZone article showcases how Spring Kafka supports out-of-the-box synchronous request/reply using Spring Kafka Templates.

Spring synchronous Kafka

How Spring solves the problem is by automatically setting a correlation ID in the producer record which is returned as-is by the @SendTo annotation at the consumer end.

You can check out the complete code example in this article:

Using Spring, the request/reply pattern is pretty simple to implement with Kafka. But for our case, this solution was rejected by the board because we already standardize on the Micronaut framework and can’t just add another framework. So, to solve this, we were told to find an existing stack which will work. And that led to the next and final solution.

Use Temporal Workflow Engine

Temporal is widely used in our services, so we can use Temporal Workflow to orchestrate the flow. Temporal provides a synchronous workflow start which allows the thread to wait until the completion of the workflow. Even though it’s more complicated compared to the previous approach, Temporal already has an out-of-the-box id-empotency check, monitoring, retry, and time out; furthermore, there’s no need to do polling.

To understand how this method works, take a look at the flowchart below:

Temporal as async to sync conversion orchestrator

The key is in step 3 in the above diagram where the thread is synchronously start a workflow in Temporal and waits until its completion with result (step 4 — step 9).

If you’d like to go down this road, you can follow these steps:

Add dependencies

// Add temporal SDK as dependencies
implementation("io.temporal:temporal-kotlin:$temporalVersion")
implementation("io.temporal:temporal-sdk:$temporalVersion")

Register Transaction Processor as a Temporal Worker

@Context
class TemporalWorker(
private val transactionActivity: TransactionActivity,
private val serviceOptions: WorkflowServiceStubsOptions,
private val clientOptions: WorkflowClientOptions,
) {
private val logger = getLogger()

@PostConstruct
fun start() {
logger.info("Starting workflow service on ")
val workflowService = WorkflowServiceStubs.newServiceStubs(serviceOptions)
val workflowClient = WorkflowClient.newInstance(workflowService, clientOptions)
val workerFactory = WorkerFactory.newInstance(workflowClient)
val worker = workerFactory.newWorker("transaction-task-queue")
worker.run {
registerWorkflowImplementationTypes(TransactionWorkflowImpl::class.java)
registerActivitiesImplementations(transactionActivity)
}
workerFactory.start()
}
}

Add workflow activity

@ActivityInterface
interface TransactionActivity {
@ActivityMethod
fun transfer(idempotencyKey: String, transactionId: String)
}

@Singleton
class TransactionActivityImpl(private val coreBankClient: CoreBankClient): TransactionActivity {
override fun transfer(idempotencyKey: String, transactionId: String) {
this.coreBankClient.transfer(Transaction(idempotencyKey, transactionId))
}
}

Add workflow implementation

enum class TransactionStatus {
PENDING, FAILED, SUCCESS
}
@WorkflowInterface
interface TransactionWorkflow {
@WorkflowMethod
fun start(idempotencyKey: String, transactionId: String): TransactionStatus
@SignalMethod
fun updateTransactionStatus(status: TransactionStatus)
}
class TransactionWorkflowImpl: TransactionWorkflow {
private val logger = getWorkflowLogger()
private var status: TransactionStatus = TransactionStatus.PENDING
private val defaultActivityOptions = ActivityOptions {
setStartToCloseTimeout(Duration.ofSeconds(10))
setRetryOptions {
setInitialInterval(Duration.ofSeconds(1))
setMaximumInterval(Duration.ofSeconds(10))
setBackoffCoefficient(2.0)
}
}
private val transactionActivity: TransactionActivity = Workflow.newActivityStub(TransactionActivity::class.java, defaultActivityOptions)
override fun start(idempotencyKey: String, transactionId: String): TransactionStatus {
logger.info("Start workflow to process transaction $transactionId with idempotency key $idempotencyKey")
transactionActivity.transfer(idempotencyKey, transactionId)
logger.info("Transaction has been send to core bank, start to wait core bank status")

// wait until transaction status get updated
Workflow.await {
logger.info("Wait transaction status get updated,current status is $status")
status != TransactionStatus.PENDING
}
return status
}
override fun updateTransactionStatus(status: TransactionStatus) {
logger.info("Received transaction status update event, status is $status")
this.status = status
}
}

Update workflow status in Kafka consumer

@KafkaListener
class TransactionListener(private val workflowClient: WorkflowClient) {
private val logger = getLogger()

@Topic("integration.postings_api.card_transaction_manager_postings_client.response")
fun receive(@KafkaKey key: String, transactionEvent: TransactionEvent) {
logger.info("Received transaction event $transactionEvent with idempotency key $key")
val workflowStub: WorkflowStub = workflowClient.newUntypedWorkflowStub(key)
workflowStub.signal("updateTransactionStatus", transactionEvent.status)
}
}

Now you’re done!

This method passed the Performance Test without significant CPU or Memory consumption anomalies!

Conclusion

Most architectures need a Request-Response pattern for communication and Data streaming for continuous data processing.

Synchronous Request-response communication can also be implemented with Kafka. However, CQRS and event sourcing is the best and more natural solution for data streaming. Understanding the options and their trade-offs will help implement the right tool for the job.

Does encountering challenging puzzles and crafting creative solutions within certain technical limits excite you? You should check DKatalis out!

--

--