Shagun Sodhani
5 min readNov 29, 2015

This week I read upon Pig Latin: A Not-So-Foreign Language for Data Processing. This paper, by Yahoo, describes a new language called Pig Latin which is intended to provide a middle ground between declarative SQL-style language (which many developers find unnatural) and procedural map-reduce model (which is very low-level and rigid). It also introduces a novel, interactive debugging environment called Pig Pen.

Overview

A Pig Latin program is a sequence of steps — each step carrying out a single high-level transformation. Effectively the program specifies the query execution plan itself. The program then compiles into map-reduce jobs which are run on Hadoop (though other backends can also be plugged in). Pig Latin is more expressive than map-reduce which is essentially limited to use a one-input, two-stage data flow model. Moreover, since the map and reduce functions are opaque for each other, optimizations are hard to bake in the system itself. This limitation is also overcome with Pig Latin which allows the programmer to order the execution of individual steps by specifying the execution plan.

Unlike traditional DBMS, Pig does not require data to be imported into system managed tables as it meant for offline, ad-hoc, scan-centric, read-only workloads. Pig supports User Defined Functions (UDFs) out of the box. Since it targets only parallel processing, there is no inbuilt support for operations like non-equi joins or correlated subqueries. These operations can still be performed using UDFs.

Nested Data Model

Pig supports a flexible, nested data model with 4 supported types-

  1. Atom: Simple values like string or integer. eg ‘medium’
  2. Tuple: Sequence of ‘fields’ which can be of any type. eg (‘medium’, 12)
  3. Bag: Collection of tuples. eg {(‘medium’, 12), ((‘github’, ‘twitter’), 12)}
  4. Map: Collection of key-value pairs where keys are atoms and values can be any type. eg [‘key’ -> ‘value’, ‘anotherKey’ -> (‘another’, ‘value’)]

Inbuilt functions

  1. FOREACH — Specifies how each tuple is to be processed. The semantics require that there should be no dependence between processing of different input tuples to allow parallel processing.
  2. COGROUP — Suppose we have two datasets:
    result = (query, url) //a query and its result url
    revenue = (query, amount) //a query and revenue generated by the query.
    Cogrouping these two datasets, we get
    grouped_data = COGROUP results BY query, revenue BY query.
    grouped_data would contain one tuple for each group. The first field of the tuple is the group identifier and the other fields are bags — one for each dataset being cogrouped.So 1st bag would correspond to results and other to revenue. A sample dataset has been shown here.
    COGROUP is one level lower than JOIN as it only groups together tuples into nested bags. It can be followed by an aggregate operation or cross product operation (leading to the result expected from JOIN operation). GROUP is same as COGROUP on a single dataset.
  3. Nested Operations where commands can be nested within FOREACH command to process bags within tuples.

Other functions include — LOAD, STORE, FILTER, JOIN, CROSS, DISTINCT, UNION and ORDER and are similar in operation to equivalent SQL operations.

It may be argued as to how does Pig differ from SQL-style query language when it seems to be using similar operations. Compare the following queries which generate the same result. The first one is written in SQL (declarative) and other in Pig (procedural)

SELECT category, AVG(pagerank) FROM urls WHERE pagerank > 0.2 GROUP BY category HAVING COUNT(*) > 106

good_urls = FILTER urls BY pagerank > 0.2;
groups = GROUP good_urls BY category;
big_groups = FILTER groups BY COUNT(good_urls)>106 ;
output = FOREACH big_groups GENERATE category, AVG(good_urls.pagerank);

Implementation

The Pig interpreter parses the commands and verifies that the referred input files and bags are valid. It then builds a logical plan for every bag that is being defined using the logical plans for the input bags, and the current command. These plans are evaluated lazily to allow for in-memory pipelining and filter reordering. The parsing and logical plan construction are independent of the execution platform while the compilation of the logical plan into a physical plan depends on the execution platform.

For each COGROUP command, the compiler generates a map-reduce job where the map function assigns keys for grouping and the reduce function is initially a no-op. The commands intervening between two subsequent COGROUP commands A and B are pushed into the reduce function of A to reduce the amount of data to be materialized between different jobs. The operations before the very first COGROUP operation are pushed into the map function of A. The ORDER command compiles into two map-reduce jobs. The first job samples the input data to determine quantiles of the sort key. The second job range-partitions the input data according to the quantiles to provide equal-sized partitions, followed by local sorting in the reduce phase, resulting in a globally sorted file.

The inflexibility of the map-reduce primitive results in some overheads while compiling Pig Latin into map-reduce jobs. For example, data must be materialized and replicated on the distributed file system between successive map-reduce jobs. When dealing with multiple data sets, an additional field must be inserted in every tuple to indicate which data set it came from. However, the Hadoop map-reduce implementation does provide many desired properties such as parallelism, load balancing, and fault-tolerance. Given the productivity gains to be had through Pig Latin, the associated overhead is often acceptable. Besides, there is the possibility of plugging in a different execution platform that can implement Pig Latin operations without such overheads.

Since the files reside in the Hadoop distributed file system, LOAD operation can run in parallel. Similarly, parallelism is achieved for FILTER and FOREACH operation as any map-reduce job runs several map and reduce instances in parallel. COGROUP operation runs in parallel by re-partitioning the output from multiple map instances to multiple reduce instances.

To achieve efficiency when working with nested bags, Pig uses Hadoop’s combiner function to achieve a two-tier tree evaluation of algebraic functions. So all UDFs, of algebraic nature, benefit from this optimization. Of course, non-algebraic functions can not take advantage of this.

Pig Pen

It is the interactive debugging environment that also helps to construct Pig Latin program. Typically the programmer would write the a program and run it on a dataset, or a subset of the dataset (if running over the entire dataset is too expensive) to check for correctness and change the program accordingly. Pig Pen can dynamically create a side data set (a subset of the complete dataset), called sandbox dataset, that can be used to test the program being constructed. This dataset is aimed to be real (ie subset of actual data), concise and complete(illustrate the key semantics of each command). While the paper does not go into the depth of how this dataset is created, it does mention that it starts by taking small, random samples of base data and synthesizes additional data tuples to improve completeness.

Within Yahoo, Pig Latin has been used in a variety of scenarios like computing rollup aggregates and performing temporal and session analysis. While Pig Latin does provide a powerful nested data model and supports UDFs making it easier to write and debug map-reduce jobs, it does not deal with issues like materializing and replicating data between successive map-reduce jobs. The paper argues that this overhead is acceptable given the numerous advantages Hadoop offers. Pig has come a long way since the paper was written. A lot of new functions have been added and it now comes with an interactive shell called Grunt. Moreover, now UDFs can be written in various scripting languages and not just Java. All these changes have made Pig more powerful and accessible than before.