Combining Hadoop and Spark in a Data Processing Pipeline

cdapio
cdapio
Published in
5 min readApr 24, 2019

March 21, 2017

Tony Duarte is the Training Specialist at Cask — where he defines, writes, and delivers courses on the Cask Data Application Platform (CDAP). Tony has spent over a decade working with Big Data — using Hadoop and Spark at Cloudera, Hortonworks, and MapR. Before becoming a Big Data Trainer, Tony was a Senior Kernel Engineer doing Operating Systems development and Database Kernel development.

CDAP includes an Application Development Framework so that Developers can build entire Applications with existing Big Data technologies — technologies such as Apache Hadoop, Apache Spark, Apache HBase, Apache Hive and more. CDAP has been used by Fortune 50 customers to help them do Data Ingestion and Data Egress from their data lakes and to help them build custom applications. And since it has been designed for Big Data pipelining, Cask has also provided a variety of reusable pipeline stages for Machine Learning and Data Manipulation. All you have to do is drag them into your pipeline with our graphical tool.

At the core of the API is the CDAP Workflow — for scheduling your Hadoop or Spark Jobs on Apache YARN — potentially in a processing pipeline. You can use it to build and deploy your pipeline — whether your pipeline is a single Spark Streaming job or a hundred MapReduce jobs that you’ve designed as plug-in microservices. Here at Cask, we think of ourselves as providing the first microservice architecture for Big Data.

Overview of a CDAP Workflow

The CDAP Workflow object has some great features:

  • You can leverage your existing knowledge of Hadoop and Spark
  • The Framework handles the scheduling on YARN — you just use a cron-style syntax.
  • You can deploy using a graphical interface — reducing the chance for mistakes in Production.
  • Run your code or run one of our configurable plugins using the graphical interface.

Code Walk-Through — Scheduling both Spark and Hadoop Jobs

Perhaps the best way to understand how you can use the CDAP App Development Framework is by looking at some code that will compile and run. This code will create a CDAP Application for a very simple pipeline. First it runs a Spark job to write some output files and then it runs a MapReduce job to read those files and process them. This pipeline can be deployed, scaled, and monitored using either the Graphical User Interface or REST calls. And when this is defined as an individual component of a pipeline, it can be substituted out as a microservice using the graphical interface.

public class CdapWorkflowApp extends AbstractApplication {
@Override
public void configure() {
addSpark(new MySparkRunr()); (1)
addMapReduce(new MyMR());
addWorkflow(new MyWF()); (2)
scheduleWorkflow(Schedules.builder("every5Min").createTimeSchedule("*/5 * * * *"), "MyWF"); (3)
}

public static class MyWF extends AbstractWorkflow { (4)
@Override
public void configure() {
addSpark(MySparkRunr.class.getSimpleName()); (5)
addMapReduce(MyMR.class.getSimpleName());
}
}

public static class MySparkRunr extends AbstractSpark { (6)
@Override
public void configure() {
setMainClass(MySparkMain.class); (7)
}
}

public static class MySparkMain implements JavaSparkMain { (8)
@Override
public void run(JavaSparkExecutionContext cdapJsec) throws Exception { (9)
JavaSparkContext jsc = new JavaSparkContext();
JavaRDD<String> myStrRdd = jsc.parallelize(Arrays.asList("1","2","3"),2);
myStrRdd.saveAsTextFile("/tmp/SparkOut"); (10)
jsc.stop ();
}
}

public static class MyMR extends AbstractMapReduce { (11)
@Override
public void initialize() throws Exception { (12)
MapReduceContext cdapContext = getContext();
Job job = cdapContext.getHadoopJob();
job.setMapperClass(Mapper.class); (13)
job.setReducerClass(Reducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/tmp/SparkOut")); (14)
FileOutputFormat.setOutputPath(job, new Path("/tmp/HadoopOut"));
}
}
}

(1) We setup our pipeline in the configure method. Here we are adding Spark, MapReduce, and a CDAP Workflow as components of our pipeline. No jobs are run on the cluster by this code. We are simply identifying components which might be used in our pipeline. We can use explicit REST commands or the graphical management interface to run a job or pipeline on the YARN cluster.

(2) This CDAP Workflow component can sequence Spark and/or Hadoop jobs. The class referenced here will have its own configure method to define the specific jobs and their execution sequence.

(3) CDAP Workflows can be run manually — with REST calls — or they can be run on a schedule. Here we are using cron-style notation to create a schedule. As before, nothing is actually running on the cluster until we explicitly start either the Workflow or it’s schedule.

(4) This is the definition of the CDAP Workflow. It will have a configure method to define what it does.

(5) This Workflow will run a Spark and then run a MapReduce Job.

(6) This is the object referenced in the CDAP Application’s configure method. It will define the entry point for the Spark job.

(7) The entry point for the Spark job is specified in this configure method.

(8) This is where you will place your Spark code. Both Java and Scala are supported. You can use any Spark code that you want — we are simply scheduling jobs on your cluster.

(9) A run() method is used to define the entry to your Spark code. You get two context objects — one from CDAP and one from Spark. The CDAP context will provide handles and methods to access CDAP data structures which might be used in a pipeline.

(10) This Spark code will save data to a file. It creates /tmp/SparkOut/part-00000 and /tmp/SparkOut/part-00001

(11) This is where you define you MapReduce job. You will use a CDAP context object — which provides access to the Hadoop Job object so that you can setup your Hadoop job.

(12) The initialize method is where you do your actual Hadoop job setup.

(13) To save lines of code in this example I’m using the built-in Hadoop 2.x identity mapper and identity reducer. They are the default implementation of a mapper and reducer which you typically override in your production code. They simply copy input to output.1

(14) I’m getting input from the prior Spark job. The output should be a single file /tmp/HadoopOut/part-r-00000 since I’m using the Hadoop default of one reducer.

Building and Running this code

After downloading the example you will need CDAP to run it. You can download and install the CDAP SDK by following the instructions in the CDAP Developer’s Manual available at http://docs.cask.co/cdap. Check out the Getting Started section for CDAP SDK.

For any questions or queries, you can reach out to the CDAP User Community group, where you can engage with members of the community on feature discussions, announcements etc.

--

--

cdapio
cdapio
Editor for

A 100% open source framework for building data analytics applications.