Index Parquet with Morphlines and Solr

Every once in a while I run into something that I know is possible to do but am unable to find evidence that anyone has ever really done it. When it happens, I feel compelled to figure it out myself and share it out so that others don’t have to spend copious amounts of time to get it working. What follows is an example detailing how you can use Morphlines to index Parquet files (that is, compressed Parquet files) into Solr for searching.

Ingredients:

  • Java IDE (I prefer IntelliJ)
  • A Cloudera Hadoop cluster or quickstart VM (I used v 5.7.0 quickstart VM)
  • On that cluster/VM — Impala/Hive, Solr and all required underlying services

If you want to jump straight to my example Solr schema, example Morphlines configuration, and example Morphlines driver code go here!

Step 1: Let’s Create Some Parquet Files

Before we can index Parquet files, we first need to have some. Using the impala-shell or something like Hue, create a simple table like so:

CREATE TABLE solrtest( id string, value string) STORED AS parquet;

Once we have the table, let’s insert some data into it which will create a parquet file:

INSERT INTO solrtest VALUES (“1”,”hello”),(“2”,”world”),(“3”,”navi”);

Validate that a parquet file has been created. You can use the WebHDFS UI, HUE file browser, or the HDFS client to do so:

hdfs dfs -ls /user/hive/warehouse/solrtest
Parquet files generated in /user/hive/warehouse/solrtest

Step 2: Creating the Solr Collection

We have some parquet files to index, but won’t be able to index anything without first creating an appropriate Solr collection. From the command line, the first step is to pull the example configuration files like so:

solrctl instancedir --generate $HOME/solr_configs

This will obviously put the solr_configs folder in your home directory. There are a great deal of files there which you can spend a lot of time tweaking:

Partial listing of config files in $HOME/solr_configs/conf

For this, we’re only going to modify the schema.xml. If you’re following along, you can modify the file to match the schema.xml that I created HERE. Otherwise, you’ll have to modify it so that it indexes the fields and metadata that your parquet file(s) contain.

Once we’ve modified the schema.xml file, we upload the instance directory (aka instance configuration files) to Zookeeper like so:

solrctl instancedir --create solrtest $HOME/solr_configs/

Finally, create the collection:

solrctl collection --create solrtest -s 1 -c solrtest

