The problem: We were looking for a way to make our data analysts’ work more efficient, as Redshift wasn’t holding under the pressure (we hold total of ~8B records), and we wanted to cut the costs of holding a gigantic redshift cluster (currently holding 32-nodes).
Redshift inefficiency: Each of the nodes in Redshift holds both processing (cpu/io) & disk space — which are coupled together. In other words: if you want more disk capacity, you have to increase the size of your cluster (more nodes). it is costing us a small fortune. There has to be something better.
The Idea (Presto)
This is a Datalake, so we want to keep all of our data. But we don’t actually need expensive redshift nodes to be running all the time. This led us to find Presto. Presto is a distributed SQL query engine, designed for analytic queries. Presto decouples the data from its processing; No data is stored in Presto, so it reads it from elsewhere. e.g. S3. and since S3 storage is really cheap, it makes a lot of sense to use it as the storage for your Datalake.
Presto supports standard SQL syntax which makes it easier for use by our data analysts. Do note however that there are some differences in the SQL dialect from e.g. Redshift’s SQL.
AWS Athena (“managed presto”)
Presto exists as a managed service in AWS, called Athena. You send a query to Athena, which uses Presto as its querying engine, to query the data that you store in S3. The cool thing is, that you pay per query (1TB scanned= 5$).
You don’t have to deal with setting up the cluster, maintaining it or running those machines. You only pay to run the query (results of your queries are cached in S3 by default for 45 days).
But since data in S3 is just the data, we need somewhere to store metadata about what is contained within those S3 locations. “Glue Catalog Metastore” comes to the rescue
AWS Glue Catalog Metastore (AKA Hive metadata store)
This is the metadata that enables Athena to query your data. Think about it: without this metadata, your S3 bucket is just a collection of json/csv/… files — there isn’t any column types metadata
While you can certainly create this metadata in the catalog by hand, you can also use an AWS Glue Crawler to do it for you. You create a table in the catalog pointing at your S3 bucket (containing the output from previous steps), and set a crawler to run on that table.
Crawler Type: If your data is static (same columns) you can create an ‘on demand’ crawler and run it once. But if it may change (e.g. new column added), you can set the crawler to run on a schedule, and have it update your table metadata automatically.
We have now completed the most basic use-case of Athena: we have our data in an S3 bucket, and we have a table in the catalog describing that data, so we can now use Athena to query that data. But it will not be efficient in any way, let’s continue in the journey
AWS Athena Optimizations
Those steps are important for Athena to run your queries fast & cut costs, and I describe them here in detail, but since they are quite common, they are actually very easy to implement, and in our case comprise of 2 lines of code (specifying various options)
While you can direct Athena to an S3 bucket containing your raw data (be it CSV, JSON, other..) it will not be optimal for querying in both performance (time) and cost (data scanned). Example: say you store your raw data in JSON format. Even a simple query such as count(*) will need to load the entire contents of that bucket to memory - AKA full table scan in conventional databases. We can do better:
- Columnar storage: CSV and JSON aren’t optimal. they are row-based and you can’t seek into specific columns that you are interested in. So what is optimal? file format called Apache Parquet (alternative is ORC). It stores data in a columnar format. This way, Presto won’t need to read the entirety of the data — but only the columns we’ll actually be using in our query (you aren’t still using select *, are you? :). it also allows it to be more compact, and compress it better since the data in a column is more uniform. It is a recommended format in this platform.
- Partitioning: In traditional DBs you have indexes. But they are not suitable for BigData — since those indexes will be BigData themselves :) Partitions to the rescue. Example: If you see that most of your queries filter by a specific column, then you should partition by that column, e.g. by Year — so, after partitioning, you’ll have multiple folders in your S3 bucket (2017, 2018, 2019). you can then sub-partition by month, etc. Then, when you run a query, only data from relevant folders will be loaded which will drastically cut processing time and costs.
- Compression: With BigData we are mostly capped by IO, so let’s reduce file sizes by compressing them. We’re using Snappy as it seems to be very popular and fast. It doesn’t compress by as much as other formats but we don’t care about that too much since S3 storage is very cheap. Perhaps further research is required here, feel free to experiment with the other supported formats:: Zlib, LZO, GZIP.
- File sizes: Athena (Presto) likes big files. Recommended minimum is 128MB per file.
How to do these transformations
The tool for the job is Spark. Spark is a general purpose distributed data processing engine. Or in other words: load big data, do computations on it in a distributed way, and then store it. Spark supports all the previously mentioned optimizations (columnar file format — parquet, partitioning, snappy compression, setting size).
Spark supports both Scala & Python. Even though Spark itself is written in Scala, I recommend you to use Python for your Spark jobs, as it is vastly more popular among Spark users (many data scientists use Python). It will be far easier for you to find documentation and get your question answered online.
How to run Spark
- Spin your own cluster or use Amazon EMR. But it requires more involvement, and you pay for the cluster even when you’re not using it
- AWS Glue. Glue is a completely managed service to run your ETL jobs. It spins a Spark cluster ad-hoc to run your job. You don’t pay for this spin-up time. You write a job (Python/Scala) that does ETL, set a scheduling trigger with it (e.g. every half hour) and that’s it.
In our case: load CSVs from S3, repartition, compress and store to S3 as parquet. you pay only for the execution time of your job (min 10 minutes)
Processing only new data (AWS Glue Bookmarks)
In our architecture, we have our applications streaming data to Firehose which writes to S3 (once per minute). We then take this raw data, and transform it using a Glue job, every 30 minutes. But how should we mark the data we’ve already processed?
First option: move current batch of files to an intermediary folder in S3 (“in-process”). After processing, move to an archive directory in order to avoid re-processing of same data.
Second option: we chose to use an AWS Glue feature called bookmarks. Once a job completes successfully, it sets an invisible bookmark. Any change you make to that bucket after the bookmark has been set, will be worked on by the next execution of the job (behind the scenes, modification dates are examined by the bookmark)
Note: you may only add new files, or append to existing files. Overwriting data in existing files is not supported (nor should you want to do it. In fact, in the new world of event sourcing, overwriting means hiding a crime)
Making new data available for Athena
Remember the crawler from before? where we said that if the metadata of your data doesn’t change you can run it only once? that’s not entirely true if you are doing partitioning.
The crawler has another important mission: watching for new partitions created in your bucket. Luckily, you can run it on a schedule and it will automatically recognize the partitions, updating the metadata stored in the catalog, making it available to query by Athena. Just make sure you store them in the default format (each partition in a different folder,e.g. flow_id=s3:\\bucketName\flow_id=flow1\entity_id=entity1)
Our Use case: Querying in Athena (Presto)
We only needed to make minor changes to make our original redshift SQL query to run on Athena (Presto).
The query in redshift took 16–17 minutes to run on a 20GB sample data, but after optimizing it (see below) now runs in 11 seconds (!) with a cost of 10 cents
Optimizations we did to the SQL query
- rewrite with partitions in mind: whenever you can filter (‘where’) on a column that you’ve partitioned by, do it
- Remove columns from subqueries which aren’t actually used in the final result.
- Use approx distinct count whenever you don’t have to have an exact count. so replace your distinct count(distinct x) with approx_distinct(x). This is a big one (exact distinct count causes your query to run on a single Presto node. This function has a standard error of 2.3% (you can actually customize this with approx_distinct(x, e))
Athena doesn’t support DML (update/delete) statements. It also doesn’t support stored procedures or materialized views. In other words, you should think of Athena as a query engine that can only run only one SQL statement at a time.
In summary: We talked about what is called a Datalake. the data sits idle in the lake, available for you to query it. But we want to have a dashboard updated with the system metrics for which re-querying the entire Datalake doesn’t make much sense.
What’s next? we’ll be moving to ‘Data rivers’, which is a new emerging term. Instead of firehose we plan to stream data using Kafka which will create the Data lake described above (Spark’s structured streaming), but will also flow into DB called Apache Druid (incubating) in order to power the interactive realtime dashboard