BLE Communication with PyQt5 Based GUI

Ali Camdal
5 min readFeb 26, 2023

--

In this article we are going to find out a way to establish a communication between a BLE server and a client GUI. The BLE server that we are going to use is based on ESP32’s NimBLE framework. You can visit my last article to create this server on your ESP32. User interface that we will use is developed using Qt5 Designer on Debian Distro of Linux OS. Communication codes and UI file can be found at this GitHub Repo. Let’s dive into codes and design.

BLE Communication UI

As can be seen above, design is very simple to use and understand. Since I am going to use this design on my RC car project, it contains a direction indicator which is basically gray triangles that indicates the movement of RC car. In thi UI we have three different buttons that allow us to scan the BLEs, connect the selected device and disconnect from our device. R button stands for refreshing the combo box that we will use the store our scanned BLE devices.

Simple BLE scandemo.

Above GIF shows simple BLE operations using the UI. Once we clicked to R button it starts the scanning process for BLE devices. When scanning is done, we can see the list of found devices in the combo box. Then we can select one of them and click connect to establish a connection between the server and our device. If everything is going as planned, connection status label turns to green and changed to “Connected” that indicates we have a connection and we can send data to the server. As I mentioned before I am going to use this GUI for my RC car project. That is why you can see four triangles at direction indicator area. When I press W, A, S, D, those triangles turns to green. Last of all, disconnect button breaks the connection between two devices. After disconnection we can see that the connection status label marked as red and changed to disconnected.

Now we have an idea about the GUI, then let’s dive into code.

from PyQt5 import QtWidgets, uic
from PyQt5.QtCore import Qt
from threading import Thread
from queue import Queue
from time import sleep
from PyQt5.QtGui import QPixmap
from bleClass import BLE

class Ui(QtWidgets.QMainWindow):
def __init__(self):
super(Ui, self).__init__()
uic.loadUi('ble.ui', self)

self.ble = BLE()
self.devices = None
self.target_device = None
self.BLEOut = None
self.BLEOut_old = None

self.pixMaps = {
"gray_up" : QPixmap("assets/triangle_gray_up.png"),
"green_up" : QPixmap("assets/triangle_green_up.png"),
"gray_down" : QPixmap("assets/triangle_gray_down.png"),
"green_down" : QPixmap("assets/triangle_green_down.png"),
"gray_right" : QPixmap("assets/triangle_gray_right.png"),
"green_right" : QPixmap("assets/triangle_green_right.png"),
"gray_left" : QPixmap("assets/triangle_gray_left.png"),
"green_left" : QPixmap("assets/triangle_green_left.png")
}
self.ble.setUpdateHandler(self.updateConnStatus)
self.updateConnStatus(False)
self.btnRefresh.clicked.connect(self.refresh)
self.btnConnect.clicked.connect(self.connect)
self.btnDisconnect.clicked.connect(self.disconnect)
self.show()

def updateConnStatus(self, status) -> None:
if status:
self.lblConStats.setText("Connected")
self.lblConStats.setStyleSheet("background-color: lightgreen")
self.lblConStats.adjustSize()
else:
self.lblConStats.setText("Disconnected")
self.lblConStats.setStyleSheet("background-color: red")
self.lblConStats.adjustSize()

def closeEvent(self, event):
reply = QtWidgets.QMessageBox.question(
self, 'Quit?',
'Are you sure you want to quit?',
QtWidgets.QMessageBox.Yes , QtWidgets.QMessageBox.No
)

if reply == QtWidgets.QMessageBox.Yes:
event.accept()
# To kill two threads and disconnet from BLE.
self.disconnect()
else:
event.ignore()

def controlThread(self) -> None:
while True:
if not self.ble.isConnected():
break
if self.BLEOut is not None and self.BLEOut != self.BLEOut_old:
self.ble.pushQueue(self.BLEOut)
self.BLEOut_old = self.BLEOut
sleep(0.01)

def keyPressEvent(self, event) -> None:
if self.ble.isConnected():
if event.key() == Qt.Key_D:
print("Right")
self.BLEOut = "r3"
self.img_right.setPixmap(self.pixMaps["green_right"])
elif event.key() == Qt.Key_A:
print("Left")
self.BLEOut = "r2"
self.img_left.setPixmap(self.pixMaps["green_left"])
elif event.key() == Qt.Key_W:
print("Up")
self.BLEOut = "r0"
self.img_up.setPixmap(self.pixMaps["green_up"])
elif event.key() == Qt.Key_S:
print("Down")
self.BLEOut = "r1"
self.img_down.setPixmap(self.pixMaps["green_down"])

