Apache NiFi and Amazon Transcribe for Machine Learning

Tim Spann
Cloudera
Published in
9 min readJan 27, 2024

NiFi Processors: GetAwsTranscribeJobStatus, StartAwsTranscribeJob

Introduction

Machine learning is revolutionizing the way we extract insights from data, but it often requires substantial data preprocessing and transformation. Apache NiFi, an open-source data integration tool, can streamline this process by efficiently collecting, routing, and transforming data from various sources. When combined with Amazon Polly, a cloud-based text-to-speech service, you can create sophisticated machine learning pipelines that process and analyze both textual and spoken data. In this article, we will explore how to use Apache NiFi and Amazon Polly in tandem to enhance your machine learning workflows.

What is Apache NiFi?

Apache NiFi is an easy-to-use, powerful data integration tool that enables the automation of data flows between various systems. It excels at collecting, routing, and transforming data in real-time, making it a valuable asset in the context of machine learning. NiFi offers a web-based user interface for designing data flows, and its flexibility allows it to be seamlessly integrated with other services and systems.

What is AWS?

What types of unstructured data are we sending in?

Valid file formats: MP3, MP4, WAV, FLAC, AMR, OGG, and WebM. I am sending MP3s in to be transcribed, for some future videos I will send my MP4 videos in to be transcribed and then I can add that text to my Youtube videos.

AWS has a service for transcribing the text from audio and video files which is very helpful for making that text available to train our Large Language Models.

Cloudera has built two processors to enable real-time data pipelines feeding in audio and video files to the Amazon Transcribe service. They are StartAwsTranscribeJob and GetAwsTranscribeJobStatus.

I will walk you through how I use it to process an audio file that I created and make that text available. The AWS service is very well documented here:

We need to build up a JSON control job script.

Sources can be audio files like mp3, wav, flac, amr, ogg or video files like mp4 or webm.

I am feeding my example from the results of another AWS ML call. In another article I am generating an MP3 file from text via AWS Polly. I can consume my JSON record from Kafka in NiFi.

{
"outputLocation":"https://s3.us-east-1.amazonaws.com/s3b/transit.ea8f-4e2a-b193-d0cd79ed463a.mp3","awscreationid":"1694461205017",
"awsoutputuri":"https://s3.us-east-1.amazonaws.com/s3b/transit.ea8f-4e2a-b193-d0cd79ed463a.mp3","speaktext":"UPDATE: Ridgewood Station: Center-Platform Elevator Returned to Service - Effective Immediately",
"uuid":"36c12782–6870–4761–9f67-b9a9d71ec055",
"PollyS3OutputBucket":"tspann",
"awsrequestdate":"Mon, 11 Sep 2023 19:40:34 GMT",
"languagecode":"en-US",
"taskstatus":"completed",
"voiceid":"Joanna",
"awsrequestid":"dd2d-454a-899b-c25f7027fd25",
"requestid":"dd2d-454a-899b-c25f7027fd25",
"awsTaskId":"ea8f-4e2a-b193-d0cd79ed463a",
"outputformat":"mp3","requestcharacters":"95",
"texttype":"text",
"taskid":"ea8f-4e2a-b193-d0cd79ed463a"
}

There are many ways to feed this flow with a source S3 URI, since I had this data in S3 already it made an easy first choice.

Building a Flow

Amazon provides a large number of powerful services for various machine learning tasks. Cloudera’s Data Flow has provided integration to a number of these so you can easily include them as part of your streaming data pipelines. I will be looking at a number of these as part of my data flows, this is the second in a series of articles. I will cover calling Amazon Transcribe.

The most important portion of this Transcribe Data Flow is Start Transcribe to initiate the AWS service. After that we immediately check the status of the service. If we are successful, we will move to the next step in the flow to build a new record. If we are still “running” or “throttled” we will pause and then check the status again via the ControlRate processor.

I dynamically build a couple of attributes to set in my payload. You could put this in parameters or have this static if that works for your pipeline.

I create a unique AWS Job Name by adding a fixed name nifi-trans-job- to a dynamically created UUID. I also take an S3 URI I had from elsewhere and add s3:// to it to make it a valid AWS S3 URI.

The first step is to start the AWS Transcribe job in the AWS region of our choice via SSL which we configure via a JSON Payload.

There are a lot of options we can configure in our payload. A very minimal payload is what I am starting with and you can try.

{
"LanguageCode": "en-US",
"Media": {
"MediaFileUri": "${s3uri}"
},
"MediaFormat": "mp3",
"OutputBucketName": "tspann",
"TranscriptionJobName": "${jobname}"
}

