Data Ingestion

Dhruv Bakshi
6 min readFeb 17, 2023

--

What is Data ingestion? How it is done using python scripts?

According to Cambridge dictionary , data means “information, especially facts or numbers, collected to be examined and considered and used to help decision-making, or information in an electronic form that can be stored and used by a computer.”

Ingestion means “the process of absorbing nutrients or medications into the body by eating or drinking them.”

Hence Data Ingestion means to bring data from one or more sources and process them into the suitable data formats for the working environment. It is generally the first stage of data pipeline in machine learning, deep learning or data analytics project.

Data may be present in data sets hosted by sources like World Bank Open Data, UCI Machine Learning Repository, Kaggle, etc.

Data Ingestion process may involve different steps depending on available data source and requirements of data in working environment. Some of which may include :

  • API authentication
  • Downloading raw files in .zip ,.tgz or any format
  • Unzipping raw files
  • Extracting headers and main data from unzipped files
  • Converting data into desired format like .csv, .tsv , etc.
  • Splitting into test train datasets. Optional step

Generally if amount of raw data is less then Data Ingestion could be done in real-time. Otherwise if size raw data is huge then data from various sources is stored first and whole data is processed later i.e. Data Ingestion is done batchwise.

Note: Though there are many tools for Data Ingestion available in market but for sake of simplicity we’ll limit the process to python scripts only .

I’ll cover two scenarios of Machine Learning project :

1. Data Ingestion through Kaggle API

2. Data Ingestion through UCI Machine Learning Repository url

Data Ingestion through Kaggle API

Step 1: After signing into your Kaggle account go to https://www.kaggle.com/<username>/account and click on Create New API Token

Step 2: kaggle.json will be downloaded. In working directory create a folder named .kaggle and paste kaggle.json in it.

Step 3: Install Kaggle library in working directory pip install kaggle

Step 4: Go to particular Kaggle data file which you want to download , copy URL it and paste it in code. In my code I have taken it as kaggle_url=https://www.kaggle.com/datasets/uciml/pima-indians-diabetes-database?select=diabetes.csv

Step 5: Get filename and dataset name from kaggle_url.

I’ll be first splitting kaggle_url using regex split with multiple delimiters as /, ? or = to get list of strings.

To get filename take last element of this list. To get dataset name join forth from last and third from last elements of list with / in between

import regex

def dataset_and_file_name(kaggle_url:str):
kaggle_url = re.split(r'[/?=]', kaggle_url)
file=kaggle_url[-1]
dataset="/".join(kaggle_url[-4:-2])
return dataset,file

Step 6: Write function to fetch kaggle user name and key from path where kaggle.json is stored

def get_kaggle_username_key(kaggle_json_path:str):
with open(kaggle_json_path) as user:
user_info=json.load(user)
return user_info['username'],user_info['key']

Step 7: Authenticate Kaggle API and download dataset at designated path

def download_kaggle_dataset(path:str):
username,key=get_kaggle_username_key(".kaggle\kaggle.json")
os.environ['KAGGLE_USERNAME'] = username
os.environ['KAGGLE_KEY'] = key

api = KaggleApi()
api.authenticate()
dataset,file=dataset_and_file_name(kaggle_url)
api.dataset_download_file(dataset, file, path)

download_kaggle_dataset('data_ingestion/kaggle_artifacts')

Complete code is avialable in Github link. Running this code will create diabetes.csv file in kaggle_artificats folder.

Data Ingestion through UCI Machine Learning Repository url

Step 1: Getting URL of file

Go to https://archive.ics.uci.edu/ml/machine-learning-databases/spambase.Right click on spambase.zip and copy link address paste this url in code.

uci_url= “https://archive.ics.uci.edu/ml/machine-learning-databases/spambase/spambase.zip”

Step 2: Download raw file

Raw file may be present in .zip, .tgz or any other format. Raw file is in .zip format for this particular scenario.

request.urlretrieve function on urlib.requestlibrary is used to download zip folder. The function below takes path(where .zip file will be downloaded) as an argument. It first creates the directory of path if it doesn’t exist. Then creates path for filepath by joining data.zip to path directory. Then it downloads the spambase.zip as data.zip in filepath. Finally it returns filepath.

import os
import urllib.request as request

def download_rawfile(path:str):
if not os.path.exists(path):
os.makedirs(path)
filepath = os.path.join(path,"data.zip").replace("\\","/")
request.urlretrieve(uci_url,filepath)
return filepath

Step 3 Unzip raw file contents

