Streaming Pixel Server Data to Azure EventHubs via the Kafka Streaming Protocol | QueryClick

Phil Cumner
QueryClick Tech Blog
7 min readDec 11, 2020

Originally posted to my personal blog here.

As part of the Microsoft Partner Hack in November 2020, I decided to use this opportunity to try out a new method of ingesting Fluentd logs.

What is Fluentd?

Fluentd is a log collector which takes a declarative config file containing input (or “source”) and output information. Wikipedia defines it as:

a cross platform open-source data collection software project

The main idea is it allows developers to collect log information and send it to a given endpoint of their wishes without having to worry about the implementation of the log collection service itself.

How do we use it at QueryClick?

At QueryClick, we ingest clickstream data to perform advanced analytics and have a few ways of doing this. Our most recent method is a pixel server that serves a tiny, 1x1 pixel on a client’s webpage and collects various information about the customers browsing the client’s website. Nginx handles the requests and deals with cookies and Fluentd takes Nginx’s access log and sends it to our data stores. Currently, it simply dumps line delimited JSON objects in Azure Blob Storage.

However, I wanted to see if we could utilise more of Azure’s services to our advantage. Having recently become Azure qualified, I wanted to see what alternatives I could develop.

Using Azure EventHubs

Azure EventHubs (AEH) is Azure’s “real time data ingestion tool” and integrates easily with other Azure services. It also features the ability to easily monitor data streams and configure multiple event consumers to then ingest the incoming data. One thing that we use daily at QC is Databricks and, through Azure certification revision, I learned that Spark Streaming integrates with AEH easily. So naturally it was AEH that I wanted to try out.

Plan

The plan was as follows:

  • Create an AEH instance to ingest clickstream data
  • Change the Fluentd output plugin to send data to a Kafka endpoint instead of Azure Blob Storage
  • Insert AEH connection information into the output plugin for Fluentd
  • Create a Databricks test cluster with a notebook attached to see whether we could ingest the clickstream events from AEH

TLDR: it works a treat

Why Kafka?

Yes, I know an AEH Fluentd output plugin exists. However, we’re big fans of open source software (OSS) here at QC so I wanted to try it with Kafka. We also have a RabbitMQ instance running internally too so wanted to try out AMQP initially but seeing as AEH has native Kafka support, I thought I’d roll with that first.

Implementation

So, what does this look like in practice?

Azure EventHubs Setup

First, I have to set up the AEH instance:

Create AEH instance

An AEH topic also needs to be created (yes, I know they’re not called topics, they’re called EventHubs. But that’s just confusing so I will refer to them as topics).

Create AEH topic

Fluentd Setup

Then, I have to configure Fluentd to utilise the out_kafka2plugin. It comes built in with the td-agent installation of Fluentd, however, our pixel server uses Docker Compose with a Fluentd Docker image. Luckily, the official Docker image for Fluentd offers instructions for how to install plugins on their supplied image. This requires creating your own image so I followed the supplied instructions, added a gem install fluentd-plugin-kafka2in it, and built the image locally (to be uploaded to Azure Container Repository once fully productionised).

This local image was then used in the docker-compose.yml in the pixel server:

services:
fkafka:
image: custom-fluentd:latest
restart: unless-stopped
volumes:
- ./fluentd/fluent.conf:/fluentd/etc/fluent.conf
- ./logs:/var/logs
env_file:
- .env

Then, in the mounted fluent.conffile, I added the new output plugin with AEH endpoint and credentials added under a <match>statement. I found this repo that helped.

<match nginx.access>
@type copy
<store>
@type kafka2
# list of seed brokers
# port 9093 is used by Kafka
brokers <AEH_NAME>.servicebus.windows.net:9093
use_event_time true
# buffer settings
buffer_type file
buffer_path /var/log/td-agent/buffer/kafka
flush_interval 3s
# topic settings
default_topic <TOPIC_NAME>
<format>
@type json
</format>
# producer settings
max_send_retries 1
required_acks -1
# using default OS certs for SSL
ssl_ca_certs_from_system true
username $ConnectionString
password "<SHARED_ACCESS_KEY>"
</store>
<store>
@type stdout
</store>
</match>