There are a few items that are dynamically set, this is jobname and s3uri. I build these via an Update Attribute statement. I set up an S3 URI that has my data. I have statically set my output bucket name to “tspann” where my data is stored. You can dynamically set this or make this a parameter.

We get an output location in S3 of where the output results are.

outputLocation

https://s3.us-east-1.amazonaws.com/S3Bucket/nifi-transcribe-job-279d2fae-ecbb-4064-dddd-asd.json

We also get a returned JSON record.

{
"sdkResponseMetadata" : {
"requestId" : "71af018a-ae31–41b3–9c08"
},
"sdkHttpMetadata" : {
"httpHeaders" : {
"Content-Length" : "604",
"Content-Type" : "application/x-amz-json-1.1",
"Date" : "Fri, 15 Sep 2023 20:56:34 GMT",
"x-amzn-RequestId" : "71af018a-ae31–41b3–9c08"
},
"httpStatusCode" : 200,
"allHttpHeaders" : {
"x-amzn-RequestId" : [ "71af018a-ae31–41b3–9c08" ],
"Content-Length" : [ "604" ],
"Date" : [ "Fri, 15 Sep 2023 20:56:34 GMT" ],
"Content-Type" : [ "application/x-amz-json-1.1" ]
}
},
"transcriptionJob" : {
"transcriptionJobName" : "nifi-transcribe-job-ecbb-4064-171f810423d0",
"transcriptionJobStatus" : "COMPLETED",
"languageCode" : "en-US",
"mediaSampleRateHertz" : 8000,
"mediaFormat" : "mp3",
"media" : {
"mediaFileUri" : "s3://tspann/transit.ea8f-4e2a-b193.mp3",
"redactedMediaFileUri" : null
},
"transcript" : {
"transcriptFileUri" : "https://s3.us-east-1.amazonaws.com/s3b/nifi-transcribe-job-279d2fae-ecbb-4064-b99b-171f810423d0.json",
"redactedTranscriptFileUri" : null
},
"startTime" : 1694811365918,
"creationTime" : 1694811365898,
"completionTime" : 1694811373202,
"failureReason" : null,
"settings" : {
"vocabularyName" : null,
"showSpeakerLabels" : null,
"maxSpeakerLabels" : null,
"channelIdentification" : false,
"showAlternatives" : false,
"maxAlternatives" : null,
"vocabularyFilterName" : null,
"vocabularyFilterMethod" : null
},
"modelSettings" : null,
"jobExecutionSettings" : null,
"contentRedaction" : null,
"identifyLanguage" : null,
"identifyMultipleLanguages" : null,
"languageOptions" : null,
"identifiedLanguageScore" : null,
"languageCodes" : null,
"tags" : null,
"subtitles" : null,
"languageIdSettings" : null
}
}

Rebuild the record with all the fields I want extracted from AWS’s returned JSON via JSON Path.

We extract all these values into attributes.

Attribute Values

awsrequestdate
Fri, 15 Sep 2023 20:56:34 GMT
awsrequestid
71af018a-ae31–4d1b6d8b28cf
completiontime
1694811373202
contentlength
604
creationtime
1694811365898
failurereason
Empty string set
jobstatus
Empty string set
languagecode
en-US
mediafileuri
s3://s3bucketname/transit-ea8f-4e2a-b193-d0cd79ed463a.mp3
nexttoken
Empty string set
pages
Empty string set
requestid
71af018a-ae31–4d1b6d8b28cf
starttime
1694811365918
transcriptfileuri
https://s3.us-east-1.amazonaws.com/s3buckname/nifi-transcribe-job-ecbb-4064-b99b-171f810423d0.json

Set the new filename based on the transcribe file name.

Convert attributes to a new record or flow file.

These are the attributes I want:

awsrequestdate,awsrequestid,awsTaskId,completiontime,
contentlength,creationtime,failurereason,jobname,jobnumber,
jobstatus,languagecode,mediafileuri,nexttoken,outputLocation,pages,
requestid,s3uri,starttime,transcriptfileuri,uuid

We rebuild the flow file with our chosen attributes that we already extracted.

We now have a new JSON record. Then we send this new record to a transcribe job topic in an Apache Kafka cluster hosted in Cloudera Public Cloud.

We will then get a success message and a msg count as output from sending this Kafka message. This is the control and result message of the job.

In the second portion of the data flow we read the results of the transcribe service from the stored S3 json file.

