Data Lakehousing in AWS

George Pongracz
SEEK blog
Published in
7 min readSep 27, 2020

Update: Online Talk How SEEK “Lakehouses” in AWS at Data Engineering AU Meetup

I am a Senior Data Engineer in the Enterprise DataOps Team at SEEK in Melbourne, Australia. My colleagues and I, develop for and maintain a Redshift Data Warehouse and S3 Data Lake using Apache Spark.

Back in December of 2019, Databricks added manifest file generation to their open source (OSS) variant of Delta Lake. This made it possible to use OSS Delta Lake files in S3 with Amazon Redshift Spectrum or Amazon Athena.

Delta Lake is an open source columnar storage layer based on the Parquet file format. It provides ACID transactions and simplifies and facilitates the development of incremental data pipelines over cloud object stores like Amazon S3, beyond what is offered by Parquet whilst also providing schema evolution of tables.

At around the same period that Databricks was open-sourcing manifest capability, we started the migration of our ETL logic from EMR to our new serverless data processing platform. This included the reconfiguration of our S3 data lake to enable incremental data processing using OSS Delta Lake.

We decided to use AWS Batch for our serverless data platform and Apache Airflow on Amazon Elastic Container Services (ECS) for its orchestration. AWS Batch is significantly more straight-forward to setup and use than Kubernetes and is ideal for these types of workloads.

AWS Batch enables you to spin up a virtually unlimited number of simultaneous EC2 instances for ETL jobs to process data for the few minutes each job requires. It then automatically shuts them down once the job is completed or recycles it for the next job.

We found start-up to take about one minute the first time an instance runs a job and then only a few seconds to recycle for subsequent jobs as the docker image is cached on the instances. This makes for very fast parallel ETL processing of jobs, each of which can span one or more machines.

In September 2020, Databricks published an excellent post on their blog titled Transform Your AWS Data Lake using Databricks Delta and the AWS Glue Data Catalog Service. Then, a few days later, on September 25, AWS announced Amazon Redshift Spectrum native integration with Delta Lake.
This has simplified the required integration method. The Amazon Redshift documentation describes this integration at Redshift Docs: External Tables

As part of our CRM platform enhancements, we took the opportunity to rethink our CRM pipeline to deliver the following outcomes to our customers:

  • Reduce the time required to deliver new features to production
    (thought to capability)
  • Increase the load frequency of CRM data to Redshift from overnight to hourly
  • Enable schema evolution of tables in Redshift
  • Pipeline to be Continuously Deployed.

As part of this development, we built a PySpark Redshift Spectrum NoLoader. This NoLoader enables us to incrementally load all 270+ CRM tables into Amazon Redshift within 5–10 minutes per run elapsed for all objects whilst also delivering schema evolution with data strongly typed through the entirety of the pipeline.

PySpark Redshift NoLoader

Redshift Spectrum Delta Lake Logic

Prework

Make sure you have configured the Redshift Spectrum prerequisites creating the AWS Glue Data Catalogue, an external schema in Redshift and the necessary rights in IAM.
Redshift Docs: Getting Started

To enable schema evolution whilst merging, set the Spark property:
spark.databricks.delta.schema.autoMerge.enabled = true
Delta Lake Docs: Automatic Schema Evolution

Then use the following logic:

The following python code snippets and documentation correspond to the above numbered points in blue:

1 Check if the Delta table exists
delta_exists = DeltaTable.isDeltaTable(spark, s3_delta_destination)

Delta Lake Docs: IsDeltaTable

2 Get the existing schema
delta_df = spark.read.format(“delta”) \
.load(s3_delta_location) \
.limit(0)
schema_str = delta_df \
.select(sorted(existing_delta_df.columns)) \
.schema.simpleString()

Spark PySpark Docs: simpleString

3 Merge
delta_table = DeltaTable.forPath(spark, s3_delta_destination)
delta_table.alias(“existing”) \
.merge(latest_df.alias(“updates”), join_sql) \
.whenNotMatchedInsertAll() \
.whenMatchedUpdateAll() \
.execute()

Delta Lake Docs: Conditional update without overwrite

4 Create Delta Lake table
latest_df.write.format(‘delta’) \
.mode(“append”) \
.save(s3_delta_destination)

Delta Lake Docs: Create a Table

5 Drop if Exists
spectrum_delta_drop_ddl = f’DROP TABLE IF EXISTS {redshift_external_schema}.{redshift_external_table}’

6 Create External Table
CREATE EXTERNAL TABLE tbl_name (columns)
ROW FORMAT SERDE ‘org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe’
STORED AS
INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat’
OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat’
LOCATION ‘s3://s3-bucket/prefix/_symlink_format_manifest’

