Exploring Beam SQL on Google Cloud Platform.

Did you know that a new feature was recently rolled out for Apache Beam that allows you to execute SQL directly in your pipeline? Well, don’t worry folks because I missed it too. It’s called Beam SQL, and it looks pretty darn interesting.

In this article, I’ll dive into this new feature of Beam, and see how it works by using a pipeline to read a data file from GCS, transform it, and then perform a basic calculation on the values contained in the file. Far from a complex pipeline I agree, but you’ve got to start somewhere, right!

<tl;dr> You can now inject SQL directly into your Apache Beam/Dataflow pipeline (using the Java SDK only), which means having to write a lot less code. It’s still very new and probably not ready for production workloads just yet. But, it’s certainly something to keep an eye on as it quickly matures. Full source code for below example here.


Procrastinating

Yes, I also own a hammock just like Homer’s

After seeing Beam SQL pop up in the docs a few months ago, I’d been meaning to test drive it for a while. But, being summer here in Australia, I was finding it rather hard to pull myself away from lounging around in my hammock all day. I was also really busy baby proofing the house (again) for the arrival of our latest ankle-biter in April.

But this week I found some time between commuting to work and meetings to finally have a proper poke around with Beam SQL, and put it through its paces. So, what’s it all about it then? Well, a good place to start is the official documentation blurb:

Beam SQL allows a Beam user (currently only available in Beam Java) to query bounded and unbounded PCollections with SQL statements. Your SQL query is translated to a PTransform, an encapsulated segment of a Beam pipeline. You can freely mix SQL PTransforms and other PTransforms in your pipeline.

Let’s break that statement down y’all:

  1. It’s only available in the Java SDK. *audible murmurs/groans/hisses from the Pythonist & Gopher readers*
  2. It can be used with bounded and unbounded PCollectionsi.e. streaming and batch. That’s neat.
  3. It takes your SQL and automagically generates some code under the hood for you. This is a good way to keep lazy developers like myself happy. The less code I need to write the better.

In addition to that blurb, here’s another one that tells us Beam SQL is essentially based on Apache Calcite:

Calcite provides the basic dialect underlying Beam SQL. We have added additional extensions to make it easy to leverage Beam’s unified batch/streaming model and support for complex data types.

So, you’ll need to use Calcite’s dialect of SQL when using it in Beam. Check out the Apache Calcite SQL docs here. However, for all intent and purposes, you can simply write SQL in your pipelines in order to work on your data coming in it.

Got it? Great. Let’s move on.

Now that you know what it’s all about, let’s have a look at the code needed to get a simple pipeline that uses Beam SQL up and running. To test it, I devised a cunning data pipeline:

  1. Read a CSV file from GCS that has some useless data in it.
  2. Run a SqlTransform over the content of said file, and perform an aggregation and sum — using SQL.
  3. Write the results back to another file in GCS.
  4. Test the results using good ‘auld Bash.

Step 1 — Read a file

The first thing we need is some sample data to push through the pipeline. I usually turn to the public datasets in BigQuery whenever I need some dummy data, or if I want to test something. One of the entries in the public datasets is the Wikipedia benchmark page view tables.

These tables are great tables for doing quick tests and demos on GCP. They are simple to understand and easily relatable to the audience. Also, the size of the tables range from a 1K rows, all the way up to 100B rows (6TB). Wowsers. So, you can easily scale and test your workloads with absolutely no changes needed.

The input data from BigQuery for Beam SQL to munch on — just 1K rows to start with

To start with, we simply export the smallest of the tables (1K rows) to GCS as a CSV file from the BigQuery console. Of course, we could just read the table directly from BigQuery in the pipeline (using BigQueryIO), but I can’t think of a use case that would require using Beam/Dataflow to pull data from BigQuery in order to run SQL over it. You’d be much better off just querying it directly in BigQuery! 😀

The first part of the code constructs the pipeline and creates the read from GCS into a PCollection<String>. Simples:

To work with Beam SQL, you must convert the type of your PCollection to a Row. Don’t confuse this with a BigQuery TableRow object though. They are different things altogether. So, the next thing we nee do is to run a ParDo over the PCollection<String>, which we obtained from the initial file read from GCS, and turn it each element into a Row object and then finally set the values for each element:

Check out that awesome Regex. I’ve no idea what it does. But, it looks cool.

SQL is dope

This guy thinks SQL is dope. I can’t remember his name, but apparently he’s kind of a big deal.

Now that we have a PCollection<Row> object, we can finally run some sweet, sweet SQL over the contents of that collection. Who doesn’t love a bit of SQL, huh?

Remember, that the elements will be distributed across all the workers in the Dataflow cluster. But, we don’t need to care about that because it’s all abstracted away. Using SQL also means we don’t need write boilerplate’y GroupByKeys, Combines, etc. in Java. Using SQL is much easier to express what we’d like to do with the data:

In this example, I’m simply aggregating by language and summing the views for each one. As simple as this sounds, under the hood a lot of work needs to be done to make this happen on a distributed cluster. And it’s something I’m not smart enough to understand. But once again, the complexity of this is hidden from users like us. You don’t need to worry about it — until you have a bug and you need to debug that is! Mwaaaah!

Collecting the results

