How our own Serverless DataPlatform reduced AWS bill by 93 %

DataOrc
Dataorc
Published in
9 min readAug 1, 2020

by Mayur, Navdeep

Serverless servers are the servers that you don’t serve

In this post, we will explain how we build a Serverless data platform on Hadoop EMR(i had never thought I would say this ever). With the help of this architecture, you will be able to focus more on the Applications then on infrastructure/schedulers/logs. The architecture was able to achieve followings

  • The only abstract layer is your application, for example just define the hive SQLs and you are good with production-ready ETL
  • Each and every component is stateless from the fact we are utilizing s3 for storage, Glue for schemas and metadata, and schedulers in the form of lambda functions leveraging cloud watch
  • Code in any JVM based language, python, or pure SQLs, just define how you want to munch the data, rest been taken care of.
  • We will also explain how we streamlined development on an actual cluster with negligible cost.
  • EMR(Elastic Map Reduce) will not be staying idle for even a second, terminates as soon as your job completes.
  • EMR configuration is as simple as a JSON schema. Make the changes programmatically or to a static file, next time you will get a brand new cluster with your new config
  • As scalable as your pocket.

Read on to know more about how we achieved a 90% cost reduction on our data platform.

Let’s start

Architecture

lets set up the premises

  • You have incoming data in the form of s3 files. The format can be anything JSON, Parquet, Avro, etc
  • You want to either enrich the data with your custom static data which can be sitting in S3, DynamoDB, etc
  • Or you want to do some heavy lifting like tokenization, counters calculations, or inserting into the database(database should support your scale, another post?)
  • Or you want to create data cubes, semi aggregation to support OLAP

Hmm, the wishlist is long but very achievable, so lets first focus on infra that we will be spawning to deliver these use cases.

We will set up on-demand(but on spot instances) clusters which will be churning our timely data and spitting out the required format

Following are the properties of the EMR we will be spawning

  • Auto-scaling on CPU usage and disk(actually later one is more crucial)
  • The schema will be residing outside the EMR cluster i.e. in AWS Glue
  • We will utilize step functions to schedule our jobs(resiliency is inbuilt)
  • Cluster auto-terminates on job completion. (scale-down-behavior → “TERMINATE_AT_TASK_COMPLETION”)

Ok, let's get onto the meat of the problem, how are we solving execution of our jobs, and scheduling them on EMR.

The answer is AWS EMR Steps

A bit about steps, AWS EMR steps allow you to execute your binaries/jar via commands. All you need to define

  • jar location — s3 location
  • main-class
  • args

and EMR will run it for you.

How are we leveraging Steps is we run our whole DAG (mentioned below) in these steps. So if your tasks are highly dependent on each other, then the whole DAG will be running in a single step and if there are independent tasks, it will be submitted into multiple steps and so can be run concurrently too.

We have made this super easy to submit our jobs, through our library submitting steps is as easy as defining the JSON in the form

{
"step_name": "hive-preprocessing-insertion",
"step_type": "hive",
"depends_upon": "another_step_name",
"file_name": "s3://path/to/jar/or/sql"
}

Neat!! Eh??

Be it hive or spark, the following variables are available in the runtime of the job for the use case of any kind like watermarking, processing particular data n all
1. start-epoch-time
2. end-epoch-time
3. input path(optional)
4. dependent job start-time(if dependent)
5. dependent job end-time(if dependent)

Now as we have mentioned before too, we are running the whole DAG in single or multiple steps, the major reason for that is:

Steps don't give complex DAG functionality which can power features like dependency jobs, retries, passing of outputs.

So we build our DAG functionality with following salient feature ensuring functionary of DAG but not losing fault tolerance and resiliency

  • In memory DAG to support any complex DAG or dependencies,
  • It's very well in memory but every action in DAG is backed up by a cursor stored in S3(S3 is a key-value store!!)
  • On every job execution, cursor for that job is read to get the epoch start time and on every successful run, the cursor is updated. So even though DAG is in memory, its extremely resilient.
  • Concurrency is totally in our hands, either through our dag config or through AWS steps concurrency, we are leveraging both to run the jobs parallelly if required.
  • Auto-config concurrency parameter → Based on our learning of how much resources a client(Hive/pig/Spark) can take and based upon your configured master resources, our concurrency variable adjusts itself and determines how many jobs can run in parallel in a single step, so your master is not overwhelmed.

Now we know how we can configure our steps/jobs, the question arises on how to handle the idempotency part, meaning if failure happens, how next run will take care of it.

Here are a couple of practices we are following to ensure idempotency

  • Every job takes start-time and end-time
  • If a job is dependent upon the output of others, then its start-time and end-time too are passed to all its dependent job. Small caveat it is actually minimum of (previous cursor of the current job and dependent job cursor)
  • We have been using s3guard/EMRFS to ensure the job only succeeds when all the files and partitions have been loaded, hence all dependent jobs always get a consistent view.
  • Cursors are updated only when job is successful.
  • And we don't forget to raise the alarm of any failure through configured SNS email topic

These steps ensure even if a job fails, the next turn will bring us to speed

Now comes the resiliency of Hadoop jobs in itself, as mentioned before we have already enabled autoscaling, but that always does not ensures your jobs will always be successful. Sometimes data is so humungous, all jobs are just spilling the data on disk and maybe intermediate data can hit your disk limits.