Then verify that the collection exists. If you didn’t get an error response back from the Solr server, it’s probably fine. But to double check, you can navigate to the Solr UI (on quickstart VM, it’s http://localhost:8983/solr)

Validating our collection exists. Yours will show “Num Docs: 0"

Step 3: Morphlines Intro

The first two steps were pretty simple. This was the challenging part in terms of finding documentation specific to the work we want to do (indexing Parquet files).

Before we start working on this, let’s first go over what Morphlines are at a high level. First, Morphlines are part of what is called the “Kite SDK” which you can read about in more detail here. You can think of Morphlines as being a couple of things — first, it’s a Java library with a bunch of methods that accomplish a variety of transform and loading tasks, particularly within the Hadoop ecosystem. The Morphlines library takes in a Morphlines configuration file that defines a set and order of transformations in a JSON-like format. In this way, you can use the Morphlines library and write little or no Java code at all. Here is an example Morphlines configuration file (which you can find here as well):

# Specify server locations in a SOLR_LOCATOR variable; used later in
# variable substitutions:
SOLR_LOCATOR : {
# Name of solr collection
collection : collection1

# ZooKeeper ensemble
zkHost : "127.0.0.1:2181/solr"
}

# Specify an array of one or more morphlines, each of which defines an ETL
# transformation chain. A morphline consists of one or more (potentially
# nested) commands. A morphline is a way to consume records (e.g. Flume events,
# HDFS files or Spark RDDs), turn them into a stream of records, and pipe the stream
# of records through a set of easily configurable transformations on the way to
# a target application such as Solr.
morphlines : [
{
# Name used to identify a morphline. E.g. used if there are multiple
# morphlines in a morphline config file
id : morphline1

# Import all morphline commands in these java packages and their subpackages.
# Other commands that may be present on the classpath are not visible to this
# morphline.
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]

commands : [
{
# Parse Avro container file and emit a record for each Avro object
readAvroContainer {
# Optionally, require the input to match one of these MIME types:
# supportedMimeTypes : [avro/binary]

# Optionally, use a custom Avro schema in JSON format inline:
# readerSchemaString : """<json can go here>"""

# Optionally, use a custom Avro schema file in JSON format:
# readerSchemaFile : /path/to/syslog.avsc
}
}

{
# Consume the output record of the previous command and pipe another
# record downstream.
#
# extractAvroPaths is a command that uses zero or more Avro path
# expressions to extract values from an Avro object. Each expression
# consists of a record output field name (on the left side of the
# colon ':') as well as zero or more path steps (on the right hand
# side), each path step separated by a '/' slash. Avro arrays are
# traversed with the '[]' notation.
#
# The result of a path expression is a list of objects, each of which
# is added to the given record output field.
#
# The path language supports all Avro concepts, including nested
# structures, records, arrays, maps, unions, etc, as well as a flatten
# option that collects the primitives in a subtree into a flat list.
extractAvroPaths {
flatten : false
paths : {
id : /id
username : /user_screen_name
created_at : /created_at
text : /text
}
}
}

# Consume the output record of the previous command and pipe another
# record downstream.
#
# convert timestamp field to native Solr timestamp format
# e.g. 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z
{
convertTimestamp {
field : created_at
inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd"]
inputTimezone : America/Los_Angeles
outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
outputTimezone : UTC
}
}

# Consume the output record of the previous command and pipe another
# record downstream.
#
# Command that deletes record fields that are unknown to Solr
# schema.xml.
#
# Recall that Solr throws an exception on any attempt to load a document
# that contains a field that isn't specified in schema.xml.
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : ${SOLR_LOCATOR}
}
}

# log the record at DEBUG level to SLF4J
{ logDebug { format : "output record: {}", args : ["@{}"] } }

# load the record into a Solr server or MapReduce Reducer
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]

This Morphline file reads some log data in Avro format, parses the fields, changes the ‘created_at’ timestamp field to a different format, and indexes each record in Solr. If you get rid of all of the comments, all of that work could be done in about a ~20 line file without any code at all, which is pretty cool.

As I’ve already mentioned though, Morphlines is two things, and even with a configuration file you still need something to parse the configuration files and do the work. This thing is what could be referred to as a driver program. In CDH, often times Flume agents act as a driver program for Morphlines (see here). In this example, we’ll be writing our own driver program leveraging the example from the Kite SDK site (with some modifications). Don’t worry — it’s not that hard once you know what to do. First though, we’ll create our Morphlines configuration file.

Step 4: Creating the Morphlines Configuration File

The Kite SDK site defines all of the transformations that can be accomplished through the Morphlines conf file. You can look through them all here. For this example we know we need to do the following:

  • Read a (compressed) Parquet file
  • Parse out the fields and values
  • Load the records into Solr

We can accomplish all of this in a configuration file like this (source code here):

SOLR_LOCATOR : {

# Name of solr collection
collection : solrtest

# ZooKeeper ensemble — edit this for your cluster’s Zk hostname(s)
zkHost : “localhost:2181/solr”
# The maximum number of documents to send to Solr per network batch (throughput knob)
# batchSize : 1000
}

