As future data requirements cannot always be planned much ahead of time, Data Warehousing effort is generally subdued by first creating a Data Lake which is a pool of centralized data ready to be transformed based on use cases. A means for accessing and analyzing this data makes it easier for knowledge workers to make good informed decisions.
Before creation of the Data Lake, redBus had several challenges with data requirements which can be summarized as below
- Unorganized large sets of Data could not be utilized for analytics
- There was no single location to access data from multiple sources efficiently
- Query times often led to timeout exceptions due to growing data volumes and as RDBMS is not scalable for analytics
- Data is normalized which is also not suitable for analytics
Data in silos can be very difficult to unify for analytics. The first task to perform any analytics is to unify raw data into a centralized store.
redBus Data Platform is a collection of services to create a Data Lake, perform data transformations by server-less ETL jobs, Sync data from different cloud providers and offer a query interface for analytics
With redBus maintaining multiple AWS accounts for various business units and different types of data stores, we decided to work on a generic solution which would first bring raw data into a common format at a centralized store. Data is then denormalized and stitched across business units to provide a flat structure
Following technologies have been used to build the data Lake
- Apache Parquet format when well partitioned offers read performance making it suitable for analytics. The format is built to support very efficient compression and encoding schemes thereby offering a significant cost reduction on server-less query engines like Athena compared to other formats. The cost and storage size can be up to 10x lesser than csv
- AWS S3 was selected as a centralized store due to cost for long term storage. Using S3 also comes with another advantage as many services/tools connect seamlessly to S3 like Apache Drill, Spark, AWS Glue, Rclone etc.
- Apache Spark is the technology that runs on most Big Data platforms currently and the reason being spark is 100x faster compared to Hadoop Map-Reduce
- AWS Glue is used for Batch processing as cross-account-access enables us to run ETL Jobs on multiple data sources in different accounts. Glue also provides the necessary security as scripts are encrypted and stored in s3. Glue jobs are easily scalable by providing the DPU configuration before every job run
- AWS Athena connects to the Glue data catalog and has accesses to the data stored in S3. Athena is billed based on the data size ($5.00 per TB of data scanned). Since Parquet file size is about 90% lesser than CSV, it is efficient to use Athena and be billed on demand.
- Apache Drill offers the flexibility of running a single SQL query on multiple data sources (mongo, kafka, s3 etc) at once. Also with Athena’s limitation of query timeout of 30 mins, Drill is suited to run long running jobs and highly frequent jobs like alerts etc.
- Rclone is an open source cloud-cloud sync tool. We use it to sync two buckets between GCS and S3. rClone is easy to setup and configure. It also supports encryption of configuration file.
Data Platform Architecture
Glue is a fully managed server-less ETL service. The advantages are schema inference enabled by crawlers , synchronization of jobs by triggers, integration of data catalog with Athena, support for scala and python, scalability via DPU configurations.
A single Data Processing Unit (DPU) provides 4 vCPU and 16 GB of memory, which is equivalent to a m4.xLarge EC2 instance. It can be set at job parameters (optional) of a Glue job.
An AWS Glue job of type Apache Spark requires a minimum of 2 DPUs. By default, AWS Glue allocates 10 DPUs to each Apache Spark job. Every DPU hosts 2 executors. Out of the total 20 executors, 1 executor is reserved for Driver program and 1 DPU for the application master. The actual workload is carried out by 2 * 10–2 (Master) — 1(Driver) = 17 executors. To increase the memory assigned to an executor, in the job parameters, we provide additional parameters to the job like — conf spark.yarn.executor.memoryOverhead =1024
Glue also provides metrics (enabled in advanced properties) to monitor resource utilization of executors. Better performance can be achieved by parallelism of executors.
These are some of the metrics for one of our ETL jobs with 10 DPUs. Each (colored) line denotes an executor. Glue offers Data, memory, cpu and executor profiles along with logs (info, exceptions) on cloudwatch. This Job took about 14 mins to process data of size 25 GB
Making it run faster — AWS Glue metrics provides a means for determining if the DPUs are under-provisioned or over-provisioned. In the graph blow notice the Number of Maximum Needed Executors (green) starts at 122 and falls to a constant 15 thereafter. Active Executors (blue) shows the number of executors currently performing the workload and the fixed horizontal red line Maximum Allocated Executors denotes the 17 executors (based on our 10 DPU allocation). The ratio between the maximum needed executors and maximum allocated executors (adding 1 to both for the Spark driver) gives us the under-provisioning factor: 123/18 = ~7x. We can provision 7*9 + 1 DPUs = 64 DPUs to scale out the job to run it with maximum parallelism and finish faster.
On pricing, consider our ETL job that runs for 14 minutes and consumes 10 DPUs. The price of 1 DPU-Hour is $0.44. Since the job ran for 2/6th of an hour (14 rounded to 20 as the billing is every 10 mins) and consumed 10 DPUs, we are billed 10 DPUs * 2/6 hour at $0.44 per DPU-Hour or (20/6 * 0.44) = $1.46
At redBus there are various systems that produce transactions data, application logs, Google analytics data, ELB logs, streams etc. Spark is the ideal choice for both ingress operations for the data lake as well as the transformation required for data warehousing.
Spark uses RDD (resilient distributed datasets) for holding data in memory during the transformations. This is in contrast to Hadoop map-reduce where the disk or file system is used in between every map or reduce action. Datasets and dataframes are specialized RDDs with APIs to perform transformations.
Read data in parallel
Databases can be very large and improperly indexed. To load the dataframe can be very time consuming even in batches specified by date etc.
To execute Jobs faster, we use spark read option numPartitions to read data from database in parallel which means several connections to the database is opened at once, each reading a record size calculated by (upperbound-lowerbound/numParitions). This data is stored in the dataframe partitioned by the column specified in the paritionColumn option.
Types of Jobs
For daily ingress jobs, we use two types of data synchronizations.
- A full synchronization or snapshot is a total copy of a table which is usually done for Master data (changes are infrequent)
- Incremental synchronization which is for Transactions Data (changes very often).
Deduplication on data ingress
Consider reviews generated by users which can sometimes be modified by a back-end process. The challenge involving an Incremental synchronization is handling duplicates as a transaction table design usually consists of 3 important columns (Primary Key, Created Date & Modified Date)
We first load all new records based on the created date as today in a data frame (ReviewsNewDF). Another data frame holds all modifications that occurred today (ReviewsUpdatesDF)
It is possible that records were inserted and modified on the same day, this will lead to duplicates among the two frames. After a successful deduplication we get rid of the duplicates and write to the data lake
This is achieved using spark in two simple steps.
- Using a special join called “left_anti” which can provide unique rows only on the left frame.
- Apply a union with the resulting dataframe and ReviewsNewDF.
Drill is designed to be a distributed SQL query engine with capabilities like combining tables/topics/collections from multiple sources in a single SQL query and has more SQL support as compared to Spark-SQL.
Unlike Athena, Drill is based on a fixed cost model (ec2). We use Drill to run high frequency jobs which would otherwise be costly on Athena like alerting for specific business use cases, periodically generating reports etc.
This displays all the data that is on a Kafka topic (streams) joined with Mongo (application logs) and our data lake. Current day’s stats can be compared with historic data and extra info added from application logs if required.
Drill is a schema-free query engine. There is no need to educate drill about the underlying storage schema as Drill features a JSON data model that enables queries on complex/nested data as well.
Stores can be easily configured using the Storage plugin option.
With the CTAS (Create Table As Select) construct, we also run some ETL jobs for reports using Drill’s REST API. The above image shows Drill querying Kafka to write parquet files in S3. This process can be automated by a simple script and cron. It is more efficient to run Short one-time ETL tasks on Drill as Glue jobs are billed per DPU-hour every 10 mins.
Drill query profile indicating the maximum time spent was on kafka scan.
Drill is also an alternative for AWS Athena. Athena has a soft query timeout of 30 mins and Drill can be used for large query operations.
Glue crawlers provides the ability to infer schema directly from the data source and create a table definition on Athena. This is possible as Athena can access the Data Catalog of Glue. For our data lake, we run crawlers daily to discover new partitions created by glue jobs that can be queried from Athena.
Since ELB logs are already stored on S3. It becomes easier to combine them with transactions data. Using the prestodb’s split_to_map() function, we convert the query params of the URL to a key-value pair.
Deduplication on Athena
Once data moves from an OLTP to OLAP store (objects are immutable in S3), it is stored in time series or there are more than one record per transaction (due to updates or modifications).
When we require only the latest update, we apply a deduplication process before querying from Athena by partitioning the data and assigning a row number. Row numbers are assigned after sorting based on the modified date field, cancellationdatetime in this case.
In this blog, we discussed about how we use different services to build a data lake and why it is important to first build a data lake before warehousing the data. Some tips on how to increase performance of jobs and deduplication process.
Impact of Data Lake
Since the creation of Data Lake,
- We have unified data across multiple transaction systems, elb logs, google analytics data etc. This makes it easier for data scientists to build models
- Knowledge workers now have a single source to access all relevant information to make informed decisions which would steer the organization ahead.
- Applications no longer face SQL timeout exceptions as read or query times have reduced drastically.
- Saved on cost as historic data which was stored in aws rds is now compressed and migrated to data lake.
- Data warehousing becomes easier since data is flattened and stored in a common format at the lake.