BigQuery — one store to rule them all
At Brigade we are building tools to help empower voters — specifically activists and organizers. We believe one of the keys to accomplishing this lies in data — data about voters, politicians, as well as various other civic datasets.
We love data here. So much so that we’ve got it stashed in all sorts of places and in various formats. This is of course a problem, as we often want to join across disparate datasets. Before Brigade moved to BigQuery, we used Spark SQL to do cross database joins — as discussed here in a previous post. By way of Apache Zeppelin and Spark SQL we gave much of our company the ability to do really sophisticated queries. It worked well for a while — until our datasets got big. And then it broke. It broke a lot.
Not that we don’t still love these tools but we quickly realized they were never going to be stable enough to just work — not given our cluster and dataset sizes. We were either going to have to lock down what our internal customers could do or move to a different tool.
Enter BigQuery
BigQuery is a curious animal. It‘s a public implementation of Google’s Dremel system. Data is stored and columnar format and parallelized across a very large number of nodes.
You pay (primarily) for the bytes you read — at a rate of $5/terabyte for each byte queried but very little for data at rest ($20/terabyte for storage).
We are now replicating data from all of our datastores (Cassandra, Mysql, Parquet and CSV files) into BigQuery. While we are early in our use, our usage costs — something that may cost a $1000s in Redshift are close to zero:
To some this may sounds unattractive, paying per query, but in reality you may not end up paying much depending on your usage. BigQuery is a column oriented database and you only pay for the bytes you read in a query — so if you only hit two of ten columns in a table you only get charged for reading those columns. Moreover, the columns are compressed, often taking up much less space than they would in a traditional RDBMS.
Pulling It Together
The tooling around BigQuery definitely lags compared to similar stores such as Redshift so we had to build much of what we needed ourselves.
To really make use of BigQuery, we really needed to get all our important data into it. To do this, we started simple by writing a few Spark jobs which pull data from our Rackspace hosted cluster and write to BigQuery (via Google Cloud Files). Two two libraries we tried out to help us with this were: this one from Appsflyer and this one from Spotify.
Each library has its own merits. The Appsflyer library uses JSON encoding to push data to BigQuery, which mean we can create tables with a wider set of types (namely DataTime fields). The Spotify client uses Avro behind the scenes which is far more compact — this means we can copy tables far more quickly. The downside though is that Avro doesn’t support a DateTime type so the tables that get created ended up turning our user friendly date types into unix timestamps. We ended up going with Spotify’s library because, in addition to the performance benefits we found it could handle a wider array of source inputs (read: quirky MySQL column types).
To get around the lack of DateTime types we end up creating views on top of the tables. The views simply expose the existing fields and convert the timestamps back to something our analyst can work with. For example:
Our data ingestion flow is pretty straightforward — a set of Spark jobs that run nightly and push full snapshots into BigQuery.
In the simplest case — say copying data from Parquet to BigQuery, the meat of your code is:
val sourceFileDF = sqlContext.read.parquet(“/prod/data/myfile/”)
val sourceFileDF.saveAsBigQueryTable(“my-project:my_dataset.my_table”)
We built a simple tool to help us with RDBMS replication specifically. We’ve open sourced it and you can get it here (it’s still very rough):
https://github.com/markncooper/mysql-bigquery-replicator
It takes a database, credentials and a white or black list. After that it does the rest of the magic of discovering your tables and writing them to BigQuery (via Cloud Files). Big tables automatically get copied a chunk at a time in parallel. Parallelization is done using the primary key of the row and only happens if a given table exceeds a certain size threshold.
Getting to the data
BigQuery gives you a pretty barebones UI through which to load data and run queries. The real power comes from integration with other tools. In our case, we choose to plug in Chartio. Plugging Chartio into BigQuery takes only a few minutes. It’s slick UI lets you put together complex reports and dashboards.
One restriction of Chartio is that it doesn’t allow big joins across tables stored in different BigQuery datasets. To solve this, we load all our data into the same dataset. To fix the potential name-space collision issue we ended up prefixing all our tables with “store_database_” i.e. mysql_analyticsdb_customers. A little clunky but it did the trick.
Query Plans and Stats
After that our analysts, product managers and engineers can now write complex queries and visualizations to their hearts content.
Below is the results of the explain tab you get when running one of our basic join and aggregation queries. The query itself took around 50 second — not bad for a join with 5 billion rows on one side and 500+ million on the other.
This query also ended up touching ~200 gigabytes of data — which ended up costing around $1 to run.
The Rub
BigQuery isn’t right for all problems. Queries take at least a second or two to execute. There are also no indexes so all of your queries do full table scans. On the plus side, no pesky indexes.
Tables are more or less immutable and append only. While rows have keys, they are only used to guarantee exactly-once streaming semantics within one minute of a write. If you write two updates with the same row key in the same minute BigQuery will do the de-duplication. If an hour later you write a second row with the same row key you’ll end up with two rows in your store. In our case we rewrite tables in their entirety so this is not a problem, but if it is for you — Google recommends putting a view on top of your table to do the de-duplication and then cleaning up the old rows periodically.
While there are no indexes, you can though organize your date using a date partitioner. Say you are only querying the last week of data but have a year’s worth in your store, the partitioner will only hit rows for the days you need. This will speed up the query time and dramatically lowering your costs.
Similar Tools
Other tools that we also considered:
- Presto: A distributed SQL query engine Facebook released back in 2013. It sits on top of your existing datastores (HDFS, Cassandra, RDBMSs) and allows for similar cross DB joins.
- Athena. A hosted offering from Amazon, Athena is a hosted version of Presto. It pricing is very similar to that of BigQuery.
- Redshift — Amazon’s very popular Postgres-like distributed analytics database. It scales really well, though in our case it would be much more expensive as we would have to pay for the Redshift cluster even when no queries are being run.
- Impala — a fast open source distributed database from Cloudera that sits on top of HDFS. It has the downside that, much like Redshift, it would have cost us much more because there is no pay-per-query model for Impala.