Using AWS S3 Batch Operations with Lambda in Scala

Kirsey Fam
disney-streaming
Published in
8 min readOct 1, 2020
Photo by Luke Paris on Unsplash

S3 Batch Operations is a relatively recent feature of Amazon Simple Storage Service (S3) that enables processing of hundreds, thousands, millions, or even billions of S3 objects in a simple and straightforward fashion. It allows for copying objects from one bucket to another, setting tags or access control lists (ACLs), initiating a restore from Glacier, or invoking an AWS Lambda function on each object. AWS Lambda is an event-driven, serverless computing platform that runs pieces of code called Lambda functions.

This post will focus on how to use S3 Batch Operations to invoke a Lambda function written in Scala, with sbt as our build tool. Note: the same result is achievable with other Scala build tools.

Request and Response

When configured correctly, an S3 Batch Operations job sends a JSON request in a particular format to the Lambda function to be invoked on the S3 objects. In turn, it expects the Lambda function to return a JSON response in a specific format indicating the success or failure of the invocation.

Request

The following is an example of a JSON request that is sent from S3 Batch Operations to the Lambda function:

{
"invocationSchemaVersion": "1.0",
"invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo",
"job": {
"id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce"
},
"tasks": [
{
"taskId": "dGFza2lkZ29lc2hlcmUK",
"s3Key": "mediaImage1.png",
"s3VersionId": "1",
"s3BucketArn": "arn:aws:s3:us-east-1:0123456789:sample-bucket"
}
]
}

The most important fields for processing S3 objects in the Lambda function are s3Key, s3VersionId, and s3BucketArn, which are wrapped in a task object. Each task references an S3 object that the Lambda function should process. The other fields are metadata about the S3 Batch Operations job.

At the time of writing, S3 Batch Operations only sends one task per request.

Response

The following is an example of a JSON response that the Lambda function must return to the S3 Batch Operations job:

{
"invocationSchemaVersion": "1.0",
"treatMissingKeysAs": "PermanentFailure",
"invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo",
"results": [
{
"taskId": "dGFza2lkZ29lc2hlcmUK",
"resultCode": "Succeeded",
"resultString": "Processed mediaImage1.png"
}
]
}

Response and Result Codes

Amazon S3 Batch Operations expects two different status codes in the response payload. The first is the response code for the entire request, and the second is a per-task result code. The following table contains the response codes and their associated behaviors.

Lambda Request Handler

AWS Lambda Functions require the implementation of an interface that consumes and processes data. The specific details vary between different languages, but for a Lambda function written in Java or Scala, there are two interfaces for handler methods. Those interfaces are documented here.

To handle the S3 Batch Operations request/response format, the RequestStreamHandler will be implemented in this Lambda function. In addition, circe will be used for serialization and deserialization of the request and response.

Dependencies

To use these libraries, the respective dependencies should be added to thebuild.sbt file.

libraryDependencies ++= Seq(
"com.amazonaws" % "aws-lambda-java-core" % "1.2.1",
"io.circe" %% "circe-core" % "0.12.3",
"io.circe" %% "circe-generic" % "0.12.3",
"io.circe" %% "circe-parser" % "0.12.3"
)

Model

First, the models for the request need to be defined. The definition of these models captures the structure of the data and allows for ease of deserialization with circe. Circe is a popular JSON library in Scala that allows semi-automatic generation of JSON codecs for data types using annotations. The @JsonCodec annotation is used to derive the JSON encoders and decoders for the models defined below.

import io.circe.generic.JsonCodec// Input from S3 Batch Operations to Lambda
@JsonCodec
case class S3BatchRequest(
invocationSchemaVersion: String,
invocationId: String,
job: S3BatchJob,
tasks: List[S3BatchTask]
)
// S3 Batch Job details
@JsonCodec
case class S3BatchJob(id: String)
// S3 Batch Task model
@JsonCodec
case class S3BatchTask(
taskId: String,
s3Key: String,
s3VersionId: String,
s3BucketArn: String
) {
lazy val s3BucketName: String = s3BucketArn.split(":").last
}

