MQTT. Apache Kafka. InfluxDB. SQL. IoT harmony.
The rapidly growing number of interconnected devices confirms the Internet of Things (IoT) is a fast maturing technology. The digital economy has its own currency and that is data. Similar to the standard currencies, data is valuable if you can use it. The IoT is a driver for being data rich. However, having the data is not quite enough; you need to be able to analyze the data and take the appropriate action.
In this entry, you will see how using Apache Kafka and Lenses can help you boost your productivity by providing you the tools to build end-to-end data pipelines with just a few lines of SQL code.
To top that, how does it sound seeing your entireApache Kafka-native data pipelines in one interactive graph?
I can visualize all my data pipelines over Apache Kafka in an interactive graph?
Yes, by the end of this article you will see the graph in action.
First Apache Kafka native data pipelines interactive graph is here.
Imagine a vast network of sensors pushing various measures via the MQTT protocol to a Mosquitto cluster. Using the entire Apache Kafka ecosystem, the data is first imported into topics via Kafka Connect, then Kafka Streams comes into play to analyze the stream(-s) of data, and the results are then pushed into an instance of InfluxDB via Kafka Connect.
Let’s put together one data flow to see how quick and easy it is to build a solution for monitoring the temperature and humidity levels. Furthermore, using the redux-lenses-streaming Javascript library for bridging Kafka via SQL, you will see how you can build richer Web applications by hooking into live Kafka data.
If you prefer watching a video to reading an article, you can do so:
Lenses
Lenses is the core element bringing everything together in a unified platform allowing you to built and monitor your data pipelines. You can get the free Developer from http://www.landoop.com/downloads/.
Lenses is a data streaming platform built on top of Apache Kafka, allowing you to stream, analyze and react to your data faster. You can boost your business by delivering faster, better and easier without having to be a Kafka-savvy person.
Leveraging the platform’s Kafka Connect management capabilities, Kafka connectors and its powerful 3-tier SQL engine for Apache Kafka, you can build a streaming ETL in minutes.
LSQL(or Lenses SQL) is a powerful SQL engine for Apache Kafka covering both batch and streaming capabilities. What that means is simple: you can write Kafka Streams applications with LSQL or you can browse Kafka topics data at ease.
At a very high level, here’s the rundown of the major components that fall under the LSQL engine umbrella:
LSQL has made writing Kafka Streams applications or browsing Kafka topicsa lot easier.
Before going any further, make sure you have your Lenses instance ready. In case you don’t have it already, you can follow this link to get your developer edition. Please note you need Docker available on your machine. When you register for the free Developer Edition, you get an email from us with the details of how to run it. The command is similar to this:
docker run -e ADV_HOST=127.0.0.1 -e \
LICENSE_URL="https://milou.landoop.com/download/lensesdl/?id={YOUR_OWN_KEY}" \
--rm -p 3030:3030 -p 9092:9092 -p 2181:2181 -p 8081:8081 \
-p 9581:9581 -p 9582:9582 -p 9584:9584 -p 9585:9585 \
landoop/kafka-lenses-dev
For our data pipeline, we need to use the latest MQTT source connector which handles JSON payloads as well as setting the Kafka message key to the value of our sensor id, for example. The current MQTT source deployed with the free edition contains an older version of the connector which doesn’t allow setting the Kafka message key. The next release will have this covered. Meanwhile, to get the latest code please follow these:
- Download the MQTT connector jar from our Stream Reactor github repository.
- Store the jar into a known location (we are going to use it when we start the Docker process) for example
~/work/mqtt-connector/
You need to start the Docker image with the distributed MQTT connector disabled and using the latest jar. Here is the command to do so:
docker run -e --net=host \
-e LICENSE_URL="https://milou.landoop.com/download/lensesdl/?id={YOUR_OWN_KEY}" \
--rm -p 3030:3030 -p 9092:9092 -p 2181:2181 -p 8081:8081 \
-p 9581:9581 -p 9582:9582 -p 9584:9584 -p 9585:9585 \
-e DISABLE=mqtt \
-v ~/work/mqtt-connector:/connectors/mqtt-connector \
landoop/kafka-lenses-dev
You might have noticed the two extra parameters added to the command received in the email:
-e DISABLE=mqtt
Disables the already packaged MQTT connector.-v ~/work/mqtt-connector:/connectors/mqtt-connector
mounts your local folder and makes the latest code available to the Docker instance.
Run the command. The Docker image delivers the Kafka environment required for local development: 1 Kafka Broker, 1 Kafka Connect Worker, 1 Zookeeper Node, Schema Registry, and of course Lenses. You don’t have to install or configure anything else to start coding against Apache Kafka. It takes around 45 seconds or so for the environment to be available. Make sure you do allocate at least 4GB of RAM for your Docker process. Open your favorite browser and navigate to http://localhost:3030 to access the Web user interface.
MQTT cluster and the sensor input data
The first requirement for our data pipeline is having sensors data. To mimic a real network of sensors sending data over MQTT, a data generator application is provided. To avoid setting up the MQTT Mosquitto cluster, the application embeds a lightweight MQTT compliant broker by leveraging the Moquette library.
Below you can find the shape of data a sensor sends over. By the way, the programming language used for the generator code is Kotlin.
data class Sensor(
val id:String, // the sensor unique identifier
val temperature:Double, // the temperature in Celsius val humidity:Double, // the humidity level
val timestamp:Long) // The time in ms since Jan 1, 1970 UTC
The code is quite easy to follow even if you haven’t used Kotlin before. First, the MQTT broker is started on the given port. For simplicity it allows anonymous connections:
val mqttBroker = Server()
val properties = Properties()
properties.put(BrokerConstants.PORT_PROPERTY_NAME, port)
properties.put(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, "true")
mqttBroker.startServer(properties)
Then, in a while loop, the code generates and sends data as if there were 4 IoT sensors. For the purpose of this demo, we keep the sensors number small since it will make it easier to follow the data.
fun publishSensorData(topic: String, payload: ByteArray): Unit {
val fixedHeader = MqttFixedHeader(
MqttMessageType.PUBLISH,
false,
MqttQoS.AT_MOST_ONCE,
false,
0)
val varHeader = MqttPublishVariableHeader(topic, 0)
val msg = MqttPublishMessage(
fixedHeader,
varHeader,
Unpooled.copiedBuffer(payload))
mqttBroker.internalPublish(msg, "client1")
}
fun rand(from: Int, to: Int): Int {
return random.nextInt(to - from) + from
}
fun generateSensorData(sensorId: String, prev: Sensor): Sensor {
return Sensor(sensorId,
prev.temperature + random.nextDouble() * 2 + rand(0, 2),
prev.humidity + random.nextDouble() * 2 * (if (rand(0, 9) % 2 == 0) -1 else 1),
System.currentTimeMillis())
}
val dataMap = sensorIds.map { it ->
val sensor = Sensor(it, 23.0, 38.0, System.currentTimeMillis())
it to sensor
}.toMap()
try {
while (true) {
sensorIds.forEach { sensorId ->
val data = generateSensorData(
sensorId,
dataMap.get(sensorId)!!)
val json = JacksonJson.toJson(data)
publishSensorData(topic, json.toByteArray())
}
Thread.sleep(500)
}
} catch (e: Exception) {
}
You can find the code on github . To save you the time for building it, the application artifact is provided here. Once you have extracted the archive (zip or tar) to your machine, navigate to the folder and run the following command:
./bin/mqtt-sensor-data 1883 /sensor_data
There are two parameters provided. The former is the port number to bind the MQTT broker to, and the latter is the topic name to send the data to. Running the application should result in a similar content printed on your console:
Received message on /sensor_data with payload : {...}
Received message on /sensor_data with payload : {...}
Received message on /sensor_data with payload : {...}
There is data to work with now.
Import the data from MQTT cluster into Kafka
Apache Kafka comes with the Connect framework which allows moving data in and out of Kafka reliably and at scale. We will use Landoop’s MQTT source connector to ingest the sensor data into Kafka. You can find more about the connector by reading the documentation here.
The connector subscribes to the MQTT topic and writes the data it receives to a Kafka topic. Its behavior is driven by an intuitive SQL-like configuration we call KCQL (Kafka Connect Query Language). It is part of LSQL engine.
This greatly reduces the complexity of managing the configuration, while, at the same time, it allows to quickly understand the work the connector is doing.
Let’s create the MQTT source connector instance and see the data flowing through to the target Kafka topic. Make sure are logged-in into the Lenses Web UI, and navigate to the Connectors page (you will find the link on the left-hand side menu). Once loaded the page looks like this:
To add a new connector, click on the `+New Connector` button (at the top right of the page). From the new screen presented to you do select the MQTT entry from the `Sources` list. Once that is done, you will be taken to a different page prompting you to provide the connector configuration. Take the settings below and paste them into the configuration editor and then click the `Create Connector` button.
name=Mqtt-Sensor-Source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.kcql=INSERT INTO sensor_data SELECT * FROM /sensor_data WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
connect.mqtt.connection.clean=true
connect.mqtt.connection.timeout=1000
connect.mqtt.connection.keep.alive=1000
connect.mqtt.client.id=lenses_mqtt_sensor
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://127.0.0.1:1883
connect.mqtt.service.quality=1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Before continuing, let’s pause a moment to understand what the configuration is describing. connect.mqtt.hosts=tcp://127.0.0.1:1883
configuration entry specifies the MQTT connection detail. If you have used a different port to start the data generator than 1883
, the above configuration needs to be updated.
The KCQL syntax (see below) instructs the connector to read from the MQTT topic /sensor_data
and push the entire payload to the Kafka topic sensor_data
. The resulting Kafka message will have its key part set to the incoming payload field id
- the sensor unique identifier. Translating the MQTT message to a Kafka message is handled by the JsonSimpleConverter
class. It takes the Json MQTT payload and translates it to a Connect Struct which is handed over to the Connect framework to push to Kafka.
INSERT INTO sensor_data
SELECT * FROM /sensor_data
WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
WITHKEY(id)
We are ready now to create the connector. Click on the `Create connector` button and wait for the Connect framework to do its work and spin the MQTT source task. Once is running (should take a few seconds for the task to be allocated — we use only 1 in this case), you can navigate to
http://localhost:3030/#/topics/sensor_data to see the data as it arrives in Kafka.
The data pipeline is quickly taking shape. Next, we want to analyze the data an calculate the average temperature and humidity values while retaining the min/max over a given time window.
Kafka Streams App with LSQL
To analyze the data stream, we need a Kafka Streams application. Using LSQL, we can quickly define and run such an application in minutes. Apart from allowing SQL to describe your application, the Lenses platform also takes care of running and scaling the resulting Kafka Streams applications.
For the Developer Edition, the KStream app runs in the same process as the platform main process. The Enterprise version offers two additional modes for execution and scaling: Connect and Kubernetes. You can find out more from the documentation or you can see it in action here.
Running, monitoring and scaling the Kafka Streams apps defined via LSQL is provided out of the box.
To create the application processing the data stream, navigate to the `Processors` page. SQL processor is the platform terminology to describe a Kafka Stream applications written with LSQL. Press the `New Processor` button to create a the application. Grab the code below and paste it in the editor while naming the processor as `IoT`:
SET `auto.offset.reset`='latest';
SET autocreate = true;
SET `commit.interval.ms` = 3000;
INSERT INTO sensor_data_avg
WITH
avgStream as
(
SELECT STREAM
count(*) as total,
sum(temperature) as temperatureTotal,
sum(humidity) as humidityTotal,
min(temperature) minTemperature,
max(temperature) maxTemperature,
min(humidity) minHumidity,
max(humidity) maxHumidity,
_key as sensorId
FROM `sensor_data`
WHERE _ktype=STRING and _vtype=JSON
GROUP BY tumble(2,s),_key
)
SELECT STREAM
sensorId,
temperatureTotal/total as avgTemperature,
humidityTotal/total as avgHumidity,
minTemperature,
maxTemperature,
minHumidity,
maxHumidity
FROM avgStream
You don’t have to be an Apache Kafka expert or a JVM developer to write a stream processing application.
Let’s go over the code. The SQL-like syntax will end up being translated to a Kafka Streams application which calculates the average, min and max values for temperature and humidity. The calculations are done over a time window of 2 seconds. This means every 2 seconds there will be a result for each sensor who sent data.
Before the INSERT
statement there are three SET
ones:
autocreate=true;
instructs the SQL engine to create the target topic if it does not exist alreadauto.offset.reset= `latest`;
means the application will process the data from latest messages onward.commit.interval.ms =3000;
sets the frequency with whitch to save the tasks position (offsets in the source topics)
You might ask yourself: Why two SELECT
statements? The simple answer is: calculating `average`. We are working on supporting an avg
function. Once that will be available, the above code will be even easier to write.
The first SELECT
statement builds a KStream instance which calculates :
- the total number of messages received over the 2s interval
- the sum of all the values for humidity and temperature
- the min/max values for humidity and temperature
for each sensor (see the key being referenced in GROUP BY …, _key
) it receives data for. Utilising the data generated by this first statement, the second SELECT
statement can calculate the average temperature/humidity by computing temperatureTotal/total as avgTemperature
.
By clicking now on the `Create Processor` button you should end up with with the web page looking similar to this:
The stream topology graph is provided for each SQL processor.
Now the processor is up and running, you should see data already pushed to the target topic. The UI will display the data arriving in the target topic in real time. Furthermore a user can interact with each node in the graph.
Simple, quick and easy.
To complete the data pipeline, a Kafka Connect InfluxDB sink will persist the stream analysis results.
InfluxDB for your IoT time-series data
InfluxDB is an open source time series database able to handle high write and query loads. Written in Go, it can handle large amounts of data such as application metrics, IoT sensor data, and real-time analytics. At Landoop we provide the most advanced Kafka Connect Sink for it. You can find more about the sink capabilities in the documentation.
Before setting the sink, you need to have an instance of InfluxDB running. The best and fastest way to do so is to use the existing Docker image for it. Run this command to provision your instance:
sudo docker run -p 8086:8086 \
-v influxdb:/tmp/influxdb \
influxdb:1.3.0
Once the Docker image is running you need to create the database to insert the data into. Run the following command to create a database named IoT
:
curl -XPOST "http://localhost:8086/query" --data-urlencode "q=CREATE DATABASE iot"
To provision the sink, you need to navigate to the UI Connectors page as you did for the source. This time, when adding a new connector, select InfluxDB from the Sinks list. When the UI prompts for the connectors configuration, paste the information below:
name=iot-influxdb-sink
tasks.max=1
connector.class=com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector
connect.influx.url=http://localhost:8086
connect.influx.db=iot
topics=sensor_data_avg
connect.influx.kcql=INSERT INTO sensorMeasure SELECT * FROM sensor_data_avg WITHTAG (id, poweredBy=Lenses)
connect.influx.username=""
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
Using the configuration above, the sink knows to connect to the local InfluxDB instance and insert the data into the iot
database. The connect.influx.kcql
configuration entry specifies, via a simple INSERT
statement, the target InfluxDB measurement and the source Kafka topic to read the data from. WITHTAG
keyword will trigger the sink to provide two labels for each inserted data point. The first one, id
, contains the sensor unique identifier; the second one poweredBy
is a constant. Since KCQL hasn’t specified the timestamp field (InfluxDB requires a timestamp set for each data point) the point will get the time at insertion expressed in nanoseconds.
Click the `Create Connector` button to see the connector instance provisioned and the computed averages written to InfluxDB. Kafka Connect will allocate the task and the connector will start pushing data to the time-series store.
To query the data from InfluxDB run the following command:
curl -XPOST "http://localhost:8086/query?pretty=true" --data-urlencode "db=iot" --data-urlencode "q=SELECT * FROM sensorMeasure"
and you should see content similar to the one below being printed on your console:
{
"results": [
{
"statement_id": 0,
"series": [
{
"name": "sensorMeasure",
"columns": [
"time",
"avgHumidity",
"avgTemperature",
"maxHumidity",
"maxTemperature",
"minHumidity",
"minTemperature",
"poweredBy",
"sensorId"
],
"values": [
[
"2018-01-05T16:24:47.715Z",
36.910297678496086,
25.008838646379814,
38.26483448074582,
25.83512146042607,
36.128449761591284,
24.193354778778644,
"Lenses",
"\"SB01\""
],....
Voila! Our data pipeline is now running. You had the patience and curiosity to get this far, but there are two more points to touch upon, and they are quite important.
Let your Web app tap into Kafka live data
Having data flowing through the database is great, but what if you want to tap into the data as it is computed and present it in your Web application?
That could lead tp richer user experience, and your users will greatly appreciate live time dashboards/charts.
The platform comes with the Javascript library to do just that. Soon to be open sourced, it allows you to connect to Kafka and get live data into your browser leveraging Redux and LSQL.
A sample application is provided to showcase the things you can achieve. You can find the code on [github]. Once you have downloaded it locally, run:
npm install
npm run start-dev
and then navigate to http://localhost:8000/lenses/. Use this connection string: wss://localhost:3030/api/kafka/ws
and the admin/admin
credentials to open a connection. Authentication is required by default when accessing the REST API the platform exposes. You can find more about the library and the endpoints in the documentation. Type the following LSQL statement in the query editor:
SELECT * FROM sensor_data_avg WHERE _ktype=STRING and _vtype=JSON
Shortly after you should see live aggregated data being rendered on the chart:
Impressive, isn’t it !?
The cherry on the cake
There is something left for the end of this entry. And is something we are very happy to have achieved.
What if you were told you can see the entire topology in a nice graph: the Connect source pushing data into a Kafka topic, a SQL processor (Kafka Streams App) doing the analysis and storing the data back to another topic from which a Connect sink sends it to InfluxDB?
Navigate to http://localhost:3030/#topology and be amazed:
Pretty cool, if you ask me. The more connectors or SQL processors you add the more complex the graph will become. You can visualize your entire data flow in one interactive graph!
Lenses is the first product to offer native Apache Kafka data pipelines interactive graph!
Yet another first for Apache Kafka delivered by Landoop.
You can interact with each node in the graph to get more details. As you can see in the image above, the SQL processor has been selected and you get to see its processing flow.
Conclusion
We have seen how the Lenses platform enriches the Apache Kafka ecosystem by providing the tools to easily and quickly set up a data pipeline. There if more to it than just that. You can find more on the website.
With a few lines of code (LSQL and configuration), one can easily deliver an ETL solution for moving data in and out of Apache Kafka.
The topology view gives you the entire picture of your data flows. We can’t probably stress enough the benefits of seeing your data pipelines in one place.
Using the platform you can deliver your streaming solutions, in this scenario for IoT, a lot faster and a lot easier.
Focus on your business requirements, we provide the tools.
Stream. Analyze. React. Lenses!
If you want to learn more about Lenses, just contact us @LandoopLtd or via email info@landoop.com, or simply chat live on www.landoop.com.