Large File Processing using Apache Camel with AWS S3

Harsh Sharma
Globant
Published in
5 min readApr 23, 2021

Introduction

Apache Camel is an Open Source integration framework that empowers you to quickly and easily integrate various systems consuming or producing data. Camel supports most of the Enterprise Integration Patterns from the excellent book by Gregor Hohpe and Bobby Woolf, and newer integration patterns from microservice architectures to help you solve your integration problem by applying best practices out of the box.

For more detailed info kindly visit — Camel Site

And in an earlier post, my colleague Rakesh Ravlekar has explained the approach we followed here

And In this post, I will be sharing details on large file processing use case.

Use case

Our requirement was to read a large file (> 1.5 GB) from the AWS S3 bucket via Camel, process it, and then upload the generated output file (> 1.5 GB) in the destination S3 bucket.

Learning by example

Learning with a concrete example is always easier! So that’s why in this post, I’ll show you how to create a camel route for this scenario.

  1. Reading a file from S3 — To read a file from S3, we have two options as below :
  • Reading the Whole File — This operation is only suitable if you have a file less than 25 MB.

Ex — aws2-s3://test-bucket?s3Client=#client&repeatCount=1&deleteAfterRead=false&fileName=testfile.dat

  • Reading File in chunks — In this approach, you need to set an additional camel S3 option called getObjectRange. This option is used to download the specified range bytes of an object. And in this post, we will be using this option to read the large file data and for more information about the HTTP Range header, see

Ex — aws2-s3://test-bucket?s3Client=#client&repeatCount=1&deleteAfterRead=false&fileName=testfile.dat&operation=getObjectRange

2. Uploading a file to S3 bucket — To upload a file to S3, we have two options as below :

  • Upload the whole file — This option is only suitable if the size is less than 25 MB.

Ex — aws2-s3://test-bucket?s3Client=#client

  • Multi-part upload — To upload files in parts we need to set additional two options called multiPartUpload and partSize. This option is used to upload files larger than 25 MB but you can change the part size as I changed to 10MB that means the size of one upload part is 10 MB.

Ex — aws2-s3://test-bucket?s3Client=#client&multiPartUpload=true&partSize=10485760

More insides about the camel AWS S3 component will find here.

For reading the file in chunks we must have file length to convert into ranges so the next question is how will you get to know the file length? And here comes the AWS S3 SDK, we need to make additional HeadObjectRequest to S3 as below -

HeadObjectRequest headObjectRequest = HeadObjectRequest.builder().bucket(“test-bucket”).key(“testfile.dat”).build();
HeadObjectResponse headResponse = s3Client.headObject(headObjectRequest);
int fileLength = headResponse.contentLength().intValue();

Note — “To deal with the S3 operations we must have s3Client and you can create with the help of ApacheHttpClient library”.

After having file length we need to define ranges as per our own logic.
For eg. — let’s say if the file size is 8192000 bytes then the range array list contains 4 objects- (0, 2047999) (2048000–4095999).. till the end of bytes, and we will iterate this list of Ranges using split body function of the camel.

Ok, So far we did our basic steps now let’s create the route for this use case.

Camel Route Definition

from(direct(“start”))
.noStreamCaching()
.onCompletion()
.process(postProcessor)
.choice()
.when(header(AWS2S3Constants.CONTENT_LENGTH).isGreaterThan(MULTIPART_LIMIT))
.to(“aws2-s3://test-bucket?s3Client=#client&multiPartUpload=true&partSize=10485760”)
.otherwise()
.to(“aws2-s3://test-bucket?s3Client=#client”)
.end()
.end()
.split(body())
.streaming()
.process(exchange -> {
ItemDto item = (ItemDto) exchange.getIn().getBody();
exchange.getIn().setHeader(AWS2S3Constants.RANGE_START, item.getFrom());
exchange.getIn().setHeader(AWS2S3Constants.RANGE_END, item.getTo());
exchange.getIn().setHeader(AWS2S3Constants.KEY, testfile.dat);

})
.to(“aws2-s3://test-bucket?s3Client=#client&repeatCount=1&deleteAfterRead=false&fileName=testfile.dat&operation=getObjectRange”)
.process(FileProcessor)
.marshal(bindy)
.to(file(tempFilePath).fileExist(“Append”).fileName(TEMP_FILE_NAME))
.end();

Route Explanation

  1. Execution starts with a direct endpoint named as start and on that direct endpoint, we will pass our created range array list.

To initiate the route you have 2 options-

  • Using Producer Template -

ProducerTemplate pt = context.createProducerTemplate();
pt.sendBody(“start”,createRangeData());

createRangeData method will return List<ItemDto>

  • Call from any route -

from(timer(“startTimer”).repeatCount(1)).noStreamCaching()
.process(e -> e.getIn().setBody(createRangeData()))
.to(direct(“start”)).end();

2. For iterating the range list we used split body function to get and process the range object further.

3. I added one processor which will add some required S3 headers like Key, start range & end range.

4. Once the header is ready we need to fetch the particular range data from S3. For that, I have used a function in order to request the range object.

5. After receiving the range object we need to process it. The processing logic depends on your business scenario. the getObjectRange operation will return a stream of bytes in camel Exchange Hence, To get that stream inside the processor you need to add the below code -

ResponseInputStream res = (ResponseInputStream)exchange.getIn().getBody();
byte[] messageBytes = res.readAllBytes();
And these message bytes will be used further as per your logic.

Camel also gives a facility to process data in parallel as -
Using parallel stream — For that you need to add .parallelProcessing() function in route and also you can override the default ThreadPoolProfile in camel context as — context.getExecutorServiceManager().setDefaultThreadPoolProfile(MyThreadPoolProfile);
Using Executor or any other framework — Inside your processor if you want to process your data in parallel using any of the multithreading frameworks like Executor Service or Akka etc.

click here for more details on parallelProcessing in camel.

6. Once your processing is done we need to marshall the data using camel bindy to generate a CSV file, but you can marshall it into fixed-length files, etc as per your business needs.

7. Then we will send that data to the file component.

Note — “this is only one range of processed data we are storing to the local file system or you can say we are collecting the data one by one and once the data has been collected for all ranges then we will upload that file to s3”.

8. To initiate upload we added onCompletion hook and added post-processor, which runs for final processing of data hence inside that we only set the file object to the exchange body.

File file = new File(tempFilePath + “/” + TEMP_FILE_NAME);
message.setBody(file);

9. Finally we will make an upload request to s3 with multiPartUpload option.

Conclusion

So this is how we could make the large file processing with Camel and AWS S3. We are able to process almost 1.5 Gb file in less than 15 mins on our current AWS configuration (CPU — 1024 Units & RAM — 2048 Mib).

Hope you liked the write-up. Please leave me a comment for any questions/comments you have!

--

--