Deploying Keras model in production with periodic training.

omert
patron-labs
Published in
7 min readDec 10, 2019

In this article, I will give you an approach to train keras models periodically. If you need a dynamic machine learning structure that should learn continuously from new data this article might help.

We will build a salary calculator based on people’s skills with neural networks.

This article covers the journey of code which starts from keras model untill gunicorn. It is recommended to have a nginx to reach higher capacity for production.

Before we start, here is my twitter (twitter/omertaban_en), I share well crafted stories from my product development journeys.

Let’s connect.

Code:

If you want to go to this article’s code directly, you can get the Github repo here.

Approach:

We have REST API built with flask, we have also a periodic trainer server which runs periodic model training task. Please check how data converts to service in the diagram below.

Rest API, Periodic Trainer and DB elements run in seperate docker containers, h5 (keras model weights) and json files are stored in a common folder. I will mention how to share files between containers in further readings.

The data comes from user to Rest API and Rest API pushes data to DB (We will use MongoDB ). Then Periodic Trainer gets data from DB and trains Keras model, returns model name with timestamp to DB and saves model weights to shared folder between Rest API and Periodic Trainer dockers. Notice that, h5 and json file’s name should be same with model name.

When user requests service(prediction or analyze), we get latest model name from database and read weights from common folder and provide prediction to the user.

Question : ML training cannot be done in Rest API server with an asynchronous task ?

Answer : Yes, it can be done but it will harm the Rest API processes. You can try but it is not good option for production.

Project Structure:

api : handles client requests.
trainer : runs periodic training
ml_models : common folder between api and trainer to share h5 and json files.
data : mongodb database files

Docker Containers:

docker-compose.yml

version: '3'services:
api:
container_name: api
restart: always
build: ./api
ports:
- "8000:8000"
volumes:
- ./ml_models:/home/ml_models
depends_on:
- mongodb
trainer:
container_name: trainer
restart: always
build: ./trainer
volumes:
- ./ml_models:/home/ml_models
depends_on:
- mongodb
mongodb:
image: mongo:latest
container_name: "mongodb"
volumes:
- ./data:/data/db
ports:
- 27017:27017

api’s dockerfile:

FROM python:3.6.7
RUN mkdir -p /home/api
RUN mkdir -p /home/ml_models
WORKDIR /home/api
COPY . /home/api/
RUN apt-get update && apt-get install -y libhdf5-dev
RUN pip install --no-cache-dir -r requirements.txt
CMD ["gunicorn","-t 100", "-w2", "-b 0.0.0.0:8000", "app:app"]

trainer’s Dockerfile

FROM python:3.6.7
RUN mkdir -p /home/trainer
RUN mkdir -p /home/ml_models
WORKDIR /home/trainer
COPY . /home/trainer/
RUN apt-get update && apt-get install -y libhdf5-dev
RUN pip install --no-cache-dir -r requirements.txt
CMD ["python", "app.py"]

You can see how we connected api and trainer containers with common folder (/home/ml_models) in the docker-compose.yml. When trainer creates new model it will push model weights to this folder and api service will use it for prediction requests.

Let’s move to data structure :)

Data Structure :

We will make a salary calculator based on developer’s knowledge. ‘1’ represents individual has related skill, ‘0’ means vice versa.

Salary representations divided by 100 , 5.5 means 5500 dollars/month.

First 6 lines of database look likes image below. I have done salary distribution “randomly”. I hope php developers don’t get upset for this :).

Periodic Trainer :

Data is ready, docker files are okay, let’s dig in code.

I will use Advanced Python Scheduler for periodic tasks.You can use another scheduler that you are familiar with.

Celery would be a good option if you want to run training function with an external trigger (another web service or event).

Imports:

import os,glob
from datetime import datetime, timedelta
from flask import Flask
from pymongo import MongoClient
import pandas as pd
from apscheduler.schedulers.background import BackgroundScheduler
os.environ['KERAS_BACKEND'] = 'theano'from keras.callbacks import EarlyStopping
from keras.models import Sequential
from keras.layers import Dense

