How to Stream Data from a Movella DOT Wearable Sensor with a Mac and Python
Use Python on your MacBook to get human movement data from your DOT.
Movella DOTs (formerly known as “Xsens” DOTs) are wearable movement-tracking devices about the size of a stack of quarters.
These DOTs have sensors in them (accelerometers and gyroscopes), which track the movement of the person wearing them in every direction (X, Y, and Z).
Note: This tutorial is for connecting to a single DOT. Check out our article on how to connect to multiple DOTs here.
Movella/Xsens provides SDKs for writing code to interact with Movella DOTs. However, if you go to their SDK download page, you’ll see that an SDK for Mac OS computers is missing.
Apple’s Bluetooth documentation has historically been less than great and has only gotten trickier as all Bluetooth code gets hidden behind Core Bluetooth — ergo Movella has no Mac OS SDK.
If you want to connect to your Movella/Xsens DOT from a MacBook with Python code, this tutorial is for you.
If you have a Windows or Linux computer, check out Movella’s official SDKs for these operating systems first, though the code below should still work for you.
For a condensed version of this article with just the code, check out our other article here.
Prerequisite Knowledge
You should know Python and have a rough knowledge of Bluetooth LE. But you can power through this tutorial even if you’re new. The full script for connecting to Movella/Xsens DOTs is available at the end anyway.
Learn more about Bluetooth LE with our article, How to Connect to a Bluetooth Device with a MacBook and Python. This article is also MacBook-focused, but the concepts and code are identical for Windows and Linux computers.
Requirements
- A computer (preferably Mac)
- Python 3
- Bleak (a Python Bluetooth LE library)
- NumPy (a Python math library)
- 1 Movella DOT sensor
End Goal
Make a Python script that gets and prints movement data (accelerometer and gyroscope numbers) from a single Movella DOT in real-time.
Steps
In this tutorial, we will:
0. Install the Bleak and NumPy Python libraries
- “Scan” for the DOT’s Bluetooth address
- Connect to the Movella DOT in Python
- “Subscribe” to measurement data notifications
- Set and Turn on the measurement mode
- “Listen” for data
- Format the data with NumPy
- Print all the data we get from the DOT
The final script is at the bottom of this article.
Let’s get to it.
Step 0: Download and Install Bleak and Numpy
Bleak is the most popular Bluetooth LE library for Python, and it works on Mac, Windows, and Linux.
NumPy helps with math operations on big data in Python. It will make reading the data from the DOTs easy.
To install Bleak and Numpy, run these in Terminal:
pip install bleak
pip install numpy
For more assistance, check out Bleak and NumPy’s installation help pages.
Step 1: Scan for the DOT
Just like every website has a URL and every house has a street address, Bluetooth devices have wireless addresses.
There are some quirks to how these addresses work on Mac computers, but in short, each Bluetooth device will get a UUID, kind of like:
ABCDEFG1-2222-3333-4444-56789AAABBBC
OR
12345678-e89b-12d3-a456-426614174000
This is how your Mac will keep track of the DOT and any other Bluetooth device you connect to.
Note: For Windows and Linux, your device’s address will be a MAC address, which looks like ab:cd:ef:11:22:33
, but functions the same as a UUID.
We’ll use the Bluetooth scanner from our previous tutorial on How to Write a Simple Bluetooth Scanner with a MacBook and Python (Windows version here).
Scanner code:
# scanner.py
import asyncio
from bleak import BleakScanner
async def main():
devices = await BleakScanner.discover()
for device in devices:
print(device)
asyncio.run(main())
Put this in a file called scanner.py
. Before you run it, make sure your computer’s Bluetooth is on and the DOT is on and blinking.
Now, run the scanner. It will run silently for about 10 seconds, then a bunch of UUID addresses will show up on the screen, two of which are some Xsens DOT
s. Your devices’ names may also come up as Movella DOT
.
This gives us the UUID addresses:
509808FF-ECFE-895D-C1FE-BE5AC5DB6204
338312FA-C3D1–183F-325A-0726AFDBEB78
We happen to have two DOTs in our office, but we’ll pick a random one for this tutorial (338312FA-C3D1–183F-325A-0726AFDBEB78
).
For more info on Bluetooth LE, asyncio, and the how device connection works, check out our article, How to Connect to a Bluetooth Device with a MacBook and Python.
Step 2: Connect to the DOT
Start a new Python file called stream.py
. We no longer need scanner.py
, since the scanning part is over, and we only needed it to get the DOT’s UUIDs, which are unchanging.
The rest of the code for this tutorial will go in stream.py
.
This code will connect and disconnect from a single DOT. We’ll fill this in with data streaming functionality in the next steps.
# stream.py
import asyncio
from bleak import BleakClient
async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID
async with BleakClient(address) as client:
print(client.is_connected) # prints True or False
asyncio.run(main())
This is similar in structure to the scanner code. The difference is that we connect to the device with BleakClient(device)
.
This is all that is needed to connect to one device. If you run this now with your own DOT’s UUID, the output will be True
or False
, based on whether connection was successful or not.
Above, we connect to the DOT with the line async with BleakClient(address) as client
, and then anything nested within that block is done using the connection. Once that code block exits, the device is disconnected.
#...
async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID
async with BleakClient(address) as client:
# In this code block, the connection to the device is live.
# The device will disconnect here, since this is outside the code block.
asyncio.run(main())
Step 3: “Subscribe” to Data Notifications
In this step, we’ll tell our computer and the DOT that we’re going to be listening for movement data. Our computer has to “subscribe” to the message that the DOTs send out (AKA “notifications”) in order to be ready to hear the messages. Then in Step 4, we’ll “listen” for the data.
This is counterintuitive, since our code/laptop is not “asking” the device for its data. Instead, we’re telling the device, “You can start spitting out wireless data over Bluetooth, and we’ll listen for it.”
The alternative of asking for it first would be too slow, because we’d be sending 30–60 “asks” per second. That’s a waste of energy and time for the device. Instead, we’ll only tell it once.
This is also how any real-time data stream works, like YouTube livestreams and live TV. For it to be as real-time as possible, the service (e.g. Twitch, the news channel, phone calls, the DOT) spits out data and hopes we catch most of it.
To start the “subscription” to measurement data, we will use Bleak’s start_notify()
.
A quick refresher on Bluetooth services and characteristics: the overall features of a Bluetooth device are called “services” (e.g. Heart Rate Service, Battery Service, etc.) and within each service are “characteristics” (e.g. battery percentage, is the battery on, is it charging, etc.).
To use start_notify()
, we give it 2 arguments:
- The characteristic we want it to listen for data from
- The function (a callback function that we’ll create) to call whenever our computer “hears” a Bluetooth notification message (the measurement data)
Hint: Callback functions are just functions we give to other functions. They say, “When an event happens, call this other function.” In this case, “When we get a Bluetooth notification, call our custom callback()
function.”
For example:
def our_callback_function(sender, data):
# Print the measurement data or whatever we want here
...
characteristic_uuid = "15172004-4947-11e9-8646-d663bd873d93"
...
async with BleakClient(device) as client:
await client.start_notify(characteristic_uuid, our_callback_function)
...
The above characteristic_uuid
happens to be the UUID for the Short Payload Characteristic, which is the characteristic we need to subscribe to to get Free Acceleration data, per Movella’s DOT BLE Specifications documentation.
In this tutorial, we’re getting data in Free Acceleration mode, though there are 18 other modes, which we cover in the next section and which are in the DOT BLE Specification documentation. It is quirk of DOTs that each of the 18 modes is associated with either a Short, Medium or Long Payload Characteristic, and that particular payload characteristic is what needs to be subscribed to.
Our callback function has to accept two arguments (sender, data)
, since that’s how Bleak works, but it could do anything else we want it to to, such as:
def callback(sender, data):
print("Notification received from DOT")
Or:
def callback(sender, data):
print("Sender: " + sender)
print("Data: " + data)
Or any variation thereof. Whatever we choose, this function will run every time we get data, which could be 30 times a second, 60, etc. — whichever rate our DOT is spitting out data at.
For our example script, we’ll write our “subscription” like so:
# stream.py
import asyncio
from bleak import BleakClient
short_payload_characteristic_uuid = "15172004-4947-11e9-8646-d663bd873d93"
def notification_callback(sender, data):
print(f"Sender: {sender}")
print(f"Data: {data}")
async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID
async with BleakClient(address) as client:
print(f"Client connection: {client.is_connected}") # prints True or False
await client.start_notify(short_payload_characteristic_uuid, notification_callback)
asyncio.run(main())
Note the use of f-strings for print
formatting, which you can read more about here.
Above, we added:
- The UUID for the DOT’s Short Payload Characteristic
- The
notification_callback()
function - the
start_notify()
function, which we give the UUID and our custom callback function
If we run the previous chunk of code, nothing has changed. That’s because we only turned on the subscription service, but didn’t tell it to start measuring movement yet. We’ll do that in the next step.
Step 4: Set and Turn on Measurement Mode
This step kills 2 birds with one stone:
- Setting the measurement mode (one of the 18 modes below)
- Turning on the measurement mode
We’ll accomplish both with one Bleak function, write_gatt_char()
, below.
Movella/Xsens DOTs have a bunch of measurement modes, as seen in their documentation (Movella DOT BLE Service Specs PDF).
These modes allow us to get accelerometer and gyroscope data as quaternions, Euler angles, and more. It depends on what kind of math you want to do on the movement data.
We’re going to get Free Acceleration (#6) data in this tutorial, which is just the X, Y, and Z of the accelerometer.
To set and turn on the Free Acceleration measurement mode, we’ll give 2 arguments to write_gatt_char()
:
- Free Acceleration’s “measurement characteristic UUID” (found in the DOT BLE documentation)
- A special binary message
Like so:
measurement_characteristic_uuid = "15172001-4947-11e9-8646-d663bd873d93"
...
async with BleakClient(aaddress) as client:
binary_message = b"\x01\x01\x06"
await client.write_gatt_char(measurement_characteristic_uuid, binary_message)
The binary_message
has 3 hex numbers that specify:
\x01
— A default value that the DOT BLE docs say is required\x01
— whether measurement is on or not.01
= on.00
= off.\x06
— specifies that we want Free Acceleration (option6
from the measurement modes list)
The measurement_characteristic_uuid
is also different from the Short Payload Characteristic UUID that we subscribed to in the last step.
The overall logic between Step 3 and Step 4 is that we are subscribing to the Short/Medium/Long Payload Characteristic for a given measurement mode, then we are turning on a separate Measurement Characteristic and giving it the number of the measurement mode we want (in this case, 06
for Free Acceleration). It’s weird, but that’s how Bluetooth LE works.
Here is the code up until this point. But if you run it, you’ll notice one thing: we get no data at all. We’ll fix that with one line in Step 5.
# stream.py
import asyncio
from bleak import BleakClient
measurement_characteristic_uuid = '15172001-4947-11e9-8646-d663bd873d93'
short_payload_characteristic_uuid = "15172004-4947-11e9-8646-d663bd873d93"
def notification_callback(sender, data):
print(f"Sender: {sender}")
print(f"Data: {data}")
async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID
async with BleakClient(address) as client:
# Check if connection was successful
print(f"Client connection: {client.is_connected}") # prints True or False
# Subscribe to notifications from the Short Payload Characteristic
await client.start_notify(short_payload_characteristic_uuid, notification_callback)
# Set and turn on the Free Acceleration measurement mode
binary_message = b"\x01\x01\x06"
await client.write_gatt_char(measurement_characteristic_uuid, binary_message)
asyncio.run(main())
Step 5: “Listen” for Data
We only need one more line to get data:
await asyncio.sleep(10.0)
This tells asyncio
to do nothing and wait for Bluetooth data to come in. We wait for 10.0
seconds above, but you could choose 1.0
second or 600000.0
seconds.
Add the sleep
statement after the rest of the await
lines of code.
# stream.py
import asyncio
from bleak import BleakClient
measurement_characteristic_uuid = '15172001-4947-11e9-8646-d663bd873d93'
short_payload_characteristic_uuid = "15172004-4947-11e9-8646-d663bd873d93"
def notification_callback(sender, data):
print(f"Sender: {sender}")
print(f"Data: {data}")
async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID
async with BleakClient(address) as client:
# Check if connection was successful
print(f"Client connection: {client.is_connected}") # prints True or False
# Subscribe to notifications from the Short Payload Characteristic
await client.start_notify(short_payload_characteristic_uuid, notification_callback)
# Set and turn on the Free Acceleration measurement mode
binary_message = b"\x01\x01\x06"
await client.write_gatt_char(measurement_characteristic_uuid, binary_message, response=True)
await asyncio.sleep(10.0) # How long to stream data for
asyncio.run(main())
Running this code will get us a few hundred lines of real measurement data from the DOTs.
Troubleshooting: if you don’t see any sensor data like below, add response=True
as a 3rd argument to write_gatt_char()
.
It shows that the Sender
was the Short Payload Characteristic (UUID 15172004-x-etc...
), and the Data
was a bunch of hex numbers (like \x8d
) in a bytearray
. We’ll convert these bytearray
s in the next step.
Step 6: Convert the Data to Real Numbers
To convert the bytearray
s to real numbers, we’ll use NumPy. NumPy is the master of working with numbers, math, bits, and bytes in Python.
The Measurement Service section (Section 3) of the DOT BLE Specification documentation says that measurement modes with a “short payload length” are 20 bytes in total.
And Section 3.4 specifies that Free Acceleration is 16 bytes of actual data with 4 bytes of padded zeros.
There are 16 bytes of actual data, made up of a timestamp and free acceleration data. Section 3.5 Measurement Data also shows that the actual numbers in free acceleration data are 12 bytes total, meaning that the data is:
- a 4-byte timestamp
- a 4-byte X accelerometer value
- a 4-byte Y accelerometer value
- a 4-byte Z accelerometer value
Now that we know how the data is supposed to be chopped up, let’s use NumPy to convert it.
We’ll use the following function. You don’t need to know how it works to put it in your code.
import numpy as np
...
def encode_free_acceleration(bytes_):
# These bytes are grouped according to Movella's BLE specification doc
data_segments = np.dtype([
('timestamp', np.uint32),
('x', np.float32),
('y', np.float32),
('z', np.float32),
('zero_padding', np.uint32)
])
formatted_data = np.frombuffer(bytes_, dtype=data_segments)
return formatted_data
We’re using data_segments
to make a template of sorts to break up the measurement data (bytes_
) into neat numbers (formatted_data
).
Then instead of this:
\xe3\x17\xf1\xd3\xffQ\xda\xc0\x1ai\xd4@I\x8d\xdb\xc0\x00\x00\x00\x00
We’ll get this:
3555792867, -6.8225093, 6.63783, -6.8609967, 0
Above is the data as timestamp, X, Y, and Z (and a zero for padding that you can ignore).
We’ll also incorporate our new NumPy encoding function into our callback function, so that we print out the real numbers, like so:
def notification_callback(sender, data):
print(f"Sender: {sender}")
print(f"Data: {data}")
print(f"Encoded Free Acceleration: {encode_free_acceleration(data)}")
The full script is now:
# stream.py
import numpy as np
import asyncio
from bleak import BleakClient
measurement_characteristic_uuid = '15172001-4947-11e9-8646-d663bd873d93'
short_payload_characteristic_uuid = "15172004-4947-11e9-8646-d663bd873d93"
def notification_callback(sender, data):
print(f"Sender: {sender}")
print(f"Data: {data}")
print(f'Encoded Free Acceleration: {encode_free_acceleration(data)}')
def encode_free_acceleration(bytes_):
# These bytes are grouped according to Movella's BLE specification doc
data_segments = np.dtype([
('timestamp', np.uint32),
('x', np.float32),
('y', np.float32),
('z', np.float32),
('zero_padding', np.uint32)
])
formatted_data = np.frombuffer(bytes_, dtype=data_segments)
return formatted_data
async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID
async with BleakClient(address) as client:
# Check if connection was successful
print(f"Client connection: {client.is_connected}") # prints True or False
# Subscribe to notifications from the Short Payload Characteristic
await client.start_notify(short_payload_characteristic_uuid, notification_callback)
# Set and turn on the Free Acceleration measurement mode
binary_message = b"\x01\x01\x06"
await client.write_gatt_char(measurement_characteristic_uuid, binary_message, response=True)
await asyncio.sleep(10.0) # How long to stream data for
asyncio.run(main())
Step 7: Print the Measurement Data
We’re already printing a bunch of stuff along with the data. We’ll narrow it down to just the data.
Remove all of the print statements from the callback function:
def notification_callback(sender, data):
...
We’ll pass data through the NumPy encoder function, then print it, so that all we see is accelerometer data.
def notification_callback(sender, data):
print(encode_free_acceleration(data))
The output of which will be:
And that’s how we stream accelerometer data from a Movella/Xsens DOT.
If you have questions or feedback, email us at protobioengineering@gmail.com or message us on Instagram (@protobioengineering).
The Full Python Script
Stream Free Acceleration data from a Movella DOT:
# Stream data from a single Movella DOT
import numpy as np
import asyncio
from bleak import BleakClient
measurement_characteristic_uuid = '15172001-4947-11e9-8646-d663bd873d93'
short_payload_characteristic_uuid = "15172004-4947-11e9-8646-d663bd873d93"
def notification_callback(sender, data):
print(encode_free_acceleration(data))
def encode_free_acceleration(bytes_):
# These bytes are grouped according to Movella's BLE specification doc
data_segments = np.dtype([
('timestamp', np.uint32),
('x', np.float32),
('y', np.float32),
('z', np.float32),
('zero_padding', np.uint32)
])
formatted_data = np.frombuffer(bytes_, dtype=data_segments)
return formatted_data
async def main():
address = "338312FA-C3D1-183F-325A-0726AFDBEB78" # Movella DOT UUID
async with BleakClient(address) as client:
# Check if connection was successful
print(f"Client connection: {client.is_connected}") # prints True or False
# Subscribe to notifications from the Short Payload Characteristic
await client.start_notify(short_payload_characteristic_uuid, notification_callback)
# Set and turn on the Free Acceleration measurement mode
binary_message = b"\x01\x01\x06"
await client.write_gatt_char(measurement_characteristic_uuid, binary_message, response=True)
await asyncio.sleep(10.0) # How long to stream data for
asyncio.run(main())
Related Resources
- Movella Support and Knowledge Base
- Movella SDKs and Bluetooth Documentation
- AsyncIO in Python: A Complete Walkthrough
Read More in Our eBook
We now have an e-book about how to control and stream real-time data from Bluetooth LE devices. Pay What You Want until June 1st, 2024 on Ko-fi!
Questions and Feedback
If you have questions or feedback, email us at protobioengineering@gmail.com or message us on Instagram (@protobioengineering).
If you liked this article, consider supporting us by donating a coffee.