Using RTSP with OpenCV

Sems Kurtoglu
4 min readNov 6, 2023

--

Hi,

In our previous article, we included preliminary information that could help us understand RTSP.

Now it’s time to code!

Photo by Chris Ried on Unsplash

We will make an example RTSP application. Our expectations in the application we will make:

1. Create a web server with a `/start` endpoint (you can use any library you want).

  • 1.1 It only responds to a POST request.
  • 1.2. When it receives the parameter `rtsp_url`, starts a new process.

2. The new process reads the video stream from `rtsp_url` using `opencv`.

3. Add a `/stop` endpoint.

  • 3.1. Save an image of the images with certain characteristics!
  • 3.2. Stop the video reading process.
  • 3.3. Display the saved image in the HTML response.

Started…

Let’s select the main components we will use in our application:

  1. Threading
  2. FastAPI
  3. OpenCV

These components can be changed optionally, but I chose them here today.

Name of our application file: application.py
First, let’s start by installing the libraries we determined.

from fastapi import FastAPI, File, UploadFile, Form
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
import uvicorn

import cv2
import numpy as np
from PIL import Image
from io import BytesIO
import threading
import os
import time
from datetime import datetime

FastAPI is a modern web framework for building APIs with Python. By calling FastAPI(), we are creating an instance of the FastAPI application, which allows us to define routes, request handlers, and other functionality for our web application.

app = FastAPI()

Since we will save certain images here and we will use the static directory for smooth access to them. For this, we add the relevant definitions to our code.

app.mount("/static", StaticFiles(directory="./"), name="static")

Now let’s add the code that will process the RTSP URL.

def process_video(rtsp_url):
global frame_buffer
cap = cv2.VideoCapture(rtsp_url)
frame_buffer = []

while True:
ret, frame = cap.read()
if not ret:
break

gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
brightness = np.sum(gray_frame)
frame_buffer.append((frame, brightness))

if len(frame_buffer) > frame_limit:
frame_buffer = sorted(frame_buffer, key=lambda x: x[1], reverse=True)[:frame_limit]

if stop_flag:
break

cap.release()

Here, we defined the stop_flag value associated with the “/stop” request, since there will be a continuous data flow with RTSP and it will always remain in the loop. First, we will store each incoming frame to reveal certain desired images.

The start_video function starts a separate thread to process the video stream from the specified RTSP URL. This allows you to perform video processing in a separate thread from the main application and can help your application run faster and more efficiently.

def start_video(rtsp_url):
global video_process
video_process = threading.Thread(target=process_video, args=(rtsp_url,))
video_process.start()

Now we will create new functions and other definitions for these functions to work in FastAPI. This starting point will be “/start”…

The following snippet handles incoming HTTP POST requests on the /start path, if the video rendering process has not been started it starts a new process and sends a response to the client. If the process has already been started, the client will be told “Video processing is already running.” It returns a response with the message: Video processing is already running.

@app.post("/start", response_model=dict)
async def start(rtsp_url: str = Form(...)):
global video_process
if video_process and video_process.is_alive():
return {"message": "Video processing is already running."}

start_video(rtsp_url)
return {"message": "Video processing started."}

Now let’s define a redirect action that will handle an HTTP GET request in the “/stop” path of the FastAPI implementation. With response_class we will specify that this redirect will return an HTML response.

@app.get("/stop", response_class=HTMLResponse)
async def stop():
global video_process, frame_buffer, stop_flag
stop_flag = True
if video_process and video_process.is_alive():
video_process.join()

for i, (frame, _) in enumerate(frame_buffer):
frame = cv2.resize(frame, (frame_width, frame_height))
image_path = f"frame_{i}.png"
cv2.imwrite(image_path, frame)

frame_buffer = [] # Reset the buffer

return html_template

async def stop(): This function is an asynchronous operation that will run when an HTTP GET request arrives at the /stop path. This function ends the video processing and will then give us the output we want.

Additionally, the HTML related codes to be given as the relevant response are given below.

# HTML template for displaying the frames
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>Video Frames</title>
<style>
.grid-container {
display: grid;
grid-template-columns: repeat(3, 1fr);
grid-gap: 10px;
}

.grid-item {
width: 100%;
}
</style>
</head>
<body>
<h1>Top 9 Brightest Frames</h1>
<div class="grid-container">
<div class="grid-item">
<img src="/static/frame_0.png">
</div>
<div class="grid-item">
<img src="/static/frame_1.png">
</div>
<div class="grid-item">
<img src="/static/frame_2.png">
</div>
<div class="grid-item">
<img src="/static/frame_3.png">
</div>
<div class="grid-item">
<img src="/static/frame_4.png">
</div>
<div class="grid-item">
<img src="/static/frame_5.png">
</div>
<div class="grid-item">
<img src="/static/frame_6.png">
</div>
<div class="grid-item">
<img src="/static/frame_7.png">
</div>
<div class="grid-item">
<img src="/static/frame_8.png">
</div>
</div>
</body>
</html>
"""

Finally, our last lines to run FASTAPI locally are:

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

Additionally, other auxiliary components that should be included in application.py:

# Global variables to manage video processing and frames
video_process = None
frame_buffer = []
stop_flag = False
frame_limit = 9
frame_width = 300
frame_height = 300

#Check stop_flag
def condition_checker():
while True:
global stop_flag
print(stop_flag)
if stop_flag:
break
time.sleep(1) # For check every second

# Start condition_checker with threading
condition_thread = threading.Thread(target=condition_checker)
condition_thread.start()

There is no problem if the given codes run in a single file, you can run them this way. When FastAPI is running, it can be used by providing RTSP_URL information to the local address (+/start) and then output can be obtained with the stop address with request. Of course, it is possible to perform the same functions in different ways.

NEXT STEP: RUN THE APPLICATION WITH DOCKER (Coming soon!)

--

--

Sems Kurtoglu

Electronic and Telecommunication Engineer & AI Developer