Handling big, nested, and growing table schemas in data pipelines

Michał Mstowski
Aug 20 · 9 min read

Complicated data schemas are common issues in companies. They can occur when a business unit wants to add every detail to an entity, and then store it all together in one row of data. Unfortunately, data engineers are typically not a part of the domain design process, and often we are not able to convince decision makers to change it after it’s established. As problems with this type of data schema can occur on each step of the data pipeline, it’s worth spending more time on the design phase to make conscious decisions. In this post, I want to describe how we built a pipeline for this type of “incoming data” situation, and how we came up with a good solution in the end.

Ingesting, storing and serving the data

We faced this kind of challenge recently in our data team. On the ingestion diagram that follows, you can see the simplified version of the pipeline we had to set up. We created a flow that reads the transaction events from Kafka, and writes them to HDFS (as order events). After this, our job was to set up a daily job to collect all transaction IDs from the previous day, call the REST API for the necessary details, and save the data returned by the API to HDFS.

This sounds like a typical data ingestion case, but it turns out that it wasn’t so simple. The data returned from the API contains details about everything that’s connected with a transaction, such as a detailed product list, payment list, fulfillment, and additional notes or errors returned by the till. The API basically aggregates all the available details from many company services. Our job was to not only ingest and store that data, but to also serve that data to business users with solid performance. Serving this data means creating specific views which are typically built on one column from the root level of the nested structure(e.g., product, payment, etc.).

Below you can see a very small part of the schema that we derived from one day of data. Even with this small amount of data, you can see that the number of nested elements is big, and sometimes it’s hard to understand what’s inside.

You can see that in some cases we have arrays of structs that hold more arrays of structs. Of course the schemas can vary between records, and to make the problem even more interesting, the API team can add new fields in the future. And there’s one final complicating factor: the schema we’re working with is actually about 30 times larger than that.

| — products: array (nullable = true)| | — element: struct (containsNull = true)| | | — created: string (nullable = true)| | | — description: string (nullable = true)| | | — fulfilment: array (nullable = true)| | | | — element: struct (containsNull = true)| | | | | — id: string (nullable = true)| | | | | — status: string (nullable = true)| | | | | — quantity: struct (nullable = true)| | | | | | — lineIds: array (nullable = true)| | | | | | | — element: string (containsNull = true)| | | | | | — number: string (nullable = true)| | | | | | — measure: string (nullable = true)| | | | | | — size: string (nullable = true)| | | | | — products: array (nullable = true)| | | | | | — element: struct (containsNull = true)| | | | | | | — id: string (nullable = true)| | | | | | | — quantity: struct (nullable = true)| | | | | | | | — ids: array (nullable = true)| | | | | | | | | — element: string (containsNull = true)| | | | | | | | — number: string (nullable = true)| | | | | | | | — measure: string (nullable = true)| | | | | | | — reason: string (nullable = true)| | | | | — tracking: struct (nullable = true)| | | | | | — number: string (nullable = true)| | | — gtin: string (nullable = true)| | | — lines: array (nullable = true)| | | | — element: struct (containsNull = true)| | | | | — created: string (nullable = true)| | | | | — id: string (nullable = true)| | | | | — lastUpdated: string (nullable = true)

How to ingest the data?

As the data integration team, we maintain a lot of data ingestion pipelines, and all the pipelines are Spark jobs. They run daily, and collect the latest available data.

In the code snippet below you can see a simplified version of a typical ingestion job. We have two case classes defined, one that describes the source data, and a second one that describes what the data should look like in our system. Next, there is a reading function that reads data from an external source, which in this case is our REST API. Then there’s a mapping function that makes the adjustments so the data fits our standards, such as changing column names to fit our naming conventions, and adding additional data about the ingestion, such as the ingest timestamp. After that, the only thing left is to write the data to the storage system, and in this case we write it as parquet files to HDFS, and build a Hive external table.

case class DataSource(???)
case class DataTarget(???)

def readData(source:String): Array[DataSource] = {
// read data from externa source or from hdfs
}

def mapData(sourceDS: Dataset[DataSource]): Dataset[DataTarget] = {
// change column names, add ingestion timestamp etc.
}

import spark.implicits._
val source = ???
val targeTable = ???

val sourceData = readData(source)
val targetDS = mapData(sourceData.toDS)
targetDS.writeTable(targetTable)

If you’re comfortable with this process, you may have noticed that in this example that we’re missing two parts of our typical pipeline: we don’t know what the schema of the source data is, and we don’t yet know what the schema for the target data will be. This is because the data returned by the API is a compaction of data from a few different services.

