Barbell Velocity Tracking with Detectron2

Diego Granziol
7 min readFeb 1, 2023

--

As part of the offering at PureStrength and back when I started the company in November 2021, I needed a way to find key stages during the lift (start of the deadlift, bottom of the squat). As part of the discussions with my then powerlifting coach Daniel Jay (who heads up the very awesome Barbell Division in Southampton), he was very interested in velocity based training. I.e instead of measuring how hard a lift was with something subjective like RPE (rate of perceived exhaustion)/RIR (reps in reserve), where RPE is just to my understanding 10-RIR, could we give something objective. Can you compare a set of 5 180kg squats today and last week instead of just is that a little bit harder? The idea which I am sure is familiar to you already is the idea of using velocity instead. The faster a lift moves, the easier it is. In this post I will simply give you some code (the old prototype code I used, which takes a video as input and tells you for each rep, how long it took to ascend and descend and whether there was a pause). We use Python and detectron2. For those of you without GPU access the code is really quite slow on CPU, so here is a link for google collab.

First we simply install detectron2

!python -m pip install 'git+https://github.com/facebookresearch/detectron2.git'
import torch
from detectron2 import model_zoo

We then simply take a video (we can use one of mine here for example purposes) and simply convert the video into a stream of images

!wget http://www.purestrength.ai/opensource/diego205.mov
import urllib
import os
import cv2
import shutil
from time import time
import numpy as np
rng = np.random.default_rng()
val = rng.integers(low=0,high=1000000,size=1)[0]
temp_dir = str(val)
# temp_dir = 'temporary'
try:
os.mkdir(temp_dir)
except:
pass
filename = 'diego205.mov'
shutil.copyfile(filename, os.path.join(temp_dir,filename))
#filename = split_name[1].split('%40')[0]+'_'+split_name[3].split('_')[0]+'_at'+split_name[2].split('_')[0]+'_'+split_name[4]

# split_name = clean_links.split('%3D')
files = os.listdir(temp_dir)
new_list = []
clean_folders = []
from time import time
for file in files:
if any(s in file.lower() for s in ['mp4','avi','mov']):
new_list.append(file)
else:
print('{} not an allowed filetype'.format(file))
files = new_list
print(files)
t0 = time()
for f in files:
print('running on file {}'.format(f))
if 'mp4' in f:
folder_name = f.split('.mp4')[0]
elif 'mov' in f:
folder_name = f.split('.mov')[0]
elif 'avi' in f:
folder_name = f.split('.avi')[0]
else:
print('warning which filetype is this??')
folder_name = f.split('.')[0]
folder_name = folder_name.replace(' ','-')
folder_name = folder_name.replace('_', '-')
clean_folders.append(folder_name+'_')
print(folder_name+'_')
print('outputs/{}/{}.mov'.format(temp_dir, folder_name+'_.mov'))
try:
os.mkdir(temp_dir+'/{}_'.format(folder_name))
print(temp_dir+'/{}_'.format(f))
vidcap = cv2.VideoCapture(temp_dir+'/{}'.format(f))
success, image = vidcap.read()
count = 0
while success:
cv2.imwrite(temp_dir+"/{}_/%04d.jpg".format(folder_name) % count, image) # save frame as JPEG file
success, image = vidcap.read()
print('Read a new frame: ', success)
count += 1
except:
print('folders already created moving to the next step')
print(time()-t0)

Then we simply initialise the predictor from detectron2, using a pre-trained image segmentation model of mine (which will give the implied bounding boxes). As a side note i found the segmentation implied bounding boxes to be much tighter than those of object detection models such as Yolo. Perhaps a side effect of the cross entropy loss? I have hitherto not investigated.

!wget purestrength.ai/opensource/mask_rcnn_R_101_FPN_3x.yaml
!wget purestrength.ai/opensource/final_model_3class.pth
!wget purestrength.ai/opensource/Base-RCNN-FPN.yaml
import detectron2
# from detectron2.utils.logger import setup_logger
# setup_logger()

# import some common libraries
import numpy as np
import os, json, cv2, random

# import some common detectron2 utilities
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer
from detectron2.data import MetadataCatalog, DatasetCatalog
from detectron2.data.datasets import register_coco_instances
from detectron2.utils.visualizer import Visualizer
from detectron2.config import get_cfg
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from matplotlib.figure import Figure
from mpl_toolkits.axisartist.axislines import Subplot


#if we just want to load a saved model
cfg = get_cfg()
cfg.merge_from_file("mask_rcnn_R_101_FPN_3x.yaml")
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.9 # Set threshold for this model
cfg.MODEL.WEIGHTS = 'final_model_3class.pth' # Set path model .pth
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 3
predictor = DefaultPredictor(cfg)

Then we simply for each frame make a prediction and get the implied bounding box. Note that the actual bounding box predicted by the detectron model is quite loose and this would degrade the accuracy of the prediction.

import os
from time import time

folder = temp_dir+'/'+filename[:len(filename)-4]+'_'

