Streaming your batches

Why and how to convert file feeds into streams

Steve Jones
Data & AI Masters
Published in
8 min readMar 14, 2023

--

Designing new data feeds to just accept that they are receiving files and propagating those files through the architecture is one of the single dumbest things you can do in a new architecture. When you receive files you should look at how you convert them to streams and CDC, this will mean that as applications change you can isolate yourself from change, and it regularly has significant performance benefits.

REDS and the Body of Work

First out here have a read at some stage on the basic REDS pattern, this also includes a description of “Body of Work” which is super important when converting files to streams.

Why the conversion is important

The first thing to explain is why it is important to convert your files to streams. The answer is pretty simple:

Protecting downstream usage from change

If you build your downstream usage around files then its extremely hard to shift it to an event/streaming approach, whereas if a destination needs something more systolic and needs a file, then creating a file from a stream is a trivial exercise.

If you receive a file and then propagate that file you are pushing ETL thinking all the way up your architecture, forcing the next set of consumers to make the same set of decisions. You are also choosing to propagate the problem if that does change to use eventing or streaming and suddenly you’ve got to rebuild all of your pipelines to provide this faster and more effective way of data sharing.

The difference between pure Batch and doing Batch to Event, shows files being received, then the batch flow being timed processing at each stage for everything through to the destination, while the event breaks up the flows, uses BOW to do the PiT and the final write, but always based on events.
Batch to event is about breaking the time-based sync into events

Simply put the reason that the conversion is important is that by doing the conversion at the first moment possible you shift enable everyone else to build a modern data platform, even if you’ve received old school data.

How to do the conversion

Step 0: Stamp it

The first thing you should be doing is stamping the file and the individual records as you move through. If the source system has provided you with a timestamp then this is just a reception record, but if the source doesn’t give us a timestamp this is the first opportunity to record something that will help downstream.

Step 1: What changed?

You’ve received a file, is it a full-pull, is it a daily update, or a “micro-batch” with all the updates from the past hour? Either way your first challenge is to identify what are the actual change events that you are going to create, and what are the bodies of work you are going to enclose them with.

This is where the “Point in Time” view of current state matters, you are comparing the current state for the raw to the received records. For update files you might consider skipping this step, as we know that multiple applications of change only result in the same state. But if you’ve got a high collision rate then this can save time.

Where it really saves time however is on full-pulls. On one program we received files with hundreds of millions of records every day, the previous system and then pushed that all through the ETL pipelines before loading. The new system did a diff and just propagated the changes, which was less than 0.5% of the previous volume. That was a huge saving in processing time.

There are many and varied ways to do the diff. But my two personal favourites are the SHA-1 hash comparison and the timestamp check. If the source system has an “updated timestamp” against a row, then use that. If it doesn’t then when you receive data calculate the hash by concatenating all the fields as a string, then apply the hash function. Either way store a key/value pair of Primary Key and Hash or Timestamp, and order by primary key to give you the fastest possible lookup for the diff.

Step 2: What are my bodies of work?

Next up is calculating the bodies of work, bodies of work are how you ensure that the destination state of a repository is valid, its ensuring you don’t have partial applications. If your received files have transaction IDs in them, unlikely, then you can use those. Otherwise you have two choices

  1. A file is a body of work
  2. Calculate smaller bodies of work

The important element to remember here is that a body of work can span multiple files, for instance an order and its order lines. If you have timestamps you can potentially use a windowing approach, but just be aware that you risk adding complexity for little value. So most of the time I’m going to use the files as the body of work, at least to start with.

Flow showing three files being received (Order, Order Lines, Product), a diff being done on each one, and then a Body of Work surrounding all three, with a sub-body of work around the Order and Order Lines

So here we create a body of work that includes the product updates, and a sub-body of work that ensures orders are loaded before order lines, and that the state is only updated when both are complete. If you can perform more strip down analysis and create individual order/order line SBOWs it can help you down the road, but the main thing here is having the context of bodies of work, important in streaming, linked to events.

