Using Flight to access databases within IBM Cloud Pak for Data

Jianbin Tang
4 min readJun 24, 2023

--

In IBM Cloud Pak for Data, you can use the Flight Service and the Apache Arrow Flight protocol to read from and write data to data assets in a project or space. These data assets can be files in the storage associated with your current project or space, or data accessed through a database connection.

Once you follow the video at https://www.ibm.com/docs/en/cloud-paks/cp-data/4.6.x?topic=project-adding-connections-projects and setup your connection to a data source. You can use Flight to access different data sources with exactly the same coding interfaces and not need to figure out how to connect with DB2, SQL server, Oracle, Teradata, and so on.

Step 1: Create an api_access_db_with_flight.py with the below contents:

# https://www.ibm.com/docs/en/cloud-paks/cp-data/4.6.x?topic=notebook-using-flight-service-in-python-notebooks
# Author: Jianbin Tang, jbtang@au1.ibm.com
# License: Only approved to use within IBM Cloud Pak for Data Platform

import itc_utils.flight_service as itcfs
import time

def read_db_with_flight(conn_name, sql_statement):
"""
Read pandas data frame from a database.
:param conn_name: connection name in CPD project
:param sql_statement: sql statement to execute the query
example query: "select TOP 10 * from dbo.test_write_table"
:return: data frame
"""

print('\n-- Reading data and save as dataframe from ',conn_name)
start_t = time.time()

readClient = itcfs.get_flight_client()

data_request = {
'connection_name': conn_name,
'select_statement': sql_statement
}

flightInfo = itcfs.get_flight_info(readClient, nb_data_request=data_request)

data_df = itcfs.read_pandas_and_concat(readClient, flightInfo)

# data_df.head(10)

print('Read data size = ',data_df.shape)
print('Time to get data with flight: ',time.time()-start_t, 'seconds')
return data_df


def write_df_2_db_with_flight(conn_name, df, schema_name, table_name, if_exists):
"""
Push pandas dataframe to a database table. If table doesn't exisit, it will create a new table.
:param conn_name: connection name in CPD project
:param df: pandas.DataFrame
:param table_name
:param schema_name
:param if_exists: 'replace' to replace, 'append' to append to the end of the existing table.
:return: None
"""

print(f"\n-- Writing dataframe to {conn_name}, {schema_name}.{table_name}")

start_t = time.time()

nb_data_request = {
'connection_name': conn_name, # name of the connection
'commit_frequency': 5000, # optional parameter to trigger a commit every n records
'interaction_properties': {
'schema_name': schema_name,
'table_name': table_name,
'table_action': if_exists # replace | truncate | append
},
}

flight_request = itcfs.get_data_request(nb_data_request=nb_data_request)

itcfs.write_dataframe(df, data_request=flight_request)

print('Time to write data with flight: ',time.time()-start_t, 'seconds')


def update_record_2_db(conn_name, update_sql_statement):
"""
update a specific record to a database.
:param conn_name: connection name in CPD project
:param update_sql_statement: sql statement to execute the update
example query: "Update dbo.test_write_table set A = 43 where A = 99999"
:return: None
"""

import json
from pyarrow import flight
import itc_utils.flight_service as itcfs
flight_client = itcfs.get_flight_client()

nb_data_request = {
"connection_name": conn_name,
"interaction_properties": {
"static_statement": update_sql_statement,
"write_mode": "static_statement"
},
}
flight_request = itcfs.get_data_request(nb_data_request=nb_data_request)
flight_request['context'] = 'target'
flight_cmd = json.dumps(flight_request) # flight_cmd is a str
action = flight.Action('setup_phase',flight_cmd.encode('utf-8'))
gen = flight_client.do_action(action) # do_action returns an iterator of Result values
try:
result = [ str(res.body.to_pybytes().decode('utf-8')) for res in gen ]
# SQL execution assumed to be successful if there is no exception
except Exception:
print("action failed")
import traceback
traceback.print_exc()

Step 2: See below examples to

Import API

conn_name is the same as the connection name. As long as the connection works, the below code will work for various data sources.

from api_access_db_with_flight import *
conn_name = “MS SQL server SYD Default”

Generate a random dataframe for writing

import pandas as pd
import numpy as np
my_df = pd.DataFrame(np.random.randint(0,100,size=(5, 4)), columns=list(‘ABCD’))
my_df.at[0,’A’]=99999 # use 99999 as key word for later update purpose
my_df.head(5)

A B C D
0 99999 20 97 81
1 6 30 79 77
2 91 58 1 38
3 67 11 67 20
4 35 42 71 62

schema_name = ‘dbo’
table_name = ‘test_write_table’
if_exists = ‘replace’
write_df_2_db_with_flight(conn_name,my_df, schema_name, table_name, if_exists)

— Writing dataframe to MS SQL server SYD Default, dbo.test_write_table
Time to write data with flight: 0.2649970054626465 seconds

Confirm data has been written successfully

Also an example of how to read data from DB with any query

query = “select TOP 10 * from dbo.test_write_table”
df = read_db_with_flight(conn_name,query)
# save dataframe as CSV
df.to_csv(‘folder/subfolder/out.csv’)
df.head(5)

— Reading data and save as dataframe from MS SQL server SYD Default
Read data size = (5, 4)
Time to get data with flight: 0.256054162979126 seconds

A B C D
0 99999 20 97 81
1 6 30 79 77
2 91 58 1 38
3 67 11 67 20
4 35 42 71 62

Update specific record

update_sql_statement = “Update dbo.test_write_table set A = 43 where A = 99999”
update_record_2_db(conn_name, update_sql_statement)

Confirm record has been updated

query = “select TOP 10 * from dbo.test_write_table”
df = read_db_with_flight(conn_name,query)
df.head(5)

— Reading data and save as dataframe from MS SQL server SYD Default
Readed data size = (5, 4)
Time to get data with flight: 0.3321051597595215 seconds

A B C D
0 43 20 97 81
1 6 30 79 77
2 91 58 1 38
3 67 11 67 20
4 35 42 71 62

--

--

Jianbin Tang

Customer Success Manager — Architect; Expert Technical Specialist; AI Technical Leader