Stream OPC UA Into Kafka In Docker
This is part 2 of Simulating OPC environment in Docker
You can read the 1st part here:
In this article, we will continue to develop the data streaming from OPC Server into Kafka.
Setup
Based on our design from part 1 as below:
Prerequisite
- Python
- Docker
- Kafka
- OPC Ua
- Offset Explorer
Kafka with docker-compose
In order to run Kafka in Docker, we will need Zookeeper and Kafka. Confluent has these 2 official images that we can use to set up our Kafka broker. You can read more in the confluent quickstart.
services:
data_generator:
build:
dockerfile: ./container/datagenerator/Dockerfile
# entrypoint:
# - python
# - ./opc_data_generator.py
container_name: opc_data_generator
restart: on-failure
ports:
- "4840:4840"
zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:5.3.1
container_name: kafka
hostname: kafka
ports:
- "9092:9092"
restart: always
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
opc_producer:
build:
dockerfile: ./container/opc/Dockerfile
entrypoint:
- python
- ./opc_producer.py
container_name: opc_producer
restart: always
depends_on:
- kafka
Here we only use a single node for our Kafka. Check this out if you are planning to use multi-node cluster:
To start our container, we need to run the following code:
docker-compose up
Create a Kafka Topic
Once successfully created our Kafka broker, we need to create a Kafka topic to store our real-time streaming events from OPC Server. We are using Offset Explorer 2.3.2 to create topics as it is free for personal use.
Right-click on Clusters and Add New Connection
Name our Cluster as opc_server and go to the Advanced tab to set our Bootstrap servers as localhost:9092. Next press Test, once we get this message click on Yes to add the connection.
We will see opc_server added to the Cluster list. Click on Connect or double-click on opc_server to connect.
Once connected, right-click on Topics and Create Topic or Click on the green plus icon. By default, the partition value is 1. Name our Kafka Topic to opc_server and click on Add button at the bottom right.
Refer to this article to create Kafka Topics using a command.
Create Kafka Producer using Python
Create a directory named opc and create a Python file name opc_producer.py.
#import libraries
import asyncio
import time
import json
from asyncua import Client
from kafka import KafkaProducer
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
producer = KafkaProducer(bootstrap_servers='kafka:19092')
async def main():
url = "opc.tcp://host.docker.internal:4840/freeopcua/server/"
sensorNode = ["ns=2;s=freeopcua.Tags.pressure", "ns=2;s=freeopcua.Tags.temperature"]
async with Client(url=url) as client:
while True:
value = []
for sensor in sensorNode:
node = client.get_node(sensor)
value.append(await node.read_value())
message = {
"timestamp": time.time(),
"opc_pressure": value[0],
"opc_temperature": value[1]
}
json_data = json.dumps(message)
#send opc data in json format to kafka producer
producer.send('opc_server', json_data.encode()).add_callback(on_send_success).add_errback(on_send_error)
print(message)
producer.flush()
time.sleep(1)
if __name__ == "__main__":
# Read OPC data
asyncio.run(main())
First, we import the libraries that are needed to stream data from OPC Server to Kafka. The two functions after that ( on_send_succes , on_send_error ) are callback functions that keep us safe from problems and errors.
Next, we set our Kafka producer to bootstrap_servers=’kafka:19092' . We are using kafka as a hostname to access the kafka container instead of IP and port 19092 based on Kafka internal listeners we set in docker-compose.
The main function will act as a client to access the real-time data from the OPC server we set up before. We are referring to the function from FreeOpcUa/opcua-asyncio/examples. Connect to the OPC server using the URL and the OPC sensor node, then store it in a JSON string. Lastly, send the JSON string to our Kafka topics: opc_server.
To view our data in Kafka, open up Offset Explorer click on Topics, the name of our topic : opc_server and click on the Green button in Data tab.
By default, Offset Explorer will show your messages and keys in hexadecimal format. However, if your messages are UTF-8 encoded strings, Offset Explorer can show the actual string instead of the regular hexadecimal format.
We can check the real data value by clicking on the opc_producer container in Docker Desktop and it will show us the logs.
Next part we will ingest the real-time data into databases and present it as a dashboard using Apache Superset
Tap the 👏 button if you found this article useful!