Data Transfer in Edge

Appu V
NeST Digital
Published in
5 min readJun 2, 2022

Problem Statement: To transfer data from a device log in real time for analytics.

Approach:

There are multiple streaming/batch platforms for real-time analytics and batch analytics. One of the challenges that most industry face is the data transfer from their edge devices to the streaming platform. Fluentd [1] helps us solve this challenge. Fluentd is a cross platform open-source data collection software project originally developed at Treasure Data. It is written primarily in the Ruby programming language. There are multiple architecture patterns in fluentd to solve this challenge [2]. As edge devices are resource constraint, we would like to look at a lightweight forwarder aggregator architecture pattern in fluent.

Fluent forwarder aggregator architecture

Before we dive deep into this architecture, we can start trying by installing a fluentd locally and tailing [3] log file. Fluentd can be installed as docker containers. As a prerequisite, the docker must be installed in the system [4].

To run fluentd in docker, you need to create a docker compose file named docker-compose.yaml and copy the following lines.

forwarder docker-compose.yaml

Here you can see that two volume mappings are being done. The configuration folder which we have created should be mapped to /fluentd/etc/ and our log file could be mapped to any location, but the location in fluent should be the one which we give in the configuration file.

Now we need to create a configuration file for tailing a log file. For better maintainability we will create separate configuration files. We need to create a folder named as Configuration. This could be anywhere, for convenience we will create in the same folder where docker file is existing. Then we need two configuration files. The first one is fluent.conf and the second one is fluent_tail.conf. We will do all our configurations in the fluent_tail.conf file.

Sample Configuration

The above configuration is to tail a file named device.log present at fluentd/log/files/device.log. There is also a position file. The position file is used by fluentd to know about last tailed location. We then tag the data as device.log. We have a match tag where the corresponding tag is provided, and it will be printed to the system. Now we need to include this configuration in our fluent.conf.

Sample Configuration:
@include fluent_tail_.conf

Now we are all set to tail the log files. We can now start the container by the following command

docker-compose up

Once you start the container, you would be seeing the configuration which we have given above.

Fluentd Forwarder Docker Log

As the forwarder is eagerly waiting to tail the log files, we need to have a mechanism to write log files. The following python [5] code helps us to write a sample json record as log files to the location.

Python script for data generation

Sample Data in file:

{“temperature_sensor1”: 0, “temperature_sensor2”: 0, “serial”: “033_appu”}

{“temperature_sensor1”: 1, “temperature_sensor2”: 1, “serial”: “033_appu”}

{“temperature_sensor1”: 2, “temperature_sensor2”: 4, “serial”: “033_appu”}

{“temperature_sensor1”: 3, “temperature_sensor2”: 9, “serial”: “033_appu”}

Now you should be able to see the same data in the terminal where fluentd forwarder is running.

Sample Data in terminal:

{“message”:”{\”temperature_sensor1\”: 0, \”temperature_sensor2\”: 0, \”serial\”: \”033_appu\”}”}

{“message”:”{\”temperature_sensor1\”: 1, \”temperature_sensor2\”:1, \”serial\”: \”033_appu\”}”}

{“message”:”{\”temperature_sensor1\”: 2, \”temperature_sensor2\”: 4, \”serial\”: \”033_appu\”}”}

{“message”:”{\”temperature_sensor1\”: 3, \”temperature_sensor2\”: 9, \”serial\”: \”033_appu\”}”}

Fluentd Forwarder docker log tail stdout

We will stop the container by

docker-compose down

We will now try to calculate the sum of both sensors. A filter needs to be added before the match tag for doing the same. The fluent_tail_.conf needs to be modified as follows :

Fluentd data transoformation filters

Here the data is passed to two filters. The first one will parse the incoming string as a json, and the record_transformer [6] filter plugin calculate the total temperature based on existing fields. enable_ruby is to enable ruby functions when we transform the data. There are various plugins [7][8] in fluentd which include input, output, filter etc. We can also write our own custom plugins in ruby

We can start both our forwarder and data generator. The data which we see in terminal will have an additional json field

Sample Data:

{“temperature_sensor1”:0,”temperature_sensor2":0,”serial”:”033_appu”,”total_temperature”:0}

{“temperature_sensor1”:1,”temperature_sensor2":1,”serial”:”033_appu”,”total_temperature”:2}

{“temperature_sensor1”:2,”temperature_sensor2":4,”serial”:”033_appu”,”total_temperature”:6}

{“temperature_sensor1”:3,”temperature_sensor2":9,”serial”:”033_appu”,”total_temperature”:12}

Fluentd Forwarder docker log transformation

We have successfully tailed a log file and a small transformation. Now we need to send this data to another fluentd instance, which we call as an aggregator. Like forwarder, we can create a docker container for the same.

aggregator docker-compose.yaml

Like forwarder we have volume mapped the configuration folder. We need to create configuration files for the aggregator. We can name it as fluent.conf and fluent_agg.conf

Sample Configuration of fluent_agg.conf is as follows:

Sample Configutation

Like forwarder, we will include the same in fluent.conf

Here we can see that the source is forward [9] and we have used the same tag for stdout. We can start the aggregator container by the following command:

docker compose up

Fluentd Aggregator docker log

Now we need to configure our forwarder to send the data to aggregator. The match pattern must be modified as follows:

Sample forward configuration

With this configuration, fluentd will send data from forwarder to aggregator. Here the type used is forward [10]. Once the setup is complete, the data forwarder and python scripts must be started as mentioned earlier. The output which we saw in the data forwarder will be now seen in the data aggregator.

Sample Data:

{“temperature_sensor1”:0,”temperature_sensor2":0,”serial”:”033_appu”,”total_temperature”:0}

{“temperature_sensor1”:1,”temperature_sensor2":1,”serial”:”033_appu”,”total_temperature”:2}

{“temperature_sensor1”:2,”temperature_sensor2":4,”serial”:”033_appu”,”total_temperature”:6}

{“temperature_sensor1”:3,”temperature_sensor2":9,”serial”:”033_appu”,”total_temperature”:12}

Fluentd Aggregator docker log stdout

Now we will send the output of the aggregator to a file. The match pattern must be modified as follows:

<match device.log>

@type file

path /tmp/output/

</match>

The data will be present inside the output folder inside the aggregator folder once the data generator and data forwarder are started. The file name of the output will be in the following pattern buffer.<hashstring>.log

Sample data :

2022–05–30T10:19:56+00:00 device.log {“temperature_sensor1”:0,”temperature_sensor2":0,”serial”:”033_appu”,”total_temperature”:0}

2022–05–30T10:19:56+00:00 device.log {“temperature_sensor1”:1,”temperature_sensor2":1,”serial”:”033_appu”,”total_temperature”:2}

2022–05–30T10:19:56+00:00 device.log {“temperature_sensor1”:2,”temperature_sensor2":4,”serial”:”033_appu”,”total_temperature”:6}

2022–05–30T10:19:56+00:00 device.log {“temperature_sensor1”:3,”temperature_sensor2":9,”serial”:”033_appu”,”total_temperature”:12}

The data is successfully transferred from device log to aggregator. The data transfer can be secured via mlts. Also, fluentd forward supports high availability [11] and we can configure buffers [12].

Here in this example, we have configured the output of aggregator to a stdout and file. This can be configured to various outputs like Kafka, S3, Azure blob, elastic etc…

Advantages

  • Less resource utilization on the edge devices (maximize throughput)
  • Allow processing to scale independently on the aggregator tier.
  • Easy to add more backends (configuration change in aggregator vs. all forwarders)

Disadvantages

  • Dedicated resources required for an aggregation instance

Complete Code is available in Git.

References

1. https://www.fluentd.org/

2. https://fluentbit.io/blog/2020/12/03/common-architecture-patterns-with-fluentd-and-fluent-bit/

3. https://docs.docker.com/engine/install/ubuntu/

4. https://docs.fluentd.org/input/tail

5. https://www.python.org/

6. https://docs.fluentd.org/filter/record_transformer

7. https://docs.fluentd.org/plugin-development

8. https://www.fluentd.org/plugins

9. https://docs.fluentd.org/input/forward

10. https://docs.fluentd.org/output/forward

11. https://docs.fluentd.org/deployment/high-availability

12. https://docs.fluentd.org/configuration/buffer-section

--

--

Appu V
NeST Digital

Cloud Data Engineer | MCT | CCDAK | CKAD | AWS | Azure | GCP | Grand Master - Asia Book Of Records