# Overview of efficiency concepts in Big Data Engineering

Big data operates in a different ways than traditional relational database structures, index and keys are not usually present in Big data systems, where distributed systems concerns tend to have the upper hand. Nevertheless there are specific ways to operate big data, and understanding how to best operate with these type of dataset can prove the key to unlocking insights.

### Map Reduce

Map reduce is one of the fundamental paradigm of Big data. Understanding Map reduce gives insight as to how parallel operations and processing work at large scale.

As its’ name indicates Map reduce is based on two set of tasks a map task and a reduce tasks.

The **Map** task** **is responsible for “mapping” key value pairs, essentially translating a set key values pairs into a different domain, ie: an intermediate set of key value pairs for processing purposes.

Let’s take the example above and let say we wanted to count the number of times each category occurs in the dataset. By default the input would considers each row as a new key and the content of the row as its value, the map tasks role, in this case, is to convert each row in the following fashion (row1, “12\t”Tech”) -> (“Tech”, 1), such as the following would be the output of the map task:

This can be implemented the following way in python:

The **Reduce** task in turn is responsible for processing these record. In the example we have provided it is responsible for aggregating each key and counting the number of occurrence, giving the following output:

If we were to look at implementing this reducer function on the output obtained in the map phase it would be something like this in python:

In a **distributed system**, the interaction between the map phase and the reduce phase becomes harder and the basic map reduce operations needs to be extended by a shuffling phase and can also be extended by a combiner phase for additional efficiency.

The **shuffling** phase is meant to ensure that a given key obtained after the map phase would be sent to the same reducer for aggregation purposes. Keys are therefore hashed and split for distribution across the different reducers.

If we had 3 reducers, in the case of the example explained above we would have the above record allocation. We can see that for instance the construction reducers has to compute significantly less data than the other reducers. The case of un-even amount of records per reducer is one of the major cause of slow down in executing distributed map reduce operations.

The **combiner** phase is an optimization on top of smap reduce for distributed systems. It essentially performs in the workers performing the map phase a local reducing operation, essentially saving network io.

### Sorting

Sorting provides some unique property in computer programming, these properties are transcribe when dealing with larger datasets, and having datasets pre-sorted provide significant efficiencies in performing lookups and in performing grouping operations.

Let’s take a look at the impact of sorting on lookups, imagine we have a datasets filled in two different tables, one sorted by id and the other one shuffled. The dataset is comprised of 5 records. We want to extract the category for id 55.

In the case of the sorted dataset, we can use a bisection algorithm, and directly go to the middle record, check the id, and since it is smaller, look at the subsequent records. Using this algorithm, we would have at most checked 3 records in this specific scenario. In the case of the unsorted dataset, we could also start at the middle record, but we would not have any sense of direction, and in the worst case scenario we would have to check all the records in the dataset before retrieving the correct record.

Another efficiency from sorting is its impact on groupings, take for instance the map reduce paradigm. Let’s say we wanted to count the number of records in each category in the previous datasets. And to illustrate better the example let’s imagine that we had workers whose memory can only contain 3 records at the time.

In the table on the left, the dataset is sorted by id but unsorted by category, while in the table on the right it is sorted by category.

In the left example, records 12, 33 and 34 are pushed to worker a which counts the number of record of each category and push each newfound category to a dictionary. The resulting is the output of worker a: *{‘Tech’: 1, ‘Food’: 1, ‘Construction’: 1}.* The output of worker b is a similar dictionary: *{‘Tech’: 1, ‘Food’: 1}*. These results would have to be further consolidated through an aggregation step to add the overall results of each workers output.

On the right example, records 12,45 and 34 are pushed to worker c, which counts them by category outputting: *{‘Tech’: 2, ‘Construction’ : 1}, *meanwhile worker d outputs: *{‘Food’: 2}. *Since, each worker doesn’t have aggregation keys in common, no aggregation step is necessary and the results of each worker can just be appended to each other.

Beside reducing the need for an aggregation step, the other advantage of pre-sorting the dataset is to reduce the memory needs for counting each category in the workers. In the case of the left table, workers a and b needed to hold respectively 3 and 2 keys in memory while in the case of the table sorted by category only 2 and 1 keys needed to be held in memory.

### Partitions and Buckets

**Partitions** and **Bucketing** are ways to divide your table into different files, that can be read without going through your full tables for retrieval and doing a full table scan. One of the key advantage of partitioning is the ability to quickly insert or erase all the data relevant to a given partition without impacting the rest of the table.

**Buckets** are an extension of the partition concept which in offer another subdivision of partitions using a hash algorithm. These allow for more efficient retrieval of sample data, more efficient distribution of work and of join operations.

In some system such as Hive it is possible to insert data using a **static or dynamic partitioning**. Using static partitioning the partition key on which the data will be inserted will be specified when loading the data, while under dynamic partitioning, it will be inferred by the content of the data being loaded.

### Joins

Joins operation tend to be fairly expensive at scale and knowing when to use which type of joins can be fairly impactful in terms of performance.

**Sort-merge joins:** The sort-merge join is an algorithm that attempts to find all the results satisfying the join condition across two datasets. It does so by first sorting the datasets by the join conditions, and then seeing until what point in the datasets the conditions are still matching. As such the algorithm benefits from having the tables sorted by the join conditions before hand or by having a partition column as part of the join condition.

