Perform Realtime and Periodic ETL with WSO2 Stream Processor

Suhothayan Sriskandarajah
Stream Processing
Published in
9 min readJul 21, 2018

Data integration is an integral part of an integrated enterprise. One of the key aspects of data integration is having the capability of loading or moving the data from one location to another location to be processed or stored for future reference. This process is commonly referred to as extract, transform, and load (ETL). This dates back to the time where we started using databases. In early days the ETL job is done as batch processing where we extract, transform, and load a huge amount of data in very sparse intervals for archiving or analysis purposes. But in the current day’s enterprises demand for the results instantly have propelled ETL functions to run in real-time. In this blog we’ll look at the advantages of real-time ETL functions, how both in real-time and periodically data can be extracted, how they can be transformed, and how they can be loaded to the final system using WSO2 Stream Processor (WSO2 SP) as an example.

Instead of updating the stakeholders once every day with the traditional ETL systems, real-time ETL functions let you keep your stakeholders and systems up to date using real-time data integration as and when the data is ready for extraction. Here, when the data is available it is immediately extracted, transformed according to your requirement and loaded to the system for processing. This is very useful when we have a data source that can push data or inform the systems as and when data is available for processing. But some data sources are passive where we need to periodically poll the system to check for new data or updates and perform ETL functions when they are available. Both of these approaches are commonly adopted in the modern enterprises. These ETL processes have now become the central part of the information backbone of most of the organizations.

WSO2 Stream Processor (WSO2 SP)

WSO2 Stream Processor (WSO2 SP) is an open source, and a cloud-native product optimized for processing data in real-time for agile digital businesses. It has the capability of processing 100k events per second with just two nodes and scaling more with its distributed deployment. As shown below it can consume events from various data sources, process the data in real-time, and produce instantaneous actionable insights.

Figure 1 : WSO2 Stream Processor Overview

In the following sections let’s see how we can achieve ETL functions using WSO2 Stream Processor.

Data Extraction

WSO2 SP has pre-built connectors to consume data from Kafka, JMS & MQTT Message Brokers, RDBMS databases, MongoDB, Solr, Cassandra, HBase, ElasticSearch, REST APIs, HTTP, TCP, Email, File, and many others.

WSO2 SP use multiple techniques to extract data from the sources

  • Subscription: Systems like Kafka, JMS & MQTT Message Brokers are push-based systems from which WSO2 SP consume messages by subscribing to them.
  • Exposing Services: Some clients publish data using HTTP, or TCP transports in such cases a service will be exposed by WSO2 SP to consume the messages.
  • Change data capture: For systems like RDBMS databases and files, DB logs and file system triggers are monitored to catch the changes to extract the relevant data.
  • Polling: For other passive systems like MongoDB, Solr, Cassandra, HBase, ElasticSearch and REST APIs, periodic polling is triggered to extract data from the systems.

Data Transformation

Since organizational systems are heterogeneous the data they produce also comes in multiple formats, the ETL systems should have the capability to transform the data to a format that they can use for storing or further processing. WSO2 SP has default support for transforming data that comes in JSON, XML, Text, WSO2 Event, CSV, and key-value formats, and allow users to provide a mapping to convert the incoming data to a format that they prefer. In WSO2 SP the collected data are mapped into one or more streams, which have a logical series of events having the same set of attributes or schema. When data is mapped into streams WSO2 SP allows you to write Streaming SQL queries to manipulate the data, these manipulations can be very simple like replacing null values with a sensible default or it can be even running a machine learning or natural language processing operation.

WSO2 SP support following forms of data transformations.

  • Data filtering and cleansing
  • Splitting and concatenation attributes in a single event or recode
  • Adding default values
  • Updating values upon a condition
  • Performing mathematical and logical functions based on the data
  • Enriching the data by correlating with other data streams
  • Enriching the data by correlating with stored data from RDBMS and NoSQL data sources
  • Enriching the data by calling external rest APIs
  • JSON transformations
  • Handling late and out-of-order data arrival
  • Correlating data based on arrival order using pattern and sequence queries
  • Aggregating events over time
  • Running machine learning models over the data
  • Performing Natural Language Processing on the data
  • Doing Geographical Processing with the data
  • … and many others

