Sitemap
DKatalis

DKatalis is a highly adaptive tech company, driven to solve problems through tech and data.

Follow publication

Handling Asynchronous Processes in Synchronous Manner

9 min readDec 15, 2022

--

The Puzzle

Asynchronous Core-bank

Synchronous vs. Asynchronous Communication

Synchronous REST API (HTTP)

Asynchronous Data Streaming (Kafka)

Request-Response vs CQRS and Event Sourcing

CQRS (Command Query Responsibility Segregation)
Event Sourcing

So… Do not use Request-Response in Kafka if not needed

Back to our problem…

Subscribe to the core bank response Kafka topic

Problem with providing Request-response in async process with multiple pods

Repeatedly call core bank REST API to get the status of the transaction until it is completed (polling)

Polling

Solutions

Polling Database -> Polling Redis/in-memory

Redis pub/sub

Spring Kafka

Spring synchronous Kafka

Use Temporal Workflow Engine

Temporal as async to sync conversion orchestrator

Add dependencies

// Add temporal SDK as dependencies
implementation("io.temporal:temporal-kotlin:$temporalVersion")
implementation("io.temporal:temporal-sdk:$temporalVersion")

Register Transaction Processor as a Temporal Worker

@Context
class TemporalWorker(
private val transactionActivity: TransactionActivity,
private val serviceOptions: WorkflowServiceStubsOptions,
private val clientOptions: WorkflowClientOptions,
) {
private val logger = getLogger()

@PostConstruct
fun start() {
logger.info("Starting workflow service on ")
val workflowService = WorkflowServiceStubs.newServiceStubs(serviceOptions)
val workflowClient = WorkflowClient.newInstance(workflowService, clientOptions)
val workerFactory = WorkerFactory.newInstance(workflowClient)
val worker = workerFactory.newWorker("transaction-task-queue")
worker.run {
registerWorkflowImplementationTypes(TransactionWorkflowImpl::class.java)
registerActivitiesImplementations(transactionActivity)
}
workerFactory.start()
}
}

Add workflow activity

@ActivityInterface
interface TransactionActivity {
@ActivityMethod
fun transfer(idempotencyKey: String, transactionId: String)
}

@Singleton
class TransactionActivityImpl(private val coreBankClient: CoreBankClient): TransactionActivity {
override fun transfer(idempotencyKey: String, transactionId: String) {
this.coreBankClient.transfer(Transaction(idempotencyKey, transactionId))
}
}

Add workflow implementation

enum class TransactionStatus {
PENDING, FAILED, SUCCESS
}
@WorkflowInterface
interface TransactionWorkflow {
@WorkflowMethod
fun start(idempotencyKey: String, transactionId: String): TransactionStatus
@SignalMethod
fun updateTransactionStatus(status: TransactionStatus)
}
class TransactionWorkflowImpl: TransactionWorkflow {
private val logger = getWorkflowLogger()
private var status: TransactionStatus = TransactionStatus.PENDING
private val defaultActivityOptions = ActivityOptions {
setStartToCloseTimeout(Duration.ofSeconds(10))
setRetryOptions {
setInitialInterval(Duration.ofSeconds(1))
setMaximumInterval(Duration.ofSeconds(10))
setBackoffCoefficient(2.0)
}
}
private val transactionActivity: TransactionActivity = Workflow.newActivityStub(TransactionActivity::class.java, defaultActivityOptions)
override fun start(idempotencyKey: String, transactionId: String): TransactionStatus {
logger.info("Start workflow to process transaction $transactionId with idempotency key $idempotencyKey")
transactionActivity.transfer(idempotencyKey, transactionId)
logger.info("Transaction has been send to core bank, start to wait core bank status")

// wait until transaction status get updated
Workflow.await {
logger.info("Wait transaction status get updated,current status is $status")
status != TransactionStatus.PENDING
}
return status
}
override fun updateTransactionStatus(status: TransactionStatus) {
logger.info("Received transaction status update event, status is $status")
this.status = status
}
}

Update workflow status in Kafka consumer

@KafkaListener
class TransactionListener(private val workflowClient: WorkflowClient) {
private val logger = getLogger()

@Topic("integration.postings_api.card_transaction_manager_postings_client.response")
fun receive(@KafkaKey key: String, transactionEvent: TransactionEvent) {
logger.info("Received transaction event $transactionEvent with idempotency key $key")
val workflowStub: WorkflowStub = workflowClient.newUntypedWorkflowStub(key)
workflowStub.signal("updateTransactionStatus", transactionEvent.status)
}
}

Conclusion

--

--

DKatalis
DKatalis

Published in DKatalis

DKatalis is a highly adaptive tech company, driven to solve problems through tech and data.

No responses yet