Dynamically trim incoming data if it exceeds what specified in schema

Filipa
4 min readFeb 15, 2024

--

Copy S3 to Redshift — String length exceeds DDL length: the length of the data column ‘comment’ is longer than the length defined in the table. Table: 50, Data: 139.

Sucks right? Easy solution is to simply increase the length of the offensive column. But imagine that this table (say, all your leads) is created on a pipeline that runs at 1 AM and takes 3h to finish. On top, you have 5 more pipelines that are trigger only at the end of the leads pipeline. And you have yet other pipelines that generate important KPIs for your (now outdated) dashboards.

That sucks even more. Although the fix is easy (increase column’s VARCHAR length), now you have to wait 3h+ in order to have the fresh data arriving to your customers dashboards. In the meantime, it’s first of the month and the Sales team needed those updated numbers for the meeting at 10 AM (assuming you started working at 9h30). 1 point less for Data team :(

We also don’t want to set all our columns in Redshift to its max (65535), as this poses unnecessary database costs.

In my team, we struggled with the above for more than one year until we implemented a working fix.

Input data exceeds the acceptable range — photo by David Clode on Unsplash

The essential fix here is to create an automation. But you don’t want to automate a change of DDL on your production tables. Why? If you have SCD tables in your database (or you're thinking about doing it), a non-human DDL change might become an expensive error.

The solution outlined here works together with strict table schemas defined in YAML. Let’s assume we already have a customer_feedback table in a public database, with schema defined in a YAML file:

CREATE TABLE IF NOT EXISTS public.customer_feedback (
customer_id VARCHAR(3),
comment VARCHAR(50),
created_at TIMESTAMP WITH TIME ZONE)

Every day, data is dumped into s3://my-bucket/data/ and everyday, data is copy from S3 to Redshift in a Spark application (as part of our leads pipeline). For our case, the comment column comes from a free-text input field in the backend. From the data team side, we can’t control how many characters will be input by the customer. But we can control how many will be loaded into the Redshift table.

Below a step-by-step proposed solution, each step exemplified with a Spark or SQL snippet. I’ll share the full solution repository at the end.

  1. Try to COPY data from S3 to Redshift table
TRUNCATE public.customer_feedback
COPY public.customer_feedback FROM 's3://my-bucket/data/filename.parquet'
IAM_ROLE 'my_iam_role' FORMAT AS PARQUET SERIALIZETOJSON

2. In case of load error, read incoming data from S3 and find the maximum length (max_length) of each of the string columns in the Parquet files

import pyspark.sql.functions as F

df = spark.read.parquet(s3_path, pathGlobFilter="*.parquet")

df.show(truncate=False)
# Outuput:
# +-----------+--------------------+--------------------+
# |customer_id| comment| created_at|
# +-----------+--------------------+--------------------+
# | 9A7|A great spot for ...|2024-02-15 08:24:...|
# | 7H2|Wholesome delicio...|2024-02-15 08:24:...|
# +-----------+--------------------+--------------------+

# select only string columns. For these columns, compute their lengths
df_strings = df.select([F.length(col.name).alias(col.name)
for col in df.schema.fields
if isinstance(col.dataType, StringType)])

df_strings.show(truncate=False)
# Outuput:
# +-----------+-------+
# |customer_id|comment|
# +-----------+-------+
# |3 |139 |
# |3 |46 |
# +-----------+-------+


# calculate the max length for each of the selected string columns
# collect the results of the max calculation as a dictionary,
# where keys are the column names, and values the corresponding max lengths
max_len = df_strings.groupby().max().na.fill(value=0).first().asDict()

print(max_len)
# Outuput: {'max(customer_id)': 3, 'max(comment)': 139}

3. For each column, we’ll compare the max length found in S3 with the VARCHAR length defined in the YAML schema (len_yaml). If max_length> len_yaml, define a new length for the column, new_length = 2*max_length. As a safety net, we increase the length by a factor of 2 for the incoming data

len_df = max_len[f'max({col.name})']
if len_df > len_yaml:
new_len = str(len_df * 2)

5. Save in a dictionary the offensive column(s), its old and new lengths.

off_cols = []
off_cols.append({"column_name": col.name, "current_length": len_yaml, "incoming_length": len_df})

print(off_cols)
# Output:
# [{'column_name': 'comment', 'current_length': 50, 'incoming_length': 139}]

4. Create a temporay table from the schema of the production table.

CREATE TEMPORARY TABLE temp_customer_feedback AS SELECT * 
FROM public.customer_feedback LIMIT 0;

5. Alter temporary table with the new_length to accomodate the incoming data

ALTER TABLE temp_customer_feedback ALTER COLUMN comment TYPE VARCHAR(<new_len>);

6. Copy data from S3 into temporary table

COPY temp_customer_feedback FROM 's3://my-bucket/data/filename.parquet' 
IAM_ROLE 'my_iam_role' FORMAT AS PARQUET SERIALIZETOJSON;

7. And trim (SUBSTRING) data in offensive column. I like to apply a percentage (0.9 * 50 = 45) to deal with special characters in incoming data (that count as more bytes to the VARCHAR length)

UPDATE temp_customer_feedback comment SET comment = SUBSTRING(comment, 1, 45);

8. Finally, insert data from temporary into production table

TRUNCATE public.customer_feedback;
INSERT INTO public.customer_feedback SELECT * FROM temp_customer_feedback

9. (Bonus) notify that data was truncated in production table (e.g. Slack).

which_cols = ""
for d in off_cols:
prettified = (f"Column name: `{d['column_name']}`\nCurrent data length: {d['current_length']}\n"
f"Incoming data length: {d['incoming_length']}\n\n")
which_cols = which_cols + prettified

text = (f"Hello team!\n"
f"\N{Heavy Exclamation Mark Symbol} Some column(s) were truncated in table `{db_table}`.\n"
f"Consider increasing the length in schema for below columns to disable this notification:\n\n"
f"{which_cols}"
)

print(text)
# Output:
#
# Hello team!
# ❗ Some column(s) were truncated in table `public.customer_feedback`.
# Consider increasing the length in schema for below columns to disable this notification:
#
# Column name: `comment`
# Current data length: 50
# Incoming data length: 139

Find the full functionality here wrapped in method load_data_with_length_check, that you can use as python callable in your data pipelines. Happy data loading!

--

--

Filipa

Bite-sized posts documenting my journey as a Python Engineer in the startup realm.