Collecting Data from I/IoT Devices using telegraf

cem akpolat
Nerd For Tech
Published in
11 min readJun 28, 2023

Telegraf is one of the tools that aggregates data from diverse resources using distinct communication protocols, change the incoming data within the process phase, generate aggregated metrics on the processed and collected data, and then forwards the processed data to data consumers. This concept is pretty well-known, and it is similar to Extract-Transform-Load(ETL). What telegraf offers us the huge support for the distinguished data resource integration plugins. Most of the Industrial IoT (IIoT) such as modbus, opc ua, etc. are already implemented for you, and you can use them or create a new one through the well-prepared plugin document. Another plus of telegraf is the whole source is open. With the following figure, the high-level architecture of telegraf is presented, so that you may map the items better.

Telegraf Plugin System

To construct the above given software architecture, the developers chose Go language to implement telegraf, and the code structure is quite similar to the figure structure,which eases to comprehend what happens behind the scene.

Developing a new plugin for all these separate operations can be easily done w.r.t given instructions. This means if your data source or the available process operations doesn’t satisfy your conditions, you can create a new plugin and implement the predefined methods. Since this flexibility is available, there are already some works in this direction, e.g. starlark is python-like language enables the integration of more complex process and aggregate functionalities, and it is easy to forsee that many other libraries will be built on top of the telegraf in the long term.

After a gentle and light-weight introduction, the main purpose of this tutorial is to reflect the different aspects of this program, here is the short tutorial as a list:

  • Define an IoT Use-Case
  • Creating Basic telegraf Environment
  • Adding Devices & Extending telegraf Configurations
  • Putting All Configs and Extending Docker-Compose

Define an IoT Use-Case

An industrial IoT environment is composed of a number of devices and services to build the production pipeline. Many devices generate the massive amount of data and services process them to either react to the changing conditions or to monitor and visualize them. In this use case, our goal is to simulate some of these devices that communicates via distinct communication protocols such as OPCUA, SNMP, HTTP and MQTT and then visualize it using influxdb as well as forward it to other tools such as kafka data consumer. To connect all these components together, the telegraf tool can play a bridge role inbetween to receive all these data from devices, to transform it and to deliver other tools.

Creating Basic telegraf Environment

Starting with a basic setup environment relies on the telegraf and influxdb app within a docker-compose file. You can always go to the way to install natively, which I don’t mostly recommend. To have an initial and quick setup, we need the followings:

  • telegraf.conf that includes a simple agent configuration, which will be bound to the telegraf container in the next step.
# Agent Configuration
[agent]
## Default data collection interval for all inputs
interval = "10s"

# Input Plugins
[[inputs.cpu]]
## no configuration options required

[[inputs.mem]]
## no configuration options required

# Output Plugins
[[outputs.file]]
## Output file path
files = ["stdout"]
  • docker-compose.yml that contains influxdb and telegraf services. Since telegraf uses the influxdb to push the data, therefore, it depends on its start. Depending on a service might be tricky, since sometimes starting. A service doesn’t mean that the dependent service functions fully, just as a reminder. Or you can
version: '3'
services:
telegraf:
image: telegraf
volumes:
- ./telegraf.conf:/etc/telegraf/telegraf.conf:ro

If you installed telegraf on your OS, you may also execute the line below to start it on the local computer, at least for the Mac OSx environment. Please do not forget we build all components with the docker-compose in the end of this tutorial.

telegraf — config /path/to/telegraf.conf

Adding Devices & Extending telegraf Configurations

As mentioned above, our intention is to create an environment containing multiple devices that connect to the telegraf. The devices are added below as dummy sensors, i.e. all data are randomly generated. Nevertheless, the import point is to establish the diverse communication protocols with telegraf. If your aim is to bind a real device, you can still use it with the same config, only the data would be different. The devices to be simulated are: SNMP-, HTTP-, OPCUA-, and MQTT-devices.

Integrating SNMP Device

