Stream OPC UA Into Kafka In Docker

Muhammad Faiz Noh
5 min readJun 12, 2023

--

This is part 2 of Simulating OPC environment in Docker

You can read the 1st part here:

In this article, we will continue to develop the data streaming from OPC Server into Kafka.

Setup

Based on our design from part 1 as below:

Prerequisite

  • Python
  • Docker
  • Kafka
  • OPC Ua
  • Offset Explorer

Kafka with docker-compose

In order to run Kafka in Docker, we will need Zookeeper and Kafka. Confluent has these 2 official images that we can use to set up our Kafka broker. You can read more in the confluent quickstart.

services:
data_generator:
build:
dockerfile: ./container/datagenerator/Dockerfile
# entrypoint:
# - python
# - ./opc_data_generator.py
container_name: opc_data_generator
restart: on-failure
ports:
- "4840:4840"

zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka:5.3.1
container_name: kafka
hostname: kafka
ports:
- "9092:9092"
restart: always
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper

opc_producer:
build:
dockerfile: ./container/opc/Dockerfile
entrypoint:
- python
- ./opc_producer.py
container_name: opc_producer
restart: always
depends_on:
- kafka

Here we only use a single node for our Kafka. Check this out if you are planning to use multi-node cluster:

To start our container, we need to run the following code:

docker-compose up

Create a Kafka Topic

Once successfully created our Kafka broker, we need to create a Kafka topic to store our real-time streaming events from OPC Server. We are using Offset Explorer 2.3.2 to create topics as it is free for personal use.

Right-click on Clusters and Add New Connection

We are using version 5.3.1 but apparently latest option here is only up to 3.2
Check on the connection by clicking on ping. We will get this message if its a success.

Name our Cluster as opc_server and go to the Advanced tab to set our Bootstrap servers as localhost:9092. Next press Test, once we get this message click on Yes to add the connection.

We will see opc_server added to the Cluster list. Click on Connect or double-click on opc_server to connect.

Once connected, right-click on Topics and Create Topic or Click on the green plus icon. By default, the partition value is 1. Name our Kafka Topic to opc_server and click on Add button at the bottom right.

Refer to this article to create Kafka Topics using a command.

Create Kafka Producer using Python

Create a directory named opc and create a Python file name opc_producer.py.

#import libraries

import asyncio
import time
import json
from asyncua import Client
from kafka import KafkaProducer


def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)

def on_send_error(excp):
log.error('I am an errback', exc_info=excp)


producer = KafkaProducer(bootstrap_servers='kafka:19092')

async def main():
url = "opc.tcp://host.docker.internal:4840/freeopcua/server/"
sensorNode = ["ns=2;s=freeopcua.Tags.pressure", "ns=2;s=freeopcua.Tags.temperature"]

async with Client(url=url) as client:
while True:
value = []
for sensor in sensorNode:
node = client.get_node(sensor)
value.append(await node.read_value())
message = {
"timestamp": time.time(),
"opc_pressure": value[0],
"opc_temperature": value[1]
}
json_data = json.dumps(message)

#send opc data in json format to kafka producer
producer.send('opc_server', json_data.encode()).add_callback(on_send_success).add_errback(on_send_error)
print(message)
producer.flush()
time.sleep(1)

if __name__ == "__main__":

# Read OPC data
asyncio.run(main())

First, we import the libraries that are needed to stream data from OPC Server to Kafka. The two functions after that ( on_send_succes , on_send_error ) are callback functions that keep us safe from problems and errors.

Next, we set our Kafka producer to bootstrap_servers=’kafka:19092' . We are using kafka as a hostname to access the kafka container instead of IP and port 19092 based on Kafka internal listeners we set in docker-compose.

The main function will act as a client to access the real-time data from the OPC server we set up before. We are referring to the function from FreeOpcUa/opcua-asyncio/examples. Connect to the OPC server using the URL and the OPC sensor node, then store it in a JSON string. Lastly, send the JSON string to our Kafka topics: opc_server.

To view our data in Kafka, open up Offset Explorer click on Topics, the name of our topic : opc_server and click on the Green button in Data tab.

By default, Offset Explorer will show your messages and keys in hexadecimal format. However, if your messages are UTF-8 encoded strings, Offset Explorer can show the actual string instead of the regular hexadecimal format.

Logs for kafka producer

We can check the real data value by clicking on the opc_producer container in Docker Desktop and it will show us the logs.

Next part we will ingest the real-time data into databases and present it as a dashboard using Apache Superset

Tap the 👏 button if you found this article useful!

--

--