October 13, 2016
This summer as an intern at Cask, I had the opportunity to work on Cask Hydrator. Since its launch in 2015, Cask Hydrator has been a broadly used and important application on CDAP to help users easily build and run big data pipelines. I helped evolve Hydrator further by adding the Action function to it.
The Need for Actions
A Hydrator pipeline allows users to move data from various source(s) to disparate sink(s) with the optional capability to transform data in-flight. The data in the pipeline can be processed record-by-record or at a whole feed level. Often, our customers have found the need to orchestrate pre-defined actions before the start of the processing pipeline or after the processing pipeline has completed. These actions generally provide operations such as
- Delete HDFS Directories,
- Archive HDFS Directories,
- Dump a Database Table,
- Bulk upload a Database Table, etc
In other situations, we have seen customers implement business logic in some sort of scripts/tools. This is partly because the expertise in their team is in writing SQL and Hive queries, R/python/shell scripts, etc. and partly because of the security restrictions in their environment where they cannot expose internal details but get their job done using custom tools. In such cases customers look for an orchestration framework which executes their pre-built tools.
Therefore, we added a new type of plugin called ‘Action’ to support the above use cases and more to the Hydrator Batch Data Pipeline.
Let’s take you through a complete end-to-end use case to show you how the ‘Action’ plugin type can be used in the field. Consider processing sensor data generated by various T-mobile devices for the same vendor. Since the data is generated from different devices, we want to normalize it (map it to the same format) before uploading to Hadoop. Given the devices are provided by the same vendor, they most likely have some sort of internal Normalizer tool which understands the business rules required to perform the mapping operation. Furthermore, to not expose the rules to the outside world we want to execute the Normalizer tool on a secured UNIX machine. After executing the tool, the normalized data is copied to HDFS. Hive queries are then executed to perform complex joins, after which the data is stored in Hive tables. These Hive tables are then exported to a relational database, such as MySQL, for reporting needs. Once the exporting is done, the HDFS files cleared.
Next, I am going to discuss how to set up a Hydrator pipeline that stitches all the above Actions together:
- Action plugin responsible for SSH into the secured UNIX machine and executing the Normalizer tool.
- Action plugin to copy the normalized files to HDFS.
- Action plugin that executes the Hive scripts to create the Hive tables.
- Part of the pipeline responsible for copying the Hive table data to MySQL.
- Action plugin that deletes the files generated by HDFSCopy.
Now that we know how it’s being used, let’s take a look under the hood at how Actions work. As you know, Hydrator Studio allow users to construct a logical pipeline using a sleek drag-and-drop UI. When the pipeline is deployed, a planner analyzes the logical view and materializes it into a CDAP Workflow. Depending on the stages added in the logical view, the Workflow will have one or more MapReduce or Spark jobs for physical execution.
Action types do not fit into the MapReduce or Spark paradigm since they are used to execute any arbitrary code. In order to support Action type in Workflows, a new type of node was introduced — CustomAction (extended from AbstractCustomAction).
The Action plugins in Hydrator now map to the CustomAction in Workflow. There is 1–1 mapping between Action plugins in the logical view and CustomAction in the physical view. So, for the example mentioned in the use case above there are four CustomActions generated:Normalizer, HDFSCopy, HiveQuery, and HDFSDelete stages.
Starting with CDAP 3.5, the following Action plugins come pre-packaged. Download CDAP today to give them a try!
The SSH Custom Action plugin establishes an SSH connection with a remote machine and executes some commands on that machine. This is meant to allow users to go into remote machines and execute commands to set up the environment properly for the pipeline. For example, users may want to run a script that executes joins between tables in an SQL database.
HDFS Action (move and delete)
The HDFS Custom Action plugin either moves or deletes files in the same directory in HDFS (Hadoop Distributed File System) on the cluster that is running the pipeline. One use case for this plugin is archiving data in HDFS that has just been processed by the pipeline, as described in the example above.
The Database Custom Action plugin establishes a connection with a database and executes a command. If a user wants to query the contents of a table before or after a pipeline adds or deletes records in it, this plugin can be added to the start or end of the pipeline.
My Experience at Cask
When I first started on this project, I did not anticipate the level of independence and freedom that I would be given to design and implement the plugins. This initially was daunting and unexpected, but, through awesome support from the fantastic engineers at Cask, it turned into a fun and incredibly rewarding project that I loved working on. It felt awesome demoing a feature that I was responsible for. If this kind of experience and project perks your interest in any way, I would highly recommend being a part of #LifeatCask. Click here to apply for the Summer 2017 internship program.