Build a Production Ready Face Detection API
Part 2: Refining the API and integrating the final workflow
This is a continuation article to https://medium.com/analytics-vidhya/build-a-production-ready-face-detection-api-part-1-c56cbe9592bf which was the first part of our series.
In Part 1 of this post, we created a simple dockerized face detection API on top of django and mtcnn face detection model. In this post, we are going to make our API asynchronous by using a micro-service architecture and process the images in the background. I will try as much as possible to abide by the 12 factor app methodology in the development of our service.
In order to make our API asynchronous, we shall use celery as our task processor, Redis as our result back-end, RabbitMQ as our message broker and PostgreSQL as our persistence layer. We shall also use Minio as our file storage service (Minio is an s3 alternative). We shall begin by updating our architectural diagram from part 1 so as to visualize the setup of the new design.
We shall begin by updating our docker-compose file (docker-compose.yaml) as shown below;
version: '3'
services:
api:
build: .
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ./:/django
ports:
- "8900:8000"
env_file: &envfile
- env.env
depends_on:
- broker
- redis
- database
database:
image: postgres:latest
volumes:
- ./db_data/pgdata:/var/lib/postgresql/data/
environment:
POSTGRES_USER: django
ports:
- "5432:5432"
broker:
image: rabbitmq:3.6.16-management-alpine
env_file: *envfile
ports:
- 5672:5672
- 15688:15672
- 5671:5671
redis:
image: 'bitnami/redis:latest'
environment:
- ALLOW_EMPTY_PASSWORD=yes
ports:
- "6379:6379"
worker:
build: .
command: ["celery", "worker", "--app=api.celery_app", "--concurrency=2", "--hostname=worker@%h", "--loglevel=INFO"]
volumes:
- ./:/django
links:
- database
env_file: *envfile
depends_on:
- broker
- redis
- database
minio:
image: "minio/minio"
command: server /data
env_file: *envfile
ports:
- "9000:9000"
volumes:
- ./storage/minio:/data
We also add an env file env.env which contains all our application environment variables. copy the following into the env.env file in our project root directory;
RABBITMQ_DEFAULT_USER=user
RABBITMQ_DEFAULT_PASS=weak_password
CELERY_BROKER=amqp://user:weak_password@broker:5672
CELERY_RESULT_BACKEND=redis://redis:6379
MINIO_STORAGE_ENDPOINT=minio:9000
MINIO_ACCESS_KEY=weak_password
MINIO_SECRET_KEY=weak_password
DEBUG="true"
DATABASE_NAME=django
DATABASE_USER=django
DATABASE_PASSWORD=weak_password
DATABASE_SERVICE_HOST=database
We next integrate celery into our application and convert the detect_faces function into a background task.
Asynchronous Processing
in order for us to process our task in the background/Asynchronously we shall follow the following steps;
- Add celery to the requirements.txt file, this is to ensure that celery is installed inside our application container
celery==4.2.0
2. Add celery configs to settings.py: copy the settings below to the bottom of settings.py
CELERY = {
'BROKER_URL': os.environ['CELERY_BROKER'],
'CELERY_IMPORTS': ('apps.api.tasks', ),
'CELERY_TASK_SERIALIZER': 'json',
'CELERY_RESULT_BACKEND': os.environ['CELERY_RESULT_BACKEND'],
'CELERY_RESULT_SERIALIZER': 'json',
'CELERY_ACCEPT_CONTENT': ['json'],
}
3. Copy the below database settings to the settings file also
########## DATABASE CONFIGURATION
# See: https://docs.djangoproject.com/en/dev/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': os.getenv('DATABASE_ENGINE', 'django.db.backends.postgresql_psycopg2'),
'NAME': os.getenv('DATABASE_NAME', ''),
'USER': os.getenv('DATABASE_USER', ''),
'PASSWORD': os.getenv('DATABASE_PASSWORD', ''),
'HOST': os.getenv('DATABASE_SERVICE_HOST', ''),
'PORT': os.getenv('DATABASE_SERVICE_PORT', 5432)
}
}
########## END DATABASE CONFIGURATION
4. Create the file api/celery_app.py and copy the following code into it.
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
app = Celery()
app.conf.update(settings.CELERY)
5. Add the Image model (api/models/image.py).
import os
import uuid
from django.db import models
class Image(models.Model):
image_id = models.CharField(max_length=100)
image_array = models.TextField(null= True)
name = models.CharField(max_length=100)
date_created = models.DateTimeField(auto_now_add=True)
6. For each face identified, we want to save it in the db so we add a Face model to our models api/models/face.py
import os
import uuid
from django.db import models
class Face(models.Model):
image_id = models.CharField(max_length=100)
face_id = models.CharField(max_length=100)
confidence = models.CharField(max_length=200)
box = models.TextField(null= True)
keypoints = models.TextField(null= True)
date_created = models.DateTimeField(auto_now_add=True)
7. now modify our view to only save the upload image to disk then to the database before returning a 202 response.
class Image(APIView):
def post(self, request, *args, **kwargs):
image_id = str(uuid.uuid4())
name = upload_image(request, image_id)
image = Image()
image.image_id = image_id
image.name = name
image.save()
return Response({"status":"ok"}, status=status.HTTP_202_ACCEPTED)
8. Create the file api/tasks/image.py and inside, copy the below code. we have move the detect faces function from the view and converted it in to a celery task.
# The future is now!
import uuid
from django.core.files.storage import default_storage
from mtcnn.mtcnn import MTCNN
from numpy import asarray
from api.models.face import Face
from ..models.image import Image as Image_object
from PIL import Image as PImage
from api.celery_app import app
@app.task(bind=True, name='detect_faces')
def detect_faces(self, *args, **kwargs):
image_id = kwargs.get("image_id")
image_object = Image_object.objects.get(image_id=image_id)
filename = image_object.name
image = PImage.open(default_storage.open(filename))
image = image.convert('RGB')
pixels = asarray(image)
detector = MTCNN()
# detect faces in the image
results = detector.detect_faces(pixels)
detected_faces = list()
for result in results:
# only detect faces with a confidence of 94% and above
if result['confidence'] > 0.94:
face_object = Face()
face_id = str(uuid.uuid4())
face_object.face_id = face_id
face_object.image_id = image_id
face_object.confidence = result['confidence']
face_object.box = result['box']
face_object.keypoints = result['keypoints']
face_object.save()
detected_faces.append(face_id)
return detected_faces
Now what the task does is, it fetches an image from the db, detects faces on the image and then saves the faces to the db.
9. We are almost there, now, lets modify our view and call our detect faces task.
# The future is now!
import uuid
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.core.files.storage import default_storage
from api.tasks.image import detect_faces
import os
def upload_image(request, image_id):
img = request.FILES['file']
img_extension = os.path.splitext(img.name)[-1]
return default_storage.save(image_id + img_extension, request.FILES['file'])
class Image(APIView):
def post(self, request, *args, **kwargs):
image_id = str(uuid.uuid4())
name = upload_image(request, image_id)
image = Image()
image.image_id = image_id
image.name = name
image.save()
detect_faces.s(image_id=image_id).delay()
return Response({"status":"ok"}, status=status.HTTP_202_ACCEPTED)
Now we are ready to test our API!! First, ensure that your requirements.txt file looks as the one below
absl-py==0.8.0
amqp==2.5.1
astor==0.8.0
billiard==3.5.0.5
celery==4.2.0
Django==2.2.5
djangorestframework==3.10.3
gast==0.3.2
google-pasta==0.1.7
grpcio==1.23.0
h5py==2.10.0
importlib-metadata==0.23
Keras-Applications==1.0.8
Keras-Preprocessing==1.1.0
kombu==4.6.4
Markdown==3.1.1
more-itertools==7.2.0
mtcnn==0.0.9
numpy==1.17.2
opencv-python==4.1.1.26
Pillow==6.1.0
protobuf==3.9.1
psycopg2-binary==2.8.3
pytz==2019.2
redis==3.3.8
six==1.12.0
sqlparse==0.3.0
tensorboard==1.14.0
tensorflow==1.14.0
tensorflow-estimator==1.14.0
termcolor==1.1.0
vine==1.3.0
Werkzeug==0.16.0
wrapt==1.11.2
zipp==0.6.0
Now run
docker-compose up —-build
The command might take a while pulling and building the necessary images.
After the command has finished running, in a new terminal, run the sollowing set of commands
- Create a django admin super user by running the following command then type in the requested details
docker-compose run api python manage.py createsuperuser
- make database migrations
docker-compose run api python manage.py makemigrations
- migrate database
docker-compose run api python manage.py migrate
After this, sending a post request to our endpoint gives us the following response
looking at the docker logs, we see that indeed the task was processed in the background as shown below;
Until now, lets stock stake and see where we are at with our API, so at the moment, we are able to process our face detection tasks with the workers, leaving the API container to only receive tasks and queue them for later processing. This micro-service architecture gives us leverage when we want to scale our application to be able to handle a huge number of requests concurrently. There is one drawback though, at the moment, we are saving our images to disk and as we know, containers are transient and when they die, we shall also lose our uploaded image files. This is where minio(amazon S3 alternative) comes in, we shall use minio as our image storage service. To learn more about minio and its capabilities, please refer here.
To integrate minio into our django application, we shall add the settings below to our settings file (face_detect_api/settings.py)
DEFAULT_FILE_STORAGE = "minio_storage.storage.MinioMediaStorage"
STATICFILES_STORAGE = "minio_storage.storage.MinioStaticStorage"
MINIO_STORAGE_ACCESS_KEY = os.environ.get('MINIO_ACCESS_KEY')
MINIO_STORAGE_SECRET_KEY = os.environ.get('MINIO_SECRET_KEY')
MINIO_STORAGE_ENDPOINT = os.environ.get('MINIO_STORAGE_ENDPOINT')
MINIO_STORAGE_USE_HTTPS = False
MINIO_STORAGE_MEDIA_BUCKET_NAME = "media"
MINIO_STORAGE_AUTO_CREATE_MEDIA_BUCKET = True
MINIO_STORAGE_STATIC_BUCKET_NAME = "static"
MINIO_STORAGE_AUTO_CREATE_STATIC_BUCKET = True
also in the settings file, add minio_storage
to INSTALLED_APPS.
Then we shall add the packages minio==4.0.21 and django-minio-storage==0.2.2
to our requirements.txt file.
we now do a final docker-compose up and send a test image to our API. Now if we navigate using our browser to http://localhost:9000 and login using the minio credentials in our env file, we get to see that now our django application, used minio for storage.
Now that we have set the rails for our API to become scalable, we need to add a callback mechanism to requests. We shall begin by adding a callback mechanism.
Callback
We shall make a few modifications to our API so as to enable us to provide a callback after we are done processing an image.
- Modify our image model to include
status
andcallback_url
our new image model should look like the one below
from django.db import models
class Image(models.Model):
image_id = models.CharField(max_length=100)
image_array = models.TextField(null= True)
name = models.CharField(max_length=100)
status = models.CharField(max_length=100, default="processing")
callback_url = models.CharField(max_length=100)
date_created = models.DateTimeField(auto_now_add=True)
- On our view, we shall now accept the request_id as a payload alongside the callback_url. so now our endpoint will accept three parameters, an image file, request_id and callback_url. we shall also add a serializer to validate our inputs. Below is our new view;
# The future is now!
import uuid
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.core.files.storage import default_storage
from api.models.image import Image as Image_object
from api.serializers.image_serializer import ImageSerializer
from api.tasks.image import detect_faces
import os
from minio import Minio
from minio.error import ResponseError
from django.conf import settings
def upload_image(request, image_id):
# check if minio bucket exists
minioClient = Minio(settings.MINIO_STORAGE_ENDPOINT,
access_key=settings.MINIO_STORAGE_ACCESS_KEY,
secret_key=settings.MINIO_STORAGE_SECRET_KEY,
secure=False)
try:
minioClient.bucket_exists(settings.MINIO_STORAGE_MEDIA_BUCKET_NAME)
except ResponseError as err:
# log then create bucket
minioClient.make_bucket(settings.MINIO_STORAGE_MEDIA_BUCKET_NAME)
img = request.FILES['image']
img_extension = os.path.splitext(img.name)[-1]
return default_storage.save(image_id + img_extension, request.FILES['image'])
class Image(APIView):
def post(self, request):
image_serializer = ImageSerializer(data=request.data)
if image_serializer.is_valid():
image_id = str(uuid.uuid4())
request_id = request.data.get("request_id")
callback_url = request.data.get("callback_url")
name = upload_image(request, image_id)
image = Image_object()
image.image_id = image_id
image.request_id = request_id
image.callback_url = callback_url
image.name = name
image.save()
detect_faces.s(image_id=image_id).delay()
return Response({"status":"ok"}, status=status.HTTP_202_ACCEPTED)
else:
return Response(image_serializer.errors, status=status.HTTP_400_BAD_REQUEST)
- Next, we create a callback task that will be invoked once the face detect task is completed. For this, paste the below code to the file api/tasks/image.py
@app.task(bind=True, name='detect_faces_callback')
def detect_faces_callback(self, *args, **kwargs):
image_id = kwargs.get("image_id")
image_object = Image_object.objects.get(image_id=image_id)
filename = image_object.name
output_filename = "detected_faces/" + image_object.name
faces_on_image = Face.objects.filter(image_id=image_id)
image = PImage.open(default_storage.open(filename))
image = np.array(image)
image = image.copy()
faces_dict = list()
for face in faces_on_image:
faces_dict.append({
"confidence":face.confidence,
"box":face.box,
"keypoints":face.keypoints
})
box = json.loads(face.box)
x1, y1, width, height = box
x1, y1 = abs(x1), abs(y1)
x2, y2 = x1 + width, y1 + height
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 5)
cv2.putText(image,
"P: " + "{0:.4f}".format(float(face.confidence)),
(x1, (y2 + 25)),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, cv2.LINE_AA)
pil_im = PImage.fromarray(image)
silver_bullet = io.BytesIO()
pil_im.save(silver_bullet, format="JPEG")
image_file = InMemoryUploadedFile(silver_bullet, None, output_filename, 'image/jpeg',
len(silver_bullet.getvalue()), None)
default_storage.save(output_filename, image_file)
callback = dict({
"image_id":image_id,
"request_id":image_object.request_id,
"faces":faces_dict,
"output_image_url":"{host}/api/image/?image_id={image_id}".format(host=settings.API_HOST, image_id=image_id)
})
image_object.status = "completed"
image_object.save()
try:
requests.post(url=image_object.callback_url, data=json.dumps(callback))
except Exception as e:
# log exception
pass
return image_id
As an explanation to what we have done in that new task; the callback task, gets retrieves the processed image from our storage service, then draws the bounded faces on that image, before posting the results, plus a link to the same image to the provided callback url for the specified request.
To finalize on our API code, we shall add a get method that will allow us to download the processed image. In order to accomplish this, add the below method to our Image view.
def get(self,request):
if request.GET.get("image_id"):
image_id = request.GET.get("image_id")
image_object = Image_object.objects.get(image_id=image_id)
filename = "detected_faces/" + image_object.name
image = default_storage.open(filename).read()
content_type = magic.from_buffer(image, mime=True)
response = HttpResponse(image, content_type=content_type)
response['Content-Disposition'] = 'attachment; filename="%s"' % filename
return response
# return Response({"status": request.GET.get("image_id")}, status=status.HTTP_200_OK)
pass
We shall then chain the detect_faces task with the detect_faces_callback task on the view. The final version of our view looks like this:
# The future is now!
import uuid
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.core.files.storage import default_storage
from api.models.image import Image as Image_object
from api.serializers.image_serializer import ImageSerializer
from api.tasks.image import detect_faces, detect_faces_callback
import os
from minio import Minio
from minio.error import ResponseError
from django.conf import settings
from celery import chain
import magic
from django.http import HttpResponse
from PIL import Image as PImage
def upload_image(request, image_id):
# check if minio bucket exist
minioClient = Minio(settings.MINIO_STORAGE_ENDPOINT,
access_key=settings.MINIO_STORAGE_ACCESS_KEY,
secret_key=settings.MINIO_STORAGE_SECRET_KEY,
secure=False)
try:
minioClient.bucket_exists(settings.MINIO_STORAGE_MEDIA_BUCKET_NAME)
except ResponseError as err:
# log then create bucket
minioClient.make_bucket(settings.MINIO_STORAGE_MEDIA_BUCKET_NAME)
img = request.FILES['image']
img_extension = os.path.splitext(img.name)[-1]
return default_storage.save(image_id + img_extension, request.FILES['image'])
class Image(APIView):
def post(self, request):
image_serializer = ImageSerializer(data=request.data)
if image_serializer.is_valid() and request.FILES.get("image", None):
image_id = str(uuid.uuid4())
request_id = request.data.get("request_id")
callback_url = request.data.get("callback_url")
name = upload_image(request, image_id)
image = Image_object()
image.image_id = image_id
image.request_id = request_id
image.callback_url = callback_url
image.name = name
image.save()
chain(
detect_faces.s(image_id=image_id)|
detect_faces_callback.s(image_id=image_id)
).delay()
return Response({"status":"ok"}, status=status.HTTP_202_ACCEPTED)
else:
if not request.FILES.get("image", None):
return Response({'`image` is required'}, status=status.HTTP_400_BAD_REQUEST)
return Response(image_serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def get(self,request):
if request.GET.get("image_id"):
image_id = request.GET.get("image_id")
image_object = Image_object.objects.get(image_id=image_id)
filename = "detected_faces/" + image_object.name
image = default_storage.open(filename).read()
content_type = magic.from_buffer(image, mime=True)
response = HttpResponse(image, content_type=content_type)
response['Content-Disposition'] = 'attachment; filename="%s"' % filename
return response
# return Response({"status": request.GET.get("image_id")}, status=status.HTTP_200_OK)
pass
Now we are ready to test our asynchronous API.
Testing the asynch API
In order for us to test our simple API, we shall send a request using postman with the below image.
For the callback, we shall create a requestbin.com endpoint so as to inspect the callback we get from the API.
Below is the postman request
Looking at our requestbin endpoint, we confirm that the callback was posted, below is the screenshot
The payload that was posted has the output_image_url, when we fetch that image, we see that all the faces were detected and bounded.
At this point, our API is both asynchronous and composed of a micro-service architecture, with this architecture, we can morph it into more complex architectures but for the purposes of this post, we shall keep it simple. We are also able to scale our application with ease and since we are using docker, we have flexibility when it comes to deployment strategies. On part 3 of this series, we shall deploy our API using Kubernetes to Azure and test it out.
The code used in this post is available on this GitHub repo https://github.com/urandu/face_detect_api . If you want to test out the API locally, head to the repo and read the description on how to run the API locally.
In case of any question or comment you can reach me on twitter https://twitter.com/bnamawa