Running a fluent-d cluster behind an AWS ELB for handling tcp input streams

I recently got the chance to setup a fluent-d cluster behind an AWS elastic load balancer. There were some log streams, sent from filebeat and winlogbeat to this. It was not a difficult task to configure fluent-plugin-beats plugin to handle these inputs.

However, there was another very specific case that I had to tackle while setting this up and the article is about how I managed to do it. We had a tcp input stream, that is sent from various applications frequently. If we consider a single chunk of data in this tcp stream, it would appear like this. ( I am going to refer to this sample data chunk as an “exception” here after in this article ).

{
“time”: “2018–11–25 10:30:44.44”,
“type”: “exceptions”,
“module”: “test_module”,
“name”: “ExceptionWithStackTrace”,
“message”: “Sample Error Message”,
“stacktrace”: “ Exception in thread “main” java.lang.NullPointerException\r\n at com.example.myproject.Book.getTitle(Book.java:16)\r\n at com.example.myproject.Author.getBookTitles(Author.java:25)\r\nat com.example.myproject.Bootstrap.main(Bootstrap.java:14)”,
“errorcode”: “00312”
}

Now, this is an exception that I have created with sample data. But, it looks exactly similar to what my fluent-d cluster is expected to process.

I would like you to notice following few important details about this exception.

  • It is in “json format.
  • It has a field called “stacktrace” which has multiline data.
  • It does not have newline (\r\n) at the end.

Now anyone’s first choice would be to make use of the native tcp_in plugin to capture this flow of data into fluent-d. Well, It was my first choice as well. But after some careful analyzing of the input data, and how tcp_in plugin is designed to handle it, I ended up deciding that it may not be the right choice for my cluster.

This is what I figured out.

  • tcp_in plugin expects a delimiter character, defined in your fluent-d configuration, based on which it will decide the end of one chunk of data.

Have a look at following fluent-d configuration for tcp_in plugin.

<source>
@type tcp
tag tcp.events # required
format /^(?<field1>\d+):(?<field2>\w+)$/ # required
port 20001 # optional. 5170 by default
bind 0.0.0.0 # optional. 0.0.0.0 by default
delimiter \n # optional. \n (newline) by default
source_hostname_key client_ip
</source>

Now, my “exception” does not have a newline character in it, to represent the end of the data chunk but it has a “}”. This is where it became more interesting.

I mentioned earlier that I am running this fluent-d cluster behind an AWS elastic load balancer as well. A vanilla, AWS elastic load balancer replaces the “source_ip” in the tcp packet with elastic load balancer node’s ip. So if I need to preserve the actual “client_ip” address in the tcp packet, I should enable ProxyProtocol for the elastic load balancer, which I did. So afterwards this is how the exception which is sent to fluent-d node from elb, appeared.

PROXY TCP4 1xx.xxx.xxx.xxx 10.0.0.10 35646 60445\r\n
{
“time”: “2018–11–25 10:30:44.44”,
“type”: “exceptions”,
“module”: “test_module”,
“name”: “ExceptionWithStackTrace”,
“message”: “Sample Error Message”,
“stacktrace”: “ Exception in thread “main” java.lang.NullPointerException\r\n at com.example.myproject.Book.getTitle(Book.java:16)\r\n at com.example.myproject.Author.getBookTitles(Author.java:25)\r\nat com.example.myproject.Bootstrap.main(Bootstrap.java:14)”,
“errorcode”: “00312”
}

Here,

  • 1xx.xxx.xxx.xxx represents the actual client ip.
  • 60445 is the port that fluent-d is listening on, for the tcp streams.

So, this leaves me with no other option but to come up with some tweaks or a new custom plugin to read the particular exception data. Because, the data received to the fluent-d is neither a json nor a perfect tcp input that tcp_in plugin could handle.

Next, I decide to come up with following tweaks to the existing tcp_in plugin’s start function code and install it as a new plugin to my fluent-d cluster.

newchunk = 1
first = nil
last = nil
remoteip = “unable to locate”
server_create(:in_tcp_server, @port, bind: @bind, resolve_name: !!@source_hostname_key) do |data, conn|
conn.buffer << data
begin
log.info “Received: “, data
if newchunk == 1
proxyheaderend = conn.buffer.index(“\r\n”)
proxyheader = conn.buffer.slice!(0, proxyheaderend+1)
remoteip = proxyheader.gsub(/\s+/m, ‘ ‘).strip.split(“ “)[2]
end
log.info “remoiteip: “, remoteip
first = conn.buffer.index(“{“)
last = conn.buffer.rindex(“}”)
if first.nil? || last.nil?
log.info “incomplete json received: “, data
newchunk = 0
next
end
msg = conn.buffer.slice!(first, last+1)
log.info “sliced json string: “, msg
@parser.parse(msg) do |time, record|
unless time && record
log.error “pattern not match”, message: msg
next
end
tag = extract_tag_from_record(record)
tag ||= @tag
time ||= extract_time_from_record(record) || Fluent::EventTime.now
record[@source_hostname_key] = conn.remote_host if @source_hostname_key
record[@source_ipv4] = remoteip if @source_ipv4
router.emit(tag, time, record)
end
conn.close()
newchunk = 1
msg

You can find the full code here, https://github.com/uchann2/fluent-plugin-json-input/blob/master/lib/fluent/plugin/in_json_input.rb

It is designed to handle the proxy header sent from elastic load balancer and save it to “remote_ip” variable to be later used in fluent-d configurations. Also, it decides the start and end of the exception, based on the index number of ‘{‘ and ‘}’ characters.

It is not perfect and could be improved further, but it does the job.