Why combine asynchronous and distributed calculations to tackle the biggest data quality challenges?

Martin Delobel
Decathlon Digital
Published in
9 min readOct 10, 2023

In this article, co-written by Ayoub FAKIR, we’ll explain how ZIO and Spark can solve some difficult situations in data pipelines.

1. ZIO and Spark: Immutability, Asynchronous Calculations and Distributed Systems

a. What is ZIO?

In Scala, and more precisely in the functional programming world, ZIO is a powerful library that allows to deal with large concurrency and speed.

Purely functional, ZIO is designed to solve complex problems easily. You can handle every error, never leaking resources with asynchronous code. The library provides concurrent code that scales easily, without locks or deadlocks, with maximal laziness and resource safety.

By choosing ZIO, you embrace a safe and side-effect-free code. Its signature, ZIO[R, E, A] shows us that it has three parameters:

  • R — the environment type. The effect that we’re describing needs an environment that is optional (would be Any when optional). The domain describes what is required to execute this Task.
  • E — the error with which the effect may fail.
  • A — the data type that is returned in case of success.

b. What is Spark?

Apache Spark is an open-source, distributed processing system for big data workloads. It is widely used for executing data engineering, data science, and machine learning. It uses in-memory caching and optimized query execution.

Despite its endorsement in the majority of data engineering processes. It has some limitations. Only 1 DAG can be run at a time and not asynchronously.

c. Immutability and Distributed Systems: What a couple!

Immutability in programming is when a variable is created and assigned a value, it does not change throughout the execution of the program.

Generally speaking, immutability is a concept that is not new to the world of computer science. It has been around for a long time and has been used in many different areas. However, it is only recently that it has been gaining popularity in the world of distributed systems. We will explore the concept of immutability and how it can be used in distributed systems.

First off, immutability is the backbone of the so-called pure functions. Those functions are predictable. In other words, they always yield the same output given the same input. This also leads them to be composable, safer, and easily tested.

Now the idea of immutability is important in the context of Distributed Systems. One of the characteristics of such systems is that data is transferred all the time between servers and nodes. Traditionally it’s also one of the reasons why bottlenecks usually occurred during network and data transit. The benefit of immutability then is that we do not have to worry about a value being changed unexpectedly, for example, because of a bug. This allows us to stop worrying about implementing difficult and useless, in this context, concepts such as locks and synchronization, which are also costly in terms of performance.

Immutability also means having no mutable shared states. In Big Data systems, for instance, data is usually spread between different workers to perform a calculation. Having no mutable shared state means being able to effectively write concurrent or asynchronous code effectively.

Last, but not least, when done properly, code exclusively using immutability and pure functions makes it in terms concise, maintainable, and more expressive.

d. Asynchronous programming as a medium to scale.

A good system is a system that’s able to scale, ideally without unnecessary pain. Asynchronous programming is one effective way of scaling Big Data systems and allows us to achieve performance and better resource utilization.

To illustrate the better resource utilization part, the mix of Spark and ZIO has been beneficial for us.
Spark is a framework allowing us to distribute calculations on top of tools like Hadoop or Kubernetes, across multiple executors and machines. However, when having for instance, two instructions like the following:

