Apache Zeppelin Scalding Interpreter

Prasad Wagle
3 min readJun 9, 2016

--

My previous post gave an overview of how we use Apache Zeppelin at Twitter. In this post, I would like talk about the Scalding interpreter that enables data analysts to create, share and collaborate on notebooks with Scalding code.

From the scalding wiki — “Scalding is a Scala library that makes it easy to write MapReduce jobs in Hadoop. Scalding is built on top of Cascading, a Java library that abstracts away much of the complexity of Hadoop (such as the need to write raw map and reduce functions).”

Apache Zeppelin is a tool for creating and collaborating on interactive data analytics notebooks. It has an extensible architecture that allows developers to create plugins (called interpreters) that interface with a data processing backend. See Implementation Details section below for code skeleton.

How to build and configure

We are working on licensing issues to make it part of the Zeppelin binary release. For now, you have to get the Zeppelin source and build the Scalding interpreter with the command:

mvn clean package -Pscalding -DskipTests

Scalding interpreter can run in local or hdfs mode. In the local mode, you can access files on the local server and scalding transformation are done locally. In hdfs mode you can access files in HDFS and scalding transformation are run as hadoop map-reduce jobs. The default mode is local.

To run the scalding interpreter in the hdfs mode you have to do the following:

Set the classpath with ZEPPELIN_CLASSPATH_OVERRIDES

In conf/zeppelin_env.sh, you have to set ZEPPELIN_CLASSPATH_OVERRIDES to the contents of ‘hadoop classpath’ and directories with custom jar files you need for your scalding commands.

Set arguments to the scalding repl

The default arguments are: “ — local — repl”.

For hdfs mode you need to add: “ — hdfs — repl”

If you want to add custom jars, you need to use the “-libjars” option. For example: “-libjars directory1/*,directory2/*”. Note that libjars is a comma-separated list of globs. A single-file-path glob file is resolved to itself. A single directory path is equivalent to the glob directory/*.

For reducer estimation, you need to add something like: “-Dscalding.reducer.estimator.classes=com.twitter.scalding.reducer_estimation.InputSizeReducerEstimator”

Set max.open.instances

If you want to control the maximum number of open interpreters, you have to select “scoped” interpreter for note option and set max.open.instances argument.

Example code and results

Print Mode

%scalding
mode

This command should print:

res4: com.twitter.scalding.Mode = Hdfs(true,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml)

Test HDFS read

val testfile = TypedPipe.from(TextLine("/user/x/testfile"))
testfile.dump

This command should print the contents of the hdfs file /user/x/testfile.

Test map-reduce job

val testfile = TypedPipe.from(TextLine("/user/x/testfile"))
val a = testfile.groupAll.size.values
a.toList

This command should create a map reduce job and return the number of lines in the file.

Future Work

  • Better user feedback (hadoop url, progress updates)
  • Ability to cancel jobs
  • Ability to dynamically load jars without restarting the interpreter
  • Multiuser scalability (run scalding interpreters on different servers)

Acknowledgments

Thanks to Gera Shegalov, Ruban Monu, Sriram Krishnan, Oscar Boykin, Quay Ly, Sangjin Lee, Joep Rottinghuis, Rohan Ramakrishna, Jason Sprowl and Srikanth Thiagarajan.

More Information

  • Scalding interpreter (local mode) pull request from Sriram Krishnan
  • Scalding interpreter pull request that was recently merged
  • Minor changes to Scalding repo

Implementation Details

Given below is a skeleton of the Zeppelin scalding interpeter code.

// ZeppelinScaldingShell is a subclass of BaseScaldingShell 
// getRepl is similar to process except that it returns a ScaldingILoop instead of calling repl.process.
out = new ByteArrayOutputStream();
PrintWriter printWriter = new PrintWriter(out, true);
ScaldingILoop interpreter = ZeppelinScaldingShell.getRepl(args, printWriter);
interpreter.createInterpreter();
res = interpreter.intp().interpret(code); // code is received from the web browser
output = out.toString(); // this is sent to the web browser

--

--