(* originally posted in LinkedIn in 2018 )
Columnar file formats have become the primary storage choice for big data systems, but when I Googled related topics this weekend, I just found that most articles were talking about the simple query benchmark and storage footprint comparisons between a particular columnar format vs. row formats. Sorting is also a critical feature of columnar formats, but its benefit and effective practice have not been emphasized or explained in detail so far. IMHO, using columnar formats without proper sorting is like to take only half of the advantage of the underlying file format. I’d like to share my insights about this topic.
Log/Track data generated from Internet and E-Commerce companies are the major reason for the rise of big data systems, and IoT will spread this pattern to wider industries. Compared with the highly-normalized data model, such log/track data is much easier to produce without invoking lookup procedure to normalize the dimensional attributes to id or surrogate key; sometimes the data models are super wide so that various types of loggers can produce into the same event/log format (in sparsely-populated way); thus the logger can have simpler codebase and can produce the data with extremely high QPS/TPS. Soon the footprint of such data have exceeded multi-hundred PB on S3/BigTable/HDFS. Then columnar formats (Dremel/Parquet/ORC/Arrow/CarbonData) came to rescue.
The first half of the advantage of columnar formats are: the values are clustered by column so the compression is more efficient (to shrink storage footprint), and query engine can push down column projections (to reduce read I/O from network and disk by skipping unwanted columns).
The second half of advantage are: the row order is properly sorted to push the compression ratio to the next level, and query engine can push down filter predicates (to reduce read I/O) and apply vectorization computation. You may think that converting from compressed CSV/Avro to Parquet/ORC should have realized 90% of the benefits of columnar formats already, but let me try to convince you that the overhead of sorting data is absolutely worth it.
Here is a case to explain. A typical website log dataset contains: uuid, timestamp, user_id, cookie, user_agent, page_id, referring_site, http_header, … Let’s also benchmark popular analytical queries that have filters against page_id:
- The original Avro (deflate compressed) is ~1.4TB on file system; query scans the entire 1.4TB
- Simply convert to ORC (zlib compressed) is ~0.9TB; query scans ~300GB
- Properly-sorted ORC (zlib compressed) is ~0.5TB; query scans ~200MB
These kind of website logs are often very text-centric and verbose, therefore they have bloated size (and stimulate the hunger for big infrastructure to analyze). In the above case, user_agent contains about 200 characters on average (this can hold 40 integer columns 😁), cookie is about 100 characters, page_id is about 40 characters. To save most storage space, we can sort the rows by 3 columns in order: user_agent, cookie, page_id.
- Stacking the repetitive values of the lengthy user_agent together will allow the smallest encoded results.
- Within the same user_agent, the rows are further sorted by cookie, so user_id and ip_address are naturally arranged with most repetitive values aligned. The compression for these 3 columns are all optimized now.
- Next sorting precedence is page_id, because:
- page_id (like product_id in e-commerce) is one of the most used filter criteria for analytics
- web visits are highly skewed towards a relatively small set of hot page_ids
The compression improvement from 0.9TB to 0.5TB is 44%, while simple Avro-to-ORC conversion only achieves (1.4–0.9)/1.4=35% size reduction. In addition to column pruning, Presto/SparkSQL/Hive can all significantly further reduce the bytes scanned from underlying file system if the filter predicates can be pushed down to the file reader. This is why the analytical queries with filter WHERE page_id IN (?, ?, ?, …) can skip 99% of data blocks. This indicates the other half of advantage has not been harvested until you shrink your biggest datasets to their 1/3 sizes, and you observe the unbelievable read-time I/O reduction which is delivered via PPD (predicate push down) against the sorted columnar files. PPD is an important feature of modern computation engines. You can spot the recommendations (to use sorted columnar file formats) in their performance tuning documents, however they barely elaborate why and how to sort the data.
You like the awesome storage and performance benefits of the sorted columnar files, right? But sorting is an expensive operation especially for big volume of data, so you are probably wondering if that’s affordable (will it make the data ingestion too heavy or jeopardize the SLA). Here are some rules I feel useful to share:
Never apply global sort against a big dataset (to optimize the block/index layout of columnar files). We should always apply sharding or range partitioning first, and then only sort the rows within each bucket/partition.
Use web log as example, we can try one of the following strategies:
- Shard the daily traffic into n buckets (n can be 128/256/512/… as long as each shard contains reasonable amount of records) by cookie first, and then sort each bucket by user_agent, cookie, page_id
- Split the daily traffic into 2 partitions first, one for guests and the other for logged-in users. Then shard the rows by cookie and then sort by user_agent, cookie, page_id. This is an extension to the 1st option, we further optimize the file layout if a lot of queries are focused on logged-in users’ activity only
- Split the daily traffic into 24 hourly partitions by virtual column hour_id=00/01/02/../23, and then sort each partition by user_agent, cookie, page_id. This strategy will not have the even data distribution among the hourly partitions (compared with the distribution by cookie), but it can serve queries very well if filter for hour is applied
- Split the daily traffic into 24 hourly partitions, and then sort each partition by cookie, ip_address, timestamp. This strategy will not compress as aggressively as the 1st 2 options, but typically the same cookie also leads to the same user_agent, so the overall compression efficiency is still pretty good. This strategy will not provide much PPD benefit to page_id based predicates, but this option is perfect for sessionization and funnel analysis
Use e-commerce order table as example, you may consider:
- Shard the entire order table into n buckets by order_line_id, and then sort by order_line_id. This is the Hive ACID bucketing table structure. The compression is not further optimized, but it can perform super fast join and dedupe based on sorted-merge operation, and fast lookup by order_line_id.
- Partition the transactions based on virtual column order_date (derived from order_timestamp) first, then shard the orders into n buckets by order_id or order_line_id, and finally sort by country_id, product_category_id, product_id, timestamp. If you define country_id and product_category_id as partitioned columns, you can easily run into too-many-small-partition problem for the long tail countries/categories which have small number of records. Using sorted columnar files can keep the number of files under control yet still offer effective PPD once country_id and product_category_id are used in filter clause.
Use IoT event as example:
- Partition by date and hour and topic first, then shard events into n buckets by device_id and sort by event_type, subject, device_id, event_time. Since subject and event_type are strings that often used in filter and group by clause, queries can benefit from such sorting strategy.
- If device_id causes skewness, we can keep the partitioning and sorting configuration but switch to event_id/uuid as the sharding column.
- If there is a string column with lengthy and repetitive values, it is a good candidate to compress further by sorting
- String with prefix, such as urn:partition:service:region:account-id:resource-id or /path1/path2/path3/file, can be split into prefix and suffix part, and then sort by prefix to get much better compression ratio
- If there is a column with medium cardinality (e.g. less than 500K) and often used in filter clause (IN, BETWEEN, =, <, >), we can favor more PPD optimization over better compression, by sorting such column first
- If there are multiple columns with low-to-medium cardinality and all appear in filter clause frequently, then we can sort the column with the least cardinality first, then the one with a little bigger cardinality, and then the one with bigger cardinality.
There is no one-size-fits-all sorting strategy, but we can always justify the effort of sorting by saving a lot of storage space and/or drastically speed up the popular query patterns. Most of the big datasets need to go through compaction or deduplication process anyway, the additional customized sort overhead (per shard) should not be significant as long as we can shard the workload properly. The bottom line is: if the data files are generated-once-then-read-1K-times-or-more, sorting will unleash the additional benefits of columnar format for big datasets.
Before we touch base the caveats of the sorted columnar files, I’d like to mention another supercharged weapon — vectorization. Most of computation modes will pivot columnar data blocks back to rows in memory first, and then process these rows. Many analytical and ML computation operations can be 10~100X faster if the in-memory data representation is also columnar/vectorized. Dremio is mainly based on end-to-end columnar + vectorization. Spark adds vectorized reader and optimization in 2.+. Hive and Presto can perform vectorized join and group by if sorted columnar input are provided. However, these optimization are not yet widely noticed and leveraged, because rows are still the main in-memory format for data processing.
- Caveat 1: Due to the popularity of Parquet, Spark/Hive has better PPD and vectorized read support for Parquet than ORC. You need to flatten the nested data structure to primitive columns first, then sort the ORC dataset.
- Caveat 2: The sorted columnar files can be very much smaller than Avro or compressed CSV/JSON, if you don’t change the split size parameter for M/R and Spark jobs, you will either suffer from OOM error quickly, or end up having too few tasks/workers, and then suffer from very long processing time. For example, the max split size is set to 512MB, and typically a 512MB compressed Avro/CSV/JSON data block will become 1.5GB~1.8GB in memory. This can fit in the 2GB JVM container you requested for the M/R or Spark job. However, a 200MB highly-compressed columnar data block may inflate back to 3GB in-memory rows (if there are a lot of repetitive values). Let’s say you bump up the container size to 8GB, so that the job can uncompress 512MB columnar data block to memory, but there’s only 1 task/worker to process the data volume which is supposed to be handled by 4~5 tasks/workers. Therefore, you will need to reduce the max split size from 512MB to 128MB, and keep the container size between 2~3GB. (Once vectorized processing becomes the mainstream, we can ignore this issue.)
Google Big Table + Big Query system constantly optimizes the sort order in the background based on recent query patterns; Vertica can put the data block replicas into different sort order, so CBO can pick the most optimal replica to serve queries; Netezza and Redshift both have a lot of optimization for sorted table. I hope this post, Dilip Kasana’s “Even More Optimized ORC”, and SF Big Analytics Talk can convince you to take full advantage of columnar file format in your big data system.
(* Disclaimer: The views expressed in this article are those of the author and do not reflect any policy or position of the employers of the author.)