Incremental Loading into Redshift from S3 (Using Matillion ETL)

Daniel Voyce
Jul 27, 2017 · 8 min read
Matillion ETL is a fantasticly intuitive ETL tool

This article is the first of a planned series documenting some annoyances / revelations with Big Data, Data Warehousing and OLAP databases

Lets face it, unless it is a simple re-mapping of columns, ETL sucks. Having spent a couple of weeks trying to DIY our ETL process through scripts, AWS Data Pipelines (yuk) and various other “FREE”* methods, I had to bite the bullet and sign up for an ETL platform; the one that looked the most intuitive to me was Matillion and so far it has been a complete godsend, I have achieved more in a single day than I did in an entire week with other tools.

* I say FREE, as in these things don’t cost much other than your time but it is actually a false economy, I guarantee you will spend more time managing and debugging your pipelines if you try and do it manually than it would cost you in man hours to use a tool like this.

A couple of things I found frustrating with every ETL tool I evaluated was that none of them seem to have an easy way to incrementally load data from S3 into Redshift. Even Matillion with its Table Iterator functionality (which works so well when incrementally loading from RDS) doesn’t have a native component to do this and doesn’t have the ability for the table iterator to be used when querying Redshift itself.

Normally the recommended method for handling this is to “Simply re-load everything”, this makes sense as Redshift is super fast at importing multiple files in parallel from S3, however when you are talking about billions of rows, this import comes in at over 2 hours and will quickly become expensive through data transfer and hours for Matillion.

Fortunately Matillion provides some tools that will help smooth this process out, and with a bit of Python-Fu you can have a robust and idempotent method to load data incrementally into Redshift.

This article makes a few assumptions:

  1. It assumes your data gets written into a new file / files for a specific time frame (e.g. our system dumps its data every 24 hours into a set of files — stored in a folder)
  2. It assumes that you aren’t having data inserted into files that have previously been inserted (e.g. you aren’t trying to add data to a file from 3 weeks ago that was already imported)

TL;DR

See the diagram at the top: Basically this process uses the Python script component Matillion offers, does a look up on the Redshift table you are loading into to get the “High Tide” mark, then gets a list of files that are newer than that timestamp, stages them to another S3 location, loads these and then cleans up — simple right? It sounds it and for the most part it is, there are a few things to be aware of though.

The full process:

Python does most of the heavy lifting here, don’t worry if you aren’t a Python developer, it is a very easy and readable language, but like all others it has a few gotchas, especially around date processing and date type juggling (which we will rely on pretty heavily here).

Variables in Matillion

Matillion relies heavily on the use of Environment variables for keeping a global scope of things outside of each component, not all components have the ability to set variables and there are cases where you can’t easily do what you want which is why I chose to use Python for this. In the Python component Matillion automatically scopes and defines your variables so you can call them by just referencing the variable name.

The First Python Component:

This component does the majority of the heavy lifting, it goes off to Redshift and gets the latest Timestamp and populates an Environment variable with it.

It then filters any files from your input S3 bucket which have a modified date AFTER this latest timestamp and copies them to a staging folder

In order for this to work you will need to set up 3 environment variables before the you can use your script, Add these in Project > Manage Environment Variables and set your defaults for the S3 buckets, leave BulkLastTimestamp empty as it will try and validate what you put in there and then Python gets pissy if you try and convert and compare between date, datetime and timestamp types.

  • main_s3_bucket — the bucket name of your data store
  • main_s3_prefix — the path your files are held in in the above bucket
  • staging_bucket — the bucket you want to stage the files to for later loading

The Python Script itself — the comments inline should help explain what is happening here:

import boto3
from datetime import datetime

# get the "high tide" mark from the database
cursor = context.cursor()
cursor.execute('select max(utc_timestamp) from stg_poi')

# should only ever be one result, get first item in Tuple
result = cursor.fetchone()[0]

# set global variable BulkLastTimestamp in Matillion
context.updateVariable("BulkLastTimestamp", result)

# set up S3 access
s3 = boto3.resource('s3')
client = boto3.client('s3')

# create a collection of all of the objects in the bucket
objects = client.list_objects(Bucket=main_s3_bucket, Prefix=main_s3_prefix)

# define the format our date is in the database so it can parse it
date_format = "%Y-%m-%d %H:%M:%S.%f"

# generate a timestamp from the date coming from the database
max_timestamp = datetime.strptime(str(BulkLastTimestamp), date_format)

# we aren't interested in the time part at present
# as our data goes from 00:00:00 to 23:59:59
just_date = max_timestamp.date()