morphlines : [
{
id : solrTest
importCommands : [“org.kitesdk.**”, “com.cloudera.**”, “org.cloudera.**” “org.apache.solr.**”]

commands : [

# Read the Parquet data

{ readAvroParquetFile {
# For Parquet files that were not written with the parquet.avro package
# (e.g. Impala Parquet files) there is no Avro write schema stored in
# the Parquet file metadata. To read such files using the
# readAvroParquetFile command you must either provide an Avro reader
# schema via the readerSchemaFile parameter, or a default Avro schema
# will be derived using the standard mapping specification.

# Optionally, use this Avro schema in JSON format inline for projection:
readerSchemaString:”””{ “type”: “record”
,”name”: “my_record”
,”fields”: [
{“name”: “id”, “type”:[“null”,”string”]}
,{“name”: “value”, “type”:[“null”,”string”]}
]
}”””

}
}
{ extractAvroPaths {
flatten : true
paths : {
id : /id
value : /value
}
}
}
{ logDebug { format : “output record {}”, args : [“@{}”] } } 


{ sanitizeUnknownSolrFields { solrLocator : ${SOLR_LOCATOR} } }

# load the record into a Solr server or MapReduce Reducer.
{ loadSolr { solrLocator : ${SOLR_LOCATOR} } }

]
}
]

It looks a little squishy on this page, so I’d recommend looking at it on GitHub. Essentially what we’re doing is using “readAvroParquetFile” to read in and uncompress a Parquet file with the schema defined by “readerSchemaString”. If you do not define a readerSchemaString and are using a Parquet file created by Impala, Morphlines will still parse the data but will actually pass the values as bytes to Solr — which in our case isn’t really what we’re looking to do. Once the Parquet file is read, the fields and values are mapped/flattened (extractAvroPaths), the output record is logged, and ultimately the records are loaded into Solr. This configuration file works, now it’s time to write the driver program to run it for us.

Step 5: Writing the Morphlines Driver

In IntelliJ, create a project with dependencies managed by Maven. All of my source can be found here. Really, there are only a couple things that are needed here — the pom.xml and the driver code (MorphlineParquetIndexer.java):

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.cloudera.bkvarda</groupId>
<artifactId>morphlinetest</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-morphlines-core</artifactId>
<version>1.0.0-cdh5.7.0</version>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-data-core</artifactId>
<version>1.0.0-cdh5.7.0</version>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-morphlines-hadoop-core</artifactId>
<version>1.0.0-cdh5.7.0</version>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-morphlines-hadoop-parquet-avro</artifactId>
<version>1.0.0-cdh5.7.0</version>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-morphlines-avro</artifactId>
<version>1.0.0-cdh5.7.0</version>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-morphlines-solr-core</artifactId>
<version>1.0.0-cdh5.7.0</version>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-hadoop-cdh5-dependencies</artifactId>
<version>1.0.0-cdh5.7.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>maven-hadoop</id>
<name>Hadoop Releases</name>
<url>https://repository.cloudera.com/content/repositories/releases/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>org.cloudera.bkvarda.MorphlineTest</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>

</project>

The most important thing to note in the above file is the repository URL for the cloudera-specific dependencies. Without this, Maven will not be able to find the dependencies specific to your version of CDH. Secondly, you need to change the versions for anything in the “org.kitesdk” groupID to the version of CDH you will be running this on. Since I am running on CDH 5.7.0, all of my dependency versions are “1.0.0-cdh5.7.0”. To highlight why you need this, look at the Kite JARs that ship with CDH like this:

ls /usr/jars | grep kite
Included Kite JARS in CDH5.7.0 — these will all be in your classpath

If you don’t specify the correct versions, your driver program will ultimately be looking for classes that don’t exist and will throw a corresponding error.

MorphlineParquetIndexer.java:

/**
* Created by bkvarda on 6/5/16.
*/
package org.cloudera.bkvarda;

import org.kitesdk.morphline.api.MorphlineContext;
import org.kitesdk.morphline.api.Command;
import org.kitesdk.morphline.api.Record;
import java.io.File;
import java.io.IOException;
import org.kitesdk.morphline.base.Compiler;
import org.kitesdk.morphline.base.Notifications;
import org.apache.log4j.BasicConfigurator;
import org.kitesdk.morphline.hadoop.parquet.avro.ReadAvroParquetFileBuilder;

