Running Spring Integration as AWS Lambda Function — Serverless
In this blog post I will demonstrate how a SpringIntegration application within SpringBoot can be run as AWS-Lambda Function (Serverless). Purpose of writing this article is to socialize the integration between the AWS Lambda, SpringBoot and SpringIntegration.
In normal application scenario we will be using the Poller of an InboundChannelAdapter to fetch the file to process by polling at regular intervals. This will spin up an asynchronous process to work on a file. If we use the same technique in AWS-Lambda, poller will detach from the AWS Lambda to process file which causes the function to assume that it is complete to exit while the SpringIntegration will be processing the file in the background.
This article focus on developing a function to process the file to solve the above scenario.
Pre-requites
- A working SpringIntegration Application that runs within SpringBoot Container.
- Knowledge of AWS.
- Access to AWS.
- IAM role to execute Lambda.
Introduction
This article will demonstrate the capability of when the vendor/client delivers the File to S3, an event is fired to execute a Lambda Function. Consider the below simple flow:
File retrieval is an external process to our exercise. Once file is delivered to S3, a event is triggered to invoke the Lambda Function. Lambda function exposes the RequestHandler to process the Event.
RequestHandler will read the file from S3 and calls SpringBoot Application as below:
GetObjectRequest request = new GetObjectRequest(new S3ObjectId(bucket, key));
s3Client.getObject(request, localFile);
boolean success = localFile.exists() && localFile.canRead();
if (success) {
Map<String, String> headers = new HashMap<>(3);
headers.put(Constants.FILE_NAME, getFileName());
headers.put(Constants.BUCKET_NAME, getBucketName());
headers.put(Constants.BUCKET_PREFIX, getPrefix());
headers.put(Constants.S3_KEY, key);
Application application = new Application();
application.processFileMessage(localFile.getAbsolutePath(), headers);
}
Interesting part in the above code is:
boolean success = localFile.exists() && localFile.canRead();
Will let the application know that the download of the file is complete locally (Lambda allows to download the file only to /tmp) and ready to read.
Application application = new Application();
application.processFileMessage(localFile.getAbsolutePath(), headers);
Calls the SpringBoot Application with necessary Parameters to process the file.
In SpringBoot application we have the above depicted integration flow. Integration point to between the SpringBoot and SpringIntegration is by the MessagingGateway as follows:
public void processFileMessage(String localFilePath, Map<String, String> headers) {
SpringApplication.run(Application.class, new String[0]).registerShutdownHook();
IS3EventGateway s3EventGW = (IS3EventGateway.getSpringBean("s3Gateway");
s3EventGW.processFile(MessageBuilder
.withPayload(localFilePath).copyHeaders(headers).build());
}
Above snippet depicts the incoming the parameters to fetch file. First line to start the SpringApplication. In the second line we retrieve the Bean instance for MessagingGateway. To execute the file processing in the same thread we will create the message using the path and header information to MessagingGateway to deliver the message to the InputFiles Channel.
MessagingGateway provides us the capability to abstract the underlying Messaging Platform. This way when using the SourcePollingChannelAdapter(Poller) which starts a async process will wait for the reply in Main Thread blocking the AWS Lambda Function. To enable the MessagingGateway, enable @IntegrationComponentScan in root configuration class. IS3EventGateway Sample Code Snippet:
@MessagingGateway(name = "s3Gateway", defaultRequestChannel = "inputFiles")
public interface IS3EventGateway {
@Gateway
void processFile(Message<String> message);
}
From here the Spring Integration flow will take care. We plan not to send the file in memory through channel as it will be huge and decided that Splitter to read the file line by line using the “FileUtils.lineIterator”. Only downside is to close the LineIterator in the finally block.
Lambda may present the same Container for executing the subsequent calls if the execution is completed well within the 5 minutes limitation. In that scenario, application will throw javax.management.InstanceAlreadyExistsException as the application already exists in the JVM. Solution to the exception and make the SpringBoot app faster considering the Lambda nature, configure the following exclude list:
@SpringBootApplication(exclude = { AopAutoConfiguration.class, DataSourceAutoConfiguration.class,
EmbeddedServletContainerAutoConfiguration.class, WebMvcAutoConfiguration.class, JmxAutoConfiguration.class, PersistenceExceptionTranslationAutoConfiguration.class, RedisAutoConfiguration.class, WebClientAutoConfiguration.class })
@IntegrationComponentScan("com.test.file.integration")
public class Application implements ApplicationContextAware {
Based on the above flow, we can integrate to the backend using any Output Adapter to send the final message either to MessageBroker or Database or API.
To conclude, running Spring Integration within SpringBoot Container as AWS Lambda Function needs MessagingGateway as an entry point to run on the same thread and return control to the Lambda function to complete the process graciously. If the processing exceeds more than 5 minutes, then we can implement a MapReduce algorithm to Split the huge file into smaller files to process within 5 minutes and aggregate the results of the smaller files to final result.