In today’s digitalized world as the number of data being produced everyday increases, organizations need to find appropriate ways to handle with this continuous data. This is applicable for financial systems too. As the number of customers and operations performed by those customers increases exponentially, financial organizations need to change their systems to handle with this huge amount of data. Also, these systems need to be highly available,response quickly and ensure transaction atomicity . One of the ways to create a fast,available and atomic system, that we used in VakıfBank also, is to implement event driven approach in the core of applications. In this article, I will focus on the ways we applied in VakıfBank Infrastructure Team to create an event driven infrastructure.
First, if you are not familiar with EDA concept, here are some key principles behind it :
· Software architecture pattern promoting the production, detection, consumption of, and reaction to events.
· Asynchronous “Push”-Based Messaging Pattern: Unlike traditional synchronous request/response model, events are produced asynchronously.
· Autonomous Messages: Events carry the messages including required information related to the performed operation
· Higher decoupling of distributed systems: Event producers are completely independent from event consumers, other than event message format.
EDA Implementation Architecture
In the snapshot above, EDA architecture in VakıfBank is illustrated. To understand it better, let’s follow the scenario below:
· After consumer withdraws money, he/she needs to be notified for that action via sms message.
· If that process was synchronous and there was a problem in notification system, consumer would not be withdrawing money successfully which will cause a critical failure.
· This creates a tightly coupled relation between withdrawal and notification system. In order to prevent that, system needs to be designed with event based approach.
· After business logic is executed for withdraw money operation, MONEY_WITHDRAWAL event is raised at the end of operation. Event Management module at ESB layer enqueues that message to its subscriber queues, in our case a notification queue.
· Waiting messages in queues are periodically checked and defined operations of these queues are executed for each message. In our case, consumer application will trigger the send sms operation after detecting a waiting event in notification queue. In the end, consumer will be notified for withdrawal operation asynchronously.
In that architecture, only event object ids(Guid type data) are enqueued to queues. The event message payload (json type data) is placed in a database table. Also, for consumer side an event message can be executed multiple times. At each trial, if queue operation gets an error, then event is reenqueued to queue to be executed again. After maximum number of allowed retries is reached, message is dequeued permanently. This is because there can be pool connection problems at database level or outer dependent system’s operations may get timeout errors and etc. To eliminate failure at event executions due to these temporary problems, retry mechanism is developed.
Events are enqueued to a queueing system by event producers. In VakıfBank, we have two approaches for queue implementations currently. These are Oracle Advanced Queuing and RabbitMQ queues.
Oracle Advanced Queuing
In VakıfBank, Oracle database is used as database management system. Oracle database provides enterprise messaging infrastructure with Oracle Advanced Queueing, a database-integrated message queuing functionality for distributed applications. One of the pros using Oracle Advanced queueing is ensuring transaction atomicity. If there is an error in operation where event is raised or enqueue fails, the whole transaction is rolledback. Therefore neither of operation logic nor event raise is committed to database.
At Oracle Advanced Queueing implementation, queues have their own database sources and each queue’s message resides in separate database source. They are fast, scalable and created a decoupled environment at some point. Currently, we have approximately 300 queues in production environment. Recently, 70 million events raised by day and 10 million events raised per hour at peak times. Under heavy load, average dequeue time is 0.02 seconds whereas average enqueue time is 0.025 seconds approximately.
ORACLE QUEUE DECLARATIONdeclare
v_QUEUE_MESSAGE_TYPE VARCHAR(200):='VBQUEUEMSGTYPE'; --payload
v_QUEUE_NAME VARCHAR(40):='VBQUEUE'; -- QueueCode
v_QUEUE_TABLE_NAME VARCHAR(40):='VBQUEUESOURCE'; -- QueueSOURCEBegin
--Create Queue Table
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => v_QUEUE_TABLE_NAME, queue_payload_type => v_QUEUE_MESSAGE_TYPE, multiple_consumers => TRUE);--Create Queue
sys.dbms_aqadm.create_queue(queue_name => v_QUEUE_NAME,--Name of Queue
queue_table => v_QUEUE_TABLE_NAME, --Name queue source table
max_retries => 10,-- Limits the number of times a dequeue with the REMOVE mode can be attempted on a message.If limit exceeds message send to exception queue
retry_delay => 0,--Delay time, in seconds, before this message is scheduled for processing again after an application rollback.
retention_time => 0);--Number of seconds for which a message is retained in the queue table after being dequeued from the queue.--Start Queue
Since these queues are database-integrated, they don’t provide a lightweight solution. They are dependent on database transactions and a part of rdbms system indeed. As VakıfBank system continues to grow with more customers, data and resources; it is required to have a lightweight and scalable message broker infrastructure.
To migrate from Oracle queues, we searched for alternatives in which transaction atomicity is maintained and having features similar to our current system like enqueue/dequeue commit, reenqueue mechanism and dynamic queue creation and etc. After research process, we chose RabbitMQ as our new queue management system.
RabbitMQ is a lightweight open source message broker. It uses AMQP(Active Message Queueing Protocol) which standardizes messaging pattern with producers, consumers and exchanges. In order to create a reliable environment with RabbitMQ, mirrored queues with 3 nodes in one cluster are used and queue and message parameters are tuned. Queues are marked as durable in which queue metadata is stored on disk and will be recovered on node restart. Also, since our queues contain high number of messages, especially when system is under heavy load, or when scheduler operations producing high number of events triggered, lazy queues are used. These queues move their contents to disk as early as practically possible, and only load them in RAM when requested by consumers.
Dictionary<String,Object> args = new Dictionary<String,Object>();
Moreover, to guarantee durability for messages we used persistent messages which are stored in disk.
var properties = model.CreateBasicProperties();
properties.Persistent = true;
RabbitMQ provides two ways to consume messages. First way is fetch/pull based consuming, in which consumers pull the messages from the queues on demand. Second way is push based consuming, in which consumers subscribe to a queue and gets notified when there is a new message on that queue. In VakıfBank we use pull based consuming, since we have a Windows service which is responsible for reading queues continuously and allocating resources to for events to be processed within time and thread limit.
In our RabbitMQ implementation, we only produce event object id (a 16-element byte array) to RabbitMQ. Message details for that event(json data), are stored in Oracle Database. For event handler part, message is dequeued from RabbitMQ and its corresponding message detail is picked up from Oracle. In short, a distributed environment is created by using RabbitMQ in conjunction with Oracle. During RabbitMQ tests, one of the problems we faced, was about ensuring transactional behaviour between these two systems. In order to mitigate the problems originated from using distributed systems, transaction completed events are called. In producer side, we attached transaction completed event of the current transaction and saved messages produced in a list . In transaction completed event, if current transaction is committed, this list is checked and corresponding events are produced to a RabbitMQ exchange.
For consumer side, delivery tags dequeued from RabbitMQ are saved in a list and consumer acknowledgements are used rather than transactions which are 16 times slower. In transaction completed event if transaction is committed, ack is sent to permanently delete the message. Otherwise if transaction is rolledback, nack is sent to reenqueue the message.Since it’s not injected with database transactions, heavy transactional cost is eliminated which resulted in a lighter process for enqueue/dequeue operations.
One of the cons of transaction completed events is, they may not be triggered after IIS recycle. In our RabbitMQ implementation, we faced with that problem too. Although, event details are written to Oracle database successfully, events may not be produced to a RabbitMQ broker. This caused some events not to be processed successfully. In order to identify these events and retrigger them, we developed a scheduler job. These job gets the maximum date of events processed for each queue and checks if there is an event produced earlier from this latest processed event. If yes, then it reenqueues this event to RabbitMQ. Thanks to that scheduler job, transaction atomicty is maintained for our system.
Another con of transaction completed events is managing thread transaction relations and multiple nested transaction cases for complex systems. For complex transactional systems, it is hard to ensure which transaction completed event is triggered for transactions opened in one thread. To eliminate this complexity, we decided to change our infrastructure. For Oracle database table, in which events’ details are stored, Debezium listener will be integrated.
In my upcoming post, I will explain which steps are followed to integrate Debezium, SMT and Kafka to our current event infrastructure. Stay tuned!