files = os.listdir(folder)
files = sorted(files, key = lambda x:int(x.split('.')[0]))

centroid_topleft_info = {}
for i in range(0,24):
centroid_topleft_info[i] = []
bar_info = {}
bar_info_predmask = {}


import matplotlib.patches as patches
def bbox2(img):
rows = np.any(img, axis=1)
cols = np.any(img, axis=0)
ymin, ymax = np.where(rows)[0][[0, -1]]
xmin, xmax = np.where(cols)[0][[0, -1]]
return xmin,ymin,xmax-xmin,ymax-ymin
import cv2
zeros = '0000'
t0 = time()
for file in files:
print(file)
idx= int(file.split('.')[0])
im = cv2.imread('{}/{}'.format(folder,file))
outputs = predictor(im)
predictions = outputs["instances"].to("cpu")
"""bar_info structure -> x_start,y_start,x_end,y_end"""
bar_info[idx] = {}
bar_info_predmask[idx] = {}
for i in range(0,len(predictions)):
if i == 0:
bar = predictions[i].pred_masks[0,:,:]
else:
bar += predictions[i].pred_masks[0,:,:]
bar_info[idx][i] = predictions[i].pred_boxes
xmin,ymin,w,h = bbox2(predictions[i].pred_masks.numpy()[0])
try:
bar_info_predmask[idx][predictions[i].pred_classes.numpy()[0]].append([xmin,ymin,w,h])
except:
bar_info_predmask[idx][predictions[i].pred_classes.numpy()[0]] = [[xmin,ymin,w,h]]
try:
bar_info_predmask[idx][predictions[i].pred_classes.numpy()[0]].sort(key=lambda x:x[2]*x[3], reverse=True)
except:
pass

print(time()-t0)

Once we have all the information on the weight bounding boxes, we have two tasks at hand, we need to either choose the correct weight, or assuming there is only one, we need to plot the weight center. We will come back to choosing the correct weight later. Assuming there is only one weight


