Scala Types in Scio Pipelines

A pipeline. It’s probably not carrying data. Image via https://pixabay.com/en/pressure-water-line-pipeline-tube-509870

Data pipelines in Apache Beam have a distinctly functional flavour, whichever language you use. That’s because they can be distributed over a cluster of machines, so careful management of state and side-effects is important.

Spotify’s Scio is an excellent Scala API for Beam. Scala’s functional ideas help to cut out much of the boilerplate present in the native Java API.

Scio makes good use of Scala’s tuple types, in particular pairs (x, y). Its PairSCollectionFunctions add some neat, expressive functionality to the standard SCollection to compute values based on pairs.

That capability lets you write really concise code, but can make it hard to make sense of types in the middle of your pipeline. Using Scala’s type keyword to alias common types can bring more clarity to your code.

An Example: Counting in Access Logs

At this point, I think I need an example. Let’s say we’re processing simple web server access logs. I want to know how many times each user accesses each URL and the status code they received.

Here’s an example of a line from our logs:

1.2.3.4,bob,2017-01-01T00:00:00.001Z,/,200

We don’t need to worry too much about where the logs are coming from. Aside from this just being an example, Beam has numerous adapters for different data sources.

I first write a case class and a parse function to turn these useless strings of characters into something nicer to work with.

A parse function for simple access log lines

Now, we can build a pipeline starting with this parse function. We’ll build up the pipeline step by step, detailing the type signature at each point. The type at the end of the pipeline will be indicated with a comment on the next line.

sc.textFile(args("input"))
.map(AccessLog.parseLine)
// SCollection[AccessLog.Entry]

So far so good. Now, let’s map AccessLog.Entry onto the key we want to group by.

sc.textFile(args("input"))
.map(AccessLog.parseLine)
.map(x => (x.userId, x.path, x.statusCode))
// SCollection[(String, String, Int)]

Yuk. Now we need to remember that the first String is the userId, the second is the path and the final Int is the statusCode. It gets worse when we start aggregating, adding more complexity and numbers into the mix.

sc.textFile(args("input"))
.map(AccessLog.parseLine)
.map(x => (x.userId, x.path, x.statusCode))
.countByValue
// SCollection[((String, String, Int), Long)]

This is a very simple pipeline. When you’ve got something more complex it gets harder to keep track of what these types mean, and when you are working with more than one pipeline it’s harder still. The type system can help more than it is, so let’s use it.

Once More, With Type Aliases

OK, so let’s back up, and use Scala’s type keyword to make the type signatures a bit more useful. Our parsing function is a convenient place to introduce additional type information to flow through the pipeline.

Using type aliases to concisely add richer meaning to the parse function’s case classes

That’s it. Everything still type-checks, as the “real” types haven’t changed. Our new aliases will now flow through the pipeline code, allowing us to see what the types really mean at each point. Let’s retrace our steps and see how these new types help us out. This time, I’ll comment the types at each step for brevity.

sc.textFile(args("input"))
.map(AccessLog.parseLine)
// SCollection[AccessLog.Entry]
.map(x => (x.userId, x.path, x.statusCode))
// SCollection[(UserId, Path, StatusCode)]
.countByValue
// SCollection[((UserId, Path, StatusCode), Long)]

Much more useful than ((String, String, Int), Long) ? An IDE like IntelliJ (Ctrl+hover on Win/Linux, Command+hover I’m informed is the Mac shortcut) will tell you what types you’re dealing with as you code.

The type alias syntax is concise too, much better than having to create classes. It’s not a lot of extra thinking or typing for a significant increase in the amount of information you have as you’re writing or debugging a pipeline. The custom parse function early in the pipeline provides a neat starting point to inject this type information and have it flow through the rest of our pipeline.

Finally…

Data pipelines appear everywhere, from purpose-built distributed systems like Beam all the way to just mapping, filtering and folding over plain collections in your everyday Scala code. They often pull types apart into their component pieces to work their magic. Scala’s type aliases give you a concise, almost effortless way to track the meaning of those simple values as they weave their way through your pipelines.

We ❤️ feedback, so if you agree, disagree, have suggestions or have any examples of type aliases in pipelines please leave a response!