A few credentials need to be obtained from AEH. Firstly, the broker connection string ( <AEH_NAME>.servicebus.windows.net:9093 ) is here:

AEH name credential location

Add in the topic name ( <TOPIC_NAME>) from earlier. Then, create a shared access key ( <SHARED_ACCESS_KEY ) from the Shared Access Policies tab in AEH. Make sure it has Sendpermissions:

Create shared access key

Databricks Setup

Now for Databricks, I followed this tutorial and ended up with a setup like this:

Databricks streaming setup

You’ll need to create another shared access key that can Listenfrom the previous step for this. Make sure you specify the EntityHubto be the topic name in the connection string. Also, new versions of Spark require the connection string to be encrypted, use sc._jvm.org.apache.spark.eventhubs.EventHubs.Utils.encryptfrom the Spark Context scto achieve that:

conn_string = "<SHARED_ACCESS_KEY>"conf = {
"eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(conn_string)
}

(It’s honestly this simple)

Testing

As mentioned before, I ran a local instance of the pixel server and then used curlto spam it with requests. Then, I monitored the message count in both AEH and Databricks and extracted the message body in Databricks to see what I was getting.

The commands were as followed:

curl --cacert <path_to_cert> "<https://localhost:443/i?param=testing"> --output -

Please note that I have an HTTPS setup for the pixel server including a local configuration for self-signed certificates, hence the — cacert argument with port 443. Another blog to follow.

Results

This honestly worked better than I ever expected.

Firstly, let’s see what it looked like from Fluentd. I used a @type copyto send both the logs to AEH Kafka endpoint and to stdoutso that I could monitor when Fluentd picked up requests. After curling a few times, I received the below:

fkafka_1     | 2020-11-16 08:31:47.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:47+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"3dfd6da7ee3f1d1f5f9700cc248f62bf"} fkafka_1     | 2020-11-16 08:31:48.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:48+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"67264926a924f882ae083e701f8398ac"} fkafka_1     | 2020-11-16 08:31:49.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:49+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"77e9463ffad775fe60c4be9b68fcccb2"} fkafka_1     | 2020-11-16 08:31:50.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:50+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"954ab20affac1abe36fc36a724cb0247"} fkafka_1     | 2020-11-16 08:31:52.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:52+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"35af98d190620bcf7fea8364663af81a"} fkafka_1     | 2020-11-16 08:31:53.000000000 +0000 nginx.access: {"remote":"172.21.0.1","host":"-","user":"-","time":"2020-11-16T08:31:53+00:00","method":"GET","path":"/i?param=testing","code":"200","size":"43","referer":"-","agent":"curl/7.58.0","cookie":"d0a2f0d131908bf742619c89b066b775"}

(Please excuse the formatting, this looked a lot nicer on Confluence)

Now that I’m sure Fluentd picked up the requests, let’s look at the message count metric for AEH in the past 30 minutes:

AEH picking up curled requests

Sure enough, messages were coming through on the AEH topic. Now let’s look at Databricks and see if they’re coming through there:

They’re coming through in Spark Streaming

And extracting the body information:

The information we need extracted

And would you look at that, the messages are coming through perfectly!

Conclusion

I set out in the Microsoft Partner Hackathon to trial a new method for ingesting clickstream data and came out with a working, proof-of-concept after only three days of research. Now just to include this in our pipelines!

So what’s the benefit of this new data ingestion method?

I’m glad you asked, there’s a few positive takeaways from a streaming pipeline like this:

  • We now get live data and can offer this to QueryClick’s clients
  • We don’t need to change our PySpark processing code thanks to Spark Streaming to handle this new form of data
  • We can now monitor traffic as and when it comes in rather than building our own queries for batch data, further powering the metrics we can offer to client teams
  • Failures are instantaneous rather than scheduled — we know immediately when something fails and integrating this pipeline with Azure Monitor will give us alerting possibilities on top of this
  • This new ingestion method also fuels the separation of concerns within our pipeline and this has to be my biggest excitement with this method

Closing Notes

Thank you to Microsoft and the team who organised the Partner Hackathon. As much as virtual hackathons are difficult to organise, I was very impressed by this one. The engineers and cloud solution architects were always at hand to offer help at a moment’s notice and I couldn’t have done it without them.

--

--