thing = []
missing_list = 0
thing = []
idx_choice = 0
for a in range(0,len(bar_info_predmask)):
try:
thing.append([bar_info_predmask[a][idx_choice][0][0]+(bar_info_predmask[a][idx_choice][0][2]//2),-bar_info_predmask[a][idx_choice][0][1]-(bar_info_predmask[a][idx_choice][0][3]//2)])
except:
print(a)
missing_list += 1
var = 0
for i in range(0,len(thing[0])):
if np.var([a[i] for a in thing])>var:
var = np.var([a[i] for a in thing])
movement_direction=i
print(movement_direction)

However, note that openCV as of late has become incredibly inconsistent with its flipping of images, hence the minus after the try can become a plus or vice versa. This is easily fixed by just checking the graph

cap= cv2.VideoCapture(os.path.join(temp_dir,filename))

framespersecond = cap.get(cv2.CAP_PROP_FPS)

print(framespersecond)
# hand_motion_xy = thing
important_motion = [val[movement_direction] for val in thing]
# offset = idx-cutoff
start_val = important_motion[0]
ema_important_motion = [start_val]
smoothing_coef = 0.5
print('influence of frame dies after {} frames which is {} seconds'.format(1/(1-smoothing_coef),1/framespersecond*(1-smoothing_coef)))
smoothed_val = start_val
for i in range(0,len(important_motion)):
smoothed_val = smoothing_coef*smoothed_val + (1-smoothing_coef)*important_motion[i]
ema_important_motion.append(smoothed_val)
plt.plot(ema_important_motion)
plt.show()
plt.close()

As can be shown here using some exponential moving average smoothing (0.5)

Plot of me doing a medium difficulty deadlift (with injury)

I now need a way of isolating the start (ascent start) top (ascent end) and descent (if I am doing tempo work) of the lift.


tolerance = 1
check = []
velocity = np.diff(ema_important_motion)
for i in range(0,len(velocity)):
if abs(velocity[i]) < tolerance:
check.append(i)

plt.plot(ema_important_motion)
x_marker = [point for point in check]
y_marker = [ema_important_motion[point] for point in check]
plt.plot(x_marker, y_marker, 'bo')
plt.show()

This method identifies regions of low velocity. We can then “prune” these regions

y_diff = np.diff(y_marker)
x_marker_2 = []
tol = 0.5*np.max(abs(y_diff))
for i in range(0,len(y_diff)):
if abs(y_diff[i])>tol:
x_marker_2.append(x_marker[i])
x_marker_2.append(x_marker[1+i])
plt.plot(ema_important_motion)
y_marker_2 = [ema_important_motion[point] for point in x_marker_2]
plt.plot(x_marker_2, y_marker_2, 'bo')
plt.savefig(os.path.join(temp_dir,'velocityinfo.png'), bbox_inches='tight')
plt.show()

bottom_reps = []
y_rom = []
miny,maxy = min(y_marker_2),max(y_marker_2)
rep_names = ['descent','pause hole','pause between reps','ascent']
rep_info = dict()
for rep in rep_names:
rep_info[rep] = []
for i in range(1,len(x_marker_2)):
if abs(maxy-y_marker_2[i-1])<abs(miny-y_marker_2[i-1]):
# print('rep {} is on top'.format(i-1))
if y_marker_2[i] < y_marker_2[i-1] and abs(maxy-y_marker_2[i])>abs(miny-y_marker_2[i]):
# print('going down')
rep_info['descent'].append((x_marker_2[i]-x_marker_2[i-1])/framespersecond)
else:
# print('resting between reps')
rep_info['pause between reps'].append((x_marker_2[i]-x_marker_2[i-1])/framespersecond)
else:
# print('rep {} at bottom'.format(i-1))
if y_marker_2[i] > y_marker_2[i-1] and abs(maxy-y_marker_2[i])<abs(miny-y_marker_2[i]):
# print('rising')
rep_info['ascent'].append((x_marker_2[i]-x_marker_2[i-1])/framespersecond)
bottom_reps.append(x_marker_2[i-1])
y_rom.append(y_marker_2[i]-y_marker_2[i-1])
else:
# print('pausing at the bottom')
rep_info['pause hole'].append((x_marker_2[i]-x_marker_2[i-1])/framespersecond)
for info in rep_info.keys():
print('{} {} \n'.format(info,rep_info[info]))

print('average ascent time = {} seconds'.format(np.mean(rep_info['ascent'])))
Start of Ascent, End of ascent, pause on top and descent using this algorithm

I of course print out the values for the ascent, pause on top, bottom and descent in seconds, rep for rep.

descent [0.9] 

pause hole []

pause between reps [0.43333333333333335]

ascent [2.2]

average ascent time = 2.2 seconds

What if there are several weights lying around like in a gym?

This makes things a little more complicated, although we have very largely solved this in the PureStrength iOS app, which is free to download. Here is some sample code (this was version 0.0000001 of what we have now, that looks at trajectories based on the x-coordinate and the trajectory of maximum variance)

tot = 0
x_vals = []
for idx in range(0,len(bar_info_predmask)):
try:
if len(bar_info_predmask[idx][0]) > tot:
x_vals = []
_x_vals = []
for k in range(0,len(bar_info_predmask[idx][0])):
_x_vals.append(bar_info_predmask[idx][0][k][0])
x_vals.append(_x_vals)
tot = max(len(bar_info_predmask[idx][0]),tot)
elif len(bar_info_predmask[idx][0]) >= tot:
_x_vals = []
for k in range(0,len(bar_info_predmask[idx][0])):
_x_vals.append(bar_info_predmask[idx][0][k][0])
x_vals.append(_x_vals)
else:
pass
except:
pass
print(tot)

x_vals_new = []
for k in range(0,len(x_vals[0])):
x_vals_new.append(np.mean([x[k] for x in x_vals]))
x_vals = x_vals_new

x_vals.sort(key=lambda x:x)

from collections import OrderedDict
d = OrderedDict()
heights = OrderedDict()
for vals in x_vals:
d[vals] = []
heights[vals] = []
missing_frames = 0
for idx in range(0,len(bar_info_predmask)):
try:
for k in range(0,len(bar_info_predmask[idx][0])):
index = np.argmin([(x1-x2)**2 for x1,x2 in zip(x_vals,[bar_info_predmask[idx][0][k][0]]*len(x_vals))])
d[x_vals[index]].append([bar_info_predmask[idx][0][k][0]+bar_info_predmask[idx][0][k][0]//2,bar_info_predmask[idx][0][k][1]+bar_info_predmask[idx][0][k][3]//2])
heights[x_vals[index]].append(bar_info_predmask[idx][0][k][3])
except:
missing_frames +=1
frac_frames_missing = missing_frames/len(bar_info_predmask)
from copy import deepcopy
initial_var = 0
for keys in d.keys():
cap= cv2.VideoCapture(os.path.join(temp_dir,filename))
framespersecond = cap.get(cv2.CAP_PROP_FPS)
movement_direction = 1
print(framespersecond)
# hand_motion_xy = thing
important_motion = [val[movement_direction] for val in d[keys]]
# offset = idx-cutoff
start_val = important_motion[0]
ema_important_motion = [start_val]
print('influence of frame dies after {} frames which is {} seconds'.format(1/(1-smoothing_coef),1/framespersecond*(1-smoothing_coef)))
smoothed_val = start_val
for i in range(0,len(important_motion)):
smoothed_val = smoothing_coef*smoothed_val + (1-smoothing_coef)*important_motion[i]
ema_important_motion.append(smoothed_val)
if np.var(ema_important_motion) > initial_var:
initial_var = np.var(ema_important_motion)
final_ema = ema_important_motion
height = np.mean(heights[keys])
print('total fraction of missing frames {:3g}'.format(frac_frames_missing))
# return final_ema,height,framespersecond, frac_frames_missing

--

--