You might ask why I use Theano as Keras back-end since they stopped developing it. Well it still works fine for me. I especially like how lightweight it is and it really reduces the deploy time significantly. Also I didn’t face some problems that I faced with Tensorflow in deploying to production. (especially if you are doing multi-threading)

You can use Tensorflow instead as well.

Let’s initialize the MongoDB client and create a background scheduler which imported from apscheduler.

client = MongoClient('mongodb://mongodb:27017/test')
db = client.test
app = Flask(__name__)
scheduler = BackgroundScheduler()

We have initialize_db function to create some data at the beginning if there is None.

def initialize_db():
people = db.people.find({},{'_id':False})
people = list(people)
if len(people) == 0:
db.people.insert({"salary":5.0,"python":0,"java":0,"c++":0,"javascript":0,"csharp":0,"rust":0,"go":0,"php":1})
db.people.insert({"salary": 5.5, "python": 0, "java": 0, "c++": 0, "javascript": 1, "csharp": 0, "rust": 0, "go": 0, "php": 1})
db.people.insert({"salary": 6, "python": 0, "java": 0, "c++": 0, "javascript": 1, "csharp": 1, "rust": 1, "go": 1, "php": 0})
db.people.insert({"salary": 6.5, "python": 0, "java": 1, "c++": 0, "javascript": 1, "csharp": 1, "rust": 0, "go": 0, "php": 0})
db.people.insert({"salary": 7, "python": 1, "java": 0, "c++": 1, "javascript": 1, "csharp": 0, "rust": 1, "go": 0, "php": 0})
db.people.insert({"salary": 7.5, "python": 1, "java": 1, "c++": 1, "javascript": 0, "csharp": 1, "rust": 0, "go": 0, "php": 0})
return True

Here is the our training function which will create a new Keras model every 5 minutes.

def train_task():
# Data preparation
people = db.people.find({},{'_id':False})
people = list(people)
data = []
for person in people:
train_list = [0,0,0,0,0,0,0,0,0]
train_list[0] = person.get('salary', 0)
train_list[1] = person.get('python', 0)
train_list[2] = person.get('java', 0)
train_list[3] = person.get('c++', 0)
train_list[4] = person.get('javascript', 0)
train_list[5] = person.get('c#', 0)
train_list[6] = person.get('rust', 0)
train_list[7] = person.get('go', 0)
train_list[8] = person.get('php', 0)
data.append(train_list)
# %%
train_df = pd.DataFrame(data)
train_df.columns = ['salary','python','java','c++','javascript','csharp','rust','go','php']
train_X = train_df.drop(columns=['salary'])
# check that the target variable has been removed
# train_X.head()
# create a dataframe with only the target column
train_y = train_df[['salary']]
# %%
model = Sequential()
# get number of columns in training data
n_cols = train_X.shape[1]
# add model layers
model.add(Dense(8, activation='relu', input_shape=(n_cols,)))
model.add(Dense(4, activation='relu'))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error') early_stopping_monitor = EarlyStopping(patience=3) # train model
model.fit(train_X, train_y, validation_split=0.2, epochs=500, callbacks=[early_stopping_monitor])
model_json = model.to_json()
# %%
now = datetime.now()
one_day_ago = now - timedelta(days=1)
one_day_ago = one_day_ago.isoformat()
now = now.isoformat()
db.ml_models.insert({"date":now})
file_path = "../ml_models/"
file_path = os.path.join(os.path.dirname(__file__), file_path)
file_name = file_path + str(now)
with open(file_name + ".json", "w") as json_file:
json_file.write(model_json)
# serialize weights to HDF5
model.save_weights(file_name + ".h5")
for filename in glob.glob(file_path + one_day_ago + "*"):
os.remove(filename)
return True

Here is the main function which calls initialize_db function to push some data in db if there is none, also it adds a job for background scheduler . Finally we run the scheduler and the flask app.

if __name__ == '__main__':
initialize_db()
train_task()
scheduler.add_job(func=train_task, trigger="interval", seconds=300)
scheduler.start()
app.run(debug=True,port=5000)

Scheduler app is ready, we will move to REST API code to prepare prediction service for our clients. Rest API service has 2 roles, 2 more endpoints. One role is pushing new data to database and second role is some data in db and get list of data for any reviewing purpose.

