Python Data Pipeline: From IBM DB2 to Google BigQuery

Alessandro Pizzini
8 min readOct 19, 2023

The mission? Seamless, automatic data transfer from IBM DB2 FlashCopy straight into Google BigQuery using only Python.

The aim was straightforward but profound — ensuring that our GCP BigQuery and IBM DB2 FlashCopy were in perfect harmony, synchronized to dance to the same beat of time.

In this article, we’ll journey through the process of crafting an efficient data pipeline, transitioning data from DB2 to Google BigQuery.

  • DB2 FlashCopy Extraction — Utilizing Python to extract significant data snapshots from IBM DB2 FlashCopy, guided by specific criteria.
  • BigQuery Data Transformation — Systematically adapting data through anonymization and type conversion to ensure compatibility.
  • Data Movement in BigQuery — Managing the data’s journey as it enters BigQuery and subsequently progresses through the DWH layers.

Python Environment Setup

For the success of this project, it’s paramount to initialize a Python 3 environment, specifically Python 3.11 or newer. The core modules instrumental for the pipeline and their respective uses are outlined below:

  • ibm_db : It enables the initiation of a secure connection, followed by the execution of SQL queries necessary for data extraction.
  • google-cloud-bigquery: It ensures that our transformed data seamlessly enters the Google ecosystem.
  • Pandas : It facilitates the intricate handling and transformation of data once it’s pulled from IBM DB2 DB.

pandas — Python Data Analysis Library (pydata.org)

pandas — data analysis and manipulation tool

Blueprints and Building Blocks — Data Pipeline Architecture

In building our data pipeline, we’ve set up a clear system architecture. Here’s how the components fit together to move and process data from IBM DB2 to Google Cloud BigQuery:

  1. IBM DB2 DB — Serves as the primary source of data.
  2. Google Cloud Big Query — Serve as the target DB for our data
  3. ETL Python Script DB2 to BigQuery — The lifeblood of our data transition process. This script ensures that data from the Flash Copy gracefully cascades into both the staging and landing datasets of BigQuery.
Data Flow Diagram

Our Project Methodology Unfolded — From Extraction to BigQuery Datasets Propagation

Charting our data journey isn’t simple, but we’ve found our way. Guided by IBM DB2 DB to access Flash Copy tables, targeting Google BQ for storage, and aided by Pandas for data wrangling, we’ve set a clear course.

  1. Snap and Grab from IBM DB2 FlashCopy: Think of this as our data gathering stage. Using a Python script, we peek into the schema of interested, grabbing snapshots of the data we need. Whether we take it all or just bits of it depends on what the mission demands (we collected data in DELTA or FULL mode).
  2. BigQuery’s Transformation Dance: Now, the gathered data gets a makeover. We ensure it plays nice with BigQuery’s standards by anonymizing it and converting its data type. Once it’s dressed to impress, up it goes into Google BigQuery, ready for its analytical debut.
  3. Data’s Journey Across the Datalake: The story doesn’t end with BigQuery. Our data continues its journey, moving gracefully through the different layers of our Data Lake. Here, it stands ready, waiting to offer insights to every corner of our business.

There you have it — the method behind our data magic. With each step, we ensure that every piece of data finds its rightful place, serving our analytics needs just right.

#1 Snap and Grab from IBM DB2 FlashCopy

Let’s dive into the heart of the operation: retrieving data from IBM DB2 FlashCopy.

With our configuration in hand, we move to establish a connection to our IBM DB2 FlashCopy. Using the ibm_db library and the details from our configuration file, we can seamlessly bridge our Python environment to the DB2 instance. This connection allows us to send queries, retrieve data, and manage our interactions with the database.

# Read the config file
with open("db_config.json", "r") as f:
config = json.load(f)

dsn = (
f"DRIVER={{IBM DB2 ODBC DRIVER}};"
f"DATABASE={config['dsn_db']};"
f"HOSTNAME={config['dsn_hostname']};"
f"PORT={config['dsn_port']};"
f"PROTOCOL=TCPIP;"
f"UID={config['dsn_uid']};"
f"PWD={config['dsn_pwd']};"
)

# Establish the connection
try:
db2_conn = ibm_db.connect(dsn, "", "")
print("Connected to the database!")
except:
print("Failed to connect to the database.")
exit(1)

Working with large datasets often means high memory usage, especially with in-memory tools like Pandas. For instance, a 200MB raw data file can consume over 800MB in memory when loaded as a dataframe due to Pandas’ internal operations.

The Chunking Approach: To mitigate this, we employ ‘chunking’, breaking the dataset into smaller sets for more efficient processing. The fetch_data_chunk function supports this strategy, fetching specified data chunks from the source database. This ensures the system memory remains stable, even with vast tables.

def fetch_data_chunk(offset, chunk_size, table_name, where_condition, columns_table):
"""
Fetches a chunk of data from the database table.

Parameters:
offset (int): The offset from where to start fetching rows.
chunk_size (int): The number of rows to fetch.
table_name (str): The name of the table from which to fetch.
where_condition (str): The WHERE condition for the SQL query.

Returns:
list: A list of dictionaries, each representing a row.
"""

query = f"""
SELECT {', '.join(columns_table)}
FROM {db2_schema_name}.{table_name}
{where_condition}
OFFSET {offset} ROWS
FETCH NEXT {chunk_size} ROWS ONLY
"""
stmt = ibm_db.exec_immediate(db2_conn, query)
rows = []
row = ibm_db.fetch_assoc(stmt)
while row:
rows.append(row)
row = ibm_db.fetch_assoc(stmt)

return rows

Next, we’ll dive into the fetching_table function and its role in data extraction. The data are fetched from source database table by table, and the table is queries n times based on the size of the offset.

def fetching_table(table_name, chunk_size, conn, db2_schema_name):
"""
Fetches data from a specified IBM DB2 source table in chunks and writes each chunk to GCP BigQuery.

Parameters:
row (dict): A dictionary containing row-level data, used to determine the table name
chunk_size (int): The number of rows to fetch in each chunk.
conn (object): The database connection object.
db2_schema_name (str): The schema name in the DB2 database.
"""

try:
offset = 0
while True:
where_condition = generate_where_condition(row)
schema_name = get_table_schema_from_db2(table_name)
columns_table = [column[0] for column in schema_name]

rows = fetch_data_chunk(offset, chunk_size, table_name, where_condition, columns_table)
if not rows:
print(f"There are no records to fetch for table {table_name}")
break

df_name = f"{table_name}_{offset // chunk_size}"
df = pd.DataFrame(rows)

memory_usage = df.memory_usage(deep=True).sum() / (1024 ** 2)
print(f"Fetched {len(df)} rows from {table_name}, offset {offset}, Memory Usage: from {df_name} : {memory_usage:.2f} MB")

df = apply_anonymization(df,table_name)
df = convert_datetime_columns_to_utc(df)


if offset == 0: truncate_bigquery_table(table_name)
write_to_bigquery(df, table_name)


except Exception as e:
print(f"Skipping table {table_name}. Could not fetch data due to error: {e}")

#2 BigQuery’s Transformation Dance

Data transformation is an intricate dance, choreographed to the rhythm of BigQuery’s expectations. As data moves from source to destination, it’s crucial to ensure it harmonizes with the structure, datatype, and standards of the target environment. Let’s step into this dance and explore how we’ve tailored our data for BigQuery’s dancefloor.

check_schema_mismatching function is the vigilant stage manager, quickly spotting any schema mismatches between DB2 and BQ. If DB2 is flaunting additional columns, this function seamlessly integrates them into BQ's performance, ensuring the show goes on without a hitch.

def check_schemas_mismatching(table_name):
"""
Checks for schema mismatches between a DB2 table and a BigQuery table, and handles missing columns.

Parameters:
table_name (str): The name of the table to compare and synchronize between DB2 and BigQuery.
"""

# Rest of your function code here...


db2 = get_table_schema_from_db2(table_name)
columns_db2 = set([col[0] for col in db2])

load_process = get_process_mode(table_name)
dataset_name, target_table_name = get_target_table_and_dataset(load_process, organization, table_name)

bq = get_table_schema_from_bq(dataset_name, target_table_name)
columns_bq = set([col.name for col in bq])

missing_columns = set([col[0] for col in db2]) - set([col.name for col in bq])

if missing_columns: # Check if the set is not empty
converted_datatypes = {}
for col, data_type in db2:
if col in missing_columns:
new_col, new_data_type = transpile_datatype(col, data_type)
converted_datatypes[new_col] = new_data_type

print("Converted data types of missing columns: ", converted_datatypes)

for column_name, column_type in converted_datatypes.items():
alter_table_add_column(dataset_name, target_table_name, column_name, column_type)

Before our data even begins its cloud performance, we pull back the curtains and get it prepped, ensuring it’s anonymized. This is where the anonymization_table in BigQuery steps in, like a choreographer guiding each data point to maintain its essence while veiling its identity.

def apply_anonymization(df, table_name):
"""
Applies data anonymization rules to specific columns of a DataFrame based on a configuration.

Parameters:
df (pd.DataFrame): The DataFrame containing the data to be anonymized.
table_name (str): The name of the table for which anonymization rules are applied.

Returns:
pd.DataFrame: The DataFrame with selected columns anonymized according to the specified rules.
"""

full_table_name = f'CCMS_{organization}_{table_name}'
condition_df = query_configuration_results['MASKING_TABLE']

# Find matching rows in the condition DataFrame
matching_rows = condition_df[condition_df['TABLE_LND_NAME'] == full_table_name]

for index, row in matching_rows.iterrows():
field_datatype = row['FIELD_DATATYPE'] # New line to get the datatype
field_name = row['FIELD_NAME']
logic = row['LOGIC']

# Apply logic based on the type of logic you have
if field_name in df.columns:
print(f"Anonymize {field_name}")
if field_datatype == 'STRING':
df[field_name] = logic
elif field_datatype == 'DATE': # Replace this with your actual anonymization logic
if 'YEAR' in logic.upper():
df[field_name] = df[field_name].astype(str).str.extract(r'(\d{4})')[0] + '-01-01' # Extract year and set month and day to '01'


clear_memory(matching_rows,f"matching_rows_{table_name}")
return df

Enter our write_to_bigquery function, the unsung hero that takes our prepped data and ensures it's comfortably nested within BigQuery.

Thanks to BigQuery’s LoadJobConfig, most of the heavy lifting, in terms of data type definitions and transformations, is handled effortlessly. The code checks if the table already contains records, and depending on its state, it appends or creates new entries. This is possible because our target tables in BigQuery are pre-defined, allowing LoadJobConfig to intelligently adapt the data, if transformations are needed at all.

def write_to_bigquery(df, table_name):
"""
Writes a DataFrame to a Google BigQuery table.

Parameters:
df (DataFrame): The DataFrame to write to BigQuery.
table_name (str): The BigQuery table name.
"""

load_process = get_process_mode(table_name)
dataset_name, target_table_name = get_target_table_and_dataset(load_process, organization, table_name)

# Specify the BigQuery Dataset and Table
table_id = f"{bq_client.project}.{dataset_name}.{target_table_name}"


# Check if there are existing records in the table
existing_records = int(bq_client.query(f"SELECT COUNT(*) FROM {table_id}").result().to_dataframe().iloc[0, 0])
write_disposition = 'WRITE_EMPTY' if existing_records == 0 else 'WRITE_APPEND'

# Write the DataFrame to BigQuery
job_config = bigquery.LoadJobConfig(
schema = get_table_schema_from_bq(dataset_name, target_table_name),
autodetect=False, # schema is inherited
source_format = bigquery.SourceFormat.CSV,
# skip_leading_rows=1, # Skip the header row.
# allow_jagged_rows=True, # Allow missing values in delimited files
allow_quoted_newlines = True,
quote_character = '"',
write_disposition = write_disposition )
job = bq_client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result() # Wait for the job to complete

print(f"Written {len(df)} rows to table {table_id}")

#3 Data’s Journey Across the Datalake

When it comes to the datawarehouse, one size doesn’t fit all. The setup of layers (datasets) can vary based on structure and requirements. While we won’t delve deep into the intricate details here, it’s worth noting that our data’s orchestration across layers is driven by whether we’re reading in full or delta mode.

Conclusion

In wrapping up, we’ve detailed our data pipeline’s journey, from IBM DB2 FlashCopy extraction to its storage in Google BigQuery. Opting for a pipeline built in pure Python has granted us remarkable flexibility and advantages, allowing for more tailored adjustments and optimizations.

Dive Deeper

Intrigued by our data dance? Dive deeper into the depths of our analysis in my Google Colab Notebook where I have all the

Full Code in Google Colab Notebook

If you have any thoughts, questions, or have embarked on a similar journey, feel free to connect or drop a comment below. The discourse around data pipelines Cloud Migration is as enriching as the data itself, and I’m eager to learn from and engage with this community. Let’s continue to unravel the stories our data shares!

Link for full code → Google Colab Notebook

--

--