Update your Glue Catalogue in real time without using Crawlers!

Abhisek Roy
Credit Saison (India)
5 min readJun 1, 2023
Fig: Time is of the essence when updating a data lake

Time is of the essence when creating/updating a data lake. It’s even more critical when you need to update your schema periodically. For companies that have thousands of tables in the data lake, it may be difficult to keep track of all the schema changes (such as column additions, or new table creations) that are happening every day.

When schema changes don’t get updated in time (using a crawler or manually), those using the data downstream may get affected. We may catch schema changes after days or even weeks. Also, having to update the tables by running the crawler comes with its own overhead–

a. The crawler takes time to run. This time would increase exponentially as you keep adding more tables/schemas. So, even if you add a single new table, the crawler may take an hour to run and update all the existing tables.

b. The crawler will generate tables in a certain format. If you want to use advanced properties like partition projection, you will need to update the already created tables.

c. The crawler is an extra piece of infrastructure that needs to be created and maintained.

Thankfully, AWS has come up with AWS Wrangler, an AWS SDK for Python, which is built on top of popular libraries like Pandas, Apache Arrow and Boto3. It allows easier interactions with AWS resources such as S3, Redshift, MySQL, PostgreSQL, EMR and more.

Changing how you save data in AWS S3

Whether you ask ChatGPT or StackOverflow, the most common way of writing to an S3 bucket is using boto3, by creating a resource or a client. Both are pretty similar as clear from the example below–

import boto3

binary_data_a = b'Some data'
binary_data_b = b'Some more data'

# Option 1: object.put()
s3 = boto3.resource('s3')
object = s3.Object('bucket-name', 'object/key/with/filename.extension')
object.put(Body=binary_data_a)

# Option 2: client.put_object()
client = boto3.client('s3')
client.put_object(Body=binary_data_b, Bucket='bucket-name', Key='object/key/with/filename.extension')

There’s not much difference between the two options shown above and, irrespective of the one you choose, you will need to run the Glue Crawler in case there is a schema change.

The code that you can use instead of the above to create and update your Glue tables automatically is:


import awswrangler as wr

wr.s3.to_parquet(
df=df,
path='s3://' + BUCKET_NAME + '/' + file_key + '/',
compression=None,
dataset=True,
partition_cols=['file_date'],
mode='append',
catalog_versioning=False,
schema_evolution=True,
database=GLUE_DATABASE_NAME,
table=GLUE_TABLE_NAME,
filename_prefix=FILE_PREFIX,
regular_partitions=False,
projection_enabled=True,
use_threads=True,
projection_types={'file_date': 'date'},
projection_ranges={'file_date': '2022-05-21,NOW'},
projection_intervals={'file_date': '1'},
projection_formats={'file_date': 'yyyy-MM-dd'},
)

Let’s understand how this works parameter by parameter. The first parameter is dfor data frame. So irrespective of your data source, a JSON, a CSV, or an XML, you will need to convert the data to a data frame first.

Next is the path. You will need to give the path to the table name folder and not the partition folder. Let’s understand this with an example. If you want the file to be saved with a file key such as–

s3://bucket_name/payment_data/file_date=2023–05–25/payment_data8015a566b3570.parquet

You just need to pass s3://bucket_name/payment_data/ in the path parameter.

For compression, you can choose one of- None, snappy, gzip or zstd. Next, for dataset you need to set it to True, since we are saving data we want to query later on and not just random files. Once this is set to True, you will be able to set values of other parameters like partition_cols, projection_params, schema_evolution and more.

For partition_cols, you need to pass an array of columns that are to be used for partitioning. Ensure that you actually have these columns in your data frame. For instance, if you want to use ‘time_stamp’ as a partition, ensure that you have a ‘time_stamp’ column in your data first. Now, in case you add ‘time_stamp’ as a partition column, when storing your data in the s3 bucket, a folder will get created in the format time_stamp=yyyy-mm-dd and the column will be removed from the parquet file itself.

For mode, the most common functionality is ‘append’, which will add new data as and when it comes, whereas ‘overwrite’ deletes everything in the target directory when adding new files and ‘overwrite_partitions’ will delete files in a particular partition before writing new files.

If catalog_versioningis set to True and mode is set to overwrite, a version of the table-catalog is archived before a new one is created. The value of schema_evolution is True by default and ensures that your table catalog is updated in scenarios such as addition of a new column.

For database, and table, you will need to pass the glue database and glue table names that you want your data to be saved under. To have each of your parquet files begin with a particular keyword, you can pass a value in filename_prefix. We usually set it as the table name.

You need to set use_threads to True if you want to use os.cpu_count() to find the max number of threads available and use all of them for writing your parquet files. You can also set it as False to disable multi-threading or a specific integer to use a constant number of threads at all times.

In case you want to use partition projections, you will need to set regular_partitions as False and projection_enabled as True. Their default values are just the reverse.

You will also need to specify the parameters related to partition projection. First you need to specify the projection_types for all the partitions, then the projection_ranges, the projection_intervals and the projection_formats. Based on the type of partition, you may also need to set projection_valuesand projection_digits. The rules that are followed here are the same as when you define the partitions for Glue Tables in AWS. Hence, the official documentation still holds good.

A Working Example

Grasping a problem statement is always simpler when you have a working example. For our use case, we wanted to save data in a bucket called ‘my-bucket’, in a table/folder called ‘user_data’. We wanted to partition it by a column called time_stamp. The time_stamp column would be a date type column starting from the date 2022–05–21 and must be in YYYY-MM-DD format. We will be creating/updating the table in a database called my_database.

wr.s3.to_parquet(
df=df,
path='s3://my-bucket/user_data/',
compression=None,
dataset=True,
partition_cols=['time_stamp'],
mode='append',
catalog_versioning=False,
schema_evolution=True,
database='my_database',
table='user_data',
filename_prefix='user_data',
regular_partitions=False,
projection_enabled=True,
use_threads=True,
projection_types={'time_stamp': 'date'},
projection_ranges={'time_stamp': '2022-05-21,NOW'},
projection_intervals={'time_stamp': '1'},
projection_formats={'time_stamp': 'yyyy-MM-dd'},
)

When the code runs, it saves my file in locations like this–

s3://my-bucket/user_data/time_stamp=2023–05–25/user_data8015a566b3570.parquet

The path until the user_data remains the same, then the time_stamp is set based on the value of the column. The last file name (user_data8015a566b3570.parquet) is also auto-generated.

Every time my code runs, it matches the data frame that I am trying to save with the data catalog that I have in Glue and makes updates as and when required. This enables queries running via Athena to always fetch the latest schema.

So go on, automate more parts of your data lake and enable your team to democratize data across your organization!

--

--