Ballerina is a JVM based programming language targeted for developing integrations. Therefore, it provides built-in support for communicating over many protocols and handling many data types as first-class constructs. In addition, there are a large number of connectors and modules available in Ballerina Central to simplify the integration and data processing tasks. This makes it suitable for developing ETL flows as an alternative to using ETL tools as we discuss in this article.
ETL is the process of gathering data from various sources, performing required data cleansing, transformation, mapping, enriching, etc operations and loading those to target data stores. Each step in ETL has certain challenges to address (This is discussed in detail here):
Extraction: ETL tools have to connect to various data sources such as RDBMS, NoSQL data stores, file servers, messaging systems, COTS or in-house developed business applications and SaaS systems to extract data. These systems may use different protocols (e.g. HTTP/S, FTP, AMQP, etc), message formats (e.g. JSON, XML, CSV, etc) and authentication mechanisms (e.g. basic auth, OIDC, SAML2, etc). In addition, some source systems may use proprietary protocols, formats and authentication mechanisms. Furthermore, ETL tools have to support batch extraction or real-time extraction depending on the data source type and business requirements.
Now, let’s see how we can implement such data source integrations in Ballerina. First, we can connect to RDBMS sources using Ballerina JDBC connections as follows:
Similarly, if we want to fetch data from HTTP or FTP services, we can use connectors built-in to the language and available in the Ballerina Central as follows:
Furthermore, we can use language’s built-in support for reading CSV, JSON and XML formats to read data over necessary formats. For example, below code snippet shows reading CSV data from a file:
Finally, we can utilize built-in authentication mechanisms such as OIDC, JWT and basic authentication to connect with data sources. Example of making a OIDC authentication is shown below:
Transformation: ETL tools have to support data processing operations on multiple data formats such as JSON, XML and CSV. These operations could include:
- Filtering out incomplete records
- Combining or splitting fields
- Assigning default values to empty fields
- Validating data against a schema or validation rules
- Mapping different values used for a specific field to a standard value required by the target store
- Enriching data by fetching additional details from other data sources
ETL tools should support simplified methods to implement such operations and provide efficient runtime for working with large data sets. Furthermore, it may be needed to store invalid records separately and send notifications (e.g. email) to relevant parties regarding invalid data so that such data can be inspected manually.
As Ballerina is a programming language with usual flow control constructs and built-in support for common data types, we can perform all data processing activities within the program flow. For example, we can access JSON data in a similar way to accessing object fields as below:
Similarly, XML data can be loaded and traversed using the below syntax:
Load: Similar to the extraction phase, loading phase also has to connect with relevant target data stores and load processed data using required protocols, formats and authentication mechanisms. Usually target data stores are RDBMS or data warehouses. Therefore, methods for connecting with external systems and using multiple protocols that we discussed under the Extract phase are applicable here as well.
In addition to above ETL specific tasks, ETL tools have to work within the enterprise application environment by integrating with logging, monitoring and tracing systems, supporting deployment on various infrastructures (on-premise VMs, cloud VMs, Kubernetes, OpenShift, etc), providing methods for automating the deployment, integrating with CI/CD pipelines and facilitating the maintenance of multiple pre-production and production environments as necessary.
We can enable observability aspects for ETL flows developed using Ballerina via a configuration file for popular logging, monitoring and tracing platforms, namely ELK (logging), Prometheus (monitoring) and Jaeger (tracing). Regarding the deployment, Ballerina’s compilation output is a jar file as Ballerina is a JVM based language. Therefore, it is possible to deploy ETL flows developed in Ballerina on any infrastructure that supports Java runtime. Furthermore, Ballerina language provides annotations to automatically generate docker files and Kubernetes deployment artifacts to simplify the deployment on container based infrastructure. For example, below annotation can be used to generate a Kubernetes pod with a Ballerina ETL flow that executes every week day at 10 PM:
Once we develop ETL flows in Ballerina, each ETL flow becomes a independent unit (jar file, container or a pod). Therefore, it is possible to independently develop, deploy and scale each ETL flow, allowing us to consider those as ETL micro services.
So far we have explored basic of ETL processes and some of the features in the Ballerina language that can be used for developing ETL flows. Now we can consider an example ETL scenario and the implementation of it..
Let’s assume that a warehouse handles orders from multiple stores. Warehouse staff adds details of all orders handled within each hour to a CSV file and uploads it to a Amazon S3 bucket. At the end of the day, it is necessary to process and insert order data received within the day to the organization’s database using a ETL flow. Below are the required processing steps:
- Ensure that each record has 5 fields
- Skip any record that starts with #
- Validate the format of each field (e.g. Item ID must start with an uppercase letter followed by underscore and three numbers, quantity must by an integer, etc)
- Postcodes of store locations are given in CSV records. It is necessary enrich records by adding the corresponding city name. Data about postcodes of cities can be fetched from the organization’s database.
- For each invalid record, reason for invalidation and the input file name need to be inserted as additional fields to the record.
- All valid records have to be inserted to the organization’s database.
- All invalid records have to be stored separately, so that those can be reviewed and corrected. For this example scenario, we are adding invalid records to a Google Spreadsheet.
- Once the processing is complete, CSV file have to be removed from the input S3 bucked and placed in a separate (backup) bucket.
Let’s consider the Ballerina program for implementing this ETL flow. First we need to fetch CSV files from a given S3 bucket. For that, we can use aws.s3 connector available in Ballerina Central. S3 connector can be initialized as follows:
Then we can iterate through all objects in the relevant S3 bucket using the below code:
We are using the regex module available in Ballerina Central to validate fields against the required criteria. Further, Ballerina JDBC connector that we discussed earlier is used for connecting with organization’s database to fetch postcode details and also to insert validated records to the target table.
Finally, we are using Google Spreadsheet connector to insert invalid records to a spreadsheet. Similar to the S3 connector, spreadsheet connector has to be initialized as below:
Then we can insert invalid records to the Google Spreadsheet using the below code:
Now we need to run this ETL flow at the end of each day (let’s say at 11.30 PM). For this, we can add the following annotation to make it a Kubernetes job that run everyday at 11.30 PM.
Complete Ballerina code for this ETL flow is available here. We can try out this flow by adding data1.csv and data2.csv files to a S3 bucket and running the generated jar file (after configuring S3, Google Spreadsheet and database connection details).
Once the ETL flow is executed below data will be inserted to the database:
Note that all records in the database conforms with the validation rules and an additional field is added containing the city name. Further, below records regarding invalid data will be added to the spreadsheet:
We can see that two additional fields are added with the input file name and the reason for invalidation.