A generic pipeline to make offline inferences
Data Science has been at the core of Doctrine.fr since the beginning. As such, a lot of effort has been put into developing data science models and industrializing them using pipelines tailored to provide quality answers to business questions. This article focuses on the industrialization part, as we detail how we are doing offline inferences using our models. For insights about our online inference system, you can refer to our article about a dedicated API for machine learning.
More precisely, this article describes the generic logic we have developed in order to make the inference step straightforward such that we only have to focus on the specific processing for a given task.
Technical stack
Before going into the details of our generic pipeline, here’s some context with a small summary of the technical stack used in our data science projects.
- These projects are coded in Python.
- Data is mainly queried from a PostgreSQL database (we are also using AWS S3 but less frequently).
- Different versions of a model are stored on AWS S3.
- Offline tasks are scheduled and run using Airflow.
Global overview of the generic pipeline
Let’s say we have a production-ready model which is trained to predict the structure of a decision (it predicts a label for each paragraph of a decision) and we want to apply it to our corpus of decisions. If you want to know how we trained this model, you can read our article about structuring legal document through deep learning.
We are using the example of predicting classes on decisions, but it could be inference of any type of model on any type of content, which emphasizes the need for a truly generic data pipeline. This is how the generic logic works for this specific use case, steps 3 to 5 are parallelizable:
- Query decisions to be processed through the unique ID we have for each decision at Doctrine
- Separate those IDs in batches of N decisions for multiprocessing purposes
- Fetch the data needed for those ids (e.g. contents of the decision)
- Process each decision (its contents) and apply the model
- Insert inferred results in the database
Every single one of the steps laid out above is implemented in a generic python class we have called Project
.
This class also includes other things like connectors to database, connectors to Elasticsearch, loaders for models on AWS S3 or management of asynchronous operations . Those technical components are of tremendous value and help us avoid fragmentation and duplication, but fall outside the scope of this article, so we will not go into further details about them.
This is a simplified implementation of thisProject
class:
The different steps of the pipeline are launched in the run
method of Project
. All new data science projects thus inherit from theProject
class and overload its get_rows
, process_element
and insert_results
methods. In the next section, we will also go into more detail for get_ids
method, which is used to get the identifiers of documents to process, in this use case decisions. So, if we apply it here to our task of structure prediction on decisions, our data script looks like this:
Let’s now dig a bit more into what each step of the pipeline is doing.
Identify elements to process
The first step of our pipeline is to query identifiers of documents we want to process, which is done with get_ids
method. In our example we're dealing with court decisions, so let's suppose we have the identifiers stored in a PostgreSQL table called decisions
. This table also contains data like the content of the decision on which we want to apply our model of classification.
The table decisions
is defined with:
This first step highlights a first natural need: we have to store information about which decisions have been processed and when they have been processed.
Storing this information has several purposes:
- It is used for debugging, knowing when a model has given a label for a decision can be very helpful if we detect errors in predictions
- It is used to only process new decisions and not those already processed
- If the prediction script fails for any reason, we can rerun the script from the decisions where it stopped
How do we store this information?
We simply use a table in our PostgreSQL database to do it. We have a schema named operation_states
to store tables of this type in our database. And this is how the structure of the table is defined:
We store the ID of the decision with the date of the first and the last time the model has made a prediction on it.
How do we select the IDs for reprocessing?
We need the first and last time we have made a prediction for a given decision because we can make predictions several times in the lifetime of a decision. There are two main reasons why we would make several structure predictions for a decision:
- Since a given decision is fetched from multiple external sources at Doctrine, it sometimes happens that the metadata or actual contents for a given decision get updated several times in its lifetime. When that happens, the model has to make a new prediction.
- A new, better model has been trained and we want to apply it again on decisions.
The first and naive solution we have found to automatically reprocess decisions is to only process a fraction of them every day. But with this solution, in most of the cases, we are processing decisions which did not change since the last computation. It was a waste of time and resources.
Hence we have thought of a more efficient solution. It is clear that we need to focus on modified content only. We need to store information about when a decision has been modified. For this purpose, we are still using a PostgreSQL table, which stores the last modification date about the decision (date of the content change, date of the metadata change etc…). That’s why we are storing those kind of tables in a schema called modification_states
. The structure of the table looks like this:
How is the modification information updated?
The modification information is obtained from the decision loading scripts. In those scripts, the content or metadata of a decision are compared to the existing ones using a hash function. If there should be a difference, we have rules to determine which content to keep, and if the kept content is new then the script updates the updated_at
field of modification_states.decisions_modified_at
table for this decision.
Finally, the method get_ids
takes as input decisions
, operation_states.decisions_classified_at
and modification_states.decisions_modified_at
tables in order to select decisions to be processed.
For example, this is what the SQL query in get_ids
looks like as we want to get the new decisions and the decisions which have been modified:
Actually, in our Project
class, we have implemented a generic get_ids
function which takes input table names as arguments among other arguments. An interesting argument is stock
, when True
it selects the entire list of decisions, in the case we want to apply a new model on the whole corpus of decisions.
In order to use these IDs, we split them into batches of N elements to be processed, because the subsequent steps are fully parallelizable. The parallelization is done using python’s multiprocessing
package inside the Project.run
method.
Query data to be processed
Having a batch of decisions to process, through their ids, we now have to get useful information about the decisions. We want to predict a class for each paragraph of the decision, therefore we need to have the contents of the decision.
The get_rows
method is simply about querying this information for the chosen decisions from the previous step. It actually translates into simple SQL query:
Where science happens
We have queried information required for the batch of decisions, we now need to apply the logic specific to the task of predicting a class for each paragraph of a decision. The logic of the task is implemented in the process_element
method. This method takes one decision as input and includes preprocessing on the raw contents of the decision (lowercase, stemming, etc.), separation of the content into paragraphs and use of the model to make a prediction for each of them. Finally, the method returns predictions as outputs.
In addition to the fact that we can parallelize several batches of decisions, we can also do it inside a single batch by doing asynchronous operations when we are requesting different services which can be done simultaneously (Database, ElasticSearch index, etc.). This is not presented in our simplified implementation of the Project
class, but in practice it is based on the asyncio
package. In our example, we could take advantage of asynchronous processings because the limiting resources could be the database or the available CPUs, however in practice it processes in reasonable time, such that we did not need to leverage the asynchronous part.
Insert results
Finally, having our predictions, the last step of our pipeline is to insert data in our database using the insert_results
method.
At this step, we are inserting several pieces of data:
- Results of the predictions in a defined table
- Processed IDs in the
operation_states.decisions_classified_at
table to store the information about which decisions have been processed and when
The insert_results
can also deal with data comparison before insertion. Typically, if we make a prediction for a decision because the content has changed for example, we compare the prediction against the previous one, and we only update data if the prediction has changed. It optimizes resource consumption, as aSELECT
is more efficient than a DELETE
followed by INSERT
in PostgreSQL. The comparison can be done using some hash functions (PostgreSQL has an implementation of MD5 algorithm).
Conclusion
Using this generic pipeline, we have removed a lot of the redundant code in the productionization of a model such that we only have to focus on some limited aspects. We took the example of inferring classes for decisions using a model, but this pipeline can deal with any kind of content and any kind of processing (inference using models, text processing etc.).
Thanks to Pauline Chavallard, Bertrand Chardon and Nicolas Fiorini for their valuable feedbacks.