Step 3: Stamp it and send it

Finally you’re going to stamp a record saying that you’ve done the diff, record the number of records you are sending out, the BOW and SBOW sizes, then send out the events. You can do this in three ways:

  1. The Diff file is the event
  2. Send them individually using the BOW/SBOW tags
  3. Send an event that references the Diff file to pickup

Now if you’ve got a LOT of events so its still pretty hump centric then 1 and 3 are liable to be your best options. But if you can pick 2 then it helps in future, and if you are doing BOW/SBOW with events properly then you can do the collation on reception if you need to. So I’ll assume you are doing option 2…

How to receive diff files as events

The other side of this is pretty easy, we’re receiving events which have an SBOW/BOW attached, which means the following incredibly complex process must be done

  1. Append the events to the history files
  2. Confirm if all the events within an SBOW/BOW have been received
  3. If no, keep going unless you’ve hit an error condition
  4. If yes, then mark the SBOW/BOW as able to be viewed and include within the current Point in Time view

The error conditions are what you operationally decide means an SBOW/BOW hasn’t been received correctly. If for instance you said there were 6 order lines, and you received 5 in a minute, but 5 minutes later the SBOW/BOW is still open, that could be an error. You might flag an error if you receive something for a subsequent BOW when you haven’t closed the old one, but be aware that makes assumptions on your underlying eventing infrastructure, so a time-window is always a plan.

Collating before writing — performance hack

One thing I will mention is that collating before writing using events and BOW/SBOW is an easy performance hack. What this basically means is that rather than do the write as a single append per record, you construct the SBOW/BOW in memory first and then write out to the file in a single hit, or use a temporary high-speed store to cache the SBOW/BOW as you construct it. Then you only perform the write on completion. You still have the same error conditions, but you are doing the append to the file in a single hit. The disadvantage of this hack is that it means you make the event steps systolic rather than streaming, but depending on the technology you are using it can have significant performance benefits.

One sub-option in this is to flow the events and transformations all the way through Lazy Conformance and have the bulk write on the destination store only. This means the individual events flow through the architecture but the final state application is done as a large hit when you are talking about user stores. I like doing it this way because the risk of data corruption with CDC that has a proper BOW/SBOW approach is very low, although the history of the events might contain data that isn’t valid, the state of that store does not.

Just to note, some of the more modern data engines out there have some capabilities around PiT calculation and being able to mark BOWs as complete that mean you should check before you optimize, in case your optimization is actually fighting against the engine.

Events mean subscribers, which means users under control

The other reason you want events is that if you’ve built your architecture to support subscriptions to data then propagating change is how people are going to want to receive data. By isolating the inputs and building in the foundations that industrialize the accurate creation of current state then you isolate the destinations from risk. You don’t make end-users have to switch between “we get that file every day at 1pm, so I need to set up a timed load” and “Oh that feed is coming in as events, I need to work out when the state is valid”.

Having people subscribe to events for data products and having that propagate automatically into stores gives you a single pattern which you can industrialize end to end. If you choose to propagate batch, micro-batch and other approaches you are choosing to force destinations to engineer multiple approaches and your infrastructure to be much more visibly. More visible infrastructure means people like to make more choices.

You are choosing to shift to a CDC and event driven data infrastructure, not just because its the most reliable, easiest to patch and most traceable way to do this. Not just because it enables next generation AI use cases to be integrated directly into information flows.

You’re doing it because it reduces choice in a place where choice destroys value.

So, invest a bit in those batch streams and shifting them to CDC, and if you really can’t do that, fake it, fake it so you’re the one who deals with it, and isolates everyone else from change through industrialization of the data foundations.

DALL-E’s view of the Greek God Mercury looking for change to pay at a tollbooth
You wouldn’t want Mercury to use a Tollbooth (from DALL-E)

--

--

Steve Jones
Data & AI Masters

My job is to make exciting technology dull, because dull means it works. All opinions my own.