Geek Culture
Published in

Geek Culture

Data Engineering

Data Engineering Tricks: How To Get Dirty Data Cleaned through VDK

Implement a custom plugin for data cleaning in Versatile Data Kit (VDK)

Photo by Towfiqu barbhuiya on Unsplash

Data engineering is the process of organizing and transforming data so that you can use it for analytics and decision-making. Data cleaning is a crucial part of data engineering because it removes inaccuracies and inconsistencies from data sets.

You can use different techniques for data cleaning, and the most appropriate method will vary depending on the specific data set. One of the most popular techniques is handling missing values.

In this article, I’ll show you how to handle missing values using the Versatile Data Kit (VDK), a framework that makes it easy to build, run, and manage your data pipelines with Python or SQL on any cloud. VDK automates building, running, and managing your data pipelines with Python or SQL on any cloud.

The article is organized as follows:

  • Basic concepts related to data cleaning
  • Missing values in VDK: extending the IIngesterPlugin interface
  • Missing values in VDK: using VDK commands

Basic Concepts related to data cleaning

Data cleaning is the process of making your data clean. There are different techniques for cleaning data. In this article, I’ll focus on handling missing values.

A missing value is a value that is not provided. A missing value can refer to a single column in a table or an entire row.

There are a few different ways to handle missing values in data engineering:

  • Ignore missing values and hope they don’t affect the results too much.
  • Impute missing values, which means filling in the missing values with some estimate. One popular imputation method is called mean imputation, where the missing values are replaced with the mean of the non-missing values. This can be effective if the data is not too heavily skewed and there are not too many missing values.
  • Multiple imputations, which perform multiple estimations for each missing value, and then all of the estimates are combined. This can be more accurate than mean imputation, but it is also more computationally expensive.
  • Delete any rows or columns that have missing values. This can be effective if there are only a few missing values and if they are not spread evenly throughout the data.

Missing values in VDK: extending the IIngesterPlugin interface

To perform data cleaning, I’ll implement a custom plugin in VDK. In detail, I’ll implement a plugin that extends the IIngesterPlugin interface provided by VDK.

I’ll perform the following operations:

  1. Creating the skeleton
  2. Configuring the plugin
  3. Implementing the VDK plugin
  4. Writing a test
  5. Running a test

The following figure shows how the implemented components are related each other:

Image by Author

1. Creating the skeleton and 2. Configuring the plugin

First, you must create the skeleton, which will contain the tree of your plugin. Refer to my previous article for more details on how to create a VDK skeleton and configure the plugin.

3. Implementing the VDK plugin

The plugin implements the IIngesterPlugin interface, and the pre_ingest_process() method. For more details about the IIngesterPlugin interface, refer to the VDK GitHub repository.

from vdk.api.plugin.plugin_input import IIngesterPlugin
from import JobContext
from vdk.internal.core import errors

_log = logging.getLogger(__name__)

class IngestWithoutMissingValues(IIngesterPlugin):
Create a new ingestion mechanism for ingesting without missing values

def __init__(self, context: JobContext):
self._context = context

def pre_ingest_process(self, payload):

# Ensure all values in the payload are strings
metadata = IIngesterPlugin.IngestionMetadata({})
df = pd.DataFrame.from_dict(payload)

return df.to_dict(orient='records'), metadata

I save the script as a file named

Now, I edit the script by adding a call to my plugin:

def initialize_job(context: JobContext) -> None:
"MISSINGVALUES", lambda: IngestWithoutMissingValues(context)

4. Writing a test

As a further step, I write the following test. I create a test_table, which contains two columns: city and country.

db_dir = str(tmpdir) + "vdk-sqlite.db"
with mock.patch.dict(
"VDK_SQLITE_FILE": db_dir,
runner = CliEntryBasedTestRunner(sqlite_plugin)
"CREATE TABLE test_table (city TEXT, country TEXT)",

Then I define testing data with three rows, one of which contains a missing value.

def test_missing_values(tmpdir):

payload = [
{"city": "Pisa", "country": None},
{"city": "Milano", "country": "Italia"},
{"city": "Paris", "country": "France"},

I invoke my plugin to drop missing values, and I ingest the resulting table:

def test_missing_values(tmpdir):

pre_ingest = IngestWithoutMissingValues(runner)

cleaned_payload, metadata = pre_ingest.pre_ingest_process(payload)

mock_sqlite_conf = mock.MagicMock(SQLiteConfiguration)
sqlite_ingest = IngestToSQLite(mock_sqlite_conf)


Finally, I compare the result produced by my plugin with the expected results:

def test_missing_values(tmpdir):


result = runner.invoke(
"SELECT country FROM test_table",

output = result.stdout

assert output == (

Finally, I compile the new plugin, and I run tests:

pip install -e /path/to/plugin
cd /path/to/plugin

Missing values in VDK: using VDK commands

Alternatively, I can implement the missing value plugin as a VDK command. I have already described how to implement a plugin using VDK commands in my previous article, thus, in this article, I’ll focus only on the differences.

Firstly, I define the drop missing value step:

class MissingValuesDropper:
def __init__(self, job_input: IJobInput):
self.__job_input = job_input

def drop_missing_values(self, table: str, column : str):

query = f"DELETE FROM {table} WHERE {column} IS NULL OR {column} = '';"

def run(job_input: IJobInput) -> None:
table = job_input.get_arguments().get("table")
column = job_input.get_arguments().get("column")

formatter = MissingValuesDropper(job_input)
formatter.drop_missing_values(table, column)

The step deletes from the table all the null and empty values related to a given value provided as an input argument.

Next, I add the drop_missing_values() function to

# Drop missing values
help="Execute a SQL query against a configured database and drop missing values related to a column.",
help="The name of the table.",
help="The name of the column to clean.",

def drop_missing_values(ctx: click.Context, table: str, column : str):

args = dict(table=table, column=column)

def vdk_command_line(root_command: click.Group) -> None:

The function receives the table name and the column to clean as input.

Finally, I write a test similar to that described for data formatting.


Congratulations! You have just learned how to implement a plugin in VDK! There are two main techniques: the first technique implements the IIngesterPlugin interface, and the second technique implements the plugin as a VDK command.

You can find more information about VDK plugins in the VDK official repository. You can also use the plugins already available in VDK!

The code for the example described in this post is in my GitHub repository.

Related Articles

You may also be interested in…


Read for free the first 20 pages of Chapter 8 of the book Comet for Data Science

Click here and select "Get a Sample!" You’ll download immediately the chapter as a PDF file!



A new tech publication by Start it up (

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Angelica Lo Duca

Book Author

Researcher | +50k monthly views | I write on Data Science, Python, Tutorials, and, occasionally, Web Applications | Book Author of Comet for Data Science