A Hydrator Python Transform for Python nerds like you and me!
November 18, 2015
Before every CDAP release, we at Cask conduct an internal hackathon to use CDAP and work on interesting features. A few Cask engineers got together and, wanting to open up the capabilities of Cask Hydrator beyond Java developers, decided to build a transformation that uses user-written Python.
Beginning with CDAP release 3.2, the CDAP UI includes Cask Hydrator, which allows users to develop ETL (Extract-Transform-Load) pipelines directly in the CDAP UI. Hydrator pipelines are composed of sources, transforms, and sinks that ingest data from a single source, pass it through multiple transformations, and then write it to one or more sinks. Existing Hydrator transforms are either pre-packaged Java or user-written Javascript transforms.
Python includes a number of string-handling functions that are well-suited for ETL pipeline use cases. With good reason, there is a large community of Python developers and many Python-related Big Data projects. This post covers how CDAP developers could take advantage of their Python skills and write transformations in Python.
A transformation (or transform for short) is a step in a Cask Hydrator pipeline that takes a single record of data and performs zero or more operations on it, returning a new record to the next step in the pipeline. A transform sits between a data source and one or more sinks, where the records can be written. Transforms can do almost anything with a record, as they have access to CDAP logging and metrics through a context.
For instance, you might have data in a source (a file or a stream) that needs a unique ID added to each record. You can do this by reading in each record and using Python’s standard library function — uuid — to create a unique ID before writing the record out to a new location.
Another use case is the validation of records: checking the format of phone numbers and postal codes, the existence (or lack of) specific fields in a record, etc. Though Hydrator comes with an existing Validator transform with an extensive selection of functions, it’s quite possible that it does not fit every use case. With the PythonEvaluator, you can quickly create your own validation functions and transform.
Using the PythonEvaluator Transform
The PythonEvaluator transform is straightforward to use: you download the source, build and deploy it, and then use it when creating Hydrator pipelines.
Download the source:
$ cd $CDAP_HOME
$ git clone https://github.com/caskdata/hydrator-plugins.git
Build it:
$ cd hydrator-plugins
$ mvn clean package -DskipTests -pl python-evaluator-transform
Deploy it:
$ $CDAP_HOME/bin/cdap-cli.sh load artifact \ python-evaluator-transform/target/python-evaluator-transform-1.1.0-SNAPSHOT.jar \ config-file python-evaluator-transform/resources/plugin/python-evaluator-transform.json
$ cp python-evaluator-transform/resources/ui/PythonEvaluator.json ../ui/templates/common/
The plugin is now available when you create a Hydrator Pipeline.
The PythonEvaluator in Action
In this example, we are demonstrating the creation of a unique ID for each record that is read in. We use Python’s standard uuid function to create the ID, which gives us many easily-understood, well-documented choices (string or integer, different sizes and formats) for an ID.
This is the Python code being run:
def transform(record, emitter, context):
import uuid
record['id'] = uuid.uuid1().hex
print "Deleted timestamp %s" % record['ts']
del record['ts']
context.getMetrics().count('ids.created', 1)
context.getLogger().info("Created ID: %s" % record['id'])
emitter.emit(record)
The input schema (on the left side of the illustration) is available to the script as the Python dictionary record. The emitter’s emit method is passed a dictionary that determines the structure of the output schema (on the right side). The input record dictionary can be re-used by the emitter; in that case, any existing fields not modified by the script will be passed to the output schema unmodified.
The context object received by the script contains CDAP metrics and logger, allowing the script to access these CDAP services. In addition to providing diagnostics on your plugin, logging and metrics can be used to create business-level metrics, taking your solution beyond those provided by the platform.
In this example, a module from the standard Python libraries (“uuid”) is imported and used to create the unique ID for each record that’s added to the record before being passed on to the next stage. The timestamp field generated by the File source plugin is deleted from the record dictionary, and a message printed using the Python print statement. If you are running your code in the CDAP SDK, that message will appear in the CDAP log file. Though you would not want to do this in a production environment, using Python print statements can be be a quick and easy way to test your code when developing in the CDAP SDK.
As the record IDs are created, a metric is incremented, and a message is generated with the new ID. That message will then appear in the CDAP logs and available either using the CDAP UI or the CDAP RESTful APIs.
This GitHub Gist contains a JSON file for the above ETL pipeline. You can load it into CDAP and try this pipeline yourself.
Under The Hood
The PythonEvaluator uses Jython, a Java implementation of the Python language. Currently the PythonEvaluator uses version 2.5.2 of Jython, which determines the version of Python that can be used in the transform. Though very similar to the C language implementations of Python, Jython has a few differences to be aware of. The CDAP UI uses a Python-aware version of the popular Ace Editorto display and edit the code for the transform.
Try This Yourself!
If you’d like to try writing Python to transform your data (or writing your own plugin), download the latest CDAP SDK, the PythonEvaluator, and give this feature a spin. Share your questions and experience at using CDAP and the PythonEvaluator with the CDAP community at the CDAP user group.