spark.read.parquet(“s3://path/to/partition1”).count()
spark.read.parquet(“s3://path/to/partition2”).count()

Spark is only able to run those two instructions sequentially. So the second count() action is only going to run when the first one ends. Those two instructions are synchronous. This is a waste of resources because more often than not, the Cluster we’re running our calculations on is able to handle both instructions at once. Thanks to ZIO, we are able to make those two instructions run in parallel. In fact, we are forcing the Spark Driver to start two DAGs instead of one, using asynchronous programming, as follows:

for {
r1 <- ZIO.attempt(spark.read.parquet(“s3://path/to/partition1”).count())
r2 <- ZIO.attempt(spark.read.parquet(“s3://path/to/partition2”).count())
_ <- r1.join
_ <- r2.join
} yield()

r1 and r2 are fibers, which allow us to perform an effect without blocking the current process, submitting tasks for a new DAG in our case. In short, they activate the asynchronous superpower!

This example can be expanded to a variety of cases. For instance, if our program does some distributed calculations and needs to write data both to S3 and to a DynamoDB table, both Dataframe write operations can be done simultaneously, leading to better performance and resource utilization.

Asynchronous programming also helps to avoid blocking calls (when desired, of course). No need to wait for a response from another process if you don’t need to, ride on!

2. XML Data Cleaning, from garbage to diamonds!

Sometimes fate causes you encounter old and inappropriate formats, not suitable for efficient data processing , e.g., the cumbersome full-text XML one.

The spark-xml library offers us a very simple way to parse XML files with Spark. However, we do not apply any schema or verification on the reading process. The library only allows us to translate XML text into super-agnostic Scala Objects.

Suppose your objective is to make this data available, clean, reliable, and of high quality, the combination of the two technologies mentioned above is a good idea. Let’s deep dive into it.

The first step is the data model creation. With your uncontrolled and unstructured data source, this one will be a little bit special:

final case class UntypedTax(
Amount: Option[String],
Percent: Option[String],
ReasonCode: Option[UntypedReasonCode],
SequenceNumber: Option[String],
TaxGroupID: Option[String],
TaxRuleID: Option[String],
TaxableAmount: Option[UntypedTaxableAmount],
Rounding: Option[UntypedRounding],
_TaxType: Option[String],
_TaxSubType: Option[String],
_TypeCode: Option[String])

As you can see, we use the Option[String] type everywhere. This is the most agnostic type to avoid any problems during data loading. The idea is to be able to load everything, regardless of whether we need an Integer instead of a String, or whether the field is present in the original text file or not. The validation of the data is done afterwards, as we need to log every anomaly in our datasets. Remember, a data pipeline is only as good as its data quality features.

The objective is to go from the above “untyped” case class to a “typed” one below.

final case class Tax(
amount: Double,
percent: Double,
reasonCode: ReasonCode,
sequenceNumber: Int,
taxGroupID: String,
taxRuleID: String,
taxableAmount: TaxableAmount,
rounding: Rounding,
taxType: String,
taxSubType: String,
typeCode: String)

To do that, we will implement some data validators. Part 5 of this article is devoted to it.

3. Functional errors handling: Errors are a set of values, a state, not an enemy.

We all have been told that errors are bad. When your IDE shows red, it must be either bad or you are dumb. What if we told you that neither is true?

Errors are part of our program flows. A bug in the code may cause them, but external systems can also cause them.

  • What happens if your program tries to connect to a database that is not reachable?
  • What if you try to write to a filesystem and don’t have enough permissions to do so?
  • What if you have a memory leak, leading to memory problems you could not anticipate?
  • And what if you are transforming data from CSV files coming from a source you do not control, and some files are corrupt or have bad schemas?

All these questions are legitimate, and there are reasons for these errors. Steven Covey said in his book, The 7 Habits of Highly Effective People, that one should focus on his circle of influence. You can’t control the rain outside, but you can walk out with an umbrella. And so do we, as adept functional programmers. Errors can and will happen, especially when dealing with external systems like databases, filesystems, APIs, you name it. Our role is to effectively deal with those errors and treat them as they are states.

Here is an example. We have a schema composed of a set of column names, types and a bunch of CSV files to which we want to apply this schema and transform them to a better file format, like Parquet. Imagine a raw scenario where we try to apply the schema blindly using Spark. The program crashes when we encounter a line that does not fit our narrative. Imagine having 1 Trillion lines of data to process, and the processing fails for the 100 last lines before the end, after 10 hours of wasted resources? Pretty bad.

Functional Errors Handling, in this case, means that our program should be able to process the lines in those CSV files that match the schema and yield the proper results. Exclude the ones that don’t, and extract them separately into some dead queue, ideally with a reason why those lines could not be processed, e.g. a wrong type in one of the fields, corrupt line, bad number of fields, etc. Functional Errors Handling is just that. Do whatever is right and possible to do, and control what might cause a problem. In our last example, we would have successfully processed 1 trillion lines even with 100 lines of errors. You do the math. And we would have had a separate file, containing the 100 rows that have issues, for later debugging.

spark.read
.option(“sep”, “|”)
.option(“header”, “false”)
.option(“timestampFormat”, “yyyy-MM-dd HH:mm:ss”)
.schema(Encoders.product[D_CUSTOMERS].schema)
.csv(/path/to/customers).as[D_CUSTOMERS]

Another example is when dealing with external databases. Imagine connecting via JDBC to a database, and the connection times out. Maybe the database couldn’t respond temporarily, and if we could wait for 30 seconds, for instance, or retry a couple of seconds, the connection would have been successful, and our program would have run successfully. Functional Errors Handling helps us achieve that. It’s the powerful concept of monads that unlocks this powerful feature!

This can be easily achieved using the following ZIO code snippet:

for {
_ <- ZIO.attempt(readFromDB("jdbc_url"))
.retry(Schedule.recurs(3) && Schedule.exponential(30.second))
}

4. Use case: POSLog transactions validation in a distributed fashion!

At Decathlon, we use the POSLog format, a standard for transaction records. It is used for the physical and digital transactions of more than 1700 stores from 70 countries and more than 70 websites and marketplaces.

Our POSLog file comprises 340 fields, and each data producer is responsible for generating data. Depending on the maturity level of the country, we encounter some disparities, and it is a big data quality challenge for our data engineering teams.

Example of POSLog transaction file (XML)
Example of POSLog transaction file (XML)

The company’s data lake is based on Databricks Medallion Architecture. In this context, we implemented a data validation solution between the Bronze and Silver zones to have cleaned and conform data inside. At this point, we also transformed the XML to .delta, an evolution to the parquet format that allows us, in a nutshell, to treat data files as if they were tables, and opens up the door to different features, including editing data, merging, etc.

Decathlon’s datalake architecture

We chose ZIO Prelude for our data validator methods; in the below example, we are trying to pass from a Option[UntypedRetailTransaction] to a ValidationError or a RetailTransaction.

Example of RetailTransaction validator method (Gist link)

ValidationError is a small object that we created to store a couple of information like the fieldLog which is the name of the class (the object we are validating), the transactionId which is our key and the cause of the error (EMPTY_FIELD in this example). The aim is to send information to a DynamoDB table for logging, monitoring, and alerting.

Example of our DynamoDB table content

5. ZIO Prelude: Cascade Data Validation.

Now that we are familiar with our Validation[ERR, A] data type of Prelude, one can wonder: how does it scale exactly? What if I have nested objects that need to be validated?

All that’s needed is to call the root validator that will, in a cascade fashion, call the others, using the validateWith method:

def validateObject(...): Validation[Error, ValidObject] =
for {
validObject <- Validation.validateWith(
SubObjectValidator1
.validateSubObject(...),
SubObjectValidator2
.validateSubObject(...),
SubObjectValidator3
.validateSubObject(...)
} yield validOBject

Then, let’s say SubObjectValidator1 is also nested as follows:

def validateSubObject(...): Validation[Error, ValidSubObject1] =
for {
validSubOBject1 <- Validation.validateWith(
SubSubObjectValidator(...)
)
} yield validSubObject1

We only need to call validateObject once, and all the underlying validators will be called, and errors accumulated in the Error type.

6. Summary

In this article, we have shown the power of the mix between Apache Spark and ZIO to unlock and mix both powers of asynchronous and distributed programming, allowing us to work with complex data structures to clean and validate data at scale, in a Big Data architecture.

--

--