Redshift Docs: CREATE EXTERNAL TABLE

7 Generate Manifest
delta_table = DeltaTable.forPath(spark, s3_delta_destination)
delta_table.generate(“symlink_format_manifest”)

Delta Lake Docs: Generate Manifest using Spark

The logic shown above will work either for both Amazon Redshift Spectrum or Amazon Athena.

The DDL for steps 5 and 6 can be injected into Amazon Redshift via jdbc using the python library psycopg2 or into Amazon Athena via the python library PyAthena.

Extending OSS Delta Lake with Redshift

The open source version of Delta Lake lacks some of the advanced features that are available in its commercial variant.

Caching & Data Layout Optimisation

The use of Amazon Redshift offers some additional capabilities beyond that of Amazon Athena through the use of Materialized Views.

Materialized Views can be leveraged to cache the Redshift Spectrum Delta tables and accelerate queries, performing at the same level as internal Redshift tables. Materialised views refresh faster than CTAS or loads.
Redshift Docs: Create Materialized View

Redshift sort keys can be used to similar effect as the Databricks Z-Order function.
Redshift Docs: Choosing Sort Keys

Redshift Distribution Styles can be used to optimise data layout. This technique allows you to manage a single Delta Lake dimension file but have multiple copies of it in Redshift using multiple materialized views, with distribution strategies tuned to the needs of the the star schema that it is associated with.
Redshift Docs: Choosing a Distribution Style

Delta File Data Layout Optimisation

Delta Lake files will undergo fragmentation from Insert, Delete, Update and Merge (DML) actions. Just like parquet, it is important that they be defragmented on a regular basis, to optimise their performance, which should be done regularly.

The open source version of Delta Lake currently lacks the OPTIMIZE function but does provide the dataChange method which repartitions Delta Lake files.

The one input it requires is the number of partitions, for which we use the following aws cli command to return the the size of the delta Lake file.
eg something like:

aws s3 ls --summarize --recursive "s3://<<s3_delta_path>>" | grep "Total Size" | cut -b 16-

Spark likes file subpart sizes to be a minimum of 128MB for splitting up to 1GB in size, so the target number of partitions for repartition should be calculated based on the total size of the files that are found in the Delta Lake manifest file (which will exclude the tombstoned ones no longer in use).
Databricks Blog: Delta Lake Transaction Log

We found the compression rate of the default snappy codec used in Delta lake, to be about 80% with our data, so we multiply the files sizes by 5 and then divide by 128MB to get the number of partitions to specify for the compaction.
Delta Lake Documentation: Compaction

Once the compaction is completed it is a good time to VACUUM the Delta Lake files, which by default will hard delete any tomb-stoned files that are over one week old.
Delta Lake Documentation: Vacuum

Some Advice

Spectrum DDL

It is important to specify each field in the DDL for spectrum tables and not use “SELECT *”, which would introduce instabilities on schema evolution as Delta Lake is a columnar data store. When the schemas evolved, we found it better to drop and recreate the spectrum tables, rather than altering them.

This is important for any materialized views that might sit over the spectrum tables. If the spectrum tables were not updated to the new schema, they would still remain stable with this method.

Materialized View DDL

As tempting as it is to use “SELECT *” in the DDL for materialized views over spectrum tables, it is better to specify the fields in the DDL. We found it much better to drop and recreate the materialized views if the schema evolved.

If the fields are specified in the DDL of the materialized view, it can continue to be refreshed, albeit without any schema evolution.

This is preferable however to the situation whereby the materialized view might fail on refresh when schemas evolve.

Thank you

Databricks

I would like to thank Databricks for open-sourcing Delta Lake and the rich documentation and support for the open-source community.

AWS

I would like to thank the AWS Redshift Team for their help in delivering materialized view capability for Redshift Spectrum and native integration for Delta Lake.

I would also like to call out Mary Law, Proactive Specialist, Analytics, AWS for her help and support and her deep insights and suggestions with Redshift.

SEEK Enterprise DataOps Team

I would like to thank my fellow Senior Data Engineer Doug Ivey for his partnership in the development of our AWS Batch Serverless Data Processing Platform.

I would also like to call out our team lead, Shane Williams for creating a team and an environment, where achieving flow has been possible even during these testing times and my colleagues Santo Vasile and Jane Crofts for their support.

SEEK is a great place to work ❤️

--

--

George Pongracz
SEEK blog

Not affiliated with any vendor nor influenced by any commercial relationships, I write about what I develop and live with in production as an AWS Data Engineer.