We’ve now got the bulk of the pipeline written. The last thing we need to do is collect the results of the SQL statement, and store them somewhere. This could be anywhere really (e.g. BigQuery, Datastore, Memorystore etc.), but for brevity’s sake I simply writes the results back as a CSV file to GCS.

To do this we need to convert the SQL Row object back to String so we can write it out to GCS. Just hitting it with a simple ParDo makes this a walk in the park. Then one more apply to write the result file out (without sharding so we get just one file) to GCS.

And you’re done folks!

Crunch time

Worst picture ever for a blog post

Putting it all together, here’s the pipeline in all its Java glory:

The final piece of the puzzle is actually running the pipeline on GCP. Now, you could run it locally using the DirectRunner, but where’s the fun in that? Actually, snark aside, executing it locally is a great way to develop and debug your Beam/Dataflow pipelines. Don’t be like me and run your pipelines on GCP just for fun — your boss won’t like you for very much longer when they get the bill.

Anyway, after kicking off the pipeline with the DataflowRunner this is what happens:

The world’s most simple, linear data pipeline

Drilling down into the transform_sql step, it’s clear to see the steps that the SQL generated for us in the pipeline. Remember, I didn’t need to code these steps. I just wrote some simple SQL to express my intent:

Drilling down into the generated steps via the SQL transform. Lovely.

Even more computering

With the pipeline run, and the output written to GCS, it’s time to validate the results of the magical SQL code. Once I downloaded the output.csv file from the GCS bucket, I dutifully hit it with some wonderfully dodgy Bash. Yes, yes, I know I could have federated out to the file in GCS from BigQuery and, wrote some SQL to check the results. But, doing it with Bash makes me look like I know how this computering thing works.

Firstly, let’s check the 10 most popular languages line up between what the pipeline produced, and what BigQuery spits out. The left terminal is the Beam SQL results, and the right terminal in hitting the original table in BigQuery:

Don’t judge me for using Red Sands — or a Mac. Pleeeeease.

That looks good. Now, let’s make sure Beam SQL calculated the corrected number of aggregations for the languages field i.e. distinct values:

88 is the magic number

Finally, let’s double check the total sum of views to make sure it’s 100% correct:

Is there anything Bash can’t do?

Everything lines up with the original data that I exported from BigQuery into GCS.

I like it when things work.

Scaling

1 billion rows!

As awesome as all that is however, 1K rows isn’t really that impressive, now is it. Instead, let’s run the exact same pipeline on 1 billion rows (~100GB) and let the Dataflow cluster scale up to 10 workers. That’s much more fun!

That’s just ridiculous: the pipeline now chugging on 7M records p/s

The pipeline took about 25 minutes to process 1B rows. That’s not bad at all. Now, we need to perform some checks on the results just like before:

Some big numbers popping out now, but there’s a problem

Making a boo-boo

Whoopsie! The calculation for the en value was much higher in BigQuery than it was in the result of my pipeline, whereas the other values were just fine. Luckily, it didn’t take me that long to figure out the problem (thanks again Stack Overflow). Looking at the number — this time with commas — it’s much easier to see why it all went pear-shaped:

4,990,236,853

Remember our pipeline was written in Java. Yes, JAVA! Got it yet? That’s right. I had bootstrapped my pipeline by copying the example from the Beam documentation, and that sample code calls addInt32Field(). Then, I had unwittingly used Integer.valueOf() when setting the value.

An Integer in Java is 2³¹-1, so the max value is 2,147,483,647. As such, because the sum for the en value was so large, the JVM was wrapping back around to -2,147,483,648 when it topped out at the max value. Ahh, gotta love Java!

Alas, the fix was simply to use addInt64Field() and instantiate aLong (2⁶³-1) object to store the values in Java instead of an Integer. This gives you just a tad more head room to play with as a Long has a max value of 9,223,372,036,854,775,807.

So, a quick fix to the code and it was time to run the pipeline again.

[..]
public static final Schema SCHEMA = Schema.builder()
.addStringField("lang")
.addInt64Field("views") //<--need something bigger than 2^31
.build();
[..]
Row appRow = Row
.withSchema(SCHEMA)
.addValues(vals[4], Long.valueOf(vals[6])) //<--need a Long
.build();
[..]
Fixed. Hallelujah!

This time the results looked much better, and the value for en is appropriately sized number. Wonderful.

exit();

This is now becoming a bit of a tradition now to close out my posts with a picture of my dog

We just took a look at Beam SQL and how it works. It was relatively easy to get going, and put it through its paces. That said however, this was a very simple example using basic aggregation and summing. It would be interesting to see how it does joins and more complex operations. Finally, I only tested it using a batch worflow. I’d be curious to see how it works with a streaming source e.g. PubSub or Kafka.

It’s also import to note that it’s a very new feature and still maturing. For that reason, I ‘d be wary of using it in production workloads just yet — unless you thoroughly test it for your use case(s). Also, being so new, there’s very little material on it aside from the Beam documentation itself.

However, this is certainly something to keep on eye on if you’re using Beam/Dataflow as part of your data analytics workloads. It may help solve a lot of problems and prove useful to many developers and data engineers. It’s got a lot of potential, but most importantly, the KSQL folks can’t say Beam is missing this functionality anymore. 😀