Setting up a Big Data SQL querying system

Generate Parquet Data -> Save to S3 -> Query via Thrift

Sai Peddy
6 min readFeb 20, 2018
Photo by Hans-Peter Gauster on Unsplash

Current Infrastructure:

We have created and currently manage a platform that provides analysts with 90 days of data for approximately 102, and growing, data sources. The volume currently averages out to ~7TB per day. Due to the size of the data, we have obviously learned quite a few lessons from building such an enormous system and making the system stable but I’ll save those for another post — so keep an eye out 😄. Even though this system provides an impressive look into the data for the analysts — it still does not cover our customer’s requirements of having 13 months of data available and being able to perform SQL-like queries against that data. Cue the work on SQL Query engine. In order to achieve this work, we’ve looked at a few tools — in this series of blog posts I will talk about one of those tools, Spark Thrift. However, there is so much more than just the querying system — there is also the question of managing and using the data.

The following series of blog posts will be broken up as follows:

  1. Creating Structured Data — creating Parquet Data (Covered in this blog)
  2. Using S3 as a datastore
  3. Setting up a Thrift Server — To query your data!

The hardest part of any system overtime is the data. Data comes in all shapes and forms; it can imply that you need to convert your data into a format reasonable for your goal. This series of blog post should walk you through setting up a simple system to query data via thrift. However, creating the parquet data might be the hardest aspect since it depends on your data.

The rest of this blog will talk about our approach to generating parquet data — providing a code sample for you to do it yourself.

Our Decisions

Remember that requirement to have 13 months of queryable data. Breaking that down, the first part of that is having the data. We decided on two potential formats for our data — parquet or orc. Looking into it further, both formats were fairly similar — a stackoverflow discussion on the differences here — but parquet has existed longer and we have sources that are excessively nested, therefore we went with parquet. The code was also essentially the same to be able to generate either — which meant we could change in the future if we needed to. Other important decisions — S3 was decided to be our datastore, and we were going to start with spark-streaming to start getting live data in a usable format, then apply it to historical data.

Our Journey

In order to have columnar data — you must know the schema of the data ahead of time. Lucky for us, we started enforcing a schema on our data not too long ago. This was vital for us to even think about generating parquet data for over 100 sources. We used POJOs to represent and enforce the schema on top of the data. Now some concrete steps in creating parquet data

Steps:

  1. Create a Schema for your data (We will skip this — and perform step 2 manually)
  2. Generate StructType (Spark’s schema) from your schema(e.g POJO) or manually
  3. Convert Spark RDDs into DataFrame with StructType
  4. Save as parquet or orc

Tips on creating a schema:

  • Know the fields in your data
  • Know the type of each field (timestamp, ip address, text, etc )
  • Know whether a field is optional — may or may not come in
  • Be aware of nested elements and lists of elements

Creating Parquet via Spark Walkthrough

Let’s give ourselves a sample to work with — Assume you have an rdd of message like the following:

{
“dest_ip”: “20.123.123.12”,
“dest_port”: 8000,
“isBoolean”: true,
“timestamp”: “2018–07–08T17:48:40Z”
}

The record contains four fields, each of which corresponds to a different datatype. Let’s also assume that dest_ip can be nullable — aka optional field. We will now walk through a code sample to create a StructType that represents this record — the docs can be pretty useful. Documentation provides a way to do it in Scala — the code samples will be in Java.

Simple — you have started to save your data in S3. For this demo you might want to change that S3 location to something local, for example — /${home_path}/parquetTesting/${data_name}/. You will have some additional parameters to set in your spark job in order to get the code to be fully functional. Note: The code skips over steps such as setting up your spark session, and s3a configurations.

I’ll touch briefly on the different parts of the code — some of the decisions made in the code may not be important or required for your use case, so feel free to adjust.

The StructType is generated first so that we can feed it in as a schema in order to create a Dataset. A StructType can be thought of as a list of StructFields(aka the fields in the schema). There are certain supported datatypes that a struct field can be — as you can see in the example above, IP is not one of those, therefore, that field is saved as a string instead.

Moving further down the code where we work with the dataset — there are a few aspects to cover.

  1. Creation of the dataset
  2. Adding additional columns
  3. Writing the dataset

In this example, we had assumed that our data would all be in json format — therefore it is easy to convert by using the .json method, but other types are also supported. We enable .option("mode","FAILFAST") which is very useful during development — and could potentially be useful for production logging. It will inform you immediately if the data and the schema do not match — this usually means that the schema must be changed to fit the data, or that data is coming in incorrectly.

Once you have a dataset it can essentially be treated as a table with columns — so if you want to add more information, this is the perfect opportunity. For our short-term use case, we wanted to have an ability to partition the data based on when the data was ingested. Therefore we add the current timestamp, but also extract year,month,day as columns.

Finally, saving the dataset has two key things to know. partitionBy allows you to choose the columns within your dataset to partition your data. This is key for when you query your data — you can use these partitions to narrow down the data you actually need to search. .parquet specifies the format you are saving your data in. .parquet can just as easily be replaced by .orc — therefore making it very easy to swap formats further down the line. If you chose to implement all the additional steps (e.g partitions), you should expect the following output.

| testbucket
---|parquetFiles
---|year=2017
---|month=12
---|day=25
---|part0000
---|part0001
---|year=2018
---|month=02
---|day=17
---|part0000
---|part0001

LIMITATIONS

Just wanted to bring up a few specific limitations that we encountered in our journey.

  1. Timestamps with too much granularity
  2. Corrupted parquet data

Timestamps are very vital for systems like this. Since querying on a timeframe is very common. Which is why it is important to be aware of a bug that we discovered during the testing process. Timestamps with granularity greater than 3 digit milliseconds do not work as expected.

Let’s take the timestamp 2017–07–30T14:41:09.068184Z, in the process of converting this timestamp to a dataset it will record the timestamp as 2017–07–30T14:42:17.184Z. As you can tell most of the timestamp is perfectly fine, there’s a weird discrepancy between the min value and the seconds value. What is happening is that the first 3 digits of the milliseconds get treated as seconds and are added to the timestamp, therefore the timestamp is progressed by 68 seconds. The last 3 digits are treated as milliseconds.

The limitation stems from the class DataFrameReader and its use of the java.text.SimpleDateFormat. Here is a stack overflow post that helped us uncover it.

The second limitation is that if you stop the job in the middle of writing parquet data, your data could be corrupted. We have not had a chance to look into this to clearly. However, when you start querying via thrift and corrupted data is part of your query range, the query can fail.

--

--

Sai Peddy

Data Engineer | Love to Learn | Interested in…too many things