In regards to the first problem, we could derive the schema from the ingested data by creating a case class that describes it, but the problem with this is that we won’t be sure that in the future additional data won’t be added. If that were to happen, we might lose that data when reading from the API, which would be quite a problem. For instance, imagine that in the future a business analytic person comes to your team and asks for the additional data that should be available. If your case class wasn’t capturing that data in the ingestion pipeline for the last few months and this data was no longer available through the API, you’d be in a situation where you can’t ingest it one more time. If this happens, your team will be in hot water.

Attempting a JSON solution

A second idea that came to our minds was rather simple. Because the API returns its result as JSON, we also can write JSON to HDFS, and then build an external table on top of that data instead of using parquet. We can also use GZIP compression for the JSON files so they would have a comparable size to parquet files that are compressed using the default, SNAPPY codec. By saving all the data we retrieve and then deriving a schema from it, we could create a Hive table definition that contains all the fields that we find in the data until this point in time.

Initially this looked like a perfect solution, because we wouldn’t lose any data; we would be able to add the fields to the table description, and for all rows containing them, data would be available. Also, we could create case-specific views using this table.

Unfortunately, it all went downhill from there. The table we made, as well as the views, were worse than useless. Querying the views took ages, and often failed.

There were two reasons for this poor performance. The first one is that JSON is a row-oriented structure, which in this case means that even if we build the view using a single array, the whole JSON row needs to be read every time. The size of some rows were enormous, and both Spark and Hive had problems reading them.

I previously mentioned the second issue: GZIP compression. The difference between GZIP and SNAPPY is that the first one is not splittable at all, and the second one is built using splittable blocks. On our platform, we are trying to keep file sizes on HDFS big so as not to clutter the HDFS metadata. This means that when Spark reads a file created by us — which are 2–4 GB in size — it has to read all HDFS blocks on one executor. Of course, this often ended up exceeding executor memory, quickly followed by failure. But even if it did succeed, it was too slow, because it wasn’t using the benefits of distributed processing. Spark was processing everything on one or few executors, and wasn’t using the total power of the system

After that spectacular failure, we were thinking about replacing views with tables, and creating daily scheduled transformations for each table. But we didn’t even start this process, because there were over twenty views at that time, and that number was still rising. So if we took this approach, we would have to create and maintain over twenty additional flows each day. Maintaining these jobs would be a huge extra load for our team, and the potential maintenance effort required by this approach could eat into the time allocated for our development work if and when there was any sort of infrastructure failure.

Getting best of both worlds

In the end we came up with an idea of how to reap the benefits of both formats, JSON and parquet: we simply used them both.

The first step of the data ingestion pipeline stayed the same as described in the previous section, i.e., reading the data from the API and saving JSON files to HDFS. Then, to resolve the performance issues, we added an additional step in the pipeline: we derived the current schema from all the data we had up until this point in time. Using that schema, we read the newest JSON files daily, and saved them to another location as parquet files with the schema defined.

This simple design gives us the performance we want, and we’re also not prone to lose any data In the JSON files we store everything we get from the API, and even if someone adds a new field to the API response, we can update the schema for the parquet creation job, and rerun it for the whole dataset. Of course, because of the data volume this particular job costs a lot, because we have to scan the table and copy it all at once. But it’s not something that has to be run very often.

As a point of comparison, for the views we built on top of the JSON data, the whole dataset was scanned every time the view was queried. Also, with this new solution, we can easily build views on top of the parquet data, with all benefits that parquet gives. Most views are built on only one array from the root level of the nested structure. This means every time a user queries one view, he reads only around 5% of the data stored in one row. Of course the performance benefit of this change was enormous. Now users can query the data we serve for them with no unexpected failure, and with good query speed.

One last important note is that this approach reduced the number of batch jobs we run to only two. This greatly reduces the time we have to spend on maintenance, because if there are any issues, we don’t have to re-run several dozen jobs, just the two.

Conclusion

In conclusion, by understanding both the data and performance requirements of this particular big data ingestion problem, we were able to iterate on potential designs until we arrived at an optimal solution for our client. We achieve the desired data redundancy with the compressed JSON files, high performance queries with parquet tables and views, robustnes to schema changes and low maintenance cost. The final solution, described in this post, includes a combination of Kafka, Spark, Scala, HDFS, and parquet tables and views.

VirtusLab

Virtus Lab company blog