Distil-Whisper transcriptions with AWS Step Function and Elemental Media Convert

AWS Step Function workflow to convert video to audio and create a transcriptionwith Distil Whisper deployed on Sagemaker

Introduction

Whisper is an automatic speech recognition system trained on thousands of hours of multilingual and multitask supervised data collected from the web.

Distil-Whisper was proposed in the paper Robust Knowledge Distillation via Large-Scale Pseudo Labelling (Submitted on 1 Nov 2023). It is a distilled version of the Whisper model that is 6 times faster, 49% smaller, and performs within 1% WER on out-of-distribution evaluation sets. Compared to Whisper itself, Distil-Whisper only supports English.

The Whisper models approach human level robustness and accuracy on speech recognition, That makes these models great candidates to create transcriptions of videos and/or recorded meetings.

In this article we’ll create a pipeline that takes in a video files, extracts the audio from that file and outputs a transcription. We’ll be using AWS Media convert to create the audio files and AWS sagemaker to host and run a Distil-Whisper model.

Both extracting the audio from a video and running an inference job (with a payload bigger than 6mb) are asynchronous jobs. As such we need to chain those steps together, preferably without idle resources. AWS Step Function is a good option here.

AWS Step Functions is a cloud service from Amazon Web Services that enables you to coordinate multiple AWS services into serverless workflows so you can build and update workflow quickly. The documentation lists some typically use-cases; e.g. a job poller pattern to manage an asynchronous job with a polling loop. Just what we need (twice).

I’ve creating a minimal repository to deploy this solution; Take into account that deploying and using this solution is not fully covered by the free tier of AWS.

The repository with all the code can be found here

Transcription pipeline

Before going in details for each step of the pipeline let’s go trough it first. The pipeline gets triggered based on a new video file, e.g. when a file is uploaded to an S3 bucket. A first lambda method will create a job to extract the audio based on a pre-defined job-template and submit this job to AWS Media convert. While the job is processing, another lambda method checks the status. As long as the job is still processing, the chain will jump back to the waiting state. If the media conversion failed, the pipeline jump the the final fail state, if successful, we can proceed with the actual transcription.

Amazon SageMaker Asynchronous Inference is a feature within SageMaker that manages incoming requests by placing them in a queue for asynchronous processing. This option is ideal for requests with large payload sizes (up to 1GB), long processing times (up to one hour). Asynchronous Inference enables you to save on costs by autoscaling the instance count to zero when there are no requests to process, so you only pay when your endpoint is processing requests.

Upon invoking an asynchronous endpoint in Amazon SageMaker, the service responds with a accepted confirmation that includes the Amazon S3 location designated as outputLocation. This is where the results of the inference will be stored once completed. The InferenceId allows you to track the processing status and ultimately retrieve the inference output from the specified S3 bucket, once it becomes available. If the requests would fail, the error will be stored on S3 as well.

Similarly to the Media Convert job, a job polling pattern is implemented to track the state of the inference request.

But before an asynchronous inference request can be made on a stripped audio file, an actual Distil-Whisper model needs to be deployed first.

Let’s dive in the components

Media convert

AWS MediaConvert is a file-based video transcoding service. It lets you build flexible file-based video workflows with full control over video quality and predictable pay-as-you-go billing. We’ll be using this feature to extracting the audio track from the video.

First we’ll create a job template JSON file. The following template will create an .mp4 audio file given a video file.

{
"Settings": {
"OutputGroups": [
{
"Name": "File Group",
"OutputGroupSettings": {
"Type": "FILE_GROUP_SETTINGS",
"FileGroupSettings": {
"Destination": "<S3_BUCKET/PATH_TO_OUTPUT_FOLDER>"
}
},
"Outputs": [
{
"AudioDescriptions": [
{
"CodecSettings": {
"Codec": "AAC",
"AacSettings": {
"Bitrate": 96000,
"CodingMode": "CODING_MODE_2_0",
"SampleRate": 48000
}
},
"AudioSourceName": "Audio Selector 1"
}
],
"ContainerSettings": {
"Container": "MP4",
"Mp4Settings": {}
}
}
]
}
],
"TimecodeConfig": {
"Source": "ZEROBASED"
},
"Inputs": [
{
"TimecodeSource": "ZEROBASED",
"VideoSelector": {},
"AudioSelectors": {
"Audio Selector 1": {
"DefaultSelection": "DEFAULT"
}
}
}
]
},
"Name": "extract-audio",
"Category": "processing",
"Description": "extracting audio for transcription",
"Queue": "arn:aws:mediaconvert:eu-central-1:<ACCOUNT_ID>:queues/Default"
}

the FileGroupSettings , Destination will be overwritten in the lambda method that will create the specific job for the video file. This lambda method is based on the solution proposed in this post on the AWS blog.

Deploying a Distil-Whisper endpoint

The Distil-Whisper models are availabe via HuggingFace. They even provide some boilerplate code to deploy these models to Sagemaker. However the boilerplate will create a synchronous endpoint (which won’t work for larger audio files). The boilerplate code looks like this:

import sagemaker
import boto3
from sagemaker.huggingface import HuggingFaceModel

try:
role = sagemaker.get_execution_role()
except ValueError:
iam = boto3.client('iam')
role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

# Hub Model configuration. https://huggingface.co/models
hub = {
'HF_MODEL_ID':'distil-whisper/distil-large-v2',
'HF_TASK':'automatic-speech-recognition'
}

# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
transformers_version='4.26.0',
pytorch_version='1.13.1',
py_version='py39',
env=hub,
role=role,
)

# deploy model to SageMaker Inference
predictor = huggingface_model.deploy(
initial_instance_count=1, # number of instances
instance_type='ml.m5.xlarge' # ec2 instance type
)

from sagemaker.serializers import DataSerializer

predictor.serializer = DataSerializer(content_type='image/x-image') # change to audio/x-audio for audio

# Make sure the input file "sample1.flac" exists
with open("sample1.flac", "rb") as f:
data = f.read()
predictor.predict(data)

Great! let’s make some minor adjustments to make this an asynchronous endpoint. We’ll need to create an AsyncInferenceConfig.

from sagemaker.async_inference.async_inference_config import AsyncInferenceConfig
from sagemaker.serializers import DataSerializer

async_config = AsyncInferenceConfig(
output_path="s3://<BUCKET_NAME>/whisper/async_inference/output",
failure_path"s3://<BUCKET_NAME>/whisper/async_inference_failures/output"
)

audio_serializer = DataSerializer(content_type='audio/x-audio')

The deploy is adjusted like:

env = {
"MODEL_SERVER_WORKERS": "1",
"MMS_MAX_REQUEST_SIZE": str(500*1024*1024), #instead of default ~6.2MiB
"MMS_MAX_RESPONSE_SIZE": str(500*1024*1024), #instead of default ~6.2MiB
}

endpoint_name="distil-whisper-async"

# deploy model to SageMaker Inference
predictor = huggingface_model.deploy(
initial_instance_count=1,
instance_type='ml.m5.xlarge',
async_inference_config=async_config,
endpoint_name=endpoint_name,
serializer=audio_serializer,
env=env,
)

Job Polling pattern

Both the conversion job and the inference job are asynchronous. In order to wait on the results a polling job pattern is required, i.e. periodically check if the result (or error state) is available. If no result or error is available, then continue waiting and repeat.

Polling job pattern

The get_job API allows to request the state of a job. A job’s status can be SUBMITTED, PROGRESSING, COMPLETE, CANCELED, or ERROR. We want to continue waiting if the status is either SUBMITTED, PROGRESSING and continue to the next step in the workflow in case of the other states.

client = boto3.client('mediaconvert', endpoint_url="your-mediaconvert-endpoint")

response = client.get_job(Id=job_id)
job_status = response['Job']['Status']

#Handle the job status and return a "success" or "failure" response

The async inference jobs (even though there is an inferenceId) do not have an API that can be used to request the current state. Instead you must check wether or not there is a inference result (or error) put on the S3 locations provided in the response. (this is actually what the get_result() method in the sagemaker python package checks as well).

# handle incoming event (from the previous step)

s3_client = boto3.client('s3')
try:
s3_client.head_object(Bucket=output_bucket, Key=output_key)
return {
'status': 'success',
'message': 'Output is available.',
'InferenceId': event['InferenceId'],
'OutputLocation': output_location,
'FailureLocation': failure_location
}
except Error as e:
# try the failure location ...

During the request of the async inference you can also define what the maximum duration can be before timeout error (up to one hour).

Chaining the steps

You have the option to define the workflow in a JSON file; however, utilizing the visual builder for AWS Step Functions makes the process much more comprehensible and enjoyable! Plus, you can download the workflow in JSON format and incorporate it into your infrastructure as code.

AWS Step Function — full workflow

You can also fully define this flow with CDK (Cloud Development Kit). Yet, there is the small issue that their is not object to invoke a Sagemaker endpoint in the aws-stepfunctions-tasks library.

If you still want to use CDK you need to create a CustomState and write the definition for Sagemaker Async Inference state:

const invokeWhisperAsyncInference = new CustomState(this, 'InvokeWhisperAsyncInference', {
stateJson: {
Type: "Task",
Parameters: {
EndpointName : props.whisperEndpointName,
ContentType : "audio/x-audio",
InvocationTimeoutSeconds : Duration.seconds(900).toSeconds(),
"InputLocation.$" : "$.file_location"
},
Resource: "arn:aws:states:::aws-sdk:sagemakerruntime:invokeEndpointAsync"
},

Next Steps

With solution you have an end-to-end workflow to create the transcription of a video using a state of the art speech to text model. You might want to extend on this, e.g.

  • Introduce a parallel workflow of inserting and updating database entry with regard to the transcription variation of the original video
  • Post-process the transcription; e.g. filtering profanity, removing PII, determine sentiment, etc…
  • In the repository all resources are put in one fat stack. Bringing this to an actual production environment would benefit from a nicer structure.

Keep in mind, Distil-Whisper is English only!

Closing notes

Using just a few AWS services and Step Functions you can create a coordinated serverless workflow to create audio transcriptions using state-of-the art speech-to-text models. The new Distil-Whisper models are smaller in size and faster execution which allows to run them on small M5 Instances!

This solution allows you to keep the workflow, but, when a newer model would be published, you can easily swap the model out and keep all other logic untouched.

The repository with all the code can be found here

Have fun! Let me know what you think / how these articles can be improved!

--

--

Pieterjan Criel @pjcr
Product & Engineering at Showpad

👨‍👩‍👧‍👦 Dad of two 🇧🇪 Ghent 🎈 @Balloon_inc / @Aicon_inc 👨‍💻 Coding 🧪 Data science 📈 Graph enthusiast 👨‍💻 Principal Engineer @showpad