Marius Eftene
METRO SYSTEMS Romania
14 min readJul 22, 2019

--

Dynamic data transfer between Teradata and BigQuery

Written by Costin Croitoru, Dan Berbece & Marius Eftene

In this article, we’ll share a migration solution that takes data from Teradata Databases, transfers it to BigQuery and creates a Data Lake on Google Cloud Platform.

We were at the beginning of switching to Google Cloud Platform from on premise solutions and we needed to find a method for transferring data from Teradata to GCP BigQuery.

After some careful considerations, the solution we came up with is to export data from a Teradata databases to Google Cloud Storage using Teradata Parallel Transporter and afterwards load the data into BigQuery using Google Cloud Composer.

This is generally accomplished in two stages. The first stage is to create a list of source tables to pull and mark them as full or incremental loads. In case of incremental load it is necessary to set up what columns should be used as the incremental key.

Once we have a list of tables, the next step is to move data from Teradata to GCS. The final stage involves moving data from GCS to Google BigQuery.

Teradata TPT & Google gsutil are used for running the migration pipelines that move data from Teradata to GCS. This process exports data from tables according to a list of tables and stores the data in a bucket on GCS as csv. A second process group, GCS2BigQuery, moves the data from GCS to Google BigQuery.

The following sections describe the logic in our data migration pipeline from Teradata to Google BigQuery.

Part 1: Teradata to Google Cloud Storage

Define tables list that needs to be exported

This process involved the following — determine if the table will be transferred full or incremental.

In case the table will be transferred incremental it is necessary to define the incremental columns that will be used (max two columns for one table — insert_id and update_id) and a partition column at day level.

The incremental mode is done bases on insert_id/update_id but at day level.

The lists of tables that will be exported are stored in config table.

Master data tables export which is simpler:

· No backward and forward compatibility.

· Tables are exported from Teradata full each time

· In BigQuery data is loaded with write_truncate.

· Export from Teradata is done using TPT export.

Movement tables export with backward and forward compatibility.

This is done by using a DDL history table. If the table was not exported before an initial export will be done and a DDL image will be saved as starting point.

The initial load export is used also for master data tables or tables exported in full mode, but the structure for those tables is overwritten for each export.

See below script used to populate ddl_hist table:

SELECT DISTINCT 
LOWER(TRIM(c.databasename) ) databasename,
LOWER(TRIM(c.TABLENAME) )TABLENAME,
LOWER(TRIM(c.COLUMNNAME) )COLUMNNAME,
CASE
WHEN TRIM(UPPER(c.COLUMNTYPE)) = 'BO' THEN 'Record'
.....
WHEN TRIM(UPPER(c.COLUMNTYPE)) = '.... ' THEN 'BQ dataty equivalent'
END DataType,
CASE WHEN TRIM(UPPER(c.COLUMNTYPE)) IN ('BV','CF','CV') and c.chartype=1 THEN '('||TRIM(c.COLUMNLENGTH)||') '
.....
calculate column length based on COLUMNTYPE, COLUMNLENGTH, DECIMALTOTALDIGITS and decimalfractionaldigits
ELSE ''
END DataSize ,
CASE
WHEN TRIM(UPPER(c.NULLABLE))='N' THEN 'NOT NULL'
ELSE ''
END AS NullInd
, c.defaultvalue ,
dense_rank() OVER (PARTITION BY 1 order by c.columnid asc ) columnid ,
c.chartype,
c.COLUMNLENGTH,
c.DECIMALTOTALDIGITS,
c.DECIMALFRACTIONALDIGITS ,
0 dropp,
1,
${last_export_timestamp_id} as last_export_timestamp_id
FROM DBC.COLUMNSv c
INNER JOIN DBC.TABLESv t
ON c.TABLENAME = t.TABLENAME
AND c.DATABASENAME = t.DATABASENAME
join (
select TABLENAME,DATABASENAME, max(columnid) max_column_id, min(columnid) min_column_id
from DBC.COLUMNSv
where LOWER(TRIM(TABLENAME) ) = 'table_name'
AND LOWER(databasename) ='database_name'
group by DATABASENAME, TABLENAME
) c2
ON c2.TABLENAME = t.TABLENAME
AND c2.DATABASENAME = t.DATABASENAME
where LOWER(TRIM(t.TABLENAME) ) = 'table_name'
AND LOWER(t.databasename) ='database_name'
AND TRIM(UPPER(t.TABLEKIND)) in ('T' ,'O')
and not exists (
select 1
from teradata_db.ddl_hist_table c3
where c3.TABLENAME = t.TABLENAME
AND c3.DATABASENAME = t.DATABASENAME
)
;

In case export table was already inserted in DDL history table check if there are ddl changes. In case there are columns that were removed from Teradata, dropped_column_ind will be set to 1. That column will be exported with default value that is set in “DefaultValue” column.

In case a dropped column is added again in Teradata table, it will be exported in the same position with the new values (not with default anymore).

dropped_column_ind will be set to 0

In case new columns are added the new column will always be added at the end of the exporting file. This will allow us to update the BQ table structure and import the new added columns without manual intervention.

I use the same script as in “initial load” with the following changes:

  • columnid is calculated as follows:
max_column_id+dense_rank() OVER (PARTITION BY 1 order by  c.columnid  asc     ) columnid  ,
  • tablename and columnname should be in:
and    ( c.TABLENAME,c.COLUMNNAME) in (
select TABLENAME, COLUMNNAME from DBC.COLUMNSv t
where LOWER(TRIM(t.TABLENAME) ) = 'table_name'
AND LOWER(t.databasename) ='database_name'
minus
select TABLENAME, COLUMNNAME from teradata_db.ddl_hist_table t
where LOWER(TRIM(t.TABLENAME) ) = 'table_name'
AND LOWER(t.databasename) ='database_name'
)
  • initial load_ind will be set to 0

Delta algorithm

We are using an export_history_table that keeps necessary info to be able to export in incremental mode. It supports max two incremental columns per table.

This table contains all the information related to all historical exports that were executed from Teradata:

· table name

· partition_date

· insert_id

· update_id

· last_export_timestamp_id — export job unique ID

Export cross check info

This step will be executed only once per table per export. Crosscheck step is split in two:

· delta export — for delta export, the crosscheck file contains the no of rows for each partition

· Full data export — one row for each table containing the no of rows that are in Teradata.

Crosscheck info is used later on to ensure that the same no of rows exported from Teradata were loaded into BigQuery.

Export and execute TPT file

It is necessary to build a dynamic TPT script based on the information from DDL history table.

The script also makes a conversion to the corresponding BQ data type.

TPT script generated here will be executed automatically

