Data Engineering
Data Engineering Tricks: How To Get Dirty Data Cleaned through VDK
Implement a custom plugin for data cleaning in Versatile Data Kit (VDK)
Data engineering is the process of organizing and transforming data so that you can use it for analytics and decision-making. Data cleaning is a crucial part of data engineering because it removes inaccuracies and inconsistencies from data sets.
You can use different techniques for data cleaning, and the most appropriate method will vary depending on the specific data set. One of the most popular techniques is handling missing values.
In this article, I’ll show you how to handle missing values using the Versatile Data Kit (VDK), a framework that makes it easy to build, run, and manage your data pipelines with Python or SQL on any cloud. VDK automates building, running, and managing your data pipelines with Python or SQL on any cloud.
The article is organized as follows:
- Basic concepts related to data cleaning
- Missing values in VDK: extending the
IIngesterPlugin
interface - Missing values in VDK: using VDK commands
Basic Concepts related to data cleaning
Data cleaning is the process of making your data clean. There are different techniques for cleaning data. In this article, I’ll focus on handling missing values.
A missing value is a value that is not provided. A missing value can refer to a single column in a table or an entire row.
There are a few different ways to handle missing values in data engineering:
- Ignore missing values and hope they don’t affect the results too much.
- Impute missing values, which means filling in the missing values with some estimate. One popular imputation method is called mean imputation, where the missing values are replaced with the mean of the non-missing values. This can be effective if the data is not too heavily skewed and there are not too many missing values.
- Multiple imputations, which perform multiple estimations for each missing value, and then all of the estimates are combined. This can be more accurate than mean imputation, but it is also more computationally expensive.
- Delete any rows or columns that have missing values. This can be effective if there are only a few missing values and if they are not spread evenly throughout the data.
Missing values in VDK: extending the IIngesterPlugin
interface
To perform data cleaning, I’ll implement a custom plugin in VDK. In detail, I’ll implement a plugin that extends the IIngesterPlugin
interface provided by VDK.
I’ll perform the following operations:
- Creating the skeleton
- Configuring the plugin
- Implementing the VDK plugin
- Writing a test
- Running a test
The following figure shows how the implemented components are related each other:
1. Creating the skeleton and 2. Configuring the plugin
First, you must create the skeleton, which will contain the tree of your plugin. Refer to my previous article for more details on how to create a VDK skeleton and configure the plugin.
3. Implementing the VDK plugin
The plugin implements the IIngesterPlugin
interface, and the pre_ingest_process()
method. For more details about the IIngesterPlugin
interface, refer to the VDK GitHub repository.
from vdk.api.plugin.plugin_input import IIngesterPlugin
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core import errors
_log = logging.getLogger(__name__)
class IngestWithoutMissingValues(IIngesterPlugin):
"""
Create a new ingestion mechanism for ingesting without missing values
"""
def __init__(self, context: JobContext):
self._context = context
def pre_ingest_process(self, payload):
# Ensure all values in the payload are strings
metadata = IIngesterPlugin.IngestionMetadata({})
df = pd.DataFrame.from_dict(payload)
df.dropna(inplace=True)
return df.to_dict(orient='records'), metadata
I save the script as a file named ingest_without_missing_values.py
.
Now, I edit the missing_values_plugin.py
script by adding a call to my plugin:
@hookimpl
def initialize_job(context: JobContext) -> None:
context.ingester.add_ingester_factory_method(
"MISSINGVALUES", lambda: IngestWithoutMissingValues(context)
)
4. Writing a test
As a further step, I write the following test. I create a test_table
, which contains two columns: city
and country
.
db_dir = str(tmpdir) + "vdk-sqlite.db"
with mock.patch.dict(
os.environ,
{
"VDK_DB_DEFAULT_TYPE": "SQLITE",
"VDK_SQLITE_FILE": db_dir,
},
):
runner = CliEntryBasedTestRunner(sqlite_plugin)
runner.invoke(
[
"sqlite-query",
"--query",
"CREATE TABLE test_table (city TEXT, country TEXT)",
]
)
Then I define testing data with three rows, one of which contains a missing value.
def test_missing_values(tmpdir):
....
payload = [
{"city": "Pisa", "country": None},
{"city": "Milano", "country": "Italia"},
{"city": "Paris", "country": "France"},
]
I invoke my plugin to drop missing values, and I ingest the resulting table:
def test_missing_values(tmpdir):
...
pre_ingest = IngestWithoutMissingValues(runner)
cleaned_payload, metadata = pre_ingest.pre_ingest_process(payload)
mock_sqlite_conf = mock.MagicMock(SQLiteConfiguration)
sqlite_ingest = IngestToSQLite(mock_sqlite_conf)
sqlite_ingest.ingest_payload(
payload=cleaned_payload,
destination_table="test_table",
target=db_dir,
)
Finally, I compare the result produced by my plugin with the expected results:
def test_missing_values(tmpdir):
...
result = runner.invoke(
[
"sqlite-query",
"--query",
"SELECT country FROM test_table",
]
)
output = result.stdout
assert output == (
"country\n"
"---------\n"
"Italia\n"
"France\n")
Finally, I compile the new plugin, and I run tests:
pip install -e /path/to/plugin
cd /path/to/plugin
pytest
Missing values in VDK: using VDK commands
Alternatively, I can implement the missing value plugin as a VDK command. I have already described how to implement a plugin using VDK commands in my previous article, thus, in this article, I’ll focus only on the differences.
Firstly, I define the drop missing value step:
class MissingValuesDropper:
def __init__(self, job_input: IJobInput):
self.__job_input = job_input
def drop_missing_values(self, table: str, column : str):
query = f"DELETE FROM {table} WHERE {column} IS NULL OR {column} = '';"
self.__job_input.execute_query(query)
def run(job_input: IJobInput) -> None:
table = job_input.get_arguments().get("table")
column = job_input.get_arguments().get("column")
formatter = MissingValuesDropper(job_input)
formatter.drop_missing_values(table, column)
The step deletes from the table all the null and empty values related to a given value provided as an input argument.
Next, I add the drop_missing_values()
function to preprocessing_plugin.py
:
# Drop missing values
@click.command(
name="drop-missing-values",
help="Execute a SQL query against a configured database and drop missing values related to a column.",
no_args_is_help=True,
)
@click.option(
"-t",
"--table",
help="The name of the table.",
default="my_table",
type=click.STRING,
)
@click.option(
"-d",
"--column",
help="The name of the column to clean.",
default="",
type=click.STRING,
)
@click.pass_context
def drop_missing_values(ctx: click.Context, table: str, column : str):
args = dict(table=table, column=column)
ctx.invoke(
run,
data_job_directory=os.path.dirname(drop_missing_values_step.__file__),
arguments=json.dumps(args),
)
@hookimpl
def vdk_command_line(root_command: click.Group) -> None:
...
root_command.add_command(drop_missing_values)
The function receives the table name and the column to clean as input.
Finally, I write a test similar to that described for data formatting.
Summary
Congratulations! You have just learned how to implement a plugin in VDK! There are two main techniques: the first technique implements the IIngesterPlugin
interface, and the second technique implements the plugin as a VDK command.
You can find more information about VDK plugins in the VDK official repository. You can also use the plugins already available in VDK!
The code for the example described in this post is in my GitHub repository.
Related Articles
You may also be interested in…
Bonus!!!
Read for free the first 20 pages of Chapter 8 of the book Comet for Data Science
Click here and select "Get a Sample!" You’ll download immediately the chapter as a PDF file!