def keyReleaseEvent(self, event) -> None:
if not event.isAutoRepeat() and self.ble.isConnected():
if event.key() == Qt.Key_D:
print("Right Released")
self.img_right.setPixmap(self.pixMaps["gray_right"])
elif event.key() == Qt.Key_A:
print("Left Released")
self.img_left.setPixmap(self.pixMaps["gray_left"])
elif event.key() == Qt.Key_W:
print("Up Released")
self.img_up.setPixmap(self.pixMaps["gray_up"])
elif event.key() == Qt.Key_S:
print("Down Released")
self.img_down.setPixmap(self.pixMaps["gray_down"])
self.BLEOut = "r4"

def createThread(self, **kwargs) -> None:
keys = list(kwargs.keys())

if "target_func" in keys and "args" in keys:
Thread(target = kwargs["target_func"], args = (kwargs["args"], )).start()
elif "target_func" in keys:
Thread(target = kwargs["target_func"], args=()).start()
else:
print("Wrong arguments")

def setComboBox(self) -> None:
self.cmbBles.clear()
self.devices = self.ble.getDeviceList()
for ith, device in enumerate(self.devices):
print(f"Device {ith + 1} : {device.name}")
self.cmbBles.addItem(device.name)

def refresh(self) -> None:
self.btnRefresh.setEnabled(False)
self.createThread(target_func = self.ble.scanDevices)

while not self.ble.isScanDone():
QtWidgets.QApplication.processEvents()

self.setComboBox()
self.btnRefresh.setEnabled(True)

def connect(self) -> None:
target_device_name = self.cmbBles.currentText()
for device in self.devices:
if device.name == target_device_name:
self.target_device = device
self.createThread(target_func = self.ble.connectToDevice, args = self.target_device)
sleep(2)
self.createThread(target_func = self.controlThread)

def disconnect(self) -> None:
self.ble.disconnectFromDevice()

Above we can see the first class file of our GUI. This is called guiClass.py in the Git repo. The class provides us a simple UI and some controlling options to send data to BLE devices. Now we are going through from the beginning. As we can see init method sets some variables and button signals for us. BLE class will be mentioned later. We have classical methods for scanning, connection, and disconnection buttons. Those methods trigger the responsible methods in BLE class. Also we have three events that provided by Qt framework for us. Those are mainly controls the keyboard inputs and user behaivour for closing UI. Then we have a simple method that controls the connection status label. We have gave it to BLE class as callback. When a connection has been established it triggers the updateConnStatus method and changes label.

import asyncio
from bleak import BleakScanner
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from queue import Queue

class BLE:
def __init__(self) -> None:
self.devices = None
self.target_device = None
self.scan_done = False
self.is_connected = False
self.close_flag = False
self.updateStatus = None
self.data_queue = Queue()
self.UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
self.UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
self.UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"

def setUpdateHandler(self, func) -> None:
self.updateStatus = func

def handle_rx(self, _: BleakGATTCharacteristic, data: bytearray) -> None:
print("received:", data)

def handle_disconnect(self, _: BleakClient) -> None:
print("Device disconnected.")
self.is_connected = False
self.updateStatus(False)

def isScanDone(self) -> bool:
if self.scan_done:
self.scan_done = False
return True
else:
return self.scan_done

def isConnected(self) -> bool:
return self.is_connected

def scanDevices(self) -> None:
asyncio.run(self.scanner())

def connectToDevice(self, device) -> None:
self.target_device = device
print("Selected Device : ", self.target_device)
asyncio.run(self.setConnection())

def pushQueue(self, data) -> None:
self.data_queue.put(data)

def getDeviceList(self) -> list:
return self.devices

def disconnectFromDevice(self) -> None:
self.data_queue.put("q")

async def communicationTask(self, rx_char) -> None:
while True:
data = self.data_queue.get()
if data == "q":
print("Disconnecting...")
await self.client.disconnect()
break
elif data is not None:
await self.client.write_gatt_char(rx_char, data.encode())
print("send: ", data)

async def scanner(self) -> None:
self.devices = None
self.devices = await BleakScanner.discover()
self.scan_done = True

async def setConnection(self) -> None:
async with BleakClient(self.target_device, disconnected_callback=self.handle_disconnect) as self.client:
await self.client.start_notify(self.UART_TX_CHAR_UUID, self.handle_rx)

self.is_connected = True
self.updateStatus(True)
nus = self.client.services.get_service(self.UART_SERVICE_UUID)
rx_char = nus.get_characteristic(self.UART_RX_CHAR_UUID)
await self.communicationTask(rx_char = rx_char)

In BLE class we have used Bleak API for accessing, reading, and writing to characteristics. Before everything since our ESP32 BLE server works with Nordic UART protocol, all characteristics set to work in this way. These are can be changed from init method of bleClass.py. We have three async method that are working with Bleak. These methods can done scan, connect and disconnect processes. Data flow are established using a queue. Data that we want to send to BLE devices can be pushed to the queue using pushQueue method. Last but not least, we can handle disconnection process and incoming data with the handlers.

In the GitHub repo we can see a file that is named main.py. This file only invoke the guiClass for starting UI. This project is open to use for any kind of purpose. Thank you for your time to read this article.

Ciao.

--

--