select 
case when columnid=min_column_id then 'select cast(oreplace(regexp_replace(regexp_replace(regexp_replace(otranslate('|| fast_exp_string ||'||''''#^#''''||'
when columnid=max_column_id then tpt_exp_string || ','''' ~&\\*/\"'''','''' ______''''),''''\\r\\n'''','''' ''''),''''\\n'''','''' ''''),''''\\r'''','''' '''') ,''''#^#'''',''''~'''') as varchar('||(SUM(size_col) OVER (PARTITION BY 1 order by columnid asc rows unbounded preceding ) )||')) '||'string'||' from '|| databasename||'.'||TABLENAME||' a '||';'
else tpt_exp_string ||'||''''#^#''''||'
end tpt_exp_string,
columnid
from (
select
/*convert to varchar and treat NULL values for all datatyoes */
case when dropped_column_ind =1 then ''''''''''
when DataType in ('float','integer' ) and coalesce(DECIMALFRACTIONALDIGITS,-1) =0 then 'case when a.'||COLUMNNAME||' is null then '''''''' else cast(oreplace(trim(a.'||COLUMNNAME||'),''''.'''','''''''') as varchar('|| trim( cast(cast( trim(DECIMALTOTALDIGITS) as Integer)+cast( trim(DECIMALFRACTIONALDIGITS) as Integer) +2 as varchar(50))) ||')) end '
when DataType in ('float','integer' ) and coalesce(DECIMALFRACTIONALDIGITS,-1) >0 then 'case when a.'||COLUMNNAME||' is null then '''''''' else cast(a.'||COLUMNNAME||' as varchar('|| trim( cast(cast( trim(DECIMALTOTALDIGITS) as Integer)+cast( trim(DECIMALFRACTIONALDIGITS) as Integer) +2 as varchar(50))) ||')) end '
when DataType in ('date') then 'case when a.'||COLUMNNAME||' is null then '''''''' else cast(cast(a.'||COLUMNNAME||' as date format''''YYYY-MM-DD'''') as varchar(10)) end '
when DataType in ('integer') then 'case when a.'||COLUMNNAME||' is null then '''''''' else cast(oreplace(trim(a.'||COLUMNNAME||') ,''''.'''','''''''') as varchar(50)) end '
when DataType in ('time') then 'case when a.'||COLUMNNAME||' is null then '''''''' else cast(cast( a.'||COLUMNNAME||' as time(0) format ''''HH:MI:SS'''') as varchar(50) ) end'
when DataType in ('timestamp') then 'case when a.'||COLUMNNAME||' is null then '''''''' else cast(cast(a.'||COLUMNNAME||' as format ''''YYYY-MM-DD HH:MI:SS'''') as varchar(50) ) end '
when DataType not in ('float','integer' ,'timestamp','date', 'time') then 'case when length(trim( a.'||COLUMNNAME||'))=0 then ''''na'''' when length(REGEXP_REPLACE(a.'||COLUMNNAME||', ''''[\s]'''', '''''''', 1, 0, ''''c''''))=0 then ''''na'''' else cast(trim(REGEXP_REPLACE(a.'||COLUMNNAME||', ''''[\s\t\r\n\v\f\x]+'''', '''' '''' )) as varchar('||cast( TRIM(COLUMNLENGTH) as integer)||') ) END '
else 'case when a.'||COLUMNNAME||' is null or length(trim( a.'||COLUMNNAME||'))=0 or length(REGEXP_REPLACE(a.'||COLUMNNAME||', ''''[\s]'''', '''''''', 1, 0, ''''c''''))=0 then '''''''' else cast(trim(REGEXP_REPLACE(a.'||COLUMNNAME||', ''''[\s\t\r\n\v\f\x]+'''', '''' '''' )) as varchar('||cast( TRIM(COLUMNLENGTH) as integer)||') ) END '
end fast_exp_string,
DataType,
NullInd,
DataSize,
CASE WHEN TRIM((DataType)) IN ('string') and chartype=1 THEN cast( TRIM(COLUMNLENGTH) as integer)
....similar for remaining datatypes
END size_col,
columnid,
max_column_id,
min_column_id,
databasename,
TABLENAME,
dropped_column_ind
from (
select distinct t.*, c2.max_column_id,c2.min_column_id from teradata_db.ddl_hist_table t
join (
select TABLENAME,DATABASENAME, max(columnid) max_column_id, min(columnid) min_column_id
from teradata_db.ddl_hist_table group by DATABASENAME, TABLENAME
)c2
ON c2.TABLENAME = t.TABLENAME
AND c2.DATABASENAME = t.DATABASENAME
where LOWER(TRIM(t.TABLENAME) ) = 'table_name'
AND LOWER(t.databasename) ='database_name'
)as a
) as b

Export BQ json schema

The JSON schema is generated dynamically based on DDL history table. It has backward and forward compatibility.

select    
case
when columnid = min_column_id then '['||json_schema||','
when columnid = max_column_id then json_schema||']'
else json_schema||','
end (TITLE '')
from (
SELECT DISTINCT
'{"description": "'||LOWER(TRIM(c.COLUMNNAME) ) ||'","mode": "'||
case
when c.NullInd='NOT NULL' then 'NULLABLE'
else 'NULLABLE'
end||'","name": "'
||LOWER(TRIM(c.COLUMNNAME) )||'","type": "'||
DataType|| '"}' json_schema,
columnid columnid,
c2.max_column_id ,
c2.min_column_id
from teradata_db.ddl_hist_table c
join (
select TABLENAME,DATABASENAME, max(columnid) max_column_id, min(columnid) min_column_id
from teradata_db.ddl_hist_table
group by DATABASENAME, TABLENAME
)c2
ON c2.TABLENAME = c.TABLENAME
AND c2.DATABASENAME = c.DATABASENAME
where LOWER(TRIM(c.TABLENAME) ) = 'table_name'
AND LOWER(c.databasename) ='database_name'
) as c
order by columnid asc
;

Part 2: Google Cloud Storage to BigQuery

We now have both incremental and full export files in Google Cloud Storage. We also have a data quality mechanism in the form of Teradata counts per primary key, per table exported.

The challenge is to create a process that:

· starts as soon as new data becomes available;

· is reliable and requires as little human interaction as possible;

· runs quickly enough as to have everything loaded within the hour.

· does data quality checks and handles errors gracefully;

All these requirements can be achieved by using two Google Cloud Platform services:

· Cloud Composer, a managed workflow orchestration service;

· Kubernetes Engine, a managed environment for running containerized applications.

Cloud Composer is Google’s integration of Apache Airflow in the Cloud Platform. It provides a stable scheduling and orchestration environment, with minimal initial configuration required.

More on Cloud Composer can be read here: https://cloud.google.com/composer/.

Developing and deploying applications in a modern cloud ecosystem basically makes it mandatory to use containerization, a lightweight alternative to a virtual machine that involves encapsulating an application in a container with its own operating system. This is where Kubernetes engine steps in.

More on software packaging: https://www.linuxnix.com/what-is-containerization-in-devops/

More on Google Kubernetes engine: https://cloud.google.com/kubernetes-engine/

In order to achieve the best stability and performance using Composer there a two simple rules that must be followed:

· use appropriate hardware resources for your project. Light workloads can run fine on a three node n1-standard1 Airflow cluster. Anything more complex should upscale vertically, by using a better machine type and horizontally by increasing the number of nodes.

· design simple DAGs, with as few operators as possible. The DAG itself should handle scheduling, infrastructure orchestration and simple tasks, like checking for trigger files or reading authentication credentials. Airflow tasks are resource hungry, have considerable spin up times and have limited interop capability. These considerations make implementing anything but the simplest logic directly in Composer unfeasible.

Speed, data quality checks and graceful error handling are achieved by using Python to implement our logic. The Python code is packaged in a Docker image that is deployed in Google Kubernetes Engine. Cloud Composer and Kubernetes Engine are integrated using Airflow’s custom Kubernetes operators.

More on using Airflow operators to integrate with Google Kubernetes Engine can be found here: https://cloud.google.com/composer/docs/how-to/using/using-kubernetes-pod-operator.

To summarize, we use Google Cloud Composer for scheduling and infrastructure orchestration, Python to develop code that implements our complex logic and Google Kubernetes engine to run it in the cloud.

Composer data flow

Data in BQ is loaded in the following way:

· For incremental tables all partitions that were exported will be dropped from BQ and reloaded. This is an easy way to avoid complex logic in BQ and Teradata.

· Full tables are dropped and created at every load.

We’ve split the load logic in BQ in 2 flows as described below:

GCP Cloud Deployment

The above logic is implemented as a Python app and loaded to a Docker container. The container runs in a Kubernetes cluster managed within a Cloud Composer DAG, scheduled to run every X minutes.

The DAG logic is:

· Check that files exist for processing, exit otherwise;

· Spin up a Kubernetes cluster and configure it depending on the expected processing volume. As a general rule we raise one CPU per five tables, as we’ll spawn one process for each table.

· Run the python app on the Kubernetes cluster and load all available tables;

· If additional files were created during runtime, run the importing logic again;

· Once all available files were imported, free up resources by deleting the Kubernetes cluster.

The process is optimized for a large, weekly import. If we need to continuously import files we can opt for an always on Kubernetes cluster that runs the import logic in a while loop.

Create and upload a Docker image

In order to run Python code in Kubernetes we need to create a Docker container for it. The image will be created locally, uploaded to Google Container Registry and deployed to a Kubernetes cluster.

The prerequisites are:

· Google Cloud SDK needs to be installed locally, in order to have access to the gcloud command line tool;

· Docker Community Edition needs to be available locally, to get access to the docker command line tool.

· Enable the Kubernetes API for your project in the Kubernetes Engine page;

· Ensure that billing is enabled for the Google Cloud project;

· Make sure that virtualization in enabled for your local machine.

To speed things up, set some default values for the gcloud tool:

· gcloud config set project <project_id>;

· gcloud config set compute/zone <zone>.

The following commands create a Docker image and upload it to GCR:

· gcloud auth configure-docker;

· docker build -t <image_name> </path/to/image>;

· docker tag <image_name> gcr.io/<project_id>/<image_name>:<image_tag>;

· docker push gcr.io/<project_id>/<image_name>:<image_tag>.

Deploy the image to a Kubernetes cluster

The Teradata export model implies regular, scheduled exports, requiring a peak of processing power. In this case it makes sense to one time spin up a custom Kubernetes cluster with enough resources to handle the incoming data and delete it once the import is done.

This logic is implemented in Cloud Composer, using Kubernetes operators:

· GKEClusterCreateOperator to spin up the cluster;

· GKEPodOperator to run our Docker image;

· GKEDeleteClusterOperator to free resources after the processing is done.

GKE Operators

CREATE_GKE_CLUSTER = GKEClusterCreateOperator(
task_id=<task_id>,
project_id=<project_id>,
location=<zone>,
body=CLUSTER_DEF,
gcp_conn_id=<gcp_connection>,
api_version='v2',
dag=<dag>)

The gcp_conn_id needs to be defined in the Airflow environment. It requires a service account key file.

The CLUSTER_DEF argument should look like this:

CLUSTER_DEF = {
"name": <cluster_name>,
"initial_node_count":<node_count>,
"network":<project _network>,
"subnetwork": '<project_subnetwork>',
"node_config": {
"oauth_scopes": [
<list_of_oauth_scopes>
]
}
}

Node count should be determined by the number of files we need to process. More tables to load should require more processing power.

RUN_PODS = GKEPodOperator(
task_id=<task_id>,
project_id=<project_id>,
location=<zone>,
cluster_name=<name>,
name='bdp-cluster',
namespace='default',
image='gcr.io/<project_id>/<image_name>:<image_tag>',
image_pull_policy='Always',
cmds=['python'],
arguments=[<main_executable_path>],
gcp_conn_id=’<gcp_connection>’,
api_version='v2',
dag=<dag>)

The image path, ‘gcr.io/<project_id>/<image_name>:<image_tag>’ , should be the same defined above, in the Docker section.

DELETE_GKE_CLUSTER = GKEClusterDeleteOperator(
task_id=<task_id>,
project_id=<project_id>,
name=<cluster_name>,
location=<location>,
gcp_conn_id=<gcp_connection>,
api_version='v2',
trigger_rule='all_done',
dag=<dag>)

This step should always run, regardless of the import process result. Once the processing finishes there is no need to keep the cluster up. This is accomplished using trigger_rule=’all_done’.

Documentation links:

· Airflow GKE Operators;

· Airflow scheduling and triggers.

Client libraries, parallel processing, logging and error handling

Our Python app interacts with the following GCP services:

· Google Cloud Storage;

· Google BigQuery.

Our environment needs the following libraries:

· Google Cloud Storage Client Libs;

· Google BigQuery Client Libs.

The most common operations are:

· Cloud Storage list, read, upload and delete files in a GCS bucket:

# import GCS lib
from google.cloud import storage
cred_file = "/path/to/cred/file"
gcs_bucket_name = 'example_bucket'
destination_file_name = "/remote/path/to/file"
source_file_name = "/local/file/name"
file_to_delete = "/remote/path/to/delete/file"
# Conect to GCS using a Service Account
gcs_client = storage.Client.from_service_account_json(cred_file)
# Connect to a GCS bucket
gcs_bucket = gcs_client.get_bucket(gcs_bucket_name)
# upload a file to GCS from local
blob = gcs_bucket.blob(destination_file_name)
blob.upload_from_filename(source_file_name)
# List all files in a GCS bucket, get their names and content
for blob in gcs_bucket.list_blobs():
file_name = blob.name
file_contents = blob.download_as_string()
# Delete a file from GCS
blob = gcs_bucket.blob(file_to_delete)
blob.delete()

· BigQuery load file, drop partition, run query:

 
from google.cloud import bigquery

path_to_service_account = "/path/to/sa/json"
dataset_id = "dataset_id"
project_id = "project_id"
table_partition_id = "partition_id"
table_schema = [bigquery.SchemaField("ExampleColumnName", "String")]
csv_delimiter = ","
partition_field_name = "example_partition_field_name"
csv_file_1 = "gs:/path/to/file1"
csv_file_2 = "gs:/path/to/file2"

# Authenticate to BigQuery using a Service Account
bq_client = bigquery.Client.from_service_account_json(path_to_service_account)
table_ref = bq_client.dataset(dataset_id, project_id).table(table_partition_id)
dataset_ref = bq_client.dataset(dataset_id, project_id)

# Load a GCS CSV file to a time partitioned table
job_config = bigquery.LoadJobConfig()
job_config.schema = table_schema # a list of SchemaField objects, generated like: bigquery.SchemaField(<name>, <type>)
job_config.skip_leading_rows = 0
job_config.source_format = bigquery.SourceFormat.CSV
job_config.field_delimiter = csv_delimiter
job_config.create_disposition = 'CREATE_IF_NEEDED'
job_config.write_disposition = 'WRITE_APPEND'
job_config._properties['load']['schemaUpdateOptions'] = ['ALLOW_FIELD_ADDITION', 'ALLOW_FIELD_RELAXATION']
job_config._properties['load']['timePartitioning'] = {'type': 'DAY', 'field': partition_field_name}
load_job = bq_client.load_table_from_uri(
[csv_file_1, csv_file_2],
table_ref,
job_config=job_config)

# Delete a table partition
bq_client.delete_table(table_ref)

# Run a query using Legacy SQL and print results
bq_client.project = project_id
job_config = bigquery.QueryJobConfig()
job_config.use_legacy_sql = True
query_job = bq_client.query("example sql string", job_config)
results = list(query_job.result())
for result in results:
print(result)

In order to fully leverage the power of the Kubernetes engine our code runs one process for each table we have to import. If Teradata load files exist, the following steps are required:

· Drop partitions from BigQuery only for delta tables;

· Load the new data from files;

· Load the crosscheck files;

· Run consistency checks;

· Move files to a processed directory on success or to errors on fail.

Each table load operation is independent and idempotent. Thus we can safely run them in parallel. For parallel processing we use multiprocessing.Pool(). Logs are written directly to Stackdriver. Individual threads are identified using multiprocess.current_process() to individually identify all logs generated by each individual table processing. To access the logs in Stackdriver, we need to go to the Google Cloud Platform, select GKE Container (for app logs) and GKE cluster operations for our cluster name, defined above in <cluster_name> and filter by text and timestamp.

Any error thrown during processing is logged, the current process halts and incoming files are moved to the error directory for later analysis.

Writing logs to Stackdriver can be done using the Python client library. Step by step documentation can be found here: Stackdriver Logging with Python.

A quick example:

Tie Stackdriver logging to Python’s standard logging library:


import google.cloud.logging

client = google.cloud.logging.Client()
client.setup_logging()

Import the Python logging lib and call it. The results are written to Stackdriver:

import logging

text = 'Example log entry'
logging.info(text)

Monitoring and alerting

We need to know when the import process encounters an error so we can address it as soon as possible. This means taking the following steps:

1. All errors are logged with full details and stacktrace in Stackdriver. When an error is handled, we need to write a line to Stackdriver containing an easily recognizable string.

For example:

import logging
error_str = "example error"
logging.info("error_encountered_terradata_import : {}".format(error_str))

1. Create a log based metric in Stackdriver based on the above tag. Documentation: Log Based Metrics;

2. Create a metrics based alert. Documentation: Stackdriver Alerts;

3. Configure a notification channel of choice, when the above alert triggers. The simplest one is by email, with no extra steps needed to configure.

In order to write to Stackdriver our service account needs:

· Read-only logging and monitoring: logging.viewer and monitoring.viewer;

· Read-only logging including private logs: logging.privateLogViewer;

· Read-write monitoring: monitoring.editor;

· Logs Configuration Writer: roles/logging.configWriter;

· Log Writer: roles/logging.logWriter.

A detailed overview for GCP roles can be seen here:

· Logging Access Control.

Sending sensitive information by email is not recommended. Detailed information should be limited to Stackdriver. Email notifications should contain a generic message.

Conclusion

Teradata to BigQuery data transfer proved to be a successful and scalable solution while having cloud costs at minimum.
Copying new tables it is as simple as a configuration. Until now we’ve transferred more than 30TB of data and we maintain more than 1500 tables on weekly basis.
At Metro Systems, this solution has become a key workflow that supports the Data Lake foundation and has also increased the adoption of the new analytics platform on GCP.

--

--

Marius Eftene
METRO SYSTEMS Romania

Experienced Business Intelligence back end architect and developer now working as Big Data Engineer@Metro Systems Romania.