Machine Learning Powered Data Pipelines

Kai Sahling
The Startup
Published in
5 min readJul 13, 2020

In the course of the last years the interest in Data Science and Machine Learning has continuously increased. Thanks to libraries like Scikit-Learn, it is getting easier and easier to apply machine learning algorithms. However, in my experience, one thing is often overlooked, namely how to successfully bring such models into production.

While the interest in Machine Learning (yellow) and Data Science (blue) is growing steadily, Data Engineering (red), although equally important, remain relatively unnoticed

Bringing Machine Learning models requires data pipelines that transport the data to the model and get the results to where they are needed and where they can add value. Accordingly, every data scientist also needs data engineering skills to extract added value from a model. In the following, I would therefore like to present a simple pipeline, which enables exactly this: bringing models into production.

The pipeline consists of four individual parts: the input, the model, the output and a trigger. After the trigger starts the pipeline, the input data required for the model is loaded. The trigger can transmit information, for example a user id, so that only the data relevant for this user is loaded. The model is applied to this data and a result is calculated. The type of model does not play a role here. You can use any of the common machine learning libraries or even completely different methods to take advantage of the data. The result is finally stored in the output database. Now let’s jumpy into the actual implementation. If interested, you can clone the code from my GitHub.

Once triggered, the service get some input data, next the model is applied to the data and finally the result is stored in the output database.

For the micro service I use Flask. This allows to create an API to trigger the pipeline. I also make use of dependency injection (DI). Although DI is not as common in Python as in other languages, it is a very useful concept. Especially when testing, it is a great help. In case you are not yet familiar with DI, this article provide a great explanation. The two database I’m going to use for this post, are BigQuery and a PostgreSQL hosted on AWS. This might be an unlikely set-up, but it certainly helps to demonstrate the flexibility of the data pipeline I propose.

The Flask application is generated in app.py. Here a route called /model is created. If this endpoint is called and a user id is passed, the pipeline starts. The endpoints acts as the trigger. The method underlying the route has three injections, indicated by the injection decorator. These three classes represent the input, the model and the output. The actual implementation of the classes is not important as long as the retriever class has a get, the model class has an apply and the writer class has a write method.

The create_app method creates the Flask app. The keyword argument set the specific classes that are injected per default. When it comes to testing the pipeline, you can easily change these by passing other (mock) classes to create_app. I cover the testing here.

The configure method coming from dependencies is passed to the Flask injector. This finalises which classes are injected when /model is called.

Let’s follow the data and have a look at the retriever class in retriever.py, which is responsible for getting the data. As mentioned before, this class must have a get method. Furthermore, the injector.Module guarantees that this class can be injected later on. This is ensured by the abstract class RetrieverAbs. Anything beyond that can be designed the way you prefer.

As mentioned previously, my input database is BigQuery. In __init__, we set up the authentication, the dataset and table name. Make sure that you also have a environment variable called GOOGLE_APPLICATION_CREDENTIALS, containing the path to the credentials file that you can generate on the Google Cloud Platform IAM. Finally, an engine is created using the dataset you want to address.

The get method ultimately gets the data from the target database. It establishes a connection, creates a SQL query and executes the query on the targeted table via the pandas read_sql_query method. In this example, I only query the column data, where the input data for the model resides. Since I want to apply the model to single users, I also only get the data for that very user. The user id is passed to the get method via the user argument. I use SQLAlchemy to set up the query. The actual query would look like this: SELECT data FROM ‘my_project.my_dataset.my_table’ WHERE user == “user”. Finally, the queried data is put into a DataFrame and returned.

Next, the input data is transferred to the model. Since the example should be as simple as possible, I use the Dummy Classifier from Scikit-Learn. In general, every classifier or regressor would be possible. You could easily think of a use case, where you get some user properties in order to decide if a user is about to churn. If there is a high chance of churn, you can use this information and offer your customer a voucher for their next purchase. Another service could send an email with the voucher based on the churn probability stored in the output database. Or you could also easily skip the output database and make a call to the email service if the churn probability is greater than some threshold.

Again, there is an abstract class making sure that the actual class has a apply method and providing the injector.Module to make it injectable. The __init__ loads the model, which is stored in a .joblib file. In order to make loading the model work on every OS, I use os.path to target the model. The apply method gets some input data and runs the model. The result is finally returned.

After the result has been calculated, it is stored in the output database. The class taking care of storing the result looks similar to input class. As mentioned at the beginning, I use a PostgreSQL database hosted on AWS RDS.

Again the abstract class guarantees that the actual writer class has a write method and enables the injection. In the __init__, all required input parameters are set to connect to the database. The write method moves the result to the database. The data argument is the dictionary that has been created previously in the apply method. In order to use the pandas to_sql method, the dictionary is first transformed into a two column DataFrame, listing the user and the user’s result. Next, a connection to the database is established and the DataFrame is stored. Since if_exists is set to Append , the single row is added to the existing table. Setting method to multi is not necessary in this case, but keep it in mind to speed up the upload if needed. Once the upload is successful, the connection is closed again.

I hope this approach helps you to also put your models into a data pipeline and to get the most out of your data. As mentioned previously, I explain how to test the data pipeline here. In a later post I will also show how to dockerize the data pipeline to deploy it. Thanks for reading!

--

--