And similarly for the response:

// Output from Lambda to S3 Batch Operations
@JsonCodec
case class S3BatchLambdaResponse(
invocationSchemaVersion: String,
treatMissingKeysAs: String,
invocationId: String,
results: List[S3BatchResult]
)
// Output Result for each S3 Batch task
@JsonCodec
case class S3BatchResult(
taskId: String,
resultCode: String,
resultString: String
)

Handler

With the RequestStreamHandler interface, Lambda passes an InputStream and OutputStream to the handleRequest method, which needs to be overridden to implement the interface. The handler method should read bytes from the input stream, do some processing, then write the expected result to the output stream.

import com.amazonaws.services.lambda.runtime.{Context, RequestStreamHandler}                       class S3BatchOperationsLambdaHandler extends RequestStreamHandler {  override def handleRequest(
is: InputStream,
os: OutputStream,
context: Context
): Unit = {
// processing logic here
}
}

This class extends the RequestStreamHandler interface and overrides the handleRequest method as required. In addition, the implementation of this method must handle the request and response format expected from S3 Batch Operations.

import io.circe.parser.decode                       
import io.circe.syntax._
import software.amazon.awssdk.services.s3.model.S3Exception import scala.io.Source
override def handleRequest(
is: InputStream,
os: OutputStream,
context: Context
): Unit = {
val reqString = Source.fromInputStream(is).mkString
val s3BatchRequest = decode[S3BatchRequest](reqString) match {
case Right(batchReq) =>
batchReq
case Left(ex) =>
// if an exception is thrown during the Lambda invocation,
// then the ResponseCode is automatically set to
// "PermanentFailure".
throw ex
}
val results = processTasks(s3BatchRequest.tasks)
returnResults(os, s3BatchRequest, results)
}

Here, the handleRequest method reads in the bytes from the input stream as a string, then decodes that JSON string into the S3BatchRequest model defined earlier. Having deserialized the data, the S3 objects that appear in the task list from the request can then be retrieved and processed.

Additionally, helper methods are defined which contain logic for processing the objects and communicating the results back to the job.

def processTasks(
tasks: List[S3BatchTask]
): List[S3BatchResult] =
for {
task <- tasks
} yield {
val res = processObject(task.s3BucketName, task.s3Key)
val (resultCode, resultString) = res match {
case Right(res) => ("Succeeded", res)
case Left(ex: S3Exception)
if ex.awsErrorDetails().errorCode() == "RequestTimeout" =>
("TemporaryFailure", ex.getMessage)
case Left(ex) => ("PermanentFailure", ex.getMessage)
}
S3BatchResult(
taskId = task.taskId,
resultCode = resultCode,
resultString = resultString
)
}
def processObject(
s3BucketName: String,
s3Key: String
): Either[Exception, String] = {
val path = s"$s3BucketName/$s3Key"
println(path)
Right(path)
}

The processObject method is called for each task, receiving the name of the S3 bucket and the key of the target object as parameters. In a real use-case, the object could be retrieved from S3 to perform further processing, such as updating JSON fields or removing sensitive data. To keep things simple in this post, the object’s path is simply printed and returned as the result string.

After the object has been processed, an indication as to whether the operation succeeded or failed is required. Earlier in the post, three possible response codes were listed: Succeeded, TemporaryFailure, and PermanentFailure. If a task is marked as TemporaryFailure, S3 Batch Operations automatically retries that task before the job completes.

The results of the processing are then wrapped in an S3BatchResult object, as defined earlier. The result of each task is then returned in the response to S3 Batch Operations.

def returnResults(
os: OutputStream,
req: S3BatchRequest,
results: List[S3BatchResult]
): Unit = {
output = S3BatchLambdaResponse(
invocationSchemaVersion = req.invocationSchemaVersion,
treatMissingKeysAs = "PermanentFailure",
invocationId = req.invocationId,
results = results
)
outputBytes = output.asJson.noSpaces.getBytes()
os.write(outputBytes)
}