Don’t worry, we have enabled EBS scaling too, EBS scaling boot-script mentioned during spawning of a cluster ensures whenever you hit configured percentage of disk, EBS expansion triggers. We have set this to 70%( yes in Hadoop disk fills up really fast). This feature really enabled us to sleep on both ears.

Ok now let’s see what we have covered

  • A scaling cluster — handling your burst or any scale data
  • In-memory DAG for scheduling multiple jobs — handling complex dependent jobs.

Last part, scheduling the spawning of our EMR cluster. We have been using the AWS cloud watch to schedule our EMR, the same jar which contains our jobs source code has a public class implementing AWS lambda event interface(com.amazonaws.services.lambda.runtime.RequestStreamHandler). which calls create-emr cluster function with steps in-built.

This enabled our architecture to be completely Serverless.

Don't be scared but these are an example of configs that we pull in realtime from Terraform output and build our cluster

so input to create-emr periodically function is

  • emr-config(mentioned above)
  • steps (created via DAG config)
  • scaling configurations

Now we have come to an end to architecture and last thing but the mighty one, cost

  • One of our clients was using serverless GLUE operations for crawling new data, Lambda for data transformation, kinesis for pushing new or relevant data.
  • These all operations were replaced hive scripts which are reading new data based on cursors we mentioned before
  • So s3 to Kafka, s3 data warehouse, Dynamodb with transformation in the form of UDFs in the hive were able to get the job done in single hive script(multi insert clause)
  • The earlier monthly bill for the 1 TB of daily data was around $3000 now the bill is coming out to be around $210. whoa!! 93 % cost reduction
  • Cost reductions came in the form of
  1. For each sink data was not getting transformed again(Multi Insert clause)
  2. Dynamodb lookups were converted into map join so even for billion records we were getting data once in the form of table and data was getting replicated to all jobs
  3. Files now being produced is of ORC type and since job was hourly(and sometimes 4 times in a day), we get a massive compression of around 2–5 times, so end storage cost got reduced.
  4. Athena queries are now scanning much less data and leveraging column pruning. Queries are 2–3 times faster too
  5. With aggressive scaling up and down we are shelling out price only for what being used.

One massive operation wise win was strict schema, earlier everything being on AWS GLUE, we were not getting constant schema since data we were consuming was not of constant schema and our analysts were complaining of Athena queries failing due to schema noncompliance. Now everything is strict schema because of hive/spark using only known columns for insertion.

Though this does not stop us from being forward/back-compatible, as any new columns were added at the end of ORC schema, which ensures back compatibility.

This whole exercise was full of lessons to keep the whole architecture as serverless and bringing the cost down. there are still miles to go and we are focussing on

  • Integrating GCP Dataproc in the same flow
  • More aggressive on choosing SPOT instances and types
  • Self-healing on next turn(really exciting about things we are doing)
  • We are leveraging Dr elephant on EMR jobs and AWS logs to know if there are any errors and take known reactions if something fails in the next turn

We have leveraged following tools to build this architecture

  • Terraform(for initial setup, required only one time), if you are running from scratch.
  • Clojure(For stitching and AWS heavy lifting)
  • Java/Scala/Python(if required for custom UDFs, UDAFs, spark jobs)

But these will be required if you wish to change any core functionality, else we just wish you won’t have to touch the first two.

Let’s cover them briefly

  • We are using the following terraform module to spawn ancillaries like VPC, private/public subnet, roles, security groups. Just define your config in variable.tf and do a terraform apply. boom!! everything is spawned along with a small sample EMR running.
  • Clojure, our favorite language to get things done neatly(functional programming gives you that). We are utilizing the Amazonica library to spawn, configure the clusters on the fly.
  • We have also made an extension for GCP too, with the same interfaces to create GCP dataproc clusters without changing anything in your application.

and that’s it, that are all the ingredients we are using to spawn the EMR cluster including all the jobs to run on the fly.

Let’s talk briefly about how you can streamline the development backed with the power of EMR.

What the above terraform also did is, it created the smallest instance in public subnet through which we can access our EMR cluster in the private subnet. In one of the outputs of terraform

#cmd
terraform output

the output will show the EMR bridge which can be used to connect to the EMR master node in a simple and safe manner via proxying through this node.

Forward the port like 10000 for Hive, 8998 for LIVY (Spark), etc to submit the job.

I will also mention that we have been using jAdaptive Java library to do all proxying and port forwarding automatically in our Clojure library, so as soon as we spawn the cluster via our library, all the relevant ports are available to your local machine and you are ready to do development in your favorite IDE and yet testing and deploying on actual distributed environment.

Some of the commands that can be useful to set all the ports are if you wish to do it manually

put this into your .ssh/configHost emr_proxy
Hostname 15.20*.42.**
User ubuntu
IdentityFile /Users/navdeepagarwal/prod_key
Port 22
Host emr_cluster
Hostname ip-10-0-19-119.ap-south-1.compute.internal
User hadoop
proxycommand ssh emr_proxy exec netcat %h 22
IdentityFile /Users/navdeepagarwal/prod_key
IdentitiesOnly yes
#this will bring all the ports via SOCKS to your local machine via 6667ssh -D 6667 emr_cluster

Phew !! that was quick!

Now Little bit about us :)

At Dataorc we always design solutions with scale and cost-effectiveness in mind. With the experience of building data platforms from scratch for enterprises as well as startups, we believe in giving solutions that grow with your business as well as make it grow. Do reach out to us for free sessions of open consulting with our tech team.

--

--