**Hash joins**: Hash joins contrary to sort-merge can only be used for equi-joins, joins that represent an equality condition. The algorithm is set up by first computing the hash for the join conditions on the smaller table joined. This has the advantage of being of smaller size than the original tuple contained in these tables and in most case is possible to fit in memory. The algorithm then goes through the row, compute their hash values and performs a lookup on the pre-computed values of the smaller table.

**Map-side join** or **reduce side join:** These are different type of join, with specific characteristic. Map side joins, typically require the data used for the join to be able to reside in a single worker machine. They are generally more efficient as you are able to filter out the dataset earlier on in the process. Reduce side join on the other can handle larger scale of data and are in generally more expensive in terms of operations.

### Compression and file-formats

One of the main difference between big data technologies and traditional databases, is the heavy reliance on denormalized data models. This leads to an increased impact of compression and file formats, on the performance of the platform.

**Compression** proves to be really important when dealing with big data, usage of de-normalized data model explodes space being used when left uncompressed. For instance one of my Doubleclick (DV360) file export ended up at 3GB as an uncompressed CSV, gzipping it reduced the data to a mere 40mb gzipped.

This type of compression enables workers to more easily transfer from disk to ram at the cost of CPU cycles needed to uncompress the files in memory. These tradeoffs can be particular impactful in Big data systems where data is not always collocated with the workers processing it.

The typical big data compression formats include GZIP, Snappy, and LZO. Each of these compression format offer a different tradeoff in terms of compression to CPU cycles needed and you should choose it appropriately dependent on your use case. GZIP for Instance tend to be more appropriate as an archival/cold storage compression while Snappy and LZO fit more a hot storage kind of paradigm.

**File formats**, big data on HADOOP offers a couple of different file formats, the most typical are ORC, Parquet and AVRO. Each of these provide a different level of performance, depending on your ecosystem and read vs. write orientation.

ORC and Parquet format are for instance columnar data format that proves to be more efficient for read performance, while AVRO is a raw based file format that tends to be more efficient in terms of write performance.

In terms of systems orientation, Hive as well as Presto tend to favor the ORC file format; Impala and Apache Drill the parquet file format, while Kafka and Druid favor the Avro file format.

### Probabilistic Data Structures

Probabilistic data structures help computation on large set of data by providing a memory efficient approximation of certain operations. The three most typical are the count min sketch , bloom filers and Hyperloglog.

**Count MinSketch: **use leverage independent hash functions in order to estimate the frequency of items, one of the particularity of this algorithm is that the estimate it provides is always above or equal to its’ true value .

The algorithm works by counting the number of occurrence across n hash functions, each divided into m buckets; each time an item is present in the dataset, we increase a counter for each matching hash functions/bucket combinations. Once the data structure is setup, we can obtain the estimate of frequency by taking the minimum counter value for each matching hash function/bucket combinations.

Let’s take the example shown above with 3 hash functions and 3 buckets each. Each hash function is independent, bucket 3 of hash 1 is incremented by 1 to 5, bucket 1 of hash 2 is incremented to 6, and bucket 2 of hash 3 is incremented to 3. If we wanted to get an estimate on the frequency of (food, pasta), we would have to take the minimum value of these counters or 3 (hash3 / bucket 2).

In the best case scenario, at least one of the bucket will have no overlap with any other tuples, in which case the count min sketch value will equal its true value, in case where there is always an overlap in all the buckets, the value will be over estimated. One way to decrease this error rate is by increasing the number of hashes and buckets.

The advantage of such an approach is particularly demonstrated in distributed systems, where instead of having to perform a series of sequential reduce operation, count min sketch is able to perform the computation fully in a parallel manner by computing different hash functions on different workers. The number of buckets can be set as so to use at a maximum the available memory of each workers.

**Bloom filter: **is a space efficient, probabilistic data structure that can tell you wether an element is contained in a set. One of the particularity of bloom filters is that false negative are not possible. This means that the bloom filter will return results such as possibly in the set or definitely not in the set. Bloom filters are based on a similar approach as the Count Min Sketch algorithm, and also relies on the creation of multiple hash functions in order to provide to provide an estimate.

The independent hash functions are divided into n buckets, containing a single boolean flag (0,1). The bloom filter algorithm consists of two different type of operation a get operation and a put operation. The get operation, retrieve the flags in the corresponding buckets resulting from the hash functions; if all the buckets returns a 1 value, then the element is probably in the set. If at least one of the value is not 1 then element is definitely not in the set. The put operation consists of switching the flags to 1 for every corresponding hash functions.

In the example above, we initialize the bloom filter by turning 3 specific bucket green. If we wanted to check wether or not (food, steak) is already present in the set, we would need to check the buckets 1,5 and 7. Since out of these 3 buckets only the bucket 7 is already initialized, (food, steak) was not already present in the set.

**Hyperloglog: **Is an algorithm for approximating the number of distinct element in a given dataset, ie: it estimate the cardinality of elements in a dataset. Unlike an exact algorithm that will require a large amount of memory, the hyperloglog is able to compute estimates of the count of distinct value with limited memory usage.

### Wrap up

There are particularities with operating big data at scale, understanding the concept of map reduce, how sorting, partition and buckets can impact overall computation efficiency; as well as understanding the tradeoff between the different types of joints, compression and file formats. The use of probabilistic data structures adds another layer of tradeoffs between accuracy and performance.