End to End IoT Gateway Tech Stack for Agriculture 4.0

FastAPI, Docker, InfluxDB, Caprover, GitHub Actions

Baysan
CodeX
6 min readDec 16, 2023

--

Introduction

In this article, I am going to tell you essential tools that I used to develop our own IoT gateway for our Agriculture 4.0 project. I will not deeply dig into how to code. I think it will help you to better understand of how you can orchestrate the components. I also added the resources of the project.

Image by Baysan — Dashboard with dummy data.

Resources

I also uploaded a video on my YouTube Channel to explain the basics of the IoT Gateway project.

Unfortunately, it’s only in Turkish for now…

You can access the repo by using the link below.

Backend

I developed a FastAPI app to get data from sensors. This app is located as a bridge between the IoT and our database.

# routers/iot.py
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from schemas.iot_requests import MeasurementRequest
from models.measurement import Measurement
from helpers.influxdb2 import write_to_influxdb2

iot_router = APIRouter()


@iot_router.post("/measure")
async def measure(iot_measurement_request: MeasurementRequest):
measurement = Measurement(
iot_measurement_request.iot_device,
iot_measurement_request.measurement,
iot_measurement_request.value,
)
if write_to_influxdb2(
measurement.to_influxdb_point(),
):
return JSONResponse(
status_code=200,
content={"status": "ok", "message": "Measurement processed successfully!"},
)
else:
return JSONResponse(
status_code=500,
content={
"status": "error",
"message": "Error while processing measurement!",
},
)

For simple authentication, I created a Header based authentication mechanism. This mechanism will be in use while the MVP process. I absolutely am going to change this simlicity.

# middlewares/auth.py
from fastapi import Request
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from settings import app_settings
from helpers.logger import get_logger

logger = get_logger("middlewares")


class BaysanAuthHeaderMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: callable):
if request.url.path in app_settings.public_urls:
return await call_next(request)
if (
request.headers.get(app_settings.middleware_acceptance_header)
== app_settings.middleware_acceptance_value
):
response = await call_next(request)
response.headers[
app_settings.middleware_acceptance_header
] = app_settings.middleware_acceptance_value
return response
else:
logger.warning(
"Unauthorized request to endpoint %s from %s",
request.url.path,
request.client.host,
)
return JSONResponse(
content={
"status": "error",
"message": "You can not access this endpoint",
},
status_code=403,
)

# middlewares/__init__.py
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from .auth import BaysanAuthHeaderMiddleware
from settings import app_settings

APP_MIDDLEWARES = [
Middleware(BaysanAuthHeaderMiddleware),
Middleware(CORSMiddleware, allow_origins=app_settings.allowed_origins),
]

if app_settings.https_redirect_forced:
APP_MIDDLEWARES.append(Middleware(HTTPSRedirectMiddleware))

I also have a health check point.

# main.py
from fastapi import FastAPI
from settings import app_settings
from helpers.logger import get_logger
from middlewares import APP_MIDDLEWARES
from routers.iot import iot_router


logger = get_logger("main")

app = FastAPI(
middleware=APP_MIDDLEWARES,
title=app_settings.app_name,
summary="IoT Gateway",
version="1.0",
)

app.include_router(iot_router, prefix="/iot", tags=["iot"])


@app.get("/health", status_code=200)
async def healthcheck():
logger.info("Healthcheck endpoint called")
return {
"status": "ok",
"message": "App is healthy!",
"app_name": app_settings.app_name,
}


if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8080)

Database

I was going to choose QuestDB as our TSDB. Then InfluxDB seemed easier to configure and deploy for me and I chose InfluxDB, version 2. For now it works well for us but I think I will need to replace it with QuestDB because of its query structure is the same with SQL that I already know. For our test purposes, InfluxDB is pretty simple to use and create dashboards.

Image by Baysan

DevOps & Infra

Infra

I am so happy because of I had an opportunity to use Caprover in one of my projects. I really fell in love with the tool. I always am eager to learn Kubernetes. As I think, it is too complicated for these kind of smaller projects. I tried to use Nomad but we did not agree with this tool. Then I found Caprover and I already started to use it for another 3 projects.

Image by Baysan

It also provide network and server monitoring dashboards.

Image by Baysan

DevOps

For CI & CD pipelines, I use GitHub Actions; one workflow for CI and one for CD.

name: CI - Image Deploy To Docker Hub

on:
push:
branches:
- "main"
pull_request:
branches:
- "main"

jobs:
docker:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:alpine
ports:
- 5432:5432
env:
POSTGRES_PASSWORD: mysecretpassword
POSTGRES_DB: postgres
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
dev_influxdb2:
image: influxdb:2.7.3
ports:
- "8086:8086"
env:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: Passw0rd!.
DOCKER_INFLUXDB_INIT_ORG: dev
DOCKER_INFLUXDB_INIT_BUCKET: dev
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: admin
steps:
- name: Checkout code
uses: actions/checkout@v2 # Add this step to check out your source code
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.11 # Choose your desired Python version
- name: Install dependencies
run: pip install -r src/requirements.txt # Install your app's dependencies
- name: Create .env file # Create a .env file for your development environment
run: mv src/.env.dev src/.env
- name: Run pytest
run: pytest src # Adjust this command to run your pytest tests
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: "${{github.workspace}}" # Use the correct context path
push: true
tags: ${{ secrets.DOCKERHUB_IMAGE_NAME }}:latest
file: docker/Dockerfile

And the CD workflow uses Caprover action to deploy the latest image to the server.

name: CD - Deploy to Test Server

on:
workflow_run:
workflows: ["CI - Image Deploy To Docker Hub"]
branches: [main]
types:
- completed

jobs:
deploy_to_caprover:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
steps:
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Deploy Image to CapRrover
uses: caprover/deploy-from-github@v1.1.2
with:
server: "${{ secrets.CAPROVER_SERVER }}"
app: "${{ secrets.CAPROVER_APP_NAME }}"
token: "${{ secrets.CAPROVER_APP_TOKEN }}"
image: "${{ secrets.CAPROVER_IMAGE }}:latest"

Tests

I chose PyTest as my test framework.

# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from main import app
from settings import (
app_settings as _app_settings,
logging_settings as _logging_settings,
)


@pytest.fixture
def client():
return TestClient(app)


@pytest.fixture
def headers():
return {
_app_settings.middleware_acceptance_header: _app_settings.middleware_acceptance_value
}


@pytest.fixture
def app_settings():
return _app_settings


@pytest.fixture
def logging_settings():
return _logging_settings

I can be sure about my last updates do not crash the code by testing the each line of the codebase.

# tests/api/test_iot_endpoints.py
import pytest


def test_endpoint_measure(client, headers, mocker):
"""Tests the measure endpoint."""
data = {
"iot_device": "farm_1",
"measurement": "temperature",
"value": 23.5,
}

mocker.patch("helpers.influxdb2.write_to_influxdb2", return_value=True)

response = client.post(
"iot/measure",
json=data,
headers=headers,
)
assert response.status_code == 200
assert response.json() == {
"status": "ok",
"message": "Measurement processed successfully!",
}


@pytest.mark.skip
def test_endpoint_measure_fail(client, headers, mocker):
"""Tests the measure endpoint to fail."""
data = {
"iot_device": "farm_1",
"measurement": "temperature",
"value": 23.5,
}
# todo: this should be fixed
mocker.patch("helpers.influxdb2.write_to_influxdb2", return_value=False)

response = client.post(
"iot/measure",
json=data,
headers=headers,
)

assert response.status_code == 400
assert response.json() == {
"status": "error",
"message": "Error while processing measurement!",
}

I also have manual tests. I use these kind of scripts to do my local tests.

# manual_test.py
import random
import httpx

BASE_URL = "http://localhost:8080"
HEADERS = {"X-MY-AUTH-KEY": "AC-KAPIYI-BEZIRGAN-BASI"}

IOT_DEVICE_LIST = ["device_1", "device_2", "device_3"]
MEASUREMENT_NAME_LIST = ["temperature", "humidity", "pressure"]


def make_request(method, endpoint, json=None):
return httpx.request(method, BASE_URL + endpoint, json=json, headers=HEADERS)


# generate a function to generate a random data
def generate_data():
return {
"iot_device": random.choice(IOT_DEVICE_LIST),
"measurement": random.choice(MEASUREMENT_NAME_LIST),
"value": random.uniform(0, 100),
}


while True:
data = generate_data()
try:
response_measurement = make_request("POST", "/iot/measure", json=data)
print(response_measurement.json())
except Exception as e:
print(e)

Finally

Hopefully, you enjoyed. I know the code is not surrounded with all aspects of the business but I just didn’t want to waste time. You should visit the repo if you want to see the all parts.

You can find my links below to follow me on other platforms.

Kind regards

--

--

Baysan
CodeX
Writer for

Lifelong learner & Developer. I use technology that helps me. mebaysan.com