How to upload 50 OpenCV frames into cloud storage within 1 second

How to read RTSP/Video frames and upload them into cloud storage in an asynchronous way

Bala Venkatesh
Nov 12, 2020 · 4 min read

USE CASE

For instance, when we analyze a CCTV camera, as a first step, we should read RTSP URL using OpenCV, then we should store it somewhere in the cloud to analyze it further.

But the problem is, when we upload the frames one after the other into the cloud, it will take some time to upload, won’t it?

In the interest to procure a clear understanding of this, I have done an experiment with Google bucket, and it reckoned that one frame takes 1.05secs to upload the Google bucket. Due to this, we will have to wait for 1 sec to get a response and then, we need to upload the next frames in the row.

I was spit-balling to discover a solution for this, at your anticipation, here you go!!!

The solution is, we can upload frames in an asynchronous way using Celery.

When we are uploading the frames in an asynchronous way, we can’t get the sequence frames, as a resort, we should use the group and chain concept in Celery.

Many (non-expertise) would be interested to be aware; What is Celery?

Celery is one of the most popular backgrounds for job managers in the Python world.

"Celery" is compatible with several message brokers like RabbitMQ or Redis. These can act as both producer and consumer. Further, "Celery" is an asynchronous task queue/job queue based on distributed message passing. In addition to that, It is focused on real-time operations and supports scheduling as well.

Having clarified the definition, let us see below how to configure celery with python code.

Step 1:- Import all necessary celery package

from celery import Celery
from celery.result import AsyncResult
from celery.result import allow_join_result
from celery.decorators import periodic_task

Step 2:- We should configure the broker and backend in celery. I have used Redis as backend, so install Redis in your system and, please ensure it’s running successfully;

app = Celery(‘tasks’, backend=’redis://guest@127.0.0.1:6379', broker=’redis://guest@127.0.0.1:6379')

Step 3:- In order to call a function in an asynchronous way, we should put “@app.taks annotation” on the function.

Here’s the sample celery coding to upload frames into Google bucket.

@app.task(bind=True, max_retries=30)
def upload_frames_gcs(self, file_name):
try:
url = upload_file_to_gcs(file_name)
return url
except Exception as e:
raise self.retry(exc=e)

Step 4:-The below are the most important steps:

We won’t be able to directly call the function and upload frames in an asynchronous way, because we can’t get sequence frames after uploading, so we should use the Chains and Groups concept in celery to upload frames into a bucket. Using this technique, we can upload up to 5 or 10 frames in parallel, also we can get a sequence order of frames. However, before moving into coding, let us first see “what is chains and groups in Celery”

Chains in Celery

A chain is a primitive that lets us link more tasks into one singular signature, so that itis called “One after the other, essentially forming a chain of callbacks”.

Perhaps, if you are still unsure, nevertheless, the below diagram will give you a clear idea of how the chain works in celery. These are the task's id in celery.

Groups in Celery

The group primitive is a signature that takes a list of tasks which should be applied in parallel.

This is sample coding to explain, How I uploaded frames into google bucket using groups and chain technique in celery.

jobs = group(upload_frames_gcs.s(file_name, ts) for ts, file_name in file_name_dic.items())result = jobs.apply_async()

Understandably, I am calling upload_frames_gcs function inside a group method, then you can see “s” to pass a parameter called “Chains concept “ in celery, this allows you to link the signatures, in result “One is called after the other, essentially forming a chain of callbacks”. Finally, we can get a group of results in one task.

Step 5:-No wonder, if you may think how to get frames URL after uploading in celery. Simple as that, in the result variable you can get the task id of that group function, and we can use the task id to get results.

However, Be cautious to check the status of the task, and once it’s completed, we can get the frames URL.

def taskid_status(task_id_array):
for task in task_id_array:
if task.successful():
task_id_array.remove(task)
with allow_join_result():
frames_array = []
for results in task.join():
frame_dic = {}
frame_dic['frame_url'] = results[0]
frames_array.append(frame_dic)
return task_id_array, frames_array

In the frames_array variable, you can get all the frames with a timestamp.

I have tested the performance with multiple dissimilar test cases.

  1. 10 frames take 0.77 to 0.82 sec to upload google storage.
  2. 15 frames take 0.9 to 1.0 sec to upload google storage.
  3. 30 frames take 0.7 to 0.8 sec to upload google storage.

The above is evident to say that, there isn’t much difference to increase the number of frames to upload into a bucket, because multiprocessing is used to perform concurrent execution of tasks in celery.

If you want further assistance or help. please feel free to contact me balavenkatesh.com 📬 I am happy to help you.🖥️

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data…

Sign up for Analytics Vidhya News Bytes

By Analytics Vidhya

Latest news from Analytics Vidhya on our Hackathons and some of our best articles! Take a look

By signing up, you will create a Medium account if you don’t already have one. Review our Privacy Policy for more information about our privacy practices.

Check your inbox
Medium sent you an email at to complete your subscription.

Bala Venkatesh

Written by

I have a passion for understanding technology at a fundamental level and Sharing ideas and code. * Aspire to Inspire before I expire* https://balavenkatesh.com

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Bala Venkatesh

Written by

I have a passion for understanding technology at a fundamental level and Sharing ideas and code. * Aspire to Inspire before I expire* https://balavenkatesh.com

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store