Sushil Kumar
Oct 2 · 5 min read

Apache Beam is a unified data processing model which is both programming language and runner agnostic.

Source : https://beam.apache.org/

You can write your beam pipeline in any of the supported programming language (Java, Python, Golang etc.) and run it against any of the supported runners (Dataflow, Spark, Flink, Samza etc.)

Apache Beam also has built in support (known as I/O Transforms) for myriad of data storage types. Once such system is Google’s BigQuery.

Apache Beam supports reading and writing from BigQuery tables with a built in transform. The problem with this connector is that it is not type safe. What I mean by type safe is that each row that is being read or written is represented by a type TableRow which is nothing but a Map<String,Object>. If you want to read values from the row, you need to know the exact types of each column to safely typecast them. Below is an example of one such typecast from the connector’s documentation page.

We need to typecast the column read from TableRow

As you can see, to read a column value out of TableRow we need to know the exact type to use is correctly.

In comes Scio, the Scala SDK for Apache Beam developed and open sourced by Spotify. By default Apache Beam do not support Scala as one of its SDK but after Spotify adopted Beam in their workflow they developed a Scala SDK which was both easy and expressive to use for data engineers and scientists alike. There is an awesome talk given by a Data Engineer at Spotify explaining the origins of Scio within Spotify.

Scio adds a type safe BigQuery IO API which represents each row as a case class which automatically gets generated at compile time. Scio use Scala Macros Annotations and BigQuery dry-runs to generate the case classes.

In this post we are going to learn different annotations that Scio supports to read and write from BigQuery in a type safe manner.

Table of Content

  1. Pre-requisites
  2. Typed reads from BigQuery
  3. Typed writes to BigQuery
  4. Conclusion

So lets get started.

1. Pre-requisites

Setting up Scio dependencies

In this demo I’m going to use Intellij with SBT to setup my Scio project. You can choose your favorite IDE and build tool to start.

Add following dependencies to your build.sbt file.

scalaVersion := "2.11.11"


libraryDependencies ++= Seq(
"com.spotify" %% "scio-core" % "0.6.1",
"com.spotify" %% "scio-test" % "0.6.1" % "test",
"org.apache.beam" % "beam-runners-direct-java" % "2.6.0"
)


addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full)

Note the Scala macro paradise plugin which adds support for macro annotations for Scala version older than 2.13. Just be sure to choose the compatible scala version in order to use this plugin.

Setting up Intellij for Scio

Intellij do not play well with the case classes generated by the Scio macros and hence you need to install Scio plugin for Intellij to make it happy.

You also need to set up -Dbigquery.project property in Scala Compile Server.

Setting BigQuery dataset project

2. Typed reads from BigQuery

Finding Top 10 Programming Languages by Repo Count

The first macro annotation that we are going to cover is @BigQuery.fromTable which helps in reading rows from a BigQuery table using table name.

For purpose of this illustration, I’m going to use BigQuery Github dataset and apply different transformations to gain insights into Scio’s inner working.

This macro annotation takes the table name and maps all the columns as the Row case class properties. This macro is particularly helpful if you wish to read the entire table. Note that we are using typedBigQuery[T] method on ScioContext to read create a typed SCollection (analogous to PCollection). Once we have created a typed SCollection we can use the properties of the Row class and we get complete type safety in subsequent operations. Also, you avoid making any type related error as they are caught during compile time only.

Now lets run the pipeline. Since we are using BigQuery source we won’t be able to run the code using DirectRunner,we need to run it on DataflowRunner.

Pass the project, runner and output parameters in your run configuration.

--project=<DATAFLOW-PROJECT-ID> --runner=dataflow --output=gs://<YOUR-BUCKET>/scio_bigquery_top_lang

If you face any errors while submitting the dataflow job, follow my previous post on dataflow here.

Once the pipeline succeeds go to the output bucket and check the output.

Top 10 Languages by Repo Count

This result is consistent with what I computed in one of my earlier post.

Finding which month has highest Tornado counts

Next we are going to see @BigQueryType.fromQuery annotation which derives schema information using a BigQuery SQL query. For this query we are going to use GSOD dataset.

In this example we’ll be just using two columns month and tornado and don’t need other columns hence we are using the fromQuery syntax.

Submit the dataflow job.

Pass the project, runner and output parameters in your run configuration.

--project=<DATAFLOW-PROJECT-ID> --runner=dataflow --output=gs://<YOUR-BUCKET>/scio_bigquery_tornadoes

Once the job finishes, check the output.

Looks like year end experience the maximum number of tornadoes

3. Typed writes to BigQuery

Finding top programming language per repository

The last annotation is @BigQueryType.toTable It helps us to write typed result back to BigQuery.

You can also use @description annotation on classes and fields, which gets propagated to BigQuery schema.

Run the code, passing in following parameters to run configuration.

--project=<DATAFLOW-PROJECT-ID> --runner=dataflow --outputTable=<BIGQUERY-DATASET>.TopLangPerRepo

Pay attention to the WriteDisposition and CreateDisposition arguments. For the sake of the demo I’m using WriteDisposition as TRUNCATE which should always be avoided in production systems.

Make sure you have your dataset created before you run your job. Once the job completes you’ll see the table TopLangPerRepo in your dataset. Run a sample query to check the content of the table.

Schema of the table gets auto-generated
SELECT * from `<PROJECT-ID>.<BIGQUERY-DATASET>.TopLangPerRepo` LIMIT 100

4. Conclusion

And there you have it, completely typesafe BigQuery operations in Apache Beam using Scio. You must have noticed that we did not get any run time exceptions due to wrong type cast operations and all the errors were confined to compile time checks.

If you find any errors in the code or have any questions, feel free to drop a comment below.

Till then Happy Coding! :)

The Startup

Medium's largest active publication, followed by +514K people. Follow to join our community.

Sushil Kumar

Written by

A polyglot developer with a knack for Distributed systems, Cloud and automation.

The Startup

Medium's largest active publication, followed by +514K people. Follow to join our community.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade