Photo by David Dibert: https://www.pexels.com/photo/time-lapse-photography-of-vehicles-passing-near-building-635609/

Big Data And Computing: A Beginners Approach

Hadoop Based Applications

Abhijit Roy
Geek Culture
Published in
11 min readFeb 10, 2023

--

Over the years, Hadoop’s discovery and development have eased the use of Big Data and distributed computing. It is used in different fields to serve different purposes. With regard to these requirements, several supportive counterpart interfaces and application interfaces have been developed. If you are unknown of Hadoop and MapReduce concepts it's best to please give my previous articles a quick read and understand the important concepts like HDFS, MapReduce, and Hadoop 2.x.

Some of the most used applications built on Hadoop are:

  1. Apache Hive
  2. HBase
  3. Oozie
  4. Apache Spark
  5. Pig
  6. Apache Sqoop
  7. Apache Flume

We are going to look at some of them, closely.

Apache Hive

It is a schema-based data warehouse and ETL software that facilitates reading, writing, and managing large datasets residing in the HDFS storage systems. It is also a Java-based application, as it uses a java based driver connection to execute and fetch queries from the database. HIVE stores the data in a distributed system but we can analyze and query the data in a SQL-like language Hive Query Language or HQL. It is designed to enhance scalability, extensibility, performance, and fault tolerance. Hive uses HDFS to store the data and Map Reduce for executing the transformation logic on a query. It is to be kept in mind that HIVE is not a relational database, it just behaves like one from the abstracted user surface, though it stores a structured schema, that is, the tables and storage are built in a format before the data is loaded.

HIVE is an OLAP or Online analytical protocol, that is, it is meant for data analysis and complex queries on complex multi-dimensional data coming from multiple databases.

HIVE has a number of components that lets it function. They are listed along with their examples:

  1. Driver: It is the main unit that receives the query from the user. It interacts with the client UI or CLI. It accepts the request and returns the result to the user as well. It monitors the execution of the query throughout its life cycle.
  2. Metastore: This is an RDBMS system, that contains details or metadata of all the tables and information contained in HIVE, or the backend Hadoop File System. It contains the schema of the referred table and its attributes, and the HDFS mappings. It also contains the serializers and deserializers required to perform read and write on the particular table stored in HDFS. The metadata is kept in RDBMS because we might need to modify the schema of the HIVE table, which requires CRUD operations and it is also low latency. As HDFS does not allow Delete operations, so, we can’t keep the metadata there together with the data itself.
  3. Compiler: Once the driver accepts the query, it needs the plan to execute the query. It forwards the query to the compiler. The compiler reads the query and requests the metastore to provide the schema and HDFS mappings. Once it is received from the metastore, the compiler compiles the query in HQL with the information from the metastore, devices a plan and sends it back to the Driver. Basically, the plan is a DAG or Direct Acyclic graph.
  4. Execution Engine: It is responsible for the execution of the query and the return of the result to the driver. The driver sends the DAG to the execution engine. The execution engine requests the metastore for the serialization and deserialization information for the required table. Once received it creates a Map-Reduce Job and invokes the Map-Reduce Engine. Data serialization is a process that converts structure data manually back to the original form. We need serialization in order to function using Map Reduce. As Map Reduce needs the data to flow over the network to reach different nodes, we need to convert the data in the table to byte code so that we can stream the data. Having a serializer and deserializer lets us convert the data without any dependency on the underlying structure. Serialization also helps to transfer and process data in a more compact or fast manner.
  5. The client can interact with HIVE using a CLI or a UI that is hosted on a HIVE thrift server or directly using an ODBC or JDBC driver.
HIVE architecture

Oozie

Oozie is a tool that can be used to run a job or a sequence of multiple complex jobs in an order to achieve a bigger task using a pipeline in a Hadoop distributed system. Oozie is basically used to run Workflow Jobs. Workflow jobs are a sequence of actions or jobs arranged in a Direct Acyclic Graph, where generally one job in the workflow is dependent on the result of the previous job of the workflow. The oozie executer generally runs java jars which have the map-reduce execution file. The workflow is triggered and executed by the Hadoop engine. Oozie executes on multiple worker nodes in a distributed manner.

Workflow files are XML files containing the tasks and the references to the bash and java files, which are referred to in a file called the Properties file, which contains the configurations. The workflow files need to pass the value of the namenode which can be passed from the Properties file with ${NameNode} placeholders

<workflow-app xmlns = "uri:oozie:workflow:0.4" name = "simple-Workflow">
<start to = "Create_External_Table" />

<!—Step 1 -->

<action name = "Create_External_Table">
<hive xmlns = "uri:oozie:hive-action:0.4">
<job-tracker>xyz.com:8088</job-tracker>
<name-node>hdfs://rootname</name-node>
<script>hdfs_path_of_script/external.hive</script>
</hive>

<ok to = "Create_orc_Table" />
<error to = "kill_job" />
</action>

<!—Step 2 -->

<action name = "Create_orc_Table">
<hive xmlns = "uri:oozie:hive-action:0.4">
<job-tracker>xyz.com:8088</job-tracker>
<name-node>hdfs://rootname</name-node>
<script>hdfs_path_of_script/orc.hive</script>
</hive>

<ok to = "Insert_into_Table" />
<error to = "kill_job" />
</action>

<!—Step 3 -->

<action name = "Insert_into_Table">
<hive xmlns = "uri:oozie:hive-action:0.4">
<job-tracker>xyz.com:8088</job-tracker>
<name-node>hdfs://rootname</name-node>
<script>hdfs_path_of_script/Copydata.hive</script>
<param>database_name</param>
</hive>

<ok to = "end" />
<error to = "kill_job" />
</action>

<kill name = "kill_job">
<message>Job failed</message>
</kill>

<end name = "end" />

</workflow-app>

The above is an example of an oozie workflow.

There are 3 types of basic oozie jobs:

  1. Workflow Jobs: The ones we were talking about. They are sequences of jobs or actions represented by a DAG or direct acyclic graph.
  2. Coordinator Jobs: They are workflow jobs that are triggered based on time or data availability
  3. Bundle Jobs: These are groups of Workflow or coordinator jobs.

Apache HBase

Apache HBase is a column-value-based NoSQL database. It basically stores the transpose of what a table or schema-based MySQL database stores. It is horizontally scalable and especially good for sparse data. Apache HBase was developed using the ideas behind MapReduce and Google FileTable as an open-source database powered by the Apache umbrella.

For more details, please refer here.

Apache Spark

The Spark was initiated by Matei Zaharia at UC Berkeley’s AMPLab in 2009. In 2013, the project was acquired by Apache Software Foundation. It is a fast and light cloud computing platform based on the basic idea of Hadoop, though it is not part or an extension of Hadoop. Spark may use Hadoop for processing and storage, having said that, Spark has its cloud processing unit, so, it may not be used. Spark improves the map-reduce framework of Hadoop by introducing in-memory computation. In Map-Reduce, the mappers and reducers write down the results in memory and read from the memory. This results in too many read-write operations, which are supremely time-taking. Spark’s concept is to keep the results in-memory, instead of writing them back to the file system, which saves spark a lot of time.

Spark supports all types of jobs, like offline/batch jobs, online jobs, stream jobs, SQL queries, and even machine learning and graph-based algorithms. There are 4 main components of spark placed as service interfaces over the core spark components, containing the execution engine, in-memory storage, and functionalities, which cater to a lot of different fields:

  1. Spark SQL
  2. Spark Streaming
  3. Machine Learning: Spark’s MLlib library helps users to implement Machine learning applications on a distributed system.
  4. GraphX: Used for processing graph-based algorithms on a distributed system.

Spark provides APIs in Java, Scala, or Python. It allows us to write applications in different languages.

Spark Fundamental Units

Spark operates on two abstract concepts on a basic level:

  1. DAG or Direct Acyclic Graph: Every task or job, irrespective of the type is converted to a DAG or a chain of steps or smaller tasks or jobs, which are then executed in a much better way using parallel computing.
  2. RDD or Resilient Distributed Datasets: They are immutable, distributed, and logically partitioned dataset objects. They serve as the fundamental data structures of spark. These units are used to store data across several servers or cluster nodes. They are fault-tolerant and support self-recovery. There are a few ways of creating an RDD:
    a. A stable source of data like a dataframe
    b. From other RDDs after performing some operations
    c. Parallelizing from existing collections: sc.parallelize(collection)
    Basically, the DAG represents the plan of action which are executed to create the RDDs. If those RDDs are somehow lost also, they can be quickly reobtained by performing the same operations on the RDDs of the previous steps. Thus, RDDs self-recover. As we have seen already, Spark brings huge facilitation by storing data and performing in-memory computation, RDDs are provisioned and stored in-memory.

Spark Architecture

Spark’s architecture contains 4 essential components, which govern all types of jobs running on Spark and helps to provide seamless results at a quicker interval.