Attribute Values

mime.type
binary/octet-stream
s3.bucket
tspann
s3.encryptionStrategy
SSE_S3
s3.etag
e8c164476120b7b0fbc56799584a93cd
s3.sseAlgorithm
AES256

We then extract important fields from the file.

Attribute Values

accountid
007856030109
transcript
Update. Ridgewood station center platform elevator return to service effective immediately.

Using UpdateRecord we add some more fields to the JSON result giving us our final transcribe record.

[ {
"jobName" : "nifi-transcribe-job-ecbb-4064-b99b-171f810423d0",
"accountId" : "007856030109",
"results" : {
"transcripts" : [ {
"transcript" : "Update. Ridgewood station center platform elevator return to service effective immediately."
} ],
"items" : [ {
"start_time" : "0.009",
"end_time" : "0.74",
"alternatives" : [ {
"confidence" : "0.999",
"content" : "Update"
} ],
"type" : "pronunciation"
}, {
"start_time" : null,
"end_time" : null,
"alternatives" : [ {
"confidence" : "0.0",
"content" : "."
} ],
"type" : "punctuation"
}, {
"start_time" : "0.75",
"end_time" : "1.279",
"alternatives" : [ {
"confidence" : "0.965",
"content" : "Ridgewood"
} ],
"type" : "pronunciation"
}, {
"start_time" : "1.289",
"end_time" : "1.95",
"alternatives" : [ {
"confidence" : "0.999",
"content" : "station"
} ],
"type" : "pronunciation"
}, {
"start_time" : "1.96",
"end_time" : "2.46",
"alternatives" : [ {
"confidence" : "0.989",
"content" : "center"
} ],
"type" : "pronunciation"
}, {
"start_time" : "2.47",
"end_time" : "3.019",
"alternatives" : [ {
"confidence" : "0.997",
"content" : "platform"
} ],
"type" : "pronunciation"
}, {
"start_time" : "3.029",
"end_time" : "3.47",
"alternatives" : [ {
"confidence" : "0.999",
"content" : "elevator"
} ],
"type" : "pronunciation"
}, {
"start_time" : "3.48",
"end_time" : "3.94",
"alternatives" : [ {
"confidence" : "0.999",
"content" : "return"
} ],
"type" : "pronunciation"
}, {
"start_time" : "3.95",
"end_time" : "4.019",
"alternatives" : [ {
"confidence" : "0.999",
"content" : "to"
} ],
"type" : "pronunciation"
}, {
"start_time" : "4.03",
"end_time" : "4.76",
"alternatives" : [ {
"confidence" : "0.999",
"content" : "service"
} ],
"type" : "pronunciation"
}, {
"start_time" : "4.769",
"end_time" : "5.239",
"alternatives" : [ {
"confidence" : "0.994",
"content" : "effective"
} ],
"type" : "pronunciation"
}, {
"start_time" : "5.25", "end_time" : "5.94",
"alternatives" : [ {
"confidence" : "0.999","content" : "immediately"
} ],
"type" : "pronunciation"
}, {
"start_time" : null,
"end_time" : null,
"alternatives" : [ {
"confidence" : "0.0","content" : "."
} ],
"type" : "punctuation"
} ]
},
"status" : "COMPLETED",
"mediafileuri" : "s3://tspann/transit-ea8f-4e2a-b193-d0cd79ed463a.mp3",
"starttime" : "1694811365918",
"transcript" : "Update. Ridgewood station center platform elevator return to service effective immediately.",
"s3bucket" : "tspann",
"transcriptfileuri" : "https://s3.us-east-1.amazonaws.com/tspann/nifi-transcribe-job-279d2fae-ecbb-4064-b99b-171f810423d0.json",
"s3filename" : "nifi-transcribe-job-ecbb-4064-b99b-171f810423d0.json",
"requestid" : "71af018a-ae31–41b3–9c08–4d1b6d8b28cf",
"s3uri" : "s3://tspann/transit.ea8f-4e2a-b193-d0cd79ed463a.mp3",
"languagecode" : "en-US",
"ts" : "1694811395002",
"uuid" : "d8bfc0d5-b252–28981f4cf964"
} ]

Conclusion

We have shown how easy it is to automate integrating AWS Transcribe for Machine Learning services into a standard dataflow utilizing Cloudera’s DataFlow in the public or private cloud utilizing Apache NiFi.

We can now consume this events from Apache Kafka with tools like Cloudera SQL Stream Builder, Apache Spark or microservices.

References

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/