# set the bucket we are going to stage this new data in
staging = s3.Bucket(staging_bucket)

# iterate through the object list and cherry pick what we need
for object in objects['Contents']:

# get the modified date of the S3 Object
file_modified = object['LastModified'].replace(tzinfo=None)

# we only want files that were modified after the last date
# in the database - cast as date so we can compare
if (file_modified.date() > just_date):

# set up new destination
copy_source = {
'Bucket': main_s3_bucket,
'Key': object['Key']
}

# copy the files to the staging bucket
staging.copy(copy_source, 'staging/' + str(just_date) + '/' + object['Key']

Update: June 2018

The above script is fine for a bucket with a small number of keys in there, however we recently had to completely re-run our import process across a lot of new data, it turns out the script above has a hard limit set of 1000 keys, if you have more than that then you need to paginate through the results in order to get more than 1000 results:

import boto3
from datetime import datetime
from datetime import timedelta
# Set up S3 access
s3 = boto3.resource('s3')
client = boto3.client('s3')
def get_all_s3_keys(bucket, prefix):
"""Get a list of all keys in an S3 bucket."""
keys = []
kwargs = {'Bucket': bucket, 'Prefix': prefix}
while True:
resp = client.list_objects_v2(**kwargs)
for obj in resp['Contents']:
keys.append(obj)
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break
return keys

# get the "high tide" mark from the database
cursor = context.cursor()
cursor.execute('select max(utc_timestamp) from stg_poi')
result = cursor.fetchone()[0] # Should only ever be one result, get first item in Tuple
# define the format our date is in the database
date_format = "%Y-%m-%d %H:%M:%S.%f"
# default variable value will handle null result
if result is not None:
# Set global variable in Matillion
context.updateVariable("BulkLastTimestamp", result)
objects = get_all_s3_keys(main_s3_bucket, main_s3_prefix)# generate a timestamp from the date coming from the database
max_timestamp = datetime.strptime(str(BulkLastTimestamp), date_format)
# we aren't interested in the time part at present as our data goes from 00:00:00 to 23:59:59
just_date = max_timestamp.date()
# set the bucket we are going to stage this new data in
staging = s3.Bucket(staging_bucket)
# Iterate through the object list and cherry pick what we need
for object in objects:
file_modified = object['LastModified'].replace(tzinfo=None)


# we only want files that were modified after the last date in the database (+1 day)
if (file_modified.date() > just_date):
# Set up new destination
copy_source = {
'Bucket': main_s3_bucket,
'Key': object['Key']
}
# Copy the files to the staging bucket
staging.copy(copy_source, 'staging/' + str(just_date) + '/' + object['Key'])

Loading this data into Redshift

Using an S3 load component, set it up to point to the folder you have just staged these files in, the script above has ‘staging’ hard coded into it but you can change it for whatever you want or have this handled by a variable.

Ensure you set the same options that would normally use to load this data.

I usually go for pretty open load options, our data sometimes changes and there is nothing worse than having to re-create components each time this happens.

Finally — Do some cleaning up

Everyone loves a clean and tidy workspace so unless you have a specific reason not to then delete the files in our new staging area, the originals will be preserved in your main data bucket in case you want to manually re-run some of them.

Should the previous processes fail for whatever reason this will not be run giving you a snapshot of the point of where it went wrong.

The clean-up script:

import boto3
from datetime import datetime

# set up S3 access
s3 = boto3.resource('s3')
client = boto3.client('s3')

date_format = "%Y-%m-%d %H:%M:%S.%f"

# generate a timestamp from the date coming from the database
max_timestamp = datetime.strptime(str(BulkLastTimestamp), date_format)

# we aren't interested in the time part at present
just_date = max_timestamp.date()

staging = s3.Bucket('staging_bucket')

# iterate through and delete
for obj in staging.objects.filter(Prefix='staging/' + str(just_date)):
s3.Object(staging.name, obj.key).delete()

And that is it, each time the script is run it will update the BulkLastTimestamp item to only fetch files that are newer than this, if you want to re-load a bulk load of data you can simply delete that date range in Redshift and then it will re-load only the dates following that.

I hope this has helped someone out, it’s easy to get so land locked in a tool like this that you expect there to be a component that would magically do this for you, unfortunately sometimes you need a bit of manual grunt applying in order to get stuff done.

Daniel Voyce

Written by

CTO of LOCALLY, General Nerd. I love new technology, Data geek, Blockchain Hero, Linux Buff and all round evangelist for Open Source

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade