Building a Custom Flume Interceptor

Brandon Kvarda
Mar 15, 2016 · 8 min read

Flume is a powerful tool that can be leveraged as part of a data ingestion pipeline. Reading through the docs, you can get a pretty good understanding of what Flume is capable of and how to put some of the standard channels, sources, sinks, and interceptors together…but what if you wanted to customize any of those or write your own?

If you’re a Java developer, this task is likely easy (and you’re probably not reading this). If you’re coming from a DBA or operational background, perhaps you need a little more explanation as to how to accomplish this task, and looking at the source code may not be good enough. The intention of this post is to explain how to build a simple custom Flume interceptor for non-developers, and to detail what is actually happening to make this possible. To make it as easy as possible, I’m going to suggest a specific set of tools and requirements.

Requirements:

  • Maven
  • IntelliJ IDE
  • Java JDK
  • CDH 5.x (Cloudera) — I used the Cloudera Quickstart VM
  • Kafka service installed on VM
  • Flume service installed on VM

Installing Maven

Maven is a build tool for Java/Scala. It uses a “Project Object Model” to define a project, its dependencies, as well as other things. Through a pom.xml file, you can specify these dependencies. For the purposes of this tutorial, Maven will download dependencies for us (Flume libraries) and will take our source code and output a JAR file.

Instructions for installing Maven can be found here

Installing IntelliJ

IntelliJ is a multipurpose IDE that is an alternative to other Java-centric IDEs such a Eclipse. We are going to use IntelliJ to create our custom interceptor project — we will be writing our code here.

Instructions for installing IntelliJ can be found here

Creating the Custom Interceptor

Launch IntelliJ. You’ll be creating a new project. New → Project → Maven

For project name, I called mine CustomInterceptor.

You will be prompted for a few things:

  • GroupId
  • ArtifactId
  • Version

For the purposes of this project, these can be fairly arbitrary. Mine look like this:

These settings build the bare minimum pom.xml

On the left pane, you should see a basic pom.xml file with the settings you specified above. You will need to include some Flume libraries as dependencies, you can do so by adding this to your pom.xml:

<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>

You may get prompted at this point to enable auto-import of dependencies, you’ll want to say yes to this. If these dependencies aren’t downloaded, you’ll get all sorts of errors in your CustomInterceptor code.

Now let’s set up the directory structure. On the left pane, you should see <Your Project Name> → src → main → java directory structure. You may add directories to the structure here, but remember that this will be important later when you define your config file. My full structure looks like: CustomInterceptor → src → main → java → com → cloudera → bkvarda → flume → interceptor.

Now let’s write our code. Right click on the directory you want to place your custom interceptor in (for me, interceptor) and create a new Java class. My class is called CustomInterceptor.java and it looks like this:

package com.cloudera.bkvarda.flume.interceptor; /**
* Created by bkvarda on 3/13/16.
*/

import java.util.List;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

public class CustomInterceptor implements Interceptor {

@Override
public void close()
{

}

@Override
public void initialize()
{

}

@Override
public Event intercept(Event event)
{
byte[] eventBody = event.getBody();


byte[] modifiedEvent = "world".getBytes();

event.setBody(modifiedEvent);


return event;
}

@Override
public List<Event> intercept(List<Event> events)
{
for (Event event : events){

intercept(event);
}

return events;
}

public static class Builder implements Interceptor.Builder
{
@Override
public void configure(Context context) {
// TODO Auto-generated method stub
}

@Override
public Interceptor build() {
return new CustomInterceptor();
}
}
}

Let’s talk about this for a second… When you’re building a custom interceptor in Flume, you are “implementing” the Interceptor interface… and an interface in Java is really a set of methods that must be implemented.

If you go to the Flume source code for the Interceptor interface, you’ll see that it looks very much the same as the above.

Looking at the above, there are a few things to note about how Flume works. First, you need to understand that an interceptor works on a batch of events, not a single event. Looking at the below method, we can see that for every event in our list of events that we are taking in, we are going to execute the intercept() method. Then we will return the full list of events:

@Override
public List<Event> intercept(List<Event> events)
{
for (Event event : events){

intercept(event);
}

return events;
}

So what is intercept(event) doing? Well, this:

@Override
public Event intercept(Event event)
{
byte[] eventBody = event.getBody();


byte[] modifiedEvent = "world".getBytes();

event.setBody(modifiedEvent);


return event;
}

It is simply taking in a single event, using the getBody() method which is of type byte[] and then not doing anything with the original body at all. It then creates a byte[] modifiedEvent which takes the string “world” and converts it into a byte[]. We then use the setBody() method to set the body of the event to modifiedEvent (effectively changing the body of the Flume event from whatever it was before to “world”), and then returning the event. The end result of this will be that every single event body is modified to just be “world”.

This is obviously incredibly basic, and that’s the intention — but if you want to see how the other “out of the box” interceptors are created, you can look at the differences in the source code here.

Assuming everything is all set, you should have no errors in IntelliJ. Make sure to save your project. Now we can use Maven to build a JAR for us to use with Flume. Use the command line/terminal to go to your project directory. For me it was ~/IdeaProjects/CustomInterceptor. You should see something like this:

bkvarda-MBP:CustomInterceptor bkvarda$ ls -ltotal 32-rw-r — r — 1 bkvarda staff 4202 Mar 14 19:56 CustomInterceptor.iml-rw-r — r — 1 root staff 130 Mar 14 21:38 README.md-rw-r — r — 1 bkvarda staff 1027 Mar 14 19:55 pom.xmldrwxr-xr-x 4 bkvarda staff 136 Mar 13 17:02 srcdrwxr-xr-x 7 root staff 238 Mar 14 20:56 target

Assuming Maven is setup properly, you should be able to run this successfully:

