TLDR: How to Stream Data from a Movella DOT with a Mac and Python

Proto Bioengineering
5 min readApr 2, 2023

--

Photo by Clément Hélardot on Unsplash

This is a quickstart version of How to Stream Data from a Movella DOT with a Mac and Python. If you’re new to Python and Bluetooth, check out that article.

This code will allow you to get Free Acceleration data (X, Y, and Z of the accelerometer) from a Movella DOT (formerly known as Xsens DOT).

The output of the final script.

Requirements

  • A computer (preferably Mac)
  • Python 3
  • Bleak (a Python Bluetooth LE library)
  • NumPy (a Python math library)
  • 1 Movella DOT sensor

Install Bleak with pip install bleak .

Install NumPy with pip install numpy.

Steps

  1. “Scan” for the DOT’s Bluetooth address
  2. Connect to the Movella DOT in Python
  3. “Subscribe” to measurement data notifications
  4. Set and Turn on the measurement mode
  5. Listen” for data
  6. Format the data with NumPy

Step 1: Scan for the DOT’s Bluetooth address

Scan for the DOT with scanner.py to get its UUID:

# scanner.py

import asyncio
from bleak import BleakScanner

async def main():
devices = await BleakScanner.discover()
for device in devices:
print(device)

asyncio.run(main())

The scanner output is:

We have two DOTs in our office, so we’ve picked a random one: 338312FA-C3D1–183F-325A-0726AFDBEB78.

Step 2: Connect to the DOT

# Connect to a Movella DOT

import asyncio
from bleak import BleakClient

async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID

async with BleakClient(address) as client:
# Check if connection was successful
print(f"Client connection: {client.is_connected}") # prints True or False

asyncio.run(main())

Step 3: Subscribe to Data Notifications

Use Bleak’s start_notify() to “subscribe” to the data that the DOT is sending out.

# Connect to a Movella DOT and subscribe to the Short Payload Characteristic

import asyncio
from bleak import BleakClient

short_payload_characteristic_uuid = "15172004-4947-11e9-8646-d663bd873d93"

def notification_callback(sender, data):
print(sender, data)

async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID

async with BleakClient(address) as client:
# Check if connection was successful
print(f"Client connection: {client.is_connected}") # prints True or False

# Subscribe to notifications from the Short Payload Characteristic
await client.start_notify(short_payload_characteristic_uuid, notification_callback)

asyncio.run(main())

Step 4: Set and Turn on Measurement Mode

We’re going to stream data in Free Acceleration measurement mode (mode #6 according to the Movella DOT BLE Services Specifications documentation). This will give us the accelerometer’s X, Y, and Z measurements.

# Turn on Free Acceleration measurement mode

import asyncio
from bleak import BleakClient

measurement_characteristic_uuid = '15172001-4947-11e9-8646-d663bd873d93'
short_payload_characteristic_uuid = "15172004-4947-11e9-8646-d663bd873d93"

def notification_callback(sender, data):
print("Sender:", sender)
print("Data:", data)

async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID

async with BleakClient(address) as client:
# Check if connection was successful
print(f"Client connection: {client.is_connected}") # prints True or False

# Subscribe to notifications from the Short Payload Characteristic
await client.start_notify(short_payload_characteristic_uuid, notification_callback)

# Set and turn on the Free Acceleration measurement mode
binary_message = b"\x01\x01\x06"
await client.write_gatt_char(measurement_characteristic_uuid, binary_message, response=True)


asyncio.run(main())

Where binary_message is made up of:

  • \x01 — A default value that the DOT BLE docs say is required
  • \x01 — whether measurement is on or not. 01 = on. 00 = off.
  • \x06 — specifies that we want Free Acceleration (option 6 from the measurement modes list)

Step 5: “Listen” for Data

Use asyncio.sleep(x) to stream data for x number of seconds.

# Stream data from a Movella DOT for 10 seconds

import asyncio
from bleak import BleakClient

measurement_characteristic_uuid = '15172001-4947-11e9-8646-d663bd873d93'
short_payload_characteristic_uuid = "15172004-4947-11e9-8646-d663bd873d93"

def notification_callback(sender, data):
print("Sender:", sender)
print("Data:", data)

async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID

async with BleakClient(address) as client:
# Check if connection was successful
print(f"Client connection: {client.is_connected}") # prints True or False

# Subscribe to notifications from the Short Payload Characteristic
await client.start_notify(short_payload_characteristic_uuid, notification_callback)

# Set and turn on the Free Acceleration measurement mode
binary_message = b"\x01\x01\x06"
await client.write_gatt_char(measurement_characteristic_uuid, binary_message, response=True)

# Collect data from the DOT for 10.0 seconds
await asyncio.sleep(10.0)

asyncio.run(main())

The output:

Step 6: Format the Data with Numpy

Make a new function, encode_free_acceleration() , which uses NumPy to convert bytes to real numbers.

Add this new function to the print statement in notification_callback().

# stream.py 

import numpy as np
import asyncio
from bleak import BleakClient

measurement_characteristic_uuid = '15172001-4947-11e9-8646-d663bd873d93'
short_payload_characteristic_uuid = "15172004-4947-11e9-8646-d663bd873d93"

def notification_callback(sender, data):
print(encode_free_acceleration(data))

def encode_free_acceleration(bytes_):
# These bytes are grouped according to Movella's BLE specification doc
data_segments = np.dtype([
('timestamp', np.uint32),
('x', np.float32),
('y', np.float32),
('z', np.float32),
('zero_padding', np.uint32)
])
formatted_data = np.frombuffer(bytes_, dtype=data_segments)
return formatted_data

async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID

async with BleakClient(address) as client:
# Check if connection was successful
print(f"Client connection: {client.is_connected}") # prints True or False

# Subscribe to notifications from the Short Payload Characteristic
await client.start_notify(short_payload_characteristic_uuid, notification_callback)

# Set and turn on the Free Acceleration measurement mode
binary_message = b"\x01\x01\x06"
await client.write_gatt_char(measurement_characteristic_uuid, binary_message, response=True)

# Collect data from the DOT for 10.0 seconds
await asyncio.sleep(10.0)

asyncio.run(main())

The final output:

Check out Movella’s DOT BLE Services Specifications documentation as well as their Knowledge Base for more info on coding for Movella/Xsens DOTs.

Learn more about Python’s AsyncIO here.

A more detailed version of this article is also available here.

Read More in Our eBook

We now have an e-book about how to control and stream real-time data from Bluetooth LE devices. Pay What You Want until June 1st, 2024 on Ko-fi!

Questions and Feedback

If you have questions or feedback, email us at protobioengineering@gmail.com or message us on Instagram (@protobioengineering).

If you liked this article, consider supporting us by donating a coffee.

--

--

Proto Bioengineering

Learn to code for science. “Everything simple is false. Everything complex is unusable.” — Paul Valery