In this method, the results from processing, as well as the metadata relevant to the job, are included in the S3BatchLambdaResponse model. The response is then serialized as a JSON string using circe, the bytes of which are written to the output stream.

At this point, the implementation of the Lambda function is complete.

Deployment

To use the function with S3 Batch Operations, it must be packaged and deployed. To package the code into a JAR, the sbt-assembly plugin can be used. Following setup of the plugin, the project can be packaged by running the following command:

sbt assembly

Now that the project has been assembled into a JAR file, it can be uploaded as a Lambda function via the AWS console or the AWS CLI, as outlined in the documentation.

Creating a Job

Manifest

To create an S3 Batch Operations job, the objects upon which to invoke the Lambda function need to be specified. This is done by defining a manifest file, which can be in the form of either an S3 Inventory report or a CSV file. For more details on how to specify a manifest, as well the difference between the two types, see the AWS docs.

IAM Policies

After specifying the manifest, the IAM roles of the Lambda function, as well as that of the S3 Batch Operations job, need to be defined.

Example Lambda IAM policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion",
"s3:PutObject",
"lambda:InvokeFunction"
],
"Resource": "*"
}
]
}

The s3:GetObject permission is used for retrieving the manifest, while the s3:PutObject permission is needed in order to write the report for the S3 Batch Operations job. The lambda:InvokeFunction permission is necessary for invoking the Lambda function from the job.

Example S3 Batch Operations job IAM policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:us-east-1:012345678910:function:myS3BatchOperationsLambdaFunction"
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": "arn:aws:s3:::sample-bucket"
}
]
}

With these IAM roles in place, an S3 Batch Operations job is ready to be created.

Using the AWS CLI

One way to create an S3 Batch Operations job is to use the create-job command in the CLI. The following example creates an Amazon S3 Batch Operations job that invokes a Lambda function using the AWS CLI.

aws s3control create-job \
--account-id <AccountID> \
--no-confirmation-required \
--operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:Region:AccountID:function:LambdaFunctionName" } }' \
--manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::ManifestLocation","ETag":"ManifestETag"}}' \
--report '{"Bucket":"arn:aws:s3:::sample-bucket", "Format":"Report_CSV_20180820", "Enabled":true, "Prefix":"ReportPrefix", "ReportScope":"AllTasks"}' \
--priority 2 \
--role-arn arn:aws:iam::AccountID:role/BatchOperationsRole \
--region Region \
--description "S3 Batch Lambda Invoke" \

Unless the --no-confirmation-required flag is passed, the newly created job must be run and confirmed using the console.

For more on the create-job CLI command, see the AWS CLI Command Reference. It is important to note that the manifest type is specified in the --manifest option. For manifests created from S3 Inventory Reports, this parameter should be "S3InventoryReport_CSV_20161130". For manifests created using regular CSV format, "S3BatchOperations_CSV_20180820" should be used, and the Fields value should be set.

Using the AWS Console

As the process for creating an S3 Batch Operations job using the console is quite straightforward and has become more streamlined as of late, the AWS docs on Creating an Amazon S3 Batch Operations Job and Operations should suffice for that method.

Viewing the Results

While an S3 Batch Operations job is running, the average success rate of all previously processed objects will be displayed in real-time. If at least 50% of a job’s operations have failed after more than 1000 operations have been attempted, the job automatically fails and ceases further operations.

Once the job has completed its run, results will be presented on a per-object basis in a CSV file saved in the location specified for the report.

Conclusion

S3 Batch Operations is a powerful feature of S3 for processing massive amounts of data, made more versatile through a Lambda integration. This post detailed how to write a Lambda function for use with S3 Batch Operations in Scala.

S3 Batch Operations shines most when processing millions or billions of objects, or large amounts of data in general. For example, if one needs to remove certain data from all S3 objects, or if a multitude of JSON files need to be flattened and output as Parquet files, an S3 Batch Operations job integrated with Lambda would handle the task quite well while scaling readily, processing millions of objects within hours.

--

--