How to Cook Your Data Properly with Great Expectations, An Open Source Data Quality Tools
Importance of Data Quality
As a data engineers, there are some challenge we across when implementing an ETL/ELT raw data into data warehouse. Common challenges faced by data engineers are like incomplete data, inconsistency data, or even duplicate data. This problem occurs due to poor data quality from the received data. Poor data quality may results a poor business decisions that can drain time and money, causing a loss revenue for the whole organization or company.
Data quality (DQ) is a term used to describe the data by checking their characteristic. There are a lot of defined characteristic which reflects the data having a good quality or not. The widely known characteristic to describe DQ is accuracy, reliability, consistency, and completeness.
- Accuracy: a dimension of DQ that looks at how relevant and accurate data is over time in real-world applications.
- Reliability: a dimension of DQ that having an information matches and does not contradict with other sources or systems.
- Consistency: a dimension of DQ that indicates how consistent and precise the data is throughout a dataset.
- Completeness: a dimension of DQ that assesses the completeness of a dataset by determining if all anticipated data is included.
With DQ implementation can help us data engineers to detect a level of data quality and can take action from it to resolve the issue. Data Quality very possible to be implemented before or after the data is processed to ETL/ELT pipeline, depending on the business process from the ingested data.
Great Expectations
To help us creating a DQ without from scratch, there are a lot of DQ tools with their additional feature to gives more insight and provides some built-in action.
Great Expectation (GX) is an open-source DQ tools that can be implemented using Python Programming Language with their official Python Library. GX comes up with ability to validating, documenting, and profiling on given data.
For the validation itself, it has various test case (function) that already created by GX library. Using a concept like “assert what you expect” to define which expectation should be comply on given data. All these test case catalog can be seen their availability in the Expectation Gallery.
Before we implement GX, keep in mind that there is two different of GX products, which is GX Cloud and GX Core. GX Cloud gives the intuitive experience of a fully managed SaaS solution, or can be called as their commercial product. In the other hand, GX Core is the open source tools that can be used in Python Programming Language or in any Python Notebook, such as Jupyter Notebook.
Setup Sample Data & Environment
I recommend to use a free dataset for the sample data, a publicly dataset can be seen in Kaggle or any other website that provides free datasets.
After getting the sample data, i suggest to start creating a project with a Python virtual environment since we implement the GX with a Python script. Also to avoid conflicts with libraries and dependencies installed for other Python projects. Then install all the package requirements to implement the GX, this requirement below is the library with current version that i use to implement the GX.
- Python — 3.10.12
- Pandas — 2.1.4
- Great Expectations — 1.0.5
Assuming that Python is already installed, and the workspace already activate the Python virtual environment. Next is install the GX library (GX Core)
pip install great_expectations
Those command above also cover the pandas library, so with GX installation only will complete the pre-requisites.
There are some additional dependencies if we want to use database or cloud services to connect and get the data for DQ check. All available additional dependencies can be seen here.
Implementation of GX
Start by create an empty folder with any name like script/ and Python file inside of that folder with any name, for instance like gx_main.py.
mkdir script && cd script
touch gx_main.py
Create a data context
Data context is like a profile that saves the selected data, selected test case, and any configuration needs when implement the GX.
Since this is an inital project, and unlike GX Cloud, GX Core doesn’t have existing data context, so all scripts or projects that utilize GX Core should start with the creation of a data context.
Back to the previous python script that already created before, set the code below to gx_main.py and run the code.
import great_expectations as gx
import shutil
import os
# Remove previous context (necessary when re-create data context)
if os.path.exists("./context/gx"):
shutil.rmtree("./context/gx")
# Create GX data context
context = gx.get_context(mode="file", project_root_dir="./context")
print(type(context).__name__)
The output command from the code should be:
FileDataContext
“FileDataContext” is the selected mode when creating the data context. According to GX documentation, there are two types to define the data context mode.
- File Data Context: Data Context that stores configuration information as YAML files within a file system. Makes the current data context to be used again (or used to other python script) with same configured Expectation Suites, Data Sources, and Checkpoints.
- Ephemeral Data Context: Data Context that stores configuration information in memory. This mode intentionally for environment with lack of permission like writing to a file system in the environment, or just want to validate ingested data temporary without saving the result.
Create data source
There are a multiple way for GX reads your data to implement the DQ. According to GX documentation, GX can read/connect your data by via SQL, File System, or Data frame.
By connect the data means creating the data source to existing data context. This code using the File System connection from Google Cloud Storage to get the data source. Set the code below to file gx_main.py and run the code.
# Define data source parameters
data_source_name = "gx_from_gcs"
bucket_name = "$BUCKET_NAME"
gcs_options = {}
# Create GX data source
data_source = context.data_sources.add_pandas_gcs(
name=data_source_name,
bucket_or_name=bucket_name,
gcs_options=gcs_options
)
print(data_source)
The output command from the code should be:
bucket_or_name: $BUCKET_NAME
gcs_options: {}
id: afc092eb-****-****-****-0cd2ff0bdf32
name: gx_from_gcs
type: pandas_gcs
Create data asset
Data Asset is a collection of related records within a Data Source. For connecting data using File System, GX provides two ways of data asset, which is File Data Assets and Directory Data Assets.
Currently Directory Data Assets only supports for data frame that generated using sparks. Since this implementation using pandas as Data frame, so File Data Assets is the proper type.
# Define data asset parameters
asset_name = "World Population"
gcs_prefix = "temporary_article_purposes/"
# Create GX data asset to data source
file_csv_asset = data_source.add_csv_asset(
name=asset_name,
gcs_prefix=gcs_prefix,
gcs_recursive_file_discovery=True
)
print(file_csv_asset)
The output command from the code should be:
connect_options:
gcs_prefix: temporary_article_purposes/
gcs_recursive_file_discovery: true
id: afc092eb-****-****-****-0cd2ff0bdf32
name: World Population
type: csv
Create batch definition
A Batch Definition selecting which records needs to implement the DQ from created data asset. Batch Definitions can be configured to either provide all of the records in a Data Asset, or to subdivide the Data Asset based on a date.
Based on the pre-requisites that have been done, the selected record will be from free dataset source in Kaggle called World Population Dataset.
Since data source is defined from Google Cloud Storage, so assuming the dataset already uploaded into Google cloud Storage within specific path in given code.
# Define batch parameters
batch_definition_name = "World Population 2022"
batch_definition_path = "world_population.csv"
# Get GX batch definition to data asset
batch_definition = file_csv_asset.add_batch_definition_path(
name=batch_definition_name, path=batch_definition_path
)
batch = batch_definition.get_batch()
print(batch.head())
The output command from the code should be:
Rank CCA3 Country/Territory Capital Continent 2022 Population 2020 Population 2015 Population 2010 Population 2000 Population 1990 Population 1980 Population 1970 Population Area (km²) Density (per km²) Growth Rate World Population Percentage
0 36 AFG Afghanistan Kabul Asia 41128771 38972230 33753499 28189672 19542982 10694796 12486631 10752971 652230 63.0587 1.0257 0.52
1 138 ALB Albania Tirana Europe 2842321 2866849 2882481 2913399 3182021 3295066 2941651 2324731 28748 98.8702 0.9957 0.04
2 34 DZA Algeria Algiers Africa 44903225 43451666 39543154 35856344 30774621 25518074 18739378 13795915 2381741 18.8531 1.0164 0.56
3 213 ASM American Samoa Pago Pago Oceania 44273 46189 51368 54849 58230 47818 32886 27075 199 222.4774 0.9831 0.00
4 203 AND Andorra Andorra la Vella Europe 79824 77700 71746 71519 66097 53569 35611 19860 468 170.5641 1.0100 0.00
Create a test suite and test case
Sometimes ingested data needs to check their quality with multiple cases, like value must not be null, column is exist, record is unique, or any other else. To combine all the applied test case for a given set of data, it can be implemented using Expectation Suite.
from great_expectations import expectations as gxe
# Create GX expectation suite (test suite)
suite_name = f"{data_source_name}_suite"
suite = gx.ExpectationSuite(name=suite_name)
# Add GX expectation suite to context
suite = context.suites.add(suite)
After defining the expectation suite, set the test case into the created expectation suite with desired test case. In GX, test case is called an Expectations, simply it’s a test case or expected case for our ingested data.
Just like explanation before in previous section, GX comes with built in test cases to cover data quality needs in general, All existing built in from GX can be seen in this Expectation Gallery.
This implementation will use various test case that can be seen in the code below.
# Create GX expectation (test case)
all_test_case = []
# Check Comparison Value
all_test_case.append(
gxe.ExpectColumnPairValuesAToBeGreaterThanB(
column_A="2022 Population",
column_B="2000 Population",
or_equal=True
)
)
all_test_case.append(
gxe.ExpectColumnPairValuesAToBeGreaterThanB(
column_A="2000 Population",
column_B="1980 Population",
or_equal=True
)
)
# Check Data Structure
all_test_case.append(
gxe.ExpectColumnToExist(
column="Rank"
)
)
all_test_case.append(
gxe.ExpectColumnToExist(
column="CCA3"
)
)
all_test_case.append(
gxe.ExpectColumnToExist(
column="Country/Territory"
)
)
all_test_case.append(
gxe.ExpectColumnToExist(
column="Capital"
)
)
all_test_case.append(
gxe.ExpectColumnToExist(
column="Continent"
)
)
all_test_case.append(
gxe.ExpectColumnValuesToBeOfType(
column="Growth Rate",
type_="float64"
)
)
all_test_case.append(
gxe.ExpectColumnValuesToBeOfType(
column="World Population Percentage",
type_="float64"
)
)
all_test_case.append(
gxe.ExpectTableColumnCountToEqual(
value=17
)
)
# Check Data Value
all_test_case.append(
gxe.ExpectColumnValuesToBeUnique(
column="Rank"
)
)
all_test_case.append(
gxe.ExpectColumnValuesToBeUnique(
column="CCA3"
)
)
all_test_case.append(
gxe.ExpectCompoundColumnsToBeUnique(
column_list=[
"Rank", "CCA3", "Country/Territory",
"Country/Territory", "Capital", "Continent"
]
)
)
all_test_case.append(
gxe.ExpectColumnSumToBeBetween(
column="World Population Percentage",
min_value=99,
max_value=100
)
)
all_test_case.append(
gxe.ExpectColumnValuesToNotBeNull(
column="Rank"
)
)
all_test_case.append(
gxe.ExpectColumnValuesToNotBeNull(
column="CCA3"
)
)
all_test_case.append(
gxe.ExpectColumnValuesToNotBeNull(
column="Country/Territory"
)
)
all_test_case.append(
gxe.ExpectColumnValuesToNotBeNull(
column="Capital"
)
)
all_test_case.append(
gxe.ExpectColumnValuesToNotBeNull(
column="Continent"
)
)
all_test_case.append(
gxe.ExpectColumnValuesToNotBeNull(
column="Growth Rate"
)
)
all_test_case.append(
gxe.ExpectColumnValuesToNotBeNull(
column="World Population Percentage"
)
)
for test_case in all_test_case:
suite.add_expectation(test_case)
Even though is quite challenging to understand all of the expectations above, but some test cases use the same function (only use different columns or parameters).
For the reminder this is just an example expectation to checking the DQ within the ingested data. All of the used expectation can be seen with the simple description below:
- Expect Column Pair Values A To Be Greater Than B
Case to check if values in column A is greater than column B. - Expect Column To Exist
Case to check for the existence of a desired column. - Expect Column Values To Be Of Type
Case to check a column to contain values of a desired data type. - Expect Table Column Count To Equal
Case to check total number of columns equal as desired value. - Expect Column Values To Be Unique
Case to check if each value in specific column to be unique. - Expect Compound Columns To Be Unique
Case to check if each combine value in multiple column to be unique. - Expect Column Sum To Be Between
Case to check total value in specific to be in desired range. - Expect Column Values To Not Be Null
Case to check if each value in specific column to not null (empty).
Looking back to what are the characteristics that represent the level of DQ that already described from the previous section. Some test cases used in the code above should have covered some characteristics. For example, function to check data completeness is ExpectColumnValuesToNotBeNull(), then function to check data consistency is implemented in two function like ExpectColumnValuesToBeOfType() and ExpectColumnToExist().
Despite GX provided built-in function expectation (test case), GX also provides a feature to allow creating a custom expectation. Creating a custom expectation can be seen here.
Create a validation definition
validation definition is a wrapping between ingested data (in batch definition process) and selected expectation suite. It is like defining which data should be test and which test suite should be use.
# Create GX validation definition
definition_name = f"{data_source_name}_definition"
validation_definition = gx.ValidationDefinition(
data=batch_definition, suite=suite, name=definition_name
)
# Add validation definition to context
validation_definition = context.validation_definitions.add(
validation_definition
)
# Run validation (run test suite)
validation_results = validation_definition.run()
print(validation_results)
After create the validation definition, run the validation to execute the test suite, then every result of the test case can be seen when printing the validation run result.
The output command from the code should be:
{
"success": true,
"results": [...],
"suite_name": "gx_from_gcs_suite",
"suite_parameters": {},
"statistics": {
"evaluated_expectations": 21,
"successful_expectations": 21,
"unsuccessful_expectations": 0,
"success_percent": 100.0
},
"meta": {
"great_expectations_version": "1.0.5",
"batch_spec": {
"path": "gs://$BUCKET_NAME/temporary_article_purposes/world_population.csv",
"reader_method": "read_csv",
"reader_options": {}
},
"batch_markers": {
"ge_load_time": "20240929T070633.736572Z",
"pandas_data_fingerprint": "2f49d7367f62ab004fd0a4679a76e823"
},
"active_batch_definition": {
"datasource_name": "gx_from_gcs",
"data_connector_name": "fluent",
"data_asset_name": "World Population",
"batch_identifiers": {
"path": "temporary_article_purposes/world_population.csv"
},
"batching_regex": "(?P<path>temporary_article_purposes/world_population.csv)"
},
"validation_id": "afc092eb-****-****-****-0cd2ff0bdf32",
"checkpoint_id": null,
"batch_parameters": null
},
"id": null
}
All the expectation case is stored in key results
, sadly it is really painful to see all the results in a plain text with dictionary formats. Fortunately, GX already covered this problem and provide a feature that can translate all the result above into readable documentation static web pages.
GX Data Docs
This process will generate an html web pages and can be stored the backend to specific directory with provided environment. The Store backend can be defined to local environment or cloud services environment that can be seen in GX Documentation.
Create a checkpoint
Generally a checkpoint in GX created to executes one or more validation definition, then proceed to execute a set of actions based on the validation results for each given validation definition. One of the action after running the validation definition is updating the data docs.
So to make sure the data docs is updated when validation definition is complete, this checkpoint will add an action to update data docs with this following code.
action_list = [
# This Action updates the Data Docs static website with the Validation
# Results after the Checkpoint is run.
gx.checkpoint.UpdateDataDocsAction(
name="update_all_data_docs",
),
]
# Create GX checkpoint
checkpoint_name = f"{data_source_name}_checkpoint"
checkpoint = gx.Checkpoint(
name=checkpoint_name,
validation_definitions=[validation_definition],
actions=action_list,
result_format={"result_format": "COMPLETE"},
)
# Add GX checkpint suite to context
checkpoint = context.checkpoints.add(checkpoint)
From the code above, there is an argument called result_format
to be passed when creating a checkpoint. Result format is a level detail for your validation definition result. List of the result format that provided by GX can be seen in GX Documentation.
This implementation will use a COMPLETE result format which means validation results of each test cases giving a result with all available information to explain why it failed or succeeded.
Generate data docs
Run the following code below then run for the final implementation.
site_config = {
"class_name": "SiteBuilder",
"site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
"store_backend": {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": "./data_docs"
},
}
# Add data docs to context
site_name = f"{data_source_name}_docs_site"
context.add_data_docs_site(site_name=site_name, site_config=site_config)
result = checkpoint.run()
Pay attention to store_backed.base_directory
key from the code above. It’s a defined path where to store the html web pages to local environment of your workspace. Since the code above defined with ./data_docs
path, so after run the code the folder/directory will be existed in the current workspace.
Just find the ./data_docs
directory, inside of it have html file called index.html
. To open the GX data docs, simply click the html file and open it with the browser application. Sample of GX data docs can be seen below.
The home page of GX data docs is a list of all validation definition result. Since this implementation only create a one validation definition, so there is only one result from the page above. To see the detail or explanation for the validation definition, just click the row result and the pages will be directed to other pages that contains all the created test case.
References
https://www.clearpointstrategy.com/blog/data-quality-metrics
https://www.precisely.com/blog/data-quality/5-characteristics-of-data-quality
https://symbio6.nl/en/blog/why-is-data-quality-important#characteristics
https://www.simplilearn.com/data-quality-article
https://medium.com/snowflake/how-to-ensure-data-quality-with-great-expectations-271e3ca8b4b9
https://blogs.halodoc.io/data-quality-pipeline-using-great-expectation-halodoc/
https://www.kdnuggets.com/2023/03/data-quality-dimensions-assuring-data-quality-great-expectations