zipfile library is used and simple unzip function is created which takes path( where .zip file is present) and path_to_unzip(where unzip files will be stored) as an argument. We call above function download_rawfile to get filepath and then open this file in read mode and use extractall to featch contents of .zip file and save them to path_to_unzip

from zipfile import ZipFile

def unzip_raw(path:str,path_to_unzip:str):
filename=download_rawfile(path)
with ZipFile(file=filename, mode="r") as zf:
zf.extractall(path_to_unzip)

Step 4 Getting headers

After unzipping one has to give a quick look to various files in unzip folder to search required data. In this scenario the main data is present in spambase.data file. This file has no headers. File named spambase.names has headers in lines starting from 34 to 90. Also the target column header is missing.

So we’ll be creating 2 custom functions . The first function takes path_to_unzip(path where unzipped file contents are present) as an argument and return list of filespaths present in path_to_unzip.

def list_unzip_files(path_to_unzip:str):
file_list=[]
for f in os.listdir(path_to_unzip):
file_list.append(os.path.join(path_to_unzip,f).replace("\\","/"))
return file_list

The second function takes path_to_header(path where file which contains headers) as an argument and return list of headers.

This function opens file in readmode and read every line using readlines() function. A list called header is created. For loop is used to iterated through line 34 to 90 . In every loop it strips newline character, splits row with : as delimiter then appends the first string of the splitted row to header list. After this we append target variable header(“Spam/No_Spam”) to header list and returns the header list.

def get_headers(path_to_header:str):
file1 = open(path_to_header, 'r')
Lines = file1.readlines()
header=[]
# Strips the newline character and splits with : charater and select the first part
for i in range(34,90):
header.append(Lines[i].strip().split(':')[0])
header.append("Spam/No_Spam")
return header

Step 5 Converting and saving data into desired format

In this step headers and main data is combined and converted and saved into desired format (.csv in our case).

The function takes filelist (list of unzip filepaths) and path_to_csv(path where csv will be stored) . Call to above function get_headers with last element of filelist(which contains file path of header file) is made to get header list. Pandas dataframe named clean_df is created using pd.read_csv().

pd.read_csv() takes first element of filelist(which contains file path of main data) as file path and header list as names argument (names argument of pd.read_csv() takes names of column as list). The path for csvfile by joining data.csv to path_to_csv is created. .to_csv is used to save dataframe csv file at csvfilepath. The csvfilepath is returned.

def saving_data(filelist:list,path_to_csv:str):
header=get_headers(filelist[-1])
clean_df=pd.read_csv(filelist[0],sep=",",names=header)
if not os.path.exists(path_to_csv):
os.makedirs(path_to_csv)
csvfilepath = os.path.join(path_to_csv,"data.csv").replace("\\","/")
clean_df.to_csv(csvfilepath,index=False)
return csvfilepath

Step 6 Split data into Train test dataset

This step is optional and can be done later stages of data pipeline as well. The dataset could be split through various sklearn.model_selection libraries like train_test_split , StratifiedShuffleSplit etc. But for simplicity of code I am taking first 4000 records as train dataset and reaming 601 records as test dataset.

The function takes csvfilepath(path where .csv data is present) and path_to_train_test(director path where train and test will be stored)as arguments. The function first creates clean_dataframe by pd.read_csv(). Then train_dataframe and test test_dataframe are created by splitting first 4000 records as train and remaining as test respectively.

Train_filepath and test_filepath is created by joining train.csv and test.csv respectively to path_to_train_test.

Then train_dataframe and test_dataframe are stored in .csv format at train_filepath and test_filepath respectively by using .to_csv function of pandas.

def train_test(csvfilepath:str,path_to_train_test:str):
clean_dataframe=pd.read_csv(csvfilepath)
if not os.path.exists(path_to_train_test):
os.makedirs(path_to_train_test)

train_dataframe,test_dataframe=clean_dataframe[:4000],clean_dataframe[4000:]
train_filepath=os.path.join(path_to_train_test,"train.csv").replace("\\","/")
test_fielpath=os.path.join(path_to_train_test,"test.csv").replace("\\","/")

train_dataframe.to_csv(train_filepath,index=False)
test_dataframe.to_csv(test_fielpath,index=False)

Complete code is available in Github link. Running this code will create uci_artificats folder similar to one below.



└───uci_artifacts
├───clean_data
│ │ data.csv
│ │
│ └───train_test
│ test.csv
│ train.csv

├───raw_data
│ data.zip

└───unzip_data
spambase.data
spambase.DOCUMENTATION
spambase.names

Thanks for reading.

--

--