SNMP (Simple Network Management Protocol) is widely utilized network management protocol for monitoring and managing network devices, and it allows us to collect, retrieve and modify device information. While using this protocol, the features/items of the devices are represented by OID e.g. 1.3.6.1.2.1.1.1.0, which is assigned to a value. The mostly used version is SNMPv3 and there are enough libraries supporting different programming languages. In our example, we start a Debian image as Docker on which the snmp related libraries are installed. In our example, we will use the well-known OIDs such as CPU- and memory-usage, and these are directly retrieved from telegraf itself.

# Dockerfile_snmp_agent

FROM debian:latest

RUN apt-get update && apt-get install -y snmp snmpd

# Copy the SNMP agent configuration file
COPY snmpd.conf /etc/snmp/snmpd.conf

# Expose the SNMP port
EXPOSE 161/udp

# Start the SNMP agent service
CMD ["snmpd", "-f", "-Lo", "-C", "-c", "/etc/snmp/snmpd.conf"]
  • A default snmpd.conf file is added as below, and it will be included in the docker image while creating the docker image.
com2sec readonly default public
group MyROGroup v1 readonly
group MyROGroup v2c readonly
view all included .1 80
access MyROGroup "" any noauth exact all none none
syslocation "Location"
syscontact "Contact"

The related part in the telegraf is given below. The agents key is linked with the docker service name that will be shown in the following sections, and it indicates the IP address of the Debian docker container. As seen below, every 10 seconds, two SNMP field, subsequently CPU and Memory will be fetched from the running docker container.

[[inputs.snmp]]
agents = ["snmp_agent"]
version = 2
community = "public"
interval = "10s"
timeout = "5s"

[[inputs.snmp.field]]
name = "cpu"
oid = "1.3.6.1.4.1.2021.11.11.0" # Example CPU usage OID
# data_format = "value"
# data_type = "float" # required

[[inputs.snmp.field]]
name = "memory"
oid = "1.3.6.1.4.1.2021.4.6.0" # Example memory usage OID

Device Communicating over HTTP

Integrating an HTTP device again with the same purpose above, CPU and RAM data collection can be easily performed by creating a Flask-based REST API server and using the well-known psutil library. The following app provides http://<IP>/metrics GET interface, and it returns a JSON data containing CPU and memory usage.

from flask import Flask, jsonify
import psutil

app = Flask(__name__)

@app.route('/metrics', methods=['GET'])
def get_metrics():
cpu_percent = psutil.cpu_percent(interval=1)
memory_usage = psutil.virtual_memory().percent

metrics = {
'cpu_percent': cpu_percent,
'memory_usage': memory_usage
}

return jsonify(metrics)

if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)

To dockerize this app, the following docker file will be sufficient. requirements.txt includes only two libraries, namely, flask and psutil. For the production environments, please use gunicorn or another HTTP server.

# Dockerfile_http_api

FROM python:3

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY app.py .

CMD ["python", "app.py"]

The configuration of the telegraf.conf file is relatively easy, the URL address, the operation name, the expected data format and how often the URL should be requested are written.

[[inputs.http_response]]
interval = "10s"
name_override = "http_metrics"
urls = ["http://http_api:8080/metrics"]
method = "GET"
response_timeout = "5s"
data_format = "json"

Device Interacting via MQTT

To realize an MQTT scenario, since its concept relies on Pub/Sub logic, we need a publisher, broker and subscriber components. The publisher will be a python app that again uses psutil library and regularly pushes the CPU an RAM measurements to the broker with a specific topic, and MQTT broker in this case mosquitto, forwards these messages to the subscribers (who has the interest on this data). The subscriber here is the telegraf software, in other words, telegraf subscribes to MQTT broker and says “I want to be informed on this data topic if you receive any”.

The MQTT client code is added below and as it is seen it publishes the measurements every 5 seconds in an infinite loop. The broker is called as mqtt_broker and this will be written in the docker-compose file. You can modify the code w.r.t your project requirements.

# dummy_sensor.py

import paho.mqtt.client as mqtt
import psutil
import time

broker = "mqtt_broker" # MQTT broker hostname

client = mqtt.Client()

client.connect(broker)

while True:
cpu_percent = psutil.cpu_percent(interval=1)
mem_percent = psutil.virtual_memory().percent

client.publish("sensor/cpu", cpu_percent)
client.publish("sensor/mem", mem_percent)

time.sleep(5)

Dockerizing the python app is already shown in the HTTP example, and here we see the similar Dockerfile structure. Instead of using requirements.txt file, the libraries are directly added in the file itself. Nevertheless, you should prefer the requirements.txt file approach due to the simplicity.

# Dockerfile_http_api

FROM python:3.9

WORKDIR /app

COPY dummy_sensor.py .

RUN pip install paho-mqtt psutil

CMD ["python", "dummy_sensor.py"]

MQTT broker requires an additional mosquitto.conf file at least with the following configuration and the ownership of this file should be changed to 1883 as shown below:

 sudo chown -R 1883:1883 mosquitto.conf

Without doing this ownership change, you would most probably encounter the permission issues due to the fact that the mosquitto container cannot read it.

persistence true
persistence_location /mosquitto/data/

log_dest file /mosquitto/log/mosquitto.log
log_type all

listener 1883
allow_anonymous true

telegraf.conf doesn’t require too much configuration, only the broker URL address, MQTT topics to subscribe to, the data format and data type.

[[inputs.mqtt_consumer]]
servers = ["tcp://mqtt_broker:1883"] # MQTT broker hostname and port
topics = ["sensor/cpu/#", "sensor/mem/#"] # MQTT topics to subscribe to
data_format = "value"
data_type = "float" # required

Device Communicating via OPC UA

One of the industry standard OPC UA protocol is constructed on server-client communication mechanism. The clients run a specific code on their platform and share the data with OPC UA server through OPC tags. In our case, we will assume that the clients are connected to the server and updates the objects. In this case, our server updates the data field itself. The OPC UA client will not be presented here, however, you can see an example of it in the github source code. You may ask also how the data structure of OPC UA nodes, namespaces, objects. This link or even better unified-automation website may be helpful at least for the initial steps.

Regarding the OPC UA server code below, we first create the server and set the server endpoint which would delivers the data. Three security options are added. A namespace is registered, which might be thought as the container of the all objects that we add in this code. Afterwardas an object called MyObject1 is created and added along with the registered namespace. Since this object is empty, a new variable, MyVariable, is created and added to the object. The variable should be writable, otherwise it is not possible to change this variable. In the infinite loop part, the latest variable value is fetched, which might be changed, and increased one, and set to the variable value. The goal of telegraf is to retrieve MyVariable value.

from opcua import Server, ua
import time
# Create a new OPC UA server
server = Server()

# Set the server endpoint URL and port
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
# Start the server
server.set_security_policy([
ua.SecurityPolicyType.NoSecurity,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
ua.SecurityPolicyType.Basic256Sha256_Sign])

# Register a namespace
uri = "http://example.com"
idx = server.register_namespace(uri)

# Create a custom object
node = server.get_objects_node()

obj = node.add_object(idx, "MyObject1")

# Create a custom variable
var = obj.add_variable(idx, "MyVariable", 0.0)
var.set_writable() # Enable write access for the variable

# Start the server
server.start()

print("OPC UA server is running!")

try:
# Update the custom variable value periodically
while True:
value = var.get_value()
print(value)
value += 1.0
var.set_value(value)
time.sleep(5)
finally:
server.stop()

The Dockerfile is again similar to other files, every time we install another library and use another python script file.

# Dockerfile_dummy_sensor

FROM python:3.9

WORKDIR /app

COPY opcua_server.py .

RUN pip install opcua

CMD ["python", "opcua_server.py"]

The configuration in the telegraf.conf is given as below. For the sake of the simplicity, we skip the security operations. The crucial part here is to reflect the right nodes values. There are different namespace, identifier_type and identifier at each case. Basically, telegraf with this config requests from the OPC UA server the MyVariable value that is in MyObject and belongs to an example.com namespace. The OPC UA client code in the github includes the same approach just from the python perspective.

[[inputs.opcua]]
name = "opcua"
endpoint = "opc.tcp://opcua_server:4840/freeopcua/server/"
security_policy = "None"
security_mode = "None"
auth_method = "Anonymous"
## Node configurations
nodes = [
{ name = "status", namespace = "2", identifier_type = "i", identifier = "2" }
]

Putting All Configs and Extending Docker-Compose

In the previous steps, all devices are presented step by step along with their python codes, Dockerfiles, and the required telegraf configuration. In the following, we add all these telegraf configuration in a simple configuration.

# telegraf.conf
[[inputs.opcua]]
name = "opcua"
endpoint = "opc.tcp://opcua_server:4840/freeopcua/server/"
security_policy = "None"
security_mode = "None"
auth_method = "Anonymous"
## Node configurations
nodes = [
{ name = "status", namespace = "2", identifier_type = "i", identifier = "2" }
]

[[inputs.http_response]]
interval = "10s"
name_override = "http_metrics"
urls = ["http://http_api:8080/metrics"]
method = "GET"
response_timeout = "5s"
data_format = "json"

[[inputs.snmp]]
agents = ["snmp_agent"]
version = 2
community = "public"
interval = "10s"
timeout = "5s"

[[inputs.snmp.field]]
name = "cpu"
oid = "1.3.6.1.4.1.2021.11.11.0" # Example CPU usage OID
# data_format = "value"
# data_type = "float" # required

[[inputs.snmp.field]]
name = "memory"
oid = "1.3.6.1.4.1.2021.4.6.0" # Example memory usage OID

[[inputs.mqtt_consumer]]
servers = ["tcp://mqtt_broker:1883"] # MQTT broker hostname and port
topics = ["sensor/cpu/#", "sensor/mem/#"] # MQTT topics to subscribe to
data_format = "value"
data_type = "float" # required


[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"] # InfluxDB hostname and port
token = "replace-with-your-own-token"
organization = "test-org"
bucket = "metrics"
#database = "metrics" # Name of the InfluxDB database
#username="test"
#password="test1234"

To run all dockerized components in a single environment, we create a docker-compose.yml file that includes all required configuration for each device, telegraf and influxdb. The only thing that has to be performed is just to execute docker-compose.yml up on the terminal.

version: "3"

services:
http_api:
build:
context: .
dockerfile: Dockerfile_http_api
ports:
- 8080:8080

snmp_agent:
build:
context: .
dockerfile: Dockerfile_snmp_agent
ports:
- 161:161/udp
dummy_sensor:
build:
context: .
dockerfile: Dockerfile_dummy_sensor
depends_on:
- mqtt_broker

opcua_server:
build:
context: .
dockerfile: Dockerfile_opcua_server

mqtt_broker:
image: eclipse-mosquitto
ports:
- 1883:1883
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf
- mosquitto_data:/mosquitto/data
- mosquitto_log:/mosquitto/log

telegraf:
image: telegraf
volumes:
- ./telegraf.conf:/etc/telegraf/telegraf.conf
depends_on:
- mqtt_broker
- influxdb

influxdb:
image: influxdb
volumes:
- influxdb_data:/var/lib/influxdb
ports:
- 8086:8086


volumes:
influxdb_data:
mosquitto_data:
mosquitto_log:

You can access the whole source code via this GitHub link. In this code, you will most probably see also some additional services such as kafka, etc. Those examples can be either added here or a new blog can be written in the near future.

Replacing InfluxDB Configurations

Whenever you start the docker-compose.yml file, you will notice that there is no connection to influxDB. This is because of the bucket name and token are not correct, since you haven’t yet created them. In my previous article, I introduced in this example how to configure influxDB, observe and visualize the incoming data. For this reason, in this tutorial I wouldn’t repeat the same process.

Summary

In this article, telegraf software and how it is used with various IoT devices are presented. Different device communication protocols, their python scripts and the Dockerfiles are provided. Finally, all these components through a docker-compose are brought together to run telegraf to collect the data from the devices. During the implementation phase, one thing for sure to mention is the huge variety of telegraf plugins for many communication protocols. In this example, there are also other parts of telegraf that we haven’t yet touched such as processors, aggregate, and output. Furthermore, we haven’t clearly give insight into telegraf working mechanism. In the following articles, I hope we can complete these missing pieces.

--

--