Part 2: Data as a Service: Simplifying Real-Time Data Ingestion and Delivery

Colin Barber
Loblaw Digital
Published in
5 min readSep 12, 2023

Co-authored by: Colin Barber, Vincent Zi and Indrani Gorti

Missed Part 1? Read it here.

In the fast-paced realm of e-commerce, data isn’t just a valuable asset; it’s the lifeblood that fuels innovation, enhances customer experiences, and drives operational efficiency. Imagine, for a moment, the vast amount of data generated every day in Canada’s largest grocery and e-commerce retailer, Loblaw Digital. From inventory management to personalized recommendations, the insights hidden within this data have the potential to reshape the way we shop and redefine the future of retail.

But, harnessing this data isn’t a simple task. It requires a sophisticated approach, one that goes beyond traditional data management. Enter “Data as a Service” (DaaS), a groundbreaking initiative undertaken by Loblaw Digital’s Data Engineering team. In their quest to ingest and deliver data with unparalleled efficiency and robustness, they’ve forged a unified framework, powered by Google Cloud Platform (GCP) services.

Understanding Data as a Service

DaaS is an automated and customizable framework that simplifies the ingestion and delivery of data. It eliminates the need for manual intervention and provides a streamlined workflow, allowing Data Engineering teams to spend less time on setting up and maintaining pipelines and focus on more important areas such as data modeling and cost optimization.

In the context of product metadata aggregation at Loblaw Digital (LD), the Product Domain Aggregator (PDA) leverages DaaS in order to consume real-time data from multiple different sources.

Automated Data Ingestion

DaaS employs automated data ingestion techniques to collect real-time data from various sources, including the Product Catalog Service (PCS), the Pricing and Promotions Engine (PPE), and the Helios Inventory Service (IS). Each data source publishes messages to PubSub topics. These messages are then ingested by streaming DataFlow pipelines.

In order to streamline the deployments, a singular GitLab repository houses the DaaS framework and Jinja templates are used to generate the CI/CD pipelines for each source. In this way, the same codebase can be re-used for each pipeline.

Icons by Icons8

Data Validation and Consistency

Data integrity is crucial for effective delivery. DaaS ensures consistency by validating the data model against a predefined Data Contract schema. This schema ensures that the data conforms to predefined rules. Collaboration with upstream teams helps maintain data integrity across the entire process. Check out Part 1 for a sample Data Contract similar to the ones used at LD!

When onboarding a new source, the Data Engineering team needs to work with the upstream team to model incoming data, based on business requirements specific to LD use cases. In Scala, these models can be captured as Case Classes. There are 3 main benefits to using Case Classes:

  1. We don’t need to rely on an external library, such as Pydantic in Python.
  2. With Scala we can return all errors at once when attempting to convert to a Case Class instead of failing fast at the first error. For example, we can know all missing required fields in a data model instead of just one. This makes pipeline error recovery faster.
  3. Case Classes are infrastructure agnostic, meaning that they can be used no matter the source or destination type (PubSub, BigQuery, BigTable, etc). Different Codecs are written to transform to and from the same Case Class for different destinations.

Once the data models are well defined, we can move onto the transformations within the pipeline.

Icons by Icons8

Fail-safe Ingestion

The diagram above shows the path each record takes in a real-time data pipeline. In order for data to pass from the Source to Destination, it must undergo the following transformations:

  • Source -> Raw
  • Raw -> Input or Error
  • Input -> Output or Error
  • Output -> Destination

We can define each state as such:

  • Source — PubSub message, BigQuery table row, Database record, Http request
  • Raw — String, Bytes
  • Input — Case Class describing the Source data model
  • Output — Case Class describing the Destination data model
  • Destination — PubSub message, BigQuery table row, BigTable mutations
  • Error — Rich data model describing different errors such as deserialization, validation, etc.

It is important to capture errors at every transformation step. Due to the functional programming nature of Scala, we are able to capture all errors at once instead of failing fast on the first error, which is the norm when using Python or Java.

Failed records are pushed to a deadletter table, along with the associated errors. Typically, we are trying to detect unexpected changes to the data model. When this occurs, we should be able to recover the failed records after updating the data model.

Icons by Icons8

Monitoring, Alerting and Data Recovery

When “bad” data arrives in the pipeline, it cannot be processed. If the data model is different than expected, we cannot deliver it to downstream consumers or their services might encounter an error.

At the same time, the upstream sources might not keep a copy of the data sent to us. This means that if we don’t recover the failed records, they will be lost forever. The solution for this is a robust Monitoring, Alerting and Data Recovery strategy. This is done in 4 steps:

  1. Sending failed records to the deadletter table with the associated errors.
  2. Alerting both upstream and downstream teams when there are new deadletter records.
  3. Working with the upstream team to update the data model.
  4. Reingesting the failed records from the deadletter table.

Now that we have covered all aspects of Data as a Service, we can take a look at a downstream application which uses DaaS pipelines in order to aggregate real-time product metadata!

Stay tuned for Part 3, all about Product Domain Aggregator.

--

--