Copying TB’s of data between s3 buckets

Hari Ohm Prasath
Javarevisited
3 min readSep 12, 2019

--

Problem statement:

Part of our regular production upgrade we were trying backup data in s3 bucket with

Item Count: 1,000,344 and Size: ~130 GB

We were basically initiating a backup using the regular s3 commands like:

aws s3 cp --recursive s3://<bucket>>
aws s3 sync s3://<bucket> s3://<bucket>>

During execution we noticed it took hours and hours to perform the copy there is no way to make it faster, only workaround we found is to run these aws commands in parallel in multiple terminals so they all can operate on different s3 partitions at the same time and perform copy faster, which is neither a elegant solution nor scalable.

Other options:

We tried couple other options mentioned in stack overflow and AWS forums like

S3 Batch operations

S3 batch operations seems to be solve this problem but at this point of time it doesn’t support it on objects encrypted based on KMS key. When I created a job to copy the contents of the bucket with KMS key encryption enabled got the following error:

Unsupported encryption type used: SSE_KMS

When I read more about this AWS docs it stated under “Specifying a Manifest” section → Manifests that use server-side encryption with customer-provided keys (SSE-C) and server-side encryption with AWS KMS managed keys (SSE-KMS) are not supported

https://docs.aws.amazon.com/AmazonS3/latest/dev/batch-ops-basics.html#specify-batchjob-manifest

s3-dist-cp

s3-dist-cp seems to be promising but when I ran it against a bucket with had closer to 6 TB of data the job failed while running “reduce” task after 40 minutes without any clear indication why it failed

Custom approach:

Unfortunately none of the above mentioned approaches solved our problem so we came up with this approach. This approach can be further optimized, so think as a first step to solve this problem.

Its a 2 step process, which is a combination of shell script and spark code. First we need to generate the record file (with object keys), then running a spark code to copy the files in parallel across nodes in multiple tasks

Generating the record file:

We need to generate a text file containing object keys of the items inside the source s3 bucket (that will be copied), this can be done by running this command on any EC2 instances:

aws s3 ls s3://test_bucket --recursive | awk '{print $4}' > /tmp/output.txt

Output: (just object keys one in each line)

data/solution=33/test1.mov
data/solution=33/test2.mov, etc

Spark code:

sql.read()
.textFile(file)
.repartition(2000)
.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split("\n")).iterator(), Encoders.STRING())
.map((MapFunction<String, String>) s -> String.format("aws s3 cp %s s3://%s/%s", String.format("s3://%s/%s", source, s), target, s), Encoders.STRING())
.foreachPartition((ForeachPartitionFunction<String>) iterator -> {
while (iterator.hasNext())
Runtime.getRuntime().exec(iterator.next()).waitFor();
});

Spark Submit:

spark-submit — class com.s3.S3Copy s3://test_bucket/copier.jar test_bucket back_up_bucket s3://test_bucket/output.txt

args[0] → Source bucket

args[1] → Target bucket

args[3] → s3 record file generated in previous step

This code will read the “output.txt” file and splits into multiple partitions and runs them in parallel across multiple nodes.

Performance Test

With 15 EMR core nodes each of m4.xlarge instance type we were able to copy 5.5 TB of data in less than 40 minutes. Since we pay EMR only for the time we use its cost effective (cost can be further reduced by going with SPOT or EC2 fleet configuration) and much scalable compared to the previous approach.

Spark submit:

spark-submit — conf spark.network.timeout=420000s — conf spark.executor.heartbeatInterval=410000s — conf spark.yarn.scheduler.mode=FAIR — conf spark.shuffle.service.enabled=true — conf spark.serializer=org.apache.spark.serializer.KryoSerializer — conf spark.executor.memoryOverhead=1024 — conf spark.driver.memoryOverhead=1024 — conf spark.executor.instances=74 — conf spark.executor.cores=6 — conf spark.driver.cores=6 — conf spark.driver.memory=10g — conf spark.executor.memory=10g — conf spark.default.parallelism=888 — deploy-mode cluster — master yarn — conf spark.sql.broadcastTimeout=360000 — class com.s3.S3Copy s3://dmp-dms-k8s-dev-fico-pto-tenant/copier.jar test_bucket back_up_bucket s3://test_bucket/output.txt

--

--