Did you know that a new feature was recently rolled out for Apache Beam that allows you to execute SQL directly in your pipeline? Well, don’t worry folks because I missed it too. It’s called Beam SQL, and it looks pretty darn interesting.
In this article, I’ll dive into this new feature of Beam, and see how it works by using a pipeline to read a data file from GCS, transform it, and then perform a basic calculation on the values contained in the file. Far from a complex pipeline I agree, but you’ve got to start somewhere, right!
<tl;dr> You can now inject SQL directly into your Apache Beam/Dataflow pipeline (using the Java SDK only), which means having to write a lot less code. It’s still very new and probably not ready for production workloads just yet. But, it’s certainly something to keep an eye on as it quickly matures. Full source code for below example here.
After seeing Beam SQL pop up in the docs a few months ago, I’d been meaning to test drive it for a while. But, being summer here in Australia, I was finding it rather hard to pull myself away from lounging around in my hammock all day. I was also really busy baby proofing the house (again) for the arrival of our latest ankle-biter in April.
But this week I found some time between commuting to work and meetings to finally have a proper poke around with Beam SQL, and put it through its paces. So, what’s it all about it then? Well, a good place to start is the official documentation blurb:
Beam SQL allows a Beam user (currently only available in Beam Java) to query bounded and unbounded
PCollectionswith SQL statements. Your SQL query is translated to a
PTransform, an encapsulated segment of a Beam pipeline. You can freely mix SQL
PTransformsin your pipeline.
Let’s break that statement down y’all:
- It’s only available in the Java SDK. *audible murmurs/groans/hisses from the Pythonist & Gopher readers*
- It can be used with bounded and unbounded
PCollectionsi.e. streaming and batch. That’s neat.
- It takes your SQL and automagically generates some code under the hood for you. This is a good way to keep lazy developers like myself happy. The less code I need to write the better.
In addition to that blurb, here’s another one that tells us Beam SQL is essentially based on Apache Calcite:
Calcite provides the basic dialect underlying Beam SQL. We have added additional extensions to make it easy to leverage Beam’s unified batch/streaming model and support for complex data types.
So, you’ll need to use Calcite’s dialect of SQL when using it in Beam. Check out the Apache Calcite SQL docs here. However, for all intent and purposes, you can simply write SQL in your pipelines in order to work on your data coming in it.
Got it? Great. Let’s move on.
Now that you know what it’s all about, let’s have a look at the code needed to get a simple pipeline that uses Beam SQL up and running. To test it, I devised a cunning data pipeline:
- Read a CSV file from GCS that has some useless data in it.
- Run a
SqlTransformover the content of said file, and perform an aggregation and sum — using SQL.
- Write the results back to another file in GCS.
- Test the results using good ‘auld Bash.
Step 1 — Read a file
The first thing we need is some sample data to push through the pipeline. I usually turn to the public datasets in BigQuery whenever I need some dummy data, or if I want to test something. One of the entries in the public datasets is the Wikipedia benchmark page view tables.
These tables are great tables for doing quick tests and demos on GCP. They are simple to understand and easily relatable to the audience. Also, the size of the tables range from a 1K rows, all the way up to 100B rows (6TB). Wowsers. So, you can easily scale and test your workloads with absolutely no changes needed.
To start with, we simply export the smallest of the tables (1K rows) to GCS as a CSV file from the BigQuery console. Of course, we could just read the table directly from BigQuery in the pipeline (using
BigQueryIO), but I can’t think of a use case that would require using Beam/Dataflow to pull data from BigQuery in order to run SQL over it. You’d be much better off just querying it directly in BigQuery! 😀
The first part of the code constructs the pipeline and creates the read from GCS into a
To work with Beam SQL, you must convert the type of your
PCollection to a
Row. Don’t confuse this with a BigQuery
TableRow object though. They are different things altogether. So, the next thing we nee do is to run a
ParDo over the
PCollection<String>, which we obtained from the initial file read from GCS, and turn it each element into a
Row object and then finally set the values for each element:
SQL is dope
Now that we have a
PCollection<Row> object, we can finally run some sweet, sweet SQL over the contents of that collection. Who doesn’t love a bit of SQL, huh?
Remember, that the elements will be distributed across all the workers in the Dataflow cluster. But, we don’t need to care about that because it’s all abstracted away. Using SQL also means we don’t need write boilerplate’y
Combines, etc. in Java. Using SQL is much easier to express what we’d like to do with the data:
In this example, I’m simply aggregating by
language and summing the
views for each one. As simple as this sounds, under the hood a lot of work needs to be done to make this happen on a distributed cluster. And it’s something I’m not smart enough to understand. But once again, the complexity of this is hidden from users like us. You don’t need to worry about it — until you have a bug and you need to debug that is! Mwaaaah!
Collecting the results
We’ve now got the bulk of the pipeline written. The last thing we need to do is collect the results of the SQL statement, and store them somewhere. This could be anywhere really (e.g. BigQuery, Datastore, Memorystore etc.), but for brevity’s sake I simply writes the results back as a CSV file to GCS.
To do this we need to convert the SQL
Row object back to String so we can write it out to GCS. Just hitting it with a simple
ParDo makes this a walk in the park. Then one more
apply to write the result file out (without sharding so we get just one file) to GCS.
And you’re done folks!
Putting it all together, here’s the pipeline in all its Java glory:
The final piece of the puzzle is actually running the pipeline on GCP. Now, you could run it locally using the
DirectRunner, but where’s the fun in that? Actually, snark aside, executing it locally is a great way to develop and debug your Beam/Dataflow pipelines. Don’t be like me and run your pipelines on GCP just for fun — your boss won’t like you for very much longer when they get the bill.
Anyway, after kicking off the pipeline with the
DataflowRunner this is what happens:
Drilling down into the
transform_sql step, it’s clear to see the steps that the SQL generated for us in the pipeline. Remember, I didn’t need to code these steps. I just wrote some simple SQL to express my intent:
Even more computering
With the pipeline run, and the output written to GCS, it’s time to validate the results of the magical SQL code. Once I downloaded the
output.csv file from the GCS bucket, I dutifully hit it with some wonderfully dodgy Bash. Yes, yes, I know I could have federated out to the file in GCS from BigQuery and, wrote some SQL to check the results. But, doing it with Bash makes me look like I know how this computering thing works.
Firstly, let’s check the 10 most popular languages line up between what the pipeline produced, and what BigQuery spits out. The left terminal is the Beam SQL results, and the right terminal in hitting the original table in BigQuery:
That looks good. Now, let’s make sure Beam SQL calculated the corrected number of aggregations for the
languages field i.e. distinct values:
Finally, let’s double check the total sum of
views to make sure it’s 100% correct:
Everything lines up with the original data that I exported from BigQuery into GCS.
I like it when things work.
As awesome as all that is however, 1K rows isn’t really that impressive, now is it. Instead, let’s run the exact same pipeline on 1 billion rows (~100GB) and let the Dataflow cluster scale up to 10 workers. That’s much more fun!
The pipeline took about 25 minutes to process 1B rows. That’s not bad at all. Now, we need to perform some checks on the results just like before:
Making a boo-boo
Whoopsie! The calculation for the
en value was much higher in BigQuery than it was in the result of my pipeline, whereas the other values were just fine. Luckily, it didn’t take me that long to figure out the problem (thanks again Stack Overflow). Looking at the number — this time with commas — it’s much easier to see why it all went pear-shaped:
Remember our pipeline was written in Java. Yes, JAVA! Got it yet? That’s right. I had bootstrapped my pipeline by copying the example from the Beam documentation, and that sample code calls
addInt32Field(). Then, I had unwittingly used
Integer.valueOf() when setting the value.
Integer in Java is 2³¹-1, so the max value is 2,147,483,647. As such, because the sum for the
en value was so large, the JVM was wrapping back around to -2,147,483,648 when it topped out at the max value. Ahh, gotta love Java!
Alas, the fix was simply to use
addInt64Field() and instantiate a
Long (2⁶³-1) object to store the values in Java instead of an
Integer. This gives you just a tad more head room to play with as a
Long has a max value of 9,223,372,036,854,775,807.
So, a quick fix to the code and it was time to run the pipeline again.
public static final Schema SCHEMA = Schema.builder()
.addInt64Field("views") //<--need something bigger than 2^31
Row appRow = Row
.addValues(vals, Long.valueOf(vals)) //<--need a Long
This time the results looked much better, and the value for
en is appropriately sized number. Wonderful.
We just took a look at Beam SQL and how it works. It was relatively easy to get going, and put it through its paces. That said however, this was a very simple example using basic aggregation and summing. It would be interesting to see how it does joins and more complex operations. Finally, I only tested it using a batch workflow. I’d be curious to see how it works with a streaming source e.g. PubSub or Kafka.
It’s also import to note that it’s a very new feature and still maturing. For that reason, I ‘d be wary of using it in production workloads just yet — unless you thoroughly test it for your use case(s). Also, being so new, there’s very little material on it aside from the Beam documentation itself.
However, this is certainly something to keep on eye on if you’re using Beam/Dataflow as part of your data analytics workloads. It may help solve a lot of problems and prove useful to many developers and data engineers. It’s got a lot of potential, but most importantly, the KSQL folks can’t say Beam is missing this functionality anymore. 😀