Rest API Service:

Rest Service imports are same as Periodic Trainer. Only we need 2 global variable to store model name and model itself.

Global variable will have latest model assigned in database and if any new mode is written on database it will read weights of new model from ml_models folder.

This approach has a bottleneck which is these 2 services should run on same server since they have a shared folder.

Here is the prediction service which will tell us salary prediction for any random data we will ask for.

globals()['ml_model_name'] = None
globals()['ml_model'] = None
def get_prediction(file_name, prediction_df):
file_path = "../ml_models/"
file_path = os.path.join(os.path.dirname(__file__), file_path)
file_path = file_path + file_name
json_file = open(file_path + ".json", 'r')
loaded_model_json = json_file.read()
json_file.close()
if globals()['ml_model_name'] != file_name:
print("new model: "+ file_name)
globals()['ml_model'] = model_from_json(loaded_model_json)
# load weights into new model
globals()['ml_model'].load_weights("../ml_models/"+file_name + ".h5")
globals()['ml_model_name'] = file_name
return globals()['ml_model'].predict(prediction_df)

I have written also 3 endpoints which will allow us to push some new data to database, get list of data and get prediction service.

This endpoint reads the json from your request and inserts data to the MongoDB. I will share insomnia document, although you can check below what kind of JSON object to be sent.

@app.route('/data', methods=['POST'])
def add_data():
people = db.people
request_body = json.loads(request.data)
data = request_body.get('data')
salary = data.get('salary', 0)
python = data.get('python', 0)
java = data.get('java', 0)
cplus = data.get('c++', 0)
js = data.get('javascript', 0)
csharp = data.get('csharp', 0)
rust = data.get('rust', 0)
go = data.get('go', 0)
php = data.get('php', 0)
people.insert({'salary':salary,'python':python,'java':java,'c++':cplus,'javascript':js,'csharp':csharp, 'rust':rust,
'go':go,'php':php})
return jsonify({'result' : 'success'})

If you want to review the data you’ve sent , you can use this endpoint.

@app.route('/list', methods=['GET'])
def get_list():
people = db.people
list_of_people = list(people.find({},{'_id': False}))
return jsonify({'total':len(list_of_people),'items' : list_of_people})

And here is the juicy part. Getting a result-service from all of this effort. The endpoint below reads json data you’ve sent and returns the salary prediction.

@app.route('/service', methods=['POST'])
def get_service():
data = json.loads(request.data)
prediction_list = [0, 0, 0, 0, 0, 0, 0, 0]
prediction_list[0] = data.get('python', 0)
prediction_list[1] = data.get('java', 0)
prediction_list[2] = data.get('c++', 0)
prediction_list[3] = data.get('javascript', 0)
prediction_list[4] = data.get('csharp', 0)
prediction_list[5] = data.get('rust', 0)
prediction_list[6] = data.get('go', 0)
prediction_list[7] = data.get('php', 0)
predict_df = pd.DataFrame(prediction_list).transpose()
# Get latest ml-model in the database
ml_models = db.ml_models.find({}, {'_id': False}).sort('date', pymongo.DESCENDING)
ml_models = list(ml_models)
if len(ml_models) == 0:
return jsonify({'error':'no ml model found yet.'}), 200
ml_model = ml_models[0]
result = get_prediction(file_name=ml_model['date'], prediction_df=predict_df) return jsonify(({'salary':int(result[0][0])}))

Github & Run the Code:

git clone https://github.com/patron-labs/periodic_keras_model_training_examplecd periodic_keras_model_training_example
mkdir data
mkdir ml_models
docker-compose up --build -d

Further Developments:

If you have a test data-set it will be good to test trained model performance after periodic task.

Another point is trainer server will create new model every 5 minutes, storing some of these models can be useful, but don’t forget that it might ruin your disk up. I recommend to add archiving or removing function for some model weights which are expired for your case.

Please comment below if you have ideas regarding to another approach.

Best Luck !
omert
twitter/omertaban_en

--

--

omert
patron-labs

I write about my journeys in programming and entrepreneurship.