Apache NiFi and Amazon Textract for Machine Learning

Tim Spann
Cloudera
Published in
6 min readJan 28, 2024

NiFi Processors: GetAwsTextractJobStatus, StartAwsTextractJob.

Introduction

In the real world we often receive documents, pdfs, faxes and items that have been scanned or hand written. This data is needed for training LLMs and for storage in open data lakehouses. Fortunately Amazon and Cloudera have an easy, no code solution to processing these unstructured documents into valuable data.

We will utilize Cloudera Data Flow (powered by Apache NiFi) to ingest these files from regular file systems and the infinite cloud storage that is AWS S3. We will then feed PDFs and JPGs to AWS Textract service via native NiFi processors. We will then send our results to Kafka for further processing and utilization by downstream systems. We can also write this data right to Apache Iceberg.

Building a Flow

Document Ingest — Option 1

Let’s ingest from a local file system for the use case that I have just scanned some documents and have them on my local machine.

Step 1: ListFile — list all the local files.

Step 2: FetchFile — retrieve the file

Step 3: PutS3Object — upload the file to AWS S3 Cloud storage

Document Ingest — Option 2

We have documents stored in AWS S3 Cloud Storage, which happens a lot. This is a great place to store and share files for processing anywhere.

Step 1: ListS3 — list all files in a bucket. This is ListObjects V2.

Step 2: RouteOnAttribute — filter out anything that is not JPG or PDF.

Step 3: FetchS3Object — we will fetch the filename from the list.

Step 4: UpdateAttribute — set the attribute s3.key with the value of our retrieved filename.

Now we have the same code for either option.

Step 5: StartAwsTextractJob — we start the AWS Textract Job via this processor. This job requires that we set a AWSCredentialsProviderControllerService with a valid login with permissions to execute this job. We will be running Document Analysis on the document sent. We need to set SSL and pick a region such as US East (N. Virginia) and pass in a JSON payload like below. Since this is Apache NiFi, we are allowed to do expression language to do some scripting.

StartAwsTextractJob JSON Payload

{
"ClientRequestToken": "${s3.key:substringBefore('.')}",
"DocumentLocation": {
"S3Object": {
"Bucket": "${s3.bucket}",
"Name": "${s3.key}"
}
},
"FeatureTypes": [ "TABLES" ],
"OutputConfig": {
"S3Bucket": "tspann",
"S3Prefix": "analysis"
}
}

Step 6: GetAwsTextractJobStatus — Check the status of our job via AWS Task ID

Step 7: ControlRate (running, throttled status) — Our status is not success yet, so pause and then check job status again

Step 8: UpdateAttribute — Rebuild the S3 URL

Step 9: EvaluateJsonPath — Build a new output format from some of the returned fields.

Example: $.sdkHttpMetadata.httpHeaders.Date, $.sdkResponseMetadata.requestId, $.documentMetadata.pages

Step 10: SplitJson — Break out sub records via $.blocks

Step 11: SplitRecord — make sure we are single records at a time

Step 12: UpdateRecord — our enrichment step to add all the metadata in attributes to our actual JSON record. We add fields like timestamp, unique ID, request date and request ID.

Step 13: PublishKafkaRecord_2_6 1.23.1.2.1.6.0–323 — In lucky step 13, we send our new JSON record to our Cloudera Kafka topic, “awstextractresults”.

Step 14: RetryFlowFile — this executes if our publish kafka failed. This will let us try a few times and then fail somewhere else. Often on failure, we can send the records to S3.

For some ideas on enhancements, you can send various records to AWS S3 via PutS3Object, JSON records to an Apache Iceberg Open Data Lakehouse via PutIceberg or to various other data stores including DynamoDB, Kinesis, SNS, Lambda, SQS, sFTP, Salesforce, Snowflake, Splunk, Apache HBase, Apache Kudu, Apache Hive, Apache Solr, Slack, Discord or Apache Ozone.

Lineage

Output

[ {
"blockType" : "WORD",
"confidence" : 99.926735,
"text" : "the",
"textType" : "PRINTED",
"rowIndex" : null,
"columnIndex" : null,
"rowSpan" : null,
"columnSpan" : null,
"geometry" : {
"boundingBox" : {
"width" : 0.016876873,
"height" : 0.017086018,
"left" : 0.7919099,
"top" : 0.47273117
},
"polygon" : [ {
"x" : 0.7921849,
"y" : 0.47273117
}, {
"x" : 0.80878675,
"y" : 0.47386703
}, {
"x" : 0.8084805,
"y" : 0.4898172
}, {
"x" : 0.7919099,
"y" : 0.4886628
} ]
},
"id" : "c986bae5–69a5–43d3-bb0e-2371fa818c13",
"relationships" : null,
"entityTypes" : null,
"selectionStatus" : null,
"page" : 1,
"query" : null,
"s3url" : "analysis/a057e5a71c7ef2acce9392f690287905a63b584efa8897034875493c825f6f9b/1",
"jobstatus" : "SUCCEEDED",
"s3etag" : "0e0c8a6099497a130c778b1d195c3611",
"s3bucket" : "tspann",
"awstaskid" : "a057e5a71c7ef2acce9392f690287905a63b584efa8897034875493c825f6f9b",
"ts" : "1695823847357",
"filename" : "ingest.jpg",
"uuid" : "a5af820d-a1a0–424c-8085-d64845e99da8",
"s3key" : "ingest.jpg",
"awsrequestid" : "$.awsrequestid",
"pages" : "1",
"awsrequestdate" : "Wed, 27 Sep 2023 14:10:46 GMT"
} ]

Text Extract JSON Result (1 of many records)

{"blockType":"WORD","confidence":99.89907,"text":"their","textType":"PRINTED","rowIndex":null,"columnIndex":null,"rowSpan":null,"columnSpan":null,"geometry":{"boundingBox":{"width":0.023984551,"height":0.01710292,"left":0.86240387,"top":0.70826226},"polygon":[{"x":0.86279845,"y":0.70826226},{"x":0.8863884,"y":0.7102997},{"x":0.8859504,"y":0.7253652},{"x":0.86240387,"y":0.723304}]},"id":"0a3717fb-8d7f-43fa-9c31–6c36de042f29","relationships":null,"entityTypes":null,"selectionStatus":null,"page":1,"query":null,"s3url":"analysis/a057e5a71c7ef2acce9392f690287905a63b584efa8897034875493c825f6f9b/1","jobstatus":"SUCCEEDED","s3etag":"0e0c8a6099497a130c778b1d195c3611","s3bucket":"tspann","awstaskid":"a057e5a71c7ef2acce9392f690287905a63b584efa8897034875493c825f6f9b","ts":"1695823847365","filename":"ingest.jpg","uuid":"df63d855-dda2–4c43–8a60-b0fd1a4daec7","s3key":"ingest.jpg","awsrequestid":"$.awsrequestid","pages":"1","awsrequestdate":"Wed, 27 Sep 2023 14:10:46 GMT"}

Apache Kafka Records in Cloudera Streams Messaging Manager (SMM)

Conclusion

By combining Apache NiFi’s data integration capabilities with Amazon Textract service, you can create powerful machine learning pipelines that analyze text documents and provide you results.

References

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/