Spark Architecture
  1. Driver: The driver runs on the master node, and is responsible for running and maintaining all jobs and tasks. It includes basic components like DAG and Task schedulers. Whenever spark is launched driver creates a spark context that has the basic details about the application and also several config options for the application. The driver is tasked with breaking down an application job into a chain of smaller tasks or jobs and scheduling them to be executed. It uses a cluster manager to find executors to run and complete the tasks.
  2. Cluster Manager: They are basically resource managers who are responsible for resource allocations when it is requested by the driver for the scheduling and execution of a task. Normally we use Hadoop YARN as a scheduler or Kuberneters or on local systems we use standalone schedulers as cluster managers.
  3. Spark Executors: They are the basic unit workers of spark, who are responsible for executing the tasks and also storing the data in its cache disks. Executors have to register themselves with the driver, to let the driver know of their presence. As seen in the diagram, executors have multiple slots and disks. The number of available slots in an executor is dependent on the number of cores on the executor machine. The driver, after creating the DAG and breaking the job into smaller tasks, assigns each task into different slots of different executors. So, if a job is divided into 3 tasks they will get scheduled in randomly selected slots of randomly selected executors based on availability. Other slots of a particular executor may be scheduled for another task at the same time. So, this makes spark available to process different jobs concurrently. When data is loaded in the executor the process launches in the scheduled slot of the executor and once the job is done, the data is removed. Basically, data moves from executor to executor based on the driver’s plan, activating and deactivating executors.
  4. Worker Node: They are the nodes of cluster management that basically host the executors.

Spark’s Modes of Execution

Spark can basically operate in 3 modes:

  1. Cluster Mode: In this case, the cluster manager accepts the job request from the client. It first launches the driver, which generates the spark context and creates the entry point for the program to run.
  2. Client Mode: In this case, the driver keeps on running on the client machines, which are called gateway machines. The spark job is submitted to the driver, which then uses the cluster manager.
  3. Local Mode: Entire thing runs on a single machine.

Spark’s Functioning With Data

  1. We have seen spark work with RDDs, but it doesn’t use Map Reduce directly, so how does it work?
  2. We have also seen spark uses DAGs, so what do they contain?

These are the questions we are left with now. Let’s try to answer them.

Spark has two operations, it performs on the data to get some results: Transformations and Actions. These operations have low-level APIs available. Let’s check them in details:

Transformations: They are functions that take RDDs as input and produces one or more RDDs by performing some operations on them, without changing the input RDD. Transformations are lazy by nature, i.e, they don’t take effect immediately. Spark creates a record of the transformations in the DAG in an orderly manner and maintain them. There are two types of Transformations:

  1. Narrow Transformations: These transformations are like one to one mapping. They take in one input RDD and produces one output RDD. For example: Filter(), Map().
  2. Wide Transformations: These transformations are like many-to-many mappings. They consume multiple input RDDs and produce multiple output RDDs. For example(): GroupBy(), Join().

Actions: They are functions which reflect eager execution in Spark. They produce Non-RDD results and return the final results to the driver. When we call an action it executes everything on the DAG including the transformations which are maintained on those RDDs to get the final results. For example: Count(), Reduce().

Now, we have seen the types of operations, but these operations takes place across several worker nodes on the cluster. The question arises how spark shares the variables among so many different nodes?

To understand this, we will need to understand then types of shared variables of spark. There are two types of shared variables used by Spark:

  1. Broadcast: These variables are saved in cache across all nodes. Basically, whenever a broadcast variable in stored, it saves a copy of the variables across the cluster.
  2. Accumulator: These variables are used to aggregate values from all across the cluster. For example, say, we have a counter variable, now, that variable will count occurrence of something. But the operation its counting can occur on any node of the cluster. To maintain a record like we use accumulators. It has a member variable named Value, which can be accessed and modified by any node on the cluster. It acts like a broadcast variable.

Spark’s Flow of Operations at Runtime

A job on being submitted to Spark follows these steps to run to completion:

  1. Client submits a code with transformations and actions to the driver, which creates a logical DAG or plan of action
  2. With the help of spark context, and some catalysts to optimize the operations, the driver converts the logical plans into a physical execution plans, breaking down the big job into a chain of asynchronous smaller tasks.
  3. Driver contacts the cluster manager to negotiate resources. On the basis of workload and available slots in the executors, slots are allocated.
  4. Then the driver schedules and executes the tasks on those executors.

Conclusion

In this article, we have seen in details, some of the application based on Hadoop, which have become significantly important in the big data world. In the next articles, we will try to dive more into spark applications.

Happy Reading!

--

--

Abhijit Roy
Geek Culture

I am a Computer Science and Technology Graduate from NIT, Durgapur. Find Me at https://abhijitroy1998.wixsite.com/abhijitcv