The Datum VEA — Validate, Evolve and Anonymize your data with Data Schemas
In this article, we cover the integration of JSON-schemas in our data flows, the benefits of using them for VEA (Validation, Evolution & Anonymization) and how they can end up empowering all the company by bringing the data to the next level.
There exist multiple areas where we could apply Data Schemas, from unit-testing of data production systems to end-point level payload validation. In this post, we focus on how to detect and put invalid data into quarantine, how we evolve data into its latest schema version in a streamlined manner, and, how we generate de-identified, GDPR-compliant versions.
Quarantine infected data
High-quality data is crucial for producing great data products and for informing data decisions. If we allow, even in a small portion, the ingestion of non-compliant data in our system, we can end up corrupting our entire data lake.
By contaminating our data, we mean, having different types of data in the same place, different formats, unexpected content, or, unexpected events.
To avoid this, we will need to validate in real-time the data that we are ingesting and provide immediate feedback to the producers and consumers, so we can react to the situation.
It’s better to isolate wrong events than end up having a zombie data apocalypse where data cannot be consumed.
Evolution — Homogenize all data
The world never sleeps. It changes every time we observe it. That means we need to keep up our pace with its evolution by changing, redefining and improving the definition of it.
Today, we can represent health data in one format. However, this format is going to be deprecated soon (i.e FHIR, a standard for health care data exchange that has 4 versions), so we will need to provide new fields, change some content, and even change how we represent or understand the data. This will imply producing different data. To be able to reuse the previous data with the new one, we will provide a system to evolve the previous format.
Anonymise — Remove sensitive data
Nowadays, with GDPR in place, we take great care in dealing with the sensitive data collected from our users who consented the data usage for a specific purpose. More importantly, protection of sensitive data is not only a legal but an ethical requirement — with a strong internal ethical governance, we pay special attention to minimize the sensitive data while also minimizing the impact on the algorithm accuracy and on the quality of our services.
In Alpha health, we aim to solve health challenges by building models of human behavior, bring these models to people through engaging services to help them take more conscious control of their actions and to improve their life.
Since health data is very sensitive, we need to proceed with extreme caution to protect the people and their privacy. By anonymizing their data, we can build great products without compromising anyone.
We recommend taking a look at the Data Practices manifesto’s values and principles. Pay special attention to the 9th principle:
9. Consider carefully the ethical implications of choices we make when using data, and the impacts of our work on individuals and society.
For anonymizing data, we use JSLT Expressions too, which we will show in the following sections.
As we can see in the image above, the project structure is quite simple. Each JSON schema has its own evolution and anonymization JSLT expressions. They follow the same path structure and, for each version of the JSON schema, we provide the evolution from the previous version. Thus, we are able to update to the latest one.
Besides, we have the same structure in the test resources folder where we define multiple examples for each schema, so we can guarantee that all the schemas and JSLT expressions work as expected.
Schema Validator + Listener
As we explained before, the JSON Schema Validator is a really useful tool that validates JSON data based on specific schemas.
You can specify which schema you are going to validate against it or define the schema reference as a field in the event itself. So the consumer doesn’t need to know which type of event is being dealt with in advance.
We choose both approaches, we define the schema in the
schema property and then once we are going to validate the event we extract it from it, otherwise, we apply a default generic schema for them.
In the following code, we show how we validate an event by using its schema field and how we perform the validation by using the new Schema Validation Listener feature (which we recently contributed to).
One thing we detected when we launched our validation, it was the schema reading & building latency. To remove this latency, we decided to use the Loading Cache from Guava; a useful cache that comes with an expiration time and maximum size to avoid memory leaks.
Having a cache with all the schemas brings the benefit of building and reading them once. Additionally, the expiration time brings the facility to purge the least used ones from memory, freeing up memory for the most used ones.
Using a Schema cache reduced our average validation time from 6.3 ms to 1.4ms.
If you are more interested in an event being valid or not, instead of the error itself, we would recommend using the Early failure mode since it stops validating the event on the first failure encountered instead of scanning the full event.
Listening to your validations
Knowing if the data was valid or not was the first step to improve the quality of your data. However, since we had complex schemas, we required something more sophisticated to identify which subschemas were matching against our data.
By knowing these subschemas, we could develop better JSLT expressions to evolve & anonymize the events. The first approach was a JSLT expression full of switch statements, however, after merging our PR #242 adding the Validation Listeners feature to the JSON Schema Validator, we moved to more detailed JSLT expressions for each subschema, allowing us to avoid ugly switch cases and improve the reusability of them.
To know which subschemas were validating in each field, we modified the original library to be able to track these referenced and combined schemas that were matching with our events (see: PR #256).
One of the JSLT purposes is to provide a transformation language to convert between JSON formats. This transformation language can be used to evolve between schema versions or to anonymize sensitive data.
The JSLT expression from above shows how we evolve a Device sensor event from v0 to v1. The v0 was a prototype where its fields had short names in an unstructured format - they were missing the schema field and the created time was in timestamp instead of using a date format. By evolving into a new version, our consumers could start having a better clue about how the device sensor events were organized. Plus, it’s easier to read date fields than timestamp ones. Another reason for structuring the events with a user, device, product and data hierarchies was to start having some reusable parts of data between different types of events.
By using the Validation Listeners explained before, we obtain all the subschemas referenced in an event, and then for each one of them that has an evolution expression, we apply on its respective JSON path.
This is really useful when you need to evolve some, but not all, of the subschemas.
Lambda — Data Ingestion with VEA
Currently, our data ingestion pipeline is composed of a Kinesis endpoint, a lambda function to run the VEA and multiple Kinesis firehoses to dump all the incoming events to S3 based on their data origin and state.
Applying VEA on the Data Ingestion itself allows our data consumers to have the capability of consuming good data (already validated) and most importantly, homogeneous data (data coming from different versions being represented in the same format).
The good point about using Kinesis, Lambdas, and Firehose is the flexibility, scalability and speed with which we can implement our system. It allows this to go faster now, so we can go further later.
By defining all the infrastructure with Terraform.io, we are able to replicate our infrastructure in multiple environments with just one command. Plus we can keep track of our infrastructure changes since all our terraform code lives in our git repo.
Knowing how our VEA is performing in real-time is crucial in understanding how our users are interacting with the app and in which state/version we are receiving the data. For this reason, we track the performance with Datadog; a really good product to have full visibility of our applications with a great alerting system through email or slack.
Spark — The next evolution
Quite cool, right? We developed a system capable to Validate, Evolve and Anonymize events in real-time. However, what happens when a new schema version is provided? We need to evolve the outdated latest events to the new version.
Since VEA is in Scala, and we love Spark, we decided to reuse the VEA code as a UDF to rewrite the Latest events to the new format. Easy, cheap and fast.
Next steps + Summary
This was our first, but not the least, iteration for providing a Data Collector with VEA to our people. It has been a big step in homogenizing good data for developing and feeding great models.
The current lambda is reading from 4 different Kinesis and writing to 16 different Firehoses. We would like to simplify the ingestion by using one single API Gateway and one kinesis. This change shall allow us to remove our backend endpoint and reduce the amount of Kinesis Streams used to feed the lambda.
Besides, we need to reconsider how to handle the multiple number of firehoses that we have right now since we are not happy with the output format that these ones have. It would be better if Firehose wrote using the Hive partition format (year=YYYY/month=MM/day=DD) instead of the flattened one (YYYY/MM/DD).