Building Scalable Analytics with AWS — Part I
In this post I'm gonna be talking about my personal experience on a medium size big data project and the road to put an analytics environment that can persist in time in production.
How to start with a project involving big data at your enterprise?
In my experience at least:
- There needs to be a specific use case, such as someone needing to take operational decisions from the data per se, not by mere intuition or by reports taken from a data warehouse.
- A clear stakeholder defending the value that a solution like this offers in time, remembering that data loads vary in a volatile fashion.
- There needs to be a team or a set of specialists available to start democratizing data access and self-service to the tools offered.
So on my particular use case, I needed to refresh a live dashboard every 15 minutes or so with aggregated data comparing two worlds:
- The transactional world: in which there is information about stores in the company and what should be the reality going on. In this data base, we have their schedules, addresses, and all the data corresponding to these particular entities. This was presented in PostgreSQL.
- The online world: this is what is being shown to the users on the application. This is another data base presented as an ElasticSearch index.
The objective here is to merge both worlds in order to achieve a metric of the online availability of the stores according to their schedule.
This should be presented in a dashboard such as the one shown below
Ok, let's head on to the architecture!
The requirements to achieve the dashboard above as I remember are:
- There is a need to process every 15 minutes in a batch way all the stores together with their schedules against the live view the user is being shown
- I need some kind of storage to keep the aggregated data I calculate for this live view against what should be happening (Postgres vs ElasticSearch)
- There needs to be way to understand what's the schema of the data in that storage
- I need tools to process that stored data.
- I need a tool to present that data, and it has to meet some flexible requirements, such as running every some amount of time I determine, be shared only with the users the business wants, and it needs to share resources efficiently between people wanting to explore data.
- As we manage many countries at my company, we need to replicate the batch ETL architecture to each country, for it to have limited access to it's own data stores.
- I need to automate the whole process (phew)
I came up with something like this:
Let's go from left to right:
- There are some data sources available for me that I need to merge in a tool to process ETLs every 15 minutes or so
- An admin role is set up to monitor what is going on should anything fail (which obviously will fail). This part is performed with EMR, together with CloudWatch alarms plus a specific IAM role, setting up cron every the amount of minutes needed
- There is a unified source (RDS) from which we can have access to the metadata, such as table schemas and partitioning strategies, in the tables we store our data from the batch processing. This is a very common pattern called a Hive Metastore
- There is a centralized storage where data is kept as the metastore presents. This is done with the S3 service.
- There are analytics users who access another interface, the Zeppelin interface, where they can present the data obtained from the aggregated tables in the storage proposed. They have a contained environment to write notebooks, cron them, share them, and everything securely backed up in S3.
- All the countries push data towards the metastore, having a centralized source of information, to make analytics against any country.
There is a unified way of accessing the data. This is a pattern that Netflix follows to have secure access to data.
Analysts have a unique interface to share their notebooks only for making analytics, and they have a set of utils to access any raw database, taking into account that Zeppelin is a reduced resource application, meant to explore data and present it in an aggregated fashion.
EMRs have their responsibilities reduced to the data processed in a country, so if a country fails the others continue to process the data eitherways.
Should any more requirements arise, more ETLs can be added to this architecture easily, with correct orchestration, and the clusters scale on their own with AutoScaling policies.
If the metastore becomes unavailable, no one can issue their queries against the data stored in S3.
When managing different accounts, we need to enable VPC peering in order to get to resources. If this peering fails, we're gonna have a bottleneck of data wanting to be inserted and discarded, since we won't be getting access to the metastore resource.
So how exactly was the process of getting this to production? In the next section I'm gonna go over it, and in the second part of this article I'm gonna be providing the WHOLE Zeppelin configuration that worked for me for now.
Tech Bits and Pieces
In this part I want to tackle the parts of the architecture in middle level. What are the parts of code/applications involved to get this to production?
- Application running in ETLs : this is a Spark application running on Scala. Spark is a robust distributed processing framework with support to join many data sources — as I'm running on EMR all the Spark dependencies should go
With the correct tuning, this application runs through the entire store data base, with more than 10k, with all their schedules and associated data, against the elasticsearch production index below a minute.
A good advice when querying against ElasticSearch with Spark is to use the
pushdown option to transform all the DataFrame operations to ElasticSearch queries. Be aware that when going through a cloud installation of ES, you need to enable the
For this, I created a simple helper, like this:
One other lesson I learned when creating this app, is that in order to reuse heavy objects, such as a
CloseableHttpClient to make API calls, it's cool to use the
mapPartitions transformation against a partition of your data in order to spend as few resources as possible per partition. If you use a
map transformation with a client, you would be wasting resources, either by creating a new client per element, or throttling it and sending it over the net each time you process a record.
An example is as follows:
Be aware that the logger I'm passing there is a configured
log4j Logger. If you want to use it during the job and workers, you should instantiate it as
Lastly, in order to push the data updating the table partitions it has, you should use the `DataFrameWriter` 's
insertInto method (if we want to keep the history per run, we need to
Append the data). I'm using the parquet format to write the data in a compressed columnar fashion.
- Application Utils that Automate cluster deployment: we made with the team another set of tools in Python to deploy and automatize via Jenkins cluster deployment, cluster termination, deployment of the Spark Application and CloudWatch and SNS event subscriptions via email.
In order to access the AWS API's, we decided to use the
boto3 python sdk. One cool thing in order to avoid passing credentials explicitly to the commands is to use a
Session object that automatically searches for credentials with a given
region , as in:
Then via the EMR BOTO3 api we configured the clusters with the
Ok, so this has been the first part of the architecture. The second part will focus on the Zeppelin bootstrap.
This post has been a little bit technical. Please feel free to ask any questions on the comments section should you feel I haven’t been clear on something.
- Creating a Data-Driven Enterprise with DataOps https://testbacblog.files.wordpress.com/2017/12/click-here-to-access-the-ebook-data-driven-organizations.pdf
- Apache Parquet https://parquet.apache.org/
- Boto3 EMR API https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html
- ElasticSearch Spark https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html