With the above-mentioned techniques, the incoming data can be transformed and if needed further processed according to the business use cases. WSO2 SP provides a drag-and-drop graphical editor to build the transformation and processing pipeline, the same can also be done by scripting using the easy to use Siddhi Streaming SQL language. Since WSO2 SP uses streams as its intermediate data format the above operations will consume events from one or more streams, do the necessary transformation or processing, and produce new streams which can be then consumed by another operator for processing or loaded into the end system.

Data Loading

When it comes to loading data to the end systems, like data sources the data sinks also require the data to be in a message format that they prefer, and at times they only consume messages via a particular messaging protocol. WSO2 SP supports loading data to Kafka, JMS & MQTT Message Brokers, RDBMS databases, MongoDB, Solr, Cassandra, HBase, ElasticSearch, HTTP REST services, TCP endpoints, Email & Files systems, and to many others. It also by default supports pushing messages in JSON, XML, Text, WSO2 Event, CSV, and key-value formats, and the users can customize the messages before they load or send to the end systems. To achieve performance Stream Processor can be confirmed to load data in batches and using multiple threads.

WSO2 SP using the following techniques to load the data into external systems:

  • Data publishers: To publish data for pub/sub systems like Kafka, JMS & MQTT Message Brokers
  • Database clients: To write or modify RDBMS databases, MongoDB, Cassandra, HBase data stores
  • HTTP and TCP clients: To send data to HTTP REST services, TCP endpoints, Solr, and systems like ElasticSearch
  • Email or VFS clients: for send Email, or write to the files systems
  • … and more

Guaranteed ETL with Zero Data Loss or Downtime

When working with heterogeneous systems there is always an uncertainty of endpoints becoming unreliable and node going down. This can cause data loss and system downtimes. To mitigate endpoints unavailability scenarios WSO2 SP automatically retries to its endpoints in an exponentially decaying fashion until same or an alternative endpoint is available to load the data. Further, it can also be deployed in a highly available manner such that SP node failure will not incur any data loss or system downtime. WSO2 SP employs a periodic incremental state persistence and an event syncing mechanism among its nodes, that allows it to achieve zero data loss or zero system downtime processing up to 100K events per second with just two of its nodes even when it was doing a stateful data transformation.

Simple ETL Examples with WSO2 SP

Let’s consider a couple of scenarios where we retrieve transaction data which need to be transformed and loaded into another system for processing.

Scenario 1: Extract data from JMS, perform a stateless transformation, and load to a database.

Let’s consider in the first scenario the data comes from a JMS topic in JSON format, which we need to clean the data by checking for null values and using defaults, enrich the data by joining with an RDBMS table and load the results to another RDBMS table.

Let’s assume the input data format will be like:

{“userId”:“2345”, “transactionAmount”:456, “location”:”CA, USA”}

Let’s say we need to transform this data to have userId, userName, transactionAmount, and locations fields. Where if the location is not provided in the incoming message we have to use the word “UNKNOWN”.

Finally, this data should be stored to a MySQL table, having columns userId, userName, transactionAmount, and transactionLocation.

A sample Siddhi Application for this scenario is listed here:

