Data Ingestion into s3 using Python boto3
Process JSON data and ingest data into AWS s3 using Python Pandas and boto3. We will break down large files into smaller files and use Python multiprocessing to upload the data effectively into AWS s3 leveraging multiple processors.
This article is part of Data Engineering on Cloud Medium Publication co-managed by ITVersity Inc (Training and Staffing) and Analytiqs Inc (Data Services Boutique)
Here are the details of the components used to take care of Data Ingestion into AWS s3 using Python boto3.
- Programming Language: Python 3.7 or later
- Data Source: Files downloaded locally from Kaggle
- Data Target: AWS s3 (a full managed AWS Data Storage Service)
- Data Processing: Python Pandas — A Python Library to take care of processing of the data.
- Integration with AWS: Python boto3 — A Python SDK to manage services and their components in AWS.
- Multi Processing: Python multiprocessing — One of the Python core library which provides multiprocessing capabilities.
Pre-requisites for Data Ingestion into s3
Here are the pre-requisites for Data Ingestion into s3.
- Setup AWS CLI for the ability to run AWS commands. We should be able to validate files in s3 using AWS CLI. We can also manage the services using AWS CLI.
- Setup Python Virtual Environment and Install required dependencies. We will be able to isolate all the dependencies required for this effort such as boto3 at one place.
- Optionally you can Setup Jupyter based environment to streamline the learning process to be comfortable with the topics covered as part of Setup Python Virtual Environment and Install required dependencies article.
Setup Yelp Datasets using JSON Format
Let us go ahead and setup Yelp Datasets from Kaggle. As we are going to download 4+ GB size of files from Kaggle, it will take time based on your internet speed.
- Sign up for Kaggle
- Go to Yelp Dataset Page on Kaggle.
- Click on Download and wait until it is completely downloaded. A zip file by name archive.zip will be downloaded to your downloads folder.
- Create a folder by name data under your Project Working Directory. Copy the files to separate folder for each data set.
Instead of creating folders and copying files manually, we can use this piece of code which will copy the files from archive folder to data folder under project working directory. The logic will place each file in designated folder.
import os
import globos.makedirs('../data/yelp-dataset-json', exist_ok=True)
base_dir = '/Users/itversity/Downloads/archive/'for file in glob.glob(f'{base_dir}/*.json'):
file_name = file.split('/')[-1]
print(f'Moving {file} from downloads to ../data/yelp-dataset-json/{file_name.split(".")[0]}')
data_dir = f'../data/yelp-dataset-json/{file_name.split(".")[0]}'
os.makedirs(data_dir, exist_ok=True)
os.rename(file, f'{data_dir}/{file_name}')
Source Location to which files are downloaded (archive folder under Downloads)
Target Location to which files are moved (after this archive folder will be empty)
Overview of Python Pandas
Let us get an overview of Python Pandas. It will be used to process the data in chunks and write the data into smaller and compressed JSON files.
- Python Pandas is the most popular and standard library extensively used for Data Processing.
- It have robust set of APIs to read the data from standard sources such as files, database tables, etc into Dataframes, process data and also write data from Dataframes into standard targets such as files, database tables, etc.
- It supports most of the standard file formats such as delimited text files (popularly known as csv), text files with JSON Data, Parquet, Excel, etc for both reading as well as writing.
- Once data is read into Dataframes, data can be processed using standard methods/functions as well as SQL leveraging Pandas extensions such as Pandasql.
- We can also use Pandas extensions such as Pandas Profiling to quickly analyze the data.
Overview of Python Boto3 to manage s3
Python Boto3 is Python based SDK to work with AWS services. Let us go through some of the APIs that can be leveraged to manage s3.
Here are the common tasks related to s3.
- Upload Files into s3
- Download Files from s3
- List the buckets in s3
- List the objects in s3 using a prefix
- Paginate the objects if there are too many objects in s3 to manage them.
Analyze Yelp JSON Data using Pandas
Let us understand how we can analyze Yelp Data which is in JSON format using Pandas.
Here are the typical steps one need to follow while using Pandas to analyze the JSON Data.
- Understand the characteristics of data — Data can be represented in multiple ways using JSON format. With respect to Yelp Datasets, each line in the file is a well formed JSON.
- Understand the size of the data — Number of records and columns using shape.
- Understand different attributes and their data types.
- Describe data to understand the number of records in each data set.
Read Business Review Data into Dataframe
Here is the logic to read the Yelp Review Data into Pandas Dataframe.
- We would like to consider each line in the file as one record and hence we have used
lines=True
. - Also, the data set is quite big, to understand the data we would like to just access first 100 records. To get only first 100 records from the file into the Dataframe we have used
nrows=100
.
import pandas as pd
import boto3file_path = '../data/yelp_academic_dataset_review/yelp_academic_dataset_review.json'
df = pd.read_json(file_path, lines=True, nrows=100)
Preview details about Yelp Review Data
We will look at the shape, dtypes and also invoke count to get details about Yelp Review Data on Pandas Dataframe.
df.shape
df.dtypes
df.count()
Read Files to Pandas Dataframe in Chunks
As the size of the files are quite large, it is not practical to read and process the entire data set in the file using Pandas Dataframes. Let us understand how we can read the data from files to Pandas Dataframe in Chunks.
- Most of the read APIs including
read_json
have keyword argument calledchunksize
. If we usechunksize
, it will createJsonReader
object. TheJsonReader
object will facilitate us to create Dataframe for each specified chunk.
- Here is the logic to create
JsonReader
object.
json_reader = pd.read_json(
'../data/yelp-dataset-json/yelp_academic_dataset_user/yelp_academic_dataset_user.json',
lines=True,
chunksize=10000
)
- Now we can iterate through
json_reader
and create Dataframe and process in streaming fashion. Each Dataframe typically the number of records equivalent to chunksize (except for the last one). In our case the chunksize is 10000. - Here is the logic to get first 5 chunks into Dataframes. We will be getting the type of the object.
for chunk_id, df in enumerate(json_reader):
if chunk_id == 5:
break
print(type(df))
- Now let’s get the number of records in each of the Dataframe.
for chunk_id, df in enumerate(json_reader):
if chunk_id == 5:
break
print(f'Number of records in chunk {chunk_id} is {len(df)}')
- Get total number of chunks associated with
json_reader
.
%%timemax_chunk_id = 0
for chunk_id, df in enumerate(json_reader):
max_chunk_id = chunk_idmax_chunk_id
Writing Pandas Dataframe to JSON Files
As we have understood how to read the JSON data into files, now let us go through the details about writing Pandas Dataframe to JSON Files.
- Pandas Dataframe objects have several methods to write data to different targets.
df.to_json
can be leveraged to write data in JSON format.- In our case we are supposed to write the data in JSON format following same structure as our original files (one well formed JSON per line).
- We need to use key word arguments
lines=True
andorient='records'
to achieve the behavior as in our source files. - Based on the extension, the compression can be inferred. In our case we will use
gz
as extension and the files will be automatically compressed using gzip. - Here is the sample logic to write data in the Dataframe using compressed JSON format.
file_name_suffix = str(uuid.uuid1())
dir_path = 'dummy'
df.to_json(
f's3://itvyelpws/yelp-dataset-json/{dir_path}/part-{file_name_suffix}.gz',
orient='records',
lines=True
)
Basic logic to ingest data into s3 using Boto3
First let us review the logic to ingest data into s3 using Boto3 which is available as part Data Engineering of Yelp Data Set using AWS Analytics.
- Here is the function which will write a Dataframe for a subset of the data from json file to s3. The data will be compressed using gzip.
def write_to_s3(file, df, s3_client):
dir_path = file.split('/')[-2]
file_name_suffix = str(uuid.uuid1())
df.to_json(
f's3://itvyelpws/yelp-dataset-json/{dir_path}/part-{file_name_suffix}.gz',
orient='records',
lines=True
)
- Here is the logic which will read all the json files from given folder using Pandas. As the files are quite big, we will be reading 100,000 records at a time to write to s3 in the form of JSON. For each 100,000 records we will invoke write_to_s3 function.
import glob
import boto3
s3_client = boto3.client('s3')
dfs = []
files = glob. \
glob('/data/yelp-dataset-json/*/*.json', recursive=True)
s3_client = boto3.client('s3')
for file in files:
json_reader = pd.read_json(file, lines=True, chunksize=100000)
for chunk_id, df in enumerate(json_reader):
print(f'Processing chunk {chunk_id} in {file}')
write_to_s3(file, df, s3_client)
Criteria of the single threaded approach
The above logic in the previous topic is going to divide larger files into smaller and manageable files before uploading into s3.
- Uploading files into s3 as is is not very practical. It takes a lot of time and also takes quite a lot of storage.
- We have already broken up the larger files into small files so that the copy is manageable.
- The logic also compresses the files using gzip. The compression rate is more than 50%.
Even though the files are compressed sizes are manageable as the files are uploaded only using single thread, it will take time.
Overview of Python Multiprocessing
Let us get an overview of Python Multiprocessing which can be used to ingest data using multiple threads into s3. We will pick the compressed small files to ingest data to s3 using Python Multiprocessing.
- Python have a module called
multiprocessing
which can be leveraged to use multiple processes to process the data. - Depending up on the desired parallelism we can use
Pool
to create an object which can drive processing using multiple processors.
Here are the steps which you can follow to experiment multiprocessing.
- Create a function which needs to be invoked for multiprocessing.
import multiprocessing as mp
import time
import randomdef mp_demo(i):
rand = random.randint(1, 5)
time.sleep(rand)
print(f'Sleeping for {rand} in thread {i}\n')
- Create a list with the data which can be passed as arguments. The list l will contain 10 elements (1 to 10).
l = list(range(1, 11))
- We need to create
Pool
object and invokemap
function with the function and the list as arguments.
with mp.Pool(4) as p:
p.map(mp_demo, l)
mp_demo
will be invoked 10 times using 4 parallel processors.
Data Ingestion to s3 leveraging Multiprocessing
As we got an overview about using multiprocessing and also other important libraries such as Pandas and boto3, let us take care of data ingestion to s3 leveraging multiprocessing.
- Get list of files using
glob
.
import globfiles = glob.glob('../data/yelp-dataset-json/*/*.json')files
- Create new folder to save the data in smaller files.
import os
os.makedirs('../data/yelp-dataset-json-splitted')
- We will be using
uuid
to generate unique id for the file names.
import uuiduid = uuid.uuid1()
str(uid)
- Here is the function to split the large files into small files. The functionn will be invoked using Dataframes generated from Pandas JSON Reader object.
import uuid
import subprocessdef write_to_local(file, df, target_dir):
dir_path = file.split('/')[-2]
file_name_suffix = str(uuid.uuid1())
subprocess.check_call(f'mkdir -p {target_dir}/{dir_path}', shell=True)
df.to_json(
f'../data/yelp-dataset-json-splitted/{dir_path}/part-{file_name_suffix}.json',
orient='records',
lines=True
)
- Here is the Pandas based logic to split the files. Once the files are splitted we will use multiprocessing to compress the files to reduce the size of files to be transferred.
%%timeimport glob
import pandas as pd
import boto3# Chunking large file into small files and writing to local file systemfiles = glob.glob('../data/yelp-dataset-json/*/*.json', recursive=True)
for file in files:
json_reader = pd.read_json(file, lines=True, chunksize=100000)
for chunk_id, df in enumerate(json_reader):
print(f'Processing chunk {chunk_id} in {file}')
write_to_local(file, df, '../data/yelp-dataset-json-splitted')
- Here is the function to compress the splitted JSON files. We will be adding to
mp_util.py
.
import gzip
def compress_file(file):
print(f'Compressing file {file}')
with gzip.open(f'{file}.gz', 'wt') as f:
f.write(open(file).read())
os.remove(file)
- Here is the logic to compress the files using multiprocessing. As compressing is CPU intensive, I am using 4 threads.
%%time
import glob
from multiprocessing import Pool
import subprocessfrom mp_util import compress_filefiles = glob.glob('../data/yelp-dataset-json-splitted/*/*.json', recursive=True)
len(files)with Pool(4) as p:
p.map(compress_file, files)
- Here is the function to upload the splitted files to s3. This function will be called via object of type
Pool
usingmap
function.
def upload_to_s3(file):
print(f'Uploading {file}')
file_parent_dir = file.split('/')[-2]
file_name = file.split('/')[-1]
data = open(file, 'rb').read()
subprocess.check_call(
f'/home/itversity/.local/bin/aws s3 cp {file} s3://{s3_bucket}/{s3_target_prefix}/{file_parent_dir}/{file_name}',
shell=True
)
- Here is the logic to upload the files to s3 using parallel threads.
%%timeimport glob
import pandas as pd
import boto3
import subprocess
from multiprocessing import Pools3_client = boto3.client('s3')
dfs = []
files = glob.glob('/data/yelp-dataset-json-splitted/*/*.json.gz', recursive=True)
s3_bucket = 'itvyelpws'
s3_target_prefix = 'yelp-dataset-json'with Pool(8) as p:
p.map(upload_to_s3, files)