public class MorphlineParquetIndexer {

/** Usage: java ... <morphline.conf> <dataFile1> ... <dataFileN> */
public static void main(String[] args) throws IOException {
//Debug logging
BasicConfigurator.configure();
// compile morphline.conf file on the fly
File morphlineFile = new File(args[0]);
String morphlineId = "solrTest";
MorphlineContext morphlineContext = new MorphlineContext.Builder().build();
Command morphline = new Compiler().compile(morphlineFile, morphlineId, morphlineContext, null);

// process each input data file
Notifications.notifyBeginTransaction(morphline);
try {
for (int i = 1; i < args.length; i++) {
String file = new String(args[i]);
Record record = new Record();
record.put(ReadAvroParquetFileBuilder.FILE_UPLOAD_URL,file);
Notifications.notifyStartSession(morphline);
boolean success = morphline.process(record);
if (!success) {
System.out.println("Morphline failed to process record: " + record);

}
}
Notifications.notifyCommitTransaction(morphline);
} catch (RuntimeException e) {
Notifications.notifyRollbackTransaction(morphline);
morphlineContext.getExceptionHandler().handleException(e, null);
}
Notifications.notifyShutdown(morphline);
}

}

This driver takes in a morphlines configuration file as it’s first argument and 1 or more parquet files as arguments. The parquet files can be on the local filesystem using a relative path or on HDFS using a full HDFS URI.

As you can see, there really isn’t that much code and most of this was actually copied from the Kite SDK example on their website. The primary difference is what needs to be part of the morphline record being sent to ReadAvroParquetFileBuilder. Whereas the Kite SDK example takes in a file and turns it into a buffered IO stream, then attached that as a value to the key “ATTACHMENT_BODY”, this driver only requires a string denoting where the file is located associated with the key “FILE_UPLOAD_URL”. It took me some time digging through the Kite source code here to figure it out.

Step 6: Build the JAR

Once everything looks good and compiles, let’s build a JAR so we can run this on our cluster. From the project directory:

mvn package
Building our JAR

Then we can copy it to our cluster. For me the steps were like this:

cd target
scp -P 2222 morphlinetest-1.0-SNAPSHOT.jar cloudera@localhost:/home/cloudera
copying JAR to VM (my local port 2222 is forwarded to port 22 on the VM)

Then ssh to the cluster and copy the JAR to the classpath (location with all of the other .JAR dependencies):

ssh -p 2222 cloudera@localhost
sudo cp morphlinetest-1.0-SNAPSHOT.jar /usr/jars/
Copying JAR to classpath

Step 7: Run the Morphline Program

Making sure your morphline conf file is on the cluster, you can now run the program you made against one of the parquet files created at the beginning of this tutorial:

java -jar /usr/jars/morphlinetest-1.0-SNAPSHOT.jar test_morphlines.conf hdfs://quickstart.cloudera:8020/user/hive/warehouse/solrtest/a74f6c5de6637b2a-83eb3dbf97c15f9d_175813274_data.0.parq

We are executing our JAR with our conf file (test_morphlines.conf) as our first argument and our Parquet file (using fully qualified HDFS URI) as our second argument. Once you execute it, you might see some incredibly verbose logging. You should be looking to make sure there are no errors, and if you’re lucky you’ll see some entries like this:

Logs showing output records being parsed and correctly processed
Log showing the number of records that were processed

You can validate that the records have been indexed by querying within the Solr console:

Parquet records indexed by Solr. Yours should look like the bottom two records.

Wrapping Up

With this, you will have successfully created a Solr collection, a Morphline configuration file, and a Morphline driver program. You will have indexed a compressed parquet file into Solr. Obviously, this is a simple example but it gives you an idea of how to accomplish the task, and I’ve provided all of the source code you need to get started. Good luck! As always, any questions or comments would be appreciated.

Like what you read? Give Brandon Kvarda a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.