Custom keras generator fetching images from S3 to train Neural Network

William
Analytics Vidhya
Published in
3 min readJan 11, 2020
Photo by Héctor J. Rivas on Unsplash

Training models in AWS can be done many different ways, using any number of their services. Today I’d like to focus on part of the machine learning pipeline, where you either train or predict on your model.

I had been developing a proof-of-concept model, and once I was satisfied with the initial results, the next step was to scale it up in the cloud, to do a larger scale model. I had used transfer learning with models from the keras framework, and been training an image based model. A key keras function was the predict_generator, which takes a generator as input.

Long story short I had been using flow_from_directory to read the images, and I thought it might to possible to read the images live off S3 instead. One way to do this is to write a custom keras generator to read the images and do preprocessing.

I should properly point out before I waste anymore of your time, that the latency of fetching from S3 is high, and even for small files you are looking at up to 0.5 sec per file — proving to be a bottleneck at scale and perhaps rendering the solution rather useless. In some cases you might be able to live with this latency though. Alternatively zipping/bundling of files is a solution.

Here are some snippets for a generator. Lets start with the first function, which is actually fetching the data from S3.

from keras.preprocessing.image import load_img
import io
def fectch_input(path,s3): object = s3.Object(bucket_name,path)
img = load_img(io.BytesIO(object.get()['Body'].read()))
return(img)

The s3 client is passed along as an input. And I’m using the keras loading module, but properly the common PIL package can be used as well. Next is an example of some simple resizing and again I’m using keras processing:

from keras.preprocessing.image import img_to_arraydef preprocess_input(img):     image = img.resize((128,128))
array = img_to_array(image)
return(array)

One thing to note here is to watch the format of the image tensor, to make sure that you are using the correct format. For this project I was doing transfer learning, and it is of course critical there to match the input format.

def s3_image_generator(files, batch_size = 16):
s3 = boto3.resource('s3')
while True:
#batch_paths = np.random.choice(a = files,
# size = batch_size)
batch_paths = np.array(files)
batch_input = []
batch_output = [0] * len(files)
for input_path in batch_paths:
input = fectch_input(input_path, s3)
input = preprocess_input(input)
batch_input += [ input ]
batch_x = np.array( batch_input )
batch_y = np.array( batch_output )
yield( batch_x, batch_y )

I was planning to do unsupervised learning, so I didn’t need the labels, hence the 0s in response. If you plan to use this for training, you can use the np.random.choice for random sampling.

So that is basically it! Below is a snippet for actually getting your S3 paths to pass along, and using the generator to predict.

import pandas as pd
import boto3
from time import time
bucket_name =''client = boto3.client('s3')
# Create a reusable Paginator
paginator = client.get_paginator('list_objects_v2')
# Create a PageIterator from the Paginator
page_iterator = paginator.paginate(Bucket=bucket_name,Prefix =’’)
g = pd.Series()a=time()
for page in page_iterator:
m=pd.Series(list(map(lambda d: d['Key'], page['Contents'])))
g = g.append(m)
print(len(g))
if len(g) == 10000:
break
print(time() - a)
g = g.reset_index(drop=True)preds = model_updated.predict_generator(s3_image_generator(g[0:512]), steps = 1, verbose = 1)

A final note is perhaps that this flow is a bit of an over complication if you want to do predictions only. An alternative without the generator is simply loading and predicting without the generator:

first = 1
a = time()
s3 = boto3.resource('s3')
for path in g[0:1000]:
x_batch=[]
object = s3.Object(bucket_name,path)
img = load_img(io.BytesIO(object.get()['Body'].read()))
image = img.resize((128,128))
array = img_to_array(image)
preds=(model.predict_on_batch(array))
if first==1:
predsA=preds.copy()
first=0
else:
predsA=np.append(predsA,preds,axis=0)
time()-a

Thanks for reading.

--

--

William
Analytics Vidhya

I write shorts about various machine learning projects that I am working on.