@App:name("JMS-to-DB-ETL")@App:description("Extract data from JMS, perform stateless transformation, and load to database")-- Sample input message: {“userId”:2345, 
-- “transactionAmount”:456.0,
-- “location”: "CA, USA”}
@source(type = 'jms' , destination = 'transactions',
factory.initial = '...', provider.url = '...',
@map(type = 'json',
@attributes('$.userId', '$.transactionAmount', '$.location')))
define stream TrasanctionStream (userId long,
transactionAmount double, location string);
-- Table used to enrich data
@store(type = 'rdbms', datasource = 'TRANSACTION_DATA_SOURCE')
define table UserTable (userId long, firstName string,
lastName string);
-- Final table to load the data
@store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE')
define stream TrasanctionTable (userId long,
transactionAmount double, location string);
@info(name = 'CleaningData')
from TrasanctionStream
select userId,
transactionAmount,
ifThenElse(location is null, "UNKNOWN", location) as location
insert into CleanedTrasanctionStream;
@info(name = 'EnrichData')
from CleanedTrasanctionStream as c join UserTable as u
on c.userId == u.userId
select c.userId,
str:concat( u.firstName, " ", u.lastName) as userName,
transactionAmount,
location
insert into EnrichedTrasanctionStream;
@info(name = 'LoadData')
from EnrichedTrasanctionStream
insert into TransactionTable;

The Siddhi Application’s graphical design view for this scenario in the Stream Processor Studio is represented in Figure 2.

Figure 2: Design view of Scenario 1 in the WSO2 SP Studio

Scenario 2: Extract data from a database, perform a stateful transformation, and load data to Kafka.

Let’s consider we are going to poll the database every 5 min, performs aggregations on the data, and load that into Kafka. Let’s assume the input table will have the following columns: transactionId, userId, transactionAmount, and transactionLocation. Where we need to aggregate this data every 5 minutes to get total and average for each transactionLocation and load the result to Kafka in using default XML format supported by WSO2 SP.

A sample Siddhi Application for this scenario is presented here:

@App:name('DB-to-Kafka-ETL')@App:description('Extract data from database, perform stateful transformation, and load data to kafka')define trigger TriggerStream at every 5 min;-- Sink to load the data
@sink(type='kafka', bootstrap.servers='...',
topic='...',is.binary.message='...',
@map(type='xml'))
define stream TranformedDataStream(transactionLocation string,
totalTransactionAmount double, avgTransactionAmount double,
transactionId long);
-- Store table
@store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE')
define table TransactionTable(transactionId long, userId long,
transactionAmount double, transactionLocation string);
-- In-memory Table to keep last processed transaction ID
@primaryKey('key')
define table TreaggerStateTable (key string, lastProcessedId long);
@info(name = 'CollectLastProcessedId')
from TriggerStream as s right outer join TreaggerStateTable as t
select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
as lastProcessedId
insert into DataRetrivalStream;
@info(name = 'ExtractData')
from DataRetrivalStream as s join TransactionTable as t
on s.lastProcessedId < t.transactionId
select t.transactionId, t.transactionAmount, t.transactionLocation
insert into ExtreactedDataStream;
@info(name='UpdateLastProcessedId')
from ExtreactedDataStream
select "lastProcessedId" as key, transactionId as lastProcessedId
update or insert into TreaggerStateTable
on TreaggerStateTable.key == "lastProcessedId";
@info(name = 'StatefulAggregation')
from ExtreactedDataStream#window.timeBatch(5 min)
select transactionLocation,
sum(transactionAmount) as totalTransactionAmount,
avg(transactionAmount) as avgTransactionAmount,
transactionId
group by transactionLocation
insert into TranformedDataStream;

The Siddhi Application’s graphical design view for this scenario in the Stream Processor Studio is represented in Figure 3.

Figure 3: Sample design view of Scenario 2 in the WSO2 SP Studio

Enabling ETLs with WSO2 SP

As shown in the above example scenarios WSO2 SP allows you to write ETL processor without writing a single Java code. What you need to do is to open the Stream Processor Studio/Editor that comes with WSO2 SP and simply type in the scripts using Streaming SQL or use the Graphical Editor to drag and drop components and build ETL flows. After creating the Siddhi Applications they can be deployed in WSO2 SP. The status of the ETL process and the performance of WSO2 SP nodes can be monitored via its Status Dashboard to identify performance and bottlenecks if there are any. Furthermore, such ETL processes can also be templated and enabled to be deployed by non-technical users with a click of a button using WSO2 SP’s Business Rules Feature.

In conclusion, Real-time ETLs have now become inevitable for any organizations and with products like WSO2 Stream Processor building such ETL process has become a very simple task. Download and try out the latest stable version of WSO2 Stream Processor to implement the scenarios. You can get more details about the product by visiting its official website, for further help refer to the documentation and the Siddhi Query Guide.

--

--

Suhothayan Sriskandarajah
Stream Processing

Director at WSO2, work on Big Data and Stream Processing solutions.