Designing a Data Pipeline in a Multi-Tenant Workload

Vishnu Viswanath
Capillary Technologies
5 min readOct 4, 2022

Callout: Article is for people with some basic knowledge of big data technologies or ETL pipeline

Introduction:

Data workflows have become an indispensable part of any organization. The computational needs for these workflows are varied and depend largely on the trends observed in the data.

The Data Pipeline at Capillary was built around two major design decisions:

  1. Simplified Interface of data access for all use cases across products
  2. The pipeline is agnostic to the data ingestion flows dictated by each tenant

Spark SQL was chosen as the simplified interface shared by both the ETL pipeline and also all the Business Intelligence tools accessing the data warehouse. Hence 90% of tasks in the ETL pipeline are represented by a Spark SQL query task.

Also, the ETL does computations that frequently need historical data for reference. Every tenant in the system has its own data ingestion mechanisms which would dictate the complexity of these computations. Tenants who have a major chunk of their customer base as repeat visitors would show higher latency for the same computations when compared with tenants with low repeat visitor rank.

With the advent of Spark on EMR and data bricks solutions, spark as a ‘computational resource abstraction’ is available for end-users to build custom data workflows.

This abstraction helps users design and build data workflows independent of the platform where the workflows execute. This abstraction also brings some overhead ie. the cluster launch/terminate time, general upkeep, metrics collection, and health monitoring, which need to be factored into our system design.

The question of how well your computation resource is being utilized is important to optimize costs.

Imagine your workflows are quite dynamic in nature with respect to the computing resource needed to successfully run the jobs. For instance say we start with a cluster with a finite amount of RAM and CPU resources. And your workflows are in such a fashion that some jobs do not need many resources, while others might need the entire cluster to complete within a reasonable SLA.

At Capillary, we build an ETL pipeline that operates in a multi-tenant fashion and executes this DAG of workflows on an EMR Spark cluster. Due to the multi-tenant nature of the tasks (proportional to the dataset corresponding to each tenant in the system), we observe some workflows need very less compute resources as compared to other tenants needing 100x larger.

Below is a sample query from our Dag, where we are joining two tables.

For a Sample Query1 (Tenant 1):

TableA size:3.4GB (approx 340 Million rows), TableB size: 3.2GB (approx 320 Million rows)

For same Query1 (Tenant 2):

TableA size:43.9MB (approx 439 K rows), TableB size: 18MB (approx 180 K rows)

For classification purposes ‘S’ tenants would show a trend of 5000–10000 line items coming in on a daily basis whereas ‘M’ tenants would bring in 35k-100k line items.

A tenant of size ‘L’ would be 100k+-1Million line items on a daily basis. We also have an XL categorization of tenants who show data trends of more than 3M line items.

The Problem Statement:

Having to operate with 200+ datasets with just 200 tenants and 10 levels of transformation the number of tasks that build up the pipeline stack up to 400k tasks. Even with sufficient parallelism of execution, submitting these queries over native spark-submit would drastically increase the overhead of launching the spark contexts.

Launching a large common spark context that is to be shared by all resources can lead to idling and/or starvation scenarios for execution.

A Rest API-based web server that can manage the spark sessions could act as a load balancer for the ETL pipeline tasks.

Enter Apache Livy.

Apache Livy: A Recap

Apache Livy is a service that enables users to interact with the Spark cluster using REST APIs. Now through Livy, you can fine-tune the session size required and how many parallel queries can be executed, and so on.

https://livy.incubator.apache.org/docs/latest/programmatic-api.html

Livy Profile-Based Scaling

When we are profiling tenants what that translates to is a set of spark properties that are passed to spark session via livy. This can be imagined as opening a tenant-specific pipeline with relevant configurations through which all the subsequent queries will flow through.

One such spark configuration is “shuffle partitions”. The data ingestion trends mentioned earlier for a tenant with high daily data volume could lead to highly unoptimized execution in the same pipeline. Hence data partitioning based on data ingestion trends is vital. These trends dictate the exact property to attach to the session.

Essentially what we are doing here is looking at the trends in data and classifying tenants based on them.

Let’s say a tenant in the Food and Beverages business vertical would have higher daily data volume compared to a tenant in the Jewelry vertical.

Queries of orgs with small data trends get submitted with a small data partitioning factor and vice versa. Most of the tenants would fall under the data partitioning classification of load balancing.

Some outlier cases exist with a larger data volume than all the other tenants combined. These can be pushed exclusively into a session size with larger memory configurations and also its own unique data partitioning parameters.

For example, let’s take 4GB mem/core cluster resources. We have three resource profiles: Small, Medium, and Large. The shuffle partition of Small would be 64, for medium would be 256 and for Large would be 512. This ensures that the workloads are split into sizable chunks which can be computed and worked upon.

Custom load balancer works on the premise that every task in the workflow sets certain tags and these tags enable us to identify the profiling configuration of the task and route it to the appropriate session built for that tenant.

Conclusion:

It is imperative that some amount of domain knowledge goes into building something generic. The domain knowledge can be extracted from data trends. Also observing the data trends can help us decide on the data partitioning properties (shuffle partitions) that would optimize the jobs. There are outlier cases where we had to do ‘XL’ profiling of tenants where the current shared cluster mode does not scale. They are routed to a dedicated cluster entirely for the tenant.

We hope these learnings provide an introduction to Apache Livy and it helps you make better design decisions when building and scaling complex data workflows.

Reviewed by : Abhilash Lakkur, Mani sankar J, Satish Tvv and Bhanu Bobbali.

--

--