sudo mvn clean package[INFO] Scanning for projects…[WARNING][WARNING] Some problems were encountered while building the effective model for com.cloudera.bkvarda:custom-interceptor:jar:1.0[WARNING] ‘build.plugins.plugin.version’ for org.apache.maven.plugins:maven-compiler-plugin is missing. @ line 12, column 21[WARNING][WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.[WARNING][WARNING] For this reason, future Maven versions might no longer support building such malformed projects.[WARNING][INFO][INFO] — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — [INFO] Building custom-interceptor 1.0[INFO] — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — [INFO][INFO] — — maven-clean-plugin:2.5:clean (default-clean) @ custom-interceptor — -[INFO] Deleting /Users/bkvarda/IdeaProjects/CustomInterceptor/target[INFO][INFO] — — maven-resources-plugin:2.6:resources (default-resources) @ custom-interceptor — -[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent![INFO] Copying 0 resource[INFO][INFO] — — maven-compiler-plugin:3.1:compile (default-compile) @ custom-interceptor — -[INFO] Changes detected — recompiling the module![WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent![INFO] Compiling 1 source file to /Users/bkvarda/IdeaProjects/CustomInterceptor/target/classes[INFO][INFO] — — maven-resources-plugin:2.6:testResources (default-testResources) @ custom-interceptor — -[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent![INFO] skip non existing resourceDirectory /Users/bkvarda/IdeaProjects/CustomInterceptor/src/test/resources[INFO][INFO] — — maven-compiler-plugin:3.1:testCompile (default-testCompile) @ custom-interceptor — -[INFO] Nothing to compile — all classes are up to date[INFO][INFO] — — maven-surefire-plugin:2.12.4:test (default-test) @ custom-interceptor — -[INFO] No tests to run.[INFO][INFO] — — maven-jar-plugin:2.4:jar (default-jar) @ custom-interceptor — -[INFO] Building jar: /Users/bkvarda/IdeaProjects/CustomInterceptor/target/custom-interceptor-1.0.jar[INFO] — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — [INFO] BUILD SUCCESS[INFO] — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — [INFO] Total time: 1.705 s[INFO] Finished at: 2016–03–14T22:18:27–07:00[INFO] Final Memory: 20M/302M[INFO] — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — 

Now if you navigate to the target directory, you should see your JAR:

bkvarda-MBP:CustomInterceptor bkvarda$ ls -l targettotal 8drwxr-xr-x 3 root staff 102 Mar 14 22:18 classes-rw-r — r — 1 root staff 3879 Mar 14 22:18 custom-interceptor-1.0.jardrwxr-xr-x 3 root staff 102 Mar 14 22:18 generated-sourcesdrwxr-xr-x 3 root staff 102 Mar 14 22:18 maven-archiverdrwxr-xr-x 3 root staff 102 Mar 14 22:18 maven-status

You will need to take this JAR and copy it to /usr/lib/flume-ng/lib/ on your cluster/VM. Once you’ve done this, you’ll need to add your interceptor to your flume.conf file. These are the extra lines I needed to add. If you followed this exactly, it should be the same for you as well:

histingest1.sources.kafka-source1.interceptors = i1histingest1.sources.kafka-source1.interceptors.i1.type = com.cloudera.bkvarda.flume.interceptor.CustomInterceptor$Builder

The type for you may vary depending on how your directory structure was setup in your project and also the name of your custom interceptor class.

For me, the end result were sequencefiles on HDFS that looked like this (some binary data):

End result

For reference, I used a random data generator to publish to a Kafka topic, then used the interceptor to change the body to “world” and then published to another topic. My full flume.conf looked like this (though yours may vary):

histingest1.sources = kafka-source1histingest1.channels = kafka-channel1histingest1.sinks = hdfs-sink1#Kafka Source (reads from origin and turns into event with world as body)histingest1.sources.kafka-source1.type = org.apache.flume.source.kafka.KafkaSourcehistingest1.sources.kafka-source1.zookeeperConnect = localhost:2181histingest1.sources.kafka-source1.channels = kafka-channel1histingest1.sources.kafka-source1.topic = originalhistingest1.sources.kafka-source1.groupId = flumehistingest1.sources.kafka-source1.interceptors = i1histingest1.sources.kafka-source1.interceptors.i1.type = com.cloudera.bkvarda.flume.interceptor.CustomInterceptor$Builder#Kafka Channel (receives world events)histingest1.channels.kafka-channel1.type = org.apache.flume.channel.kafka.KafkaChannelhistingest1.channels.kafka-channel1.zookeeperConnect = localhost:2181histingest1.channels.kafka-channel1.brokerList = localhost:9092histingest1.channels.kafka-channel1.topic = modifiedhistingest1.channels.kafka-channel1.groupId = flumehistingest1.channels.kafka-channel1.parseAsFlumeEvent = truehistingest1.channels.kafka-channel1.readSmallestOffset = false#HDFS Sinkhistingest1.sinks.hdfs-sink1.channel = kafka-channel1histingest1.sinks.hdfs-sink1.type = hdfshistingest1.sinks.hdfs-sink1.hdfs.path = /user/svchadoopp/data/flume_testhistingest1.sinks.hdfs-sink1.hdfs.filePrefix = tag-datahistingest1.sinks.hdfs-sink1.hdfs.fileType = SequenceFilehistingest1.sinks.hdfs-sink1.hdfs.fileSuffix = .seqhistingest1.sinks.hdfs-sink1.hdfs.writeFormat = Texthistingest1.sinks.hdfs-sink1.hdfs.inUsePrefix = _writing_histingest1.sinks.hdfs-sink1.hdfs.rollInterval = 600histingest1.sinks.hdfs-sink1.hdfs.rollSize = 132120576histingest1.sinks.hdfs-sink1.hdfs.rollCount = 0histingest1.sinks.hdfs-sink1.hdfs.idleTimeout = 0histingest1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = false

Brandon Kvarda

Written by

String slinger. Dog wrestler. Mouse clicker. Key masher. Face palmer. Omnomer.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade