Migration of our video encoder to AWS

As a streaming site, the videos we receive every day is the core of the business. Without these videos, we would not have any traffic, so we need to give a lot of love to our video encoding platform.

We have been running our own video encoder for quite a while now. Over time, we released many different version of the encoder and the encoding team spent quite a few weeks tweaking ffmpeg parameters to get the best ratio quality/file size.

At first we were using a simple PHP application and 40 dedicated servers to execute the jobs. Then we migrated to a docker container running on Mesos and the jobs were orchestrated with Chronos. The cluster had 80 dedicated servers.
Instead of using Chronos as a scheduler, we were using it as a queue and it did not work very good for our usage. The thousands of jobs we were getting every day were not handled very well by Chronos.

The encoding team created their own queuing tool, called Aeon. A simple python application, using Redis as database, that would receive resource offers from Mesos and find the job that would match the offer. It also has a priority system, so we can process important videos faster.

The main issue there, is that our Mesos cluster has a fixed size of 80 servers, and it doesn’t scale very well. New videos have bigger resolution, higher encoding complexity and require more resources and our servers are getting older every day. And the same way the cluster doesn’t scale up very well, we also pay for 80 servers even if we don’t need them 24/7.

The total resources of the mesos cluster

We needed a way to get newer modern servers, with modern CPU architecture and we needed to be able to get as many as possible and stop paying for them when we did not need them anymore.

That’s when we decided to migrate to AWS.


AWS Batch or EC2/ECS/SQS/CloudWatch

For years already, people have been using AWS for batch processing, and AWS has been very generous and created AWS Batch. Released in December 2016, Batch handles everything for you if you want to do some batch processing.

It handles scaling up and down your processing cluster, handles your queues, retries, etc. And it is free, you only pay for the resources you use, not for Batch itself. Pretty cool.
We used it a lot for other projects, but for this project, we decided to go with our own system (The real reason is that we thought Batch compute environments were limited to 256 CPUs, and a maximum of 18 compute environments total, and this was just not enough for us. But CE can have a lot more than 256 CPUs. We realised that when we finished our own setup).

We made a list of all the AWS services we needed:
* EC2 for the servers (Spot fleet, for reduced costs)
* ECS for the docker orchestration system
* S3 as temporary storage
* SQS for the message queuing
* CloudWatch for the monitoring and the auto-scaling logic
* IAM obviously for the permissions and roles
* CloudFormation to create the entire stack (CF template provided at the end of this article)

It was working good at first sight, but when we looked at it closer, we had a few issues here and there.

First, CloudWatch only offer basic alarms, and it wasn’t enough for a good auto-scaling logic for our cluster. We were using the ApproximateNumberOfVisibleMessages from SQS to scale up, and the NumberOfEmptyReceives also from SQS to scale down. Scaling down worked great, but scaling up had a few issues. It was either too slow or too fast.
You don’t want to boot the same number of servers if you have 50 messages waiting in the queue and 1 server running than if you already have 100 servers running. 
So we create a new simple metric:
ApproximateNumberOfMessagesPerServer = ApproximateNumberOfMessagesVisible/SpotFleetTargetCapacity

Autoscaling values

We decided that the alarm threshold would be 0.2 on our new metric and it would have the following steps:

{
"AdjustmentType": "PercentChangeInCapacity",
"Cooldown": 60,
"Threshold": 0.2,
"StepAdjustments": [
{
"MetricIntervalUpperBound": 0.4,
"ScalingAdjustment": 10
},
{
"MetricIntervalLowerBound": 0.4,
"MetricIntervalUpperBound": 0.6,
"ScalingAdjustment": 15
},
{
"MetricIntervalLowerBound": 0.6,
"MetricIntervalUpperBound": 0.8,
"ScalingAdjustment": 20
},
{
"MetricIntervalLowerBound": 0.8,
"MetricIntervalUpperBound": 1,
"ScalingAdjustment": 30
},
{
"MetricIntervalLowerBound": 1,
"MetricIntervalUpperBound": 1.5,
"ScalingAdjustment": 40
},
{
"MetricIntervalLowerBound": 1.5,
"MetricIntervalUpperBound": 3,
"ScalingAdjustment": 150
},
{
"MetricIntervalLowerBound": 3,
"ScalingAdjustment": 300
}
]
}

The second issue we had was workers that were scaled-down needed to send the message back to the queue so it could be sent to another worker. And because we use spot instances, our servers might be shutdown by AWS and we also need to push these messages back to the queue.

We added a bit of python code to our worker, to monitor when the instance would go down and send the message back to the queue. Each EC2 instance has an HTTP endpoint that can be checked to see if the instance is about to be shutdown. The HTTP endpoint will return a 200 if the instance will be shutdown soon. We used a background thread to run this monitoring logic, to not block the main encoding worker.

import threading
import boto3
class InstanceActionThread(object):
def __init__(self, interval=3, receipt_handle=None,
queue_url=None, sqs_client=None):
self.interval = interval
self.sqs_client = sqs_client
self.receipt_handle = receipt_handle
self.queue_url = queue_url
        thread = threading.Thread(target=self.run, args=())
thread.daemon = True
thread.start()
    def set_receipt_handle(self, receipt_handle):
self.receipt_handle = receipt_handle
    def run(self):
while True:
r = requests.get('http://169.254.169.254/latest/meta-data/spot/instance-action')
if 200 == int(r.status_code):
# instance needs to be shutdown, exit parent process
log.warning("Scaling down EC2 instance...")
self.sqs_client.change_message_visibility(
QueueUrl = self.queue_url,
ReceiptHandle = self.receipt_handle,
VisibilityTimeout = 0
)
log.warning("Message visibility set to 0...")
os._exit(2)
            time.sleep(self.interval)

Scaled-down jobs represent ~3% of our all jobs, meaning 3% of our jobs might be processed twice. Right now we don’t check anything about the jobs when we push it back to the queue. A better approach would be to check what is the progress of the job and let it finish if the job is at 75% or more, or if the ETA if the jobs is less than 3 minutes.

We already processed about 15 000 videos on our new cluster and we haven’t noticed any error, so I think we can say this was a success.

Our new CloudWatch dashboard

###
### The Cloudformation template
###
{
"AWSTemplateFormatVersion": "2010-09-09",
"Metadata": {
"AWS::CloudFormation::Interface": {
"ParameterGroups": [
{
"Label": {
"default": "Network Configuration"
},
"Parameters": [
"SubnetId1",
"SubnetId2"
]
},
{
"Label": {
"default": "EC2 Spot Fleet"
},
"Parameters": [
"InstanceType",
"IamInstanceProfile",
"SSHKeyName",
"VolumeSize",
"SecurityGroupIds",
"SpotPrice",
"TargetCapacity",
"SpotFleetMaxCapacity",
"ApplicationAutoScalingRoleEC2",
"IAMSpotFleetRole"
]
},
{
"Label": {
"default": "Elastic Container Service"
},
"Parameters": [
"ApplicationAutoScalingRoleECS",
"ECSContainerImage",
"ECSExecutionRoleArn",
"ECSTaskRoleArn"
]
},
{
"Label": {
"default": "Extra Configuration"
},
"Parameters": [
"ScratchBucketName",
"LambdaExecutionRole"
]
}
]
}
},
"Parameters": {
"SubnetId1": {
"Description": "Subnet 1",
"Type": "AWS::EC2::Subnet::Id"
},
"SubnetId2": {
"Description": "Subnet 2",
"Type": "AWS::EC2::Subnet::Id"
},
"VolumeSize": {
"Description": "Volume size in GB",
"Type": "Number"
},
"SpotFleetMaxCapacity": {
"Description": "Maximum number of instances in the spot fleet",
"Type": "Number",
"Default": 250,
"MinValue": 1,
"MaxValue": 1000
},
"ECSContainerImage": {
"Description": "The docker image",
"Type": "String"
},
"ECSExecutionRoleArn": {
"Description": "The ECS execution role ARN",
"Type": "String"
},
"LambdaExecutionRole": {
"Description": "The Lambda execution role ARN",
"Type": "String"
},
"IamInstanceProfile": {
"Description": "The ARN of the IAM role for the EC2 instance",
"Type": "String"
},
"IAMSpotFleetRole": {
"Description": "The IAM Spot Fleet role ARN",
"Type": "String"
},
"ECSTaskRoleArn": {
"Description": "The ECS task role ARN",
"Type": "String"
},
"ApplicationAutoScalingRoleEC2": {
"Description": "IAM role for EC2 fleet auto scaling",
"Type": "String"
},
"ApplicationAutoScalingRoleECS": {
"Description": "IAM role for ECS service auto scaling",
"Type": "String"
},
"SSHKeyName": {
"Description": "Name of an existing EC2 key pair for SSH access to the EC2 instance.",
"Type": "AWS::EC2::KeyPair::KeyName"
},
"ScratchBucketName": {
"Description": "The name of the S3 scratch bucket",
"Type": "String"
},
"SpotPrice": {
"Description": "Maximum spot price",
"Type": "Number"
},
"TargetCapacity": {
"Description": "Target capacity for the spot fleet",
"Type": "Number"
},
"InstanceType": {
"Description": "Type of EC2 Instance for the web nodes. Default: t2.small",
"Type": "String",
"AllowedValues": [
"t2.nano",
"t2.micro",
"t2.small",
"t2.medium",
"t2.large",
"t2.xlarge",
"t2.2xlarge",
"m3.medium",
"m3.large",
"m3.xlarge",
"m3.2xlarge",
"m4.large",
"m4.xlarge",
"m4.2xlarge",
"m4.4xlarge",
"m4.10xlarge",
"m4.16xlarge",
"c3.large",
"c3.xlarge",
"c3.2xlarge",
"c3.4xlarge",
"c3.8xlarge",
"c4.large",
"c4.xlarge",
"c4.2xlarge",
"c4.4xlarge",
"c4.8xlarge",
"c5.large",
"c5.xlarge",
"c5.2xlarge",
"c5.4xlarge",
"c5.9xlarge",
"c5.18xlarge",
"r3.large",
"r3.xlarge",
"r3.2xlarge",
"r3.4xlarge",
"r3.8xlarge",
"r4.large",
"r4.xlarge",
"r4.2xlarge",
"r4.4xlarge",
"r4.8xlarge",
"r4.16xlarge",
"x1.16xlarge",
"x1.32xlarge",
"x1e.xlarge",
"x1e.2xlarge",
"x1e.4xlarge",
"x1e.8xlarge",
"x1e.16xlarge",
"x1e.32xlarge",
"d2.xlarge",
"d2.2xlarge",
"d2.4xlarge",
"d2.8xlarge",
"i2.xlarge",
"i2.2xlarge",
"i2.4xlarge",
"i2.8xlarge",
"i3.large",
"i3.xlarge",
"i3.2xlarge",
"i3.4xlarge",
"i3.8xlarge",
"i3.16xlarge",
"f1.2xlarge",
"f1.16xlarge",
"g2.2xlarge",
"g2.8xlarge",
"g3.4xlarge",
"g3.8xlarge",
"g3.16xlarge",
"p2.xlarge",
"p2.8xlarge",
"p2.16xlarge",
"p3.2xlarge",
"p3.8xlarge",
"p3.16xlarge"
]
},
"SecurityGroupIds": {
"Type": "List<AWS::EC2::SecurityGroup::Id>",
"Description": "Security group for the EC2 instance"
}
},
"Mappings": {
"ECSTaskResources": {
"c5.2xlarge": {
"cpu": 8,
"ram": 13000,
"downscalingthreshold": 100,
"SpotFleetMinCapacity": 2
},
"c5.4xlarge": {
"cpu": 16,
"ram": 28000,
"downscalingthreshold": 100,
"SpotFleetMinCapacity": 2
},
"c5.9xlarge": {
"cpu": 36,
"ram": 58000,
"downscalingthreshold": 50,
"SpotFleetMinCapacity": 1
}
},
"RegionOSMapping": {
"us-east-2": {
"ecs201709l": "ami-64300001"
},
"us-east-1": {
"ecs201709l": "ami-aff65ad2"
},
"us-west-2": {
"ecs201709l": "ami-40ddb938"
},
"us-west-1": {
"ecs201709l": "ami-69677709"
},
"eu-west-3": {
"ecs201709l": "ami-250eb858"
},
"eu-west-2": {
"ecs201709l": "ami-2218f945"
},
"eu-west-1": {
"ecs201709l": "ami-2d386654"
},
"eu-central-1": {
"ecs201709l": "ami-9fc39c74"
},
"ap-northeast-2": {
"ecs201709l": "ami-9d56f9f3"
},
"ap-northeast-1": {
"ecs201709l": "ami-a99d8ad5"
},
"ap-southeast-2": {
"ecs201709l": "ami-efda148d"
},
"ap-southeast-1": {
"ecs201709l": "ami-846144f8"
},
"ca-central-1": {
"ecs201709l": "ami-897ff9ed"
},
"ap-south-1": {
"ecs201709l": "ami-72edc81d"
},
"sa-east-1": {
"ecs201709l": "ami-4a7e2826"
}
}
},
"Resources": {
"SpotFleet": {
"Type": "AWS::EC2::SpotFleet",
"Properties": {
"SpotFleetRequestConfigData": {
"IamFleetRole": {
"Ref": "IAMSpotFleetRole"
},
"AllocationStrategy": "lowestPrice",
"TargetCapacity": {
"Ref": "TargetCapacity"
},
"SpotPrice": {
"Ref": "SpotPrice"
},
"TerminateInstancesWithExpiration": true,
"LaunchSpecifications": [
{
"EbsOptimized": "false",
"ImageId": {
"Fn::FindInMap": [
"RegionOSMapping",
{
"Ref": "AWS::Region"
},
"ecs201709l"
]
},
"InstanceType": {
"Ref": "InstanceType"
},
"KeyName": {
"Ref": "SSHKeyName"
},
"IamInstanceProfile": {
"Arn": {
"Ref": "IamInstanceProfile"
}
},
"BlockDeviceMappings": [
{
"DeviceName": "/dev/xvda",
"Ebs": {
"DeleteOnTermination": true,
"VolumeType": "gp2",
"VolumeSize": 8
}
},
{
"DeviceName": "/dev/xvdcz",
"Ebs": {
"DeleteOnTermination": true,
"VolumeType": "gp2",
"VolumeSize": {
"Ref": "VolumeSize"
}
}
}
],
"UserData": {
"Fn::Base64": {
"Fn::Join": [
"\n",
[
"Content-Type: multipart/mixed; boundary=\"===============BOUNDARY==\"",
"MIME-Version: 1.0",
"",
"--===============BOUNDARY==",
"Content-Type: text/cloud-boothook; charset=\"us-ascii\"",
"",
"#cloud-boothook",
{
"Fn::Sub": [
"echo 'OPTIONS=\"${!OPTIONS} --storage-opt dm.basesize=${volume}G\"' >> /etc/sysconfig/docker",
{
"volume": {
"Ref": "VolumeSize"
}
}
]
},
"",
"",
"--===============BOUNDARY==",
"Content-Type: text/x-shellscript; charset=\"us-ascii\"",
"",
"#!/bin/bash",
{
"Fn::Sub": [
"echo ECS_CLUSTER=${cluster} >> /etc/ecs/ecs.config;",
{
"cluster": {
"Ref": "ECSCluster"
}
}
]
},
"echo ECS_BACKEND_HOST= >> /etc/ecs/ecs.config;",
"echo ECS_ENABLE_CONTAINER_METADATA=true >> /etc/ecs/ecs.config;",
"export PATH=/usr/local/bin:$PATH",
"yum -y install jq",
"easy_install pip",
"pip install awscli",
{
"Fn::Sub": [
"aws configure set default.region ${region}",
{
"region": {
"Ref": "AWS::Region"
}
}
]
},
"cat <<EOF > /etc/init/spot-instance-termination-notice-handler.conf",
"description \"Start spot instance termination handler monitoring script\"",
"author \"Amazon Web Services\"",
"start on started ecs",
"script",
"echo \\$\\$ > /var/run/spot-instance-termination-notice-handler.pid",
"exec /usr/local/bin/spot-instance-termination-notice-handler.sh",
"end script",
"pre-start script",
"logger \"[spot-instance-termination-notice-handler.sh]: spot instance termination",
"notice handler started\"",
"end script",
"EOF",
"cat <<EOF > /usr/local/bin/spot-instance-termination-notice-handler.sh",
"#!/bin/bash",
"while sleep 5; do",
"if [ -z \\$(curl -Isf http://169.254.169.254/latest/meta-data/spot/termination-time)]; then",
"/bin/false",
"else",
"logger \"[spot-instance-termination-notice-handler.sh]: spot instance termination notice detected\"",
"STATUS=DRAINING",
"ECS_CLUSTER=\\$(curl -s http://localhost:51678/v1/metadata | jq .Cluster | tr -d \\\\\")",
"CONTAINER_INSTANCE=\\$(curl -s http://localhost:51678/v1/metadata | jq .ContainerInstanceArn | tr -d \\\\\")",
"logger \"[spot-instance-termination-notice-handler.sh]: putting instance in state \\$STATUS\"",
"/usr/local/bin/aws ecs update-container-instances-state --cluster \\$ECS_CLUSTER --container-instances \\$CONTAINER_INSTANCE --status \\$STATUS",
"logger \"[spot-instance-termination-notice-handler.sh]: putting myself to sleep...\"",
"sleep 120 # exit loop as instance expires in 120 secs after terminating notification",
"fi",
"done",
"EOF",
"chmod +x /usr/local/bin/spot-instance-termination-notice-handler.sh",
"",
"--==BOUNDARY==--"
]
]
}
},
"NetworkInterfaces": [
{
"DeviceIndex": 0,
"SubnetId": {
"Ref": "SubnetId1"
},
"DeleteOnTermination": true,
"Groups": {
"Ref": "SecurityGroupIds"
},
"AssociatePublicIpAddress": true
}
]
},
{
"EbsOptimized": "false",
"ImageId": {
"Fn::FindInMap": [
"RegionOSMapping",
{
"Ref": "AWS::Region"
},
"ecs201709l"
]
},
"InstanceType": {
"Ref": "InstanceType"
},
"KeyName": {
"Ref": "SSHKeyName"
},
"IamInstanceProfile": {
"Arn": {
"Ref": "IamInstanceProfile"
}
},
"BlockDeviceMappings": [
{
"DeviceName": "/dev/xvda",
"Ebs": {
"DeleteOnTermination": true,
"VolumeType": "gp2",
"VolumeSize": 8
}
},
{
"DeviceName": "/dev/xvdcz",
"Ebs": {
"DeleteOnTermination": true,
"VolumeType": "gp2",
"VolumeSize": {
"Ref": "VolumeSize"
}
}
}
],
"UserData": {
"Fn::Base64": {
"Fn::Join": [
"\n",
[
"Content-Type: multipart/mixed; boundary=\"===============BOUNDARY==\"",
"MIME-Version: 1.0",
"",
"--===============BOUNDARY==",
"Content-Type: text/cloud-boothook; charset=\"us-ascii\"",
"",
"#cloud-boothook",
{
"Fn::Sub": [
"echo 'OPTIONS=\"${!OPTIONS} --storage-opt dm.basesize=${volume}G\"' >> /etc/sysconfig/docker",
{
"volume": {
"Ref": "VolumeSize"
}
}
]
},
"",
"",
"--===============BOUNDARY==",
"Content-Type: text/x-shellscript; charset=\"us-ascii\"",
"",
"#!/bin/bash",
{
"Fn::Sub": [
"echo ECS_CLUSTER=${cluster} >> /etc/ecs/ecs.config;",
{
"cluster": {
"Ref": "ECSCluster"
}
}
]
},
"echo ECS_BACKEND_HOST= >> /etc/ecs/ecs.config;",
"echo ECS_ENABLE_CONTAINER_METADATA=true >> /etc/ecs/ecs.config;",
"export PATH=/usr/local/bin:$PATH",
"yum -y install jq",
"easy_install pip",
"pip install awscli",
{
"Fn::Sub": [
"aws configure set default.region ${region}",
{
"region": {
"Ref": "AWS::Region"
}
}
]
},
"cat <<EOF > /etc/init/spot-instance-termination-notice-handler.conf",
"description \"Start spot instance termination handler monitoring script\"",
"author \"Amazon Web Services\"",
"start on started ecs",
"script",
"echo \\$\\$ > /var/run/spot-instance-termination-notice-handler.pid",
"exec /usr/local/bin/spot-instance-termination-notice-handler.sh",
"end script",
"pre-start script",
"logger \"[spot-instance-termination-notice-handler.sh]: spot instance termination",
"notice handler started\"",
"end script",
"EOF",
"cat <<EOF > /usr/local/bin/spot-instance-termination-notice-handler.sh",
"#!/bin/bash",
"while sleep 5; do",
"if [ -z \\$(curl -Isf http://169.254.169.254/latest/meta-data/spot/termination-time)]; then",
"/bin/false",
"else",
"logger \"[spot-instance-termination-notice-handler.sh]: spot instance termination notice detected\"",
"STATUS=DRAINING",
"ECS_CLUSTER=\\$(curl -s http://localhost:51678/v1/metadata | jq .Cluster | tr -d \\\\\")",
"CONTAINER_INSTANCE=\\$(curl -s http://localhost:51678/v1/metadata | jq .ContainerInstanceArn | tr -d \\\\\")",
"logger \"[spot-instance-termination-notice-handler.sh]: putting instance in state \\$STATUS\"",
"/usr/local/bin/aws ecs update-container-instances-state --cluster \\$ECS_CLUSTER --container-instances \\$CONTAINER_INSTANCE --status \\$STATUS",
"logger \"[spot-instance-termination-notice-handler.sh]: putting myself to sleep...\"",
"sleep 120 # exit loop as instance expires in 120 secs after terminating notification",
"fi",
"done",
"EOF",
"chmod +x /usr/local/bin/spot-instance-termination-notice-handler.sh",
"",
"--==BOUNDARY==--"
]
]
}
},
"NetworkInterfaces": [
{
"DeviceIndex": 0,
"SubnetId": {
"Ref": "SubnetId2"
},
"DeleteOnTermination": true,
"Groups": {
"Ref": "SecurityGroupIds"
},
"AssociatePublicIpAddress": true
}
]
}
],
"Type": "maintain"
}
}
},
"ECSCluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {
"ClusterName": {
"Fn::Join": [
"-",
[
{
"Ref": "AWS::StackName"
},
"ECSCluster"
]
]
}
}
},
"ECSService": {
"Type": "AWS::ECS::Service",
"Properties": {
"ServiceName": {
"Fn::Join": [
"-",
[
{
"Ref": "AWS::StackName"
},
"ECSService"
]
]
},
"Cluster": {
"Ref": "ECSCluster"
},
"DeploymentConfiguration": {
"MaximumPercent": 500,
"MinimumHealthyPercent": 0
},
"DesiredCount": {
"Ref": "SpotFleetMaxCapacity"
},
"TaskDefinition": {
"Ref": "ECSTaskDefinition"
},
"LaunchType": "EC2",
"PlacementConstraints": [
{
"Type": "distinctInstance"
}
]
}
},
"ECSTaskDefinition": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"Family": {
"Fn::Join": [
"-",
[
{
"Ref": "AWS::StackName"
},
"ECSTaskDefinition"
]
]
},
"NetworkMode": "bridge",
"ExecutionRoleArn": {
"Ref": "ECSExecutionRoleArn"
},
"TaskRoleArn": {
"Ref": "ECSTaskRoleArn"
},
"RequiresCompatibilities": [
"EC2"
],
"ContainerDefinitions": [
{
"Name": {
"Fn::Join": [
"-",
[
{
"Ref": "AWS::StackName"
},
"Container"
]
]
},
"Image": {
"Ref": "ECSContainerImage"
},
"Cpu": {
"Fn::FindInMap": [
"ECSTaskResources",
{
"Ref": "InstanceType"
},
"cpu"
]
},
"Memory": {
"Fn::FindInMap": [
"ECSTaskResources",
{
"Ref": "InstanceType"
},
"ram"
]
},
"Essential": true,
"LogConfiguration": {
"LogDriver": "awslogs",
"Options": {
"awslogs-group": {
"Ref": "LogGroup"
},
"awslogs-region": {
"Ref": "AWS::Region"
},
"awslogs-stream-prefix": "ecs"
}
}
}
]
}
},
"LogGroup": {
"Type": "AWS::Logs::LogGroup",
"Properties": {
"LogGroupName": {
"Fn::Join": [
"/",
[
"ecs",
{
"Ref": "AWS::StackName"
}
]
]
},
"RetentionInDays": 3
}
},
"queuefifo": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": {
"Fn::Join": [
"",
[
{
"Ref": "AWS::StackName"
},
"_FifoQueue",
".fifo"
]
]
},
"DelaySeconds": 0,
"FifoQueue": true
}
},
"ec2alarmscaledown": {
"Type": "AWS::CloudWatch::Alarm",
"Properties": {
"AlarmName": {
"Fn::Join": [
"-",
[
{
"Ref": "AWS::StackName"
},
"ec2alarmscaledown"
]
]
},
"ActionsEnabled": "true",
"ComparisonOperator": "GreaterThanOrEqualToThreshold",
"EvaluationPeriods": 5,
"MetricName": "NumberOfEmptyReceives",
"Namespace": "AWS/SQS",
"Period": 60,
"Statistic": "SampleCount",
"Threshold": {
"Fn::FindInMap": [
"ECSTaskResources",
{
"Ref": "InstanceType"
},
"downscalingthreshold"
]
},
"AlarmActions": [
{
"Ref": "EC2ScaleDown"
}
],
"Dimensions": [
{
"Name": "QueueName",
"Value": {
"Fn::GetAtt": [
"queuefifo",
"QueueName"
]
}
}
]
}
},
"ec2alarmscaleup": {
"Type": "AWS::CloudWatch::Alarm",
"DependsOn": [
"AnalyseQueueMetrics"
],
"Properties": {
"AlarmName": {
"Fn::Join": [
"-",
[
{
"Ref": "AWS::StackName"
},
"ec2alarmscaleup"
]
]
},
"ActionsEnabled": "true",
"ComparisonOperator": "GreaterThanOrEqualToThreshold",
"EvaluationPeriods": 4,
"MetricName": "AverageNumberOfMessagesPerServer",
"Namespace": "Encoding",
"Period": 60,
"Statistic": "Average",
"Threshold": 0.2,
"AlarmActions": [
{
"Ref": "EC2ScaleUp"
}
],
"Dimensions": [
{
"Name": "QueueName",
"Value": {
"Fn::GetAtt": [
"queuefifo",
"QueueName"
]
}
}
]
}
},
"EC2ScalableTarget": {
"Type": "AWS::ApplicationAutoScaling::ScalableTarget",
"Properties": {
"MaxCapacity": {
"Ref": "SpotFleetMaxCapacity"
},
"MinCapacity": {
"Fn::FindInMap": [
"ECSTaskResources",
{
"Ref": "InstanceType"
},
"SpotFleetMinCapacity"
]
},
"RoleARN": {
"Ref": "ApplicationAutoScalingRoleEC2"
},
"ResourceId": {
"Fn::Join": [
"/",
[
"spot-fleet-request",
{
"Ref": "SpotFleet"
}
]
]
},
"ServiceNamespace": "ec2",
"ScalableDimension": "ec2:spot-fleet-request:TargetCapacity"
}
},
"EC2ScaleUp": {
"Type": "AWS::ApplicationAutoScaling::ScalingPolicy",
"DependsOn": [
"SpotFleet",
"EC2ScalableTarget"
],
"Properties": {
"PolicyName": {
"Fn::Join": [
"-",
[
{
"Ref": "AWS::StackName"
},
"EC2ScaleUp"
]
]
},
"PolicyType": "StepScaling",
"ScalingTargetId": {
"Ref": "EC2ScalableTarget"
},
"StepScalingPolicyConfiguration": {
"AdjustmentType": "PercentChangeInCapacity",
"Cooldown": 60,
"MetricAggregationType": "Average",
"MinAdjustmentMagnitude": 1,
"StepAdjustments": [
{
"MetricIntervalUpperBound": 0.4,
"ScalingAdjustment": 10
},
{
"MetricIntervalLowerBound": 0.4,
"MetricIntervalUpperBound": 0.6,
"ScalingAdjustment": 15
},
{
"MetricIntervalLowerBound": 0.6,
"MetricIntervalUpperBound": 0.8,
"ScalingAdjustment": 20
},
{
"MetricIntervalLowerBound": 0.8,
"MetricIntervalUpperBound": 1,
"ScalingAdjustment": 30
},
{
"MetricIntervalLowerBound": 1,
"MetricIntervalUpperBound": 1.5,
"ScalingAdjustment": 40
},
{
"MetricIntervalLowerBound": 1.5,
"MetricIntervalUpperBound": 3,
"ScalingAdjustment": 150
},
{
"MetricIntervalLowerBound": 3,
"ScalingAdjustment": 300
}
]
}
}
},
"EC2ScaleDown": {
"Type": "AWS::ApplicationAutoScaling::ScalingPolicy",
"DependsOn": [
"SpotFleet",
"EC2ScalableTarget"
],
"Properties": {
"PolicyName": {
"Fn::Join": [
"-",
[
{
"Ref": "AWS::StackName"
},
"EC2ScaleDown"
]
]
},
"PolicyType": "StepScaling",
"ScalingTargetId": {
"Ref": "EC2ScalableTarget"
},
"StepScalingPolicyConfiguration": {
"AdjustmentType": "PercentChangeInCapacity",
"Cooldown": 600,
"MetricAggregationType": "Average",
"MinAdjustmentMagnitude": 1,
"StepAdjustments": [
{
"MetricIntervalUpperBound": 150,
"ScalingAdjustment": -5
},
{
"MetricIntervalLowerBound": 150,
"MetricIntervalUpperBound": 250,
"ScalingAdjustment": -10
},
{
"MetricIntervalLowerBound": 250,
"MetricIntervalUpperBound": 500,
"ScalingAdjustment": -20
},
{
"MetricIntervalLowerBound": 500,
"ScalingAdjustment": -30
}
]
}
}
},
"AnalyseQueueMetrics": {
"Type": "AWS::Lambda::Function",
"Properties": {
"FunctionName": {
"Fn::Join": [
"-",
[
{
"Ref": "AWS::StackName"
},
"AnalyseQueueMetrics"
]
]
},
"Handler": "index.lambda_handler",
"MemorySize": 128,
"Role": {
"Ref": "LambdaExecutionRole"
},
"Code": {
"ZipFile": {
"Fn::Join": [
"\n",
[
"import boto3",
"import json",
"from datetime import datetime, timedelta",
"",
"CLOUDWATCH_PERIOD=300",
"",
"AWS_OBJECTS = [",
" {",
{
"Fn::Sub": [
" 'queue': '${queuename}',",
{
"queuename": {
"Fn::GetAtt": [
"queuefifo",
"QueueName"
]
}
}
]
},
{
"Fn::Sub": [
" 'spotfleet_request': '${spotfleet}',",
{
"spotfleet": {
"Ref": "SpotFleet"
}
}
]
},
" }",
"]",
"",
"def lambda_handler(event, context):",
" main()",
"",
"def retrieve_metric(c, metric, stat, results):",
" response = c.get_metric_statistics(",
" Namespace=metric['namespace'],",
" MetricName=metric['metricname'],",
" Dimensions=metric['dimensions'],",
" StartTime=datetime.utcnow() - timedelta(seconds=CLOUDWATCH_PERIOD),",
" EndTime=datetime.utcnow(),",
" Period=CLOUDWATCH_PERIOD,",
" Statistics=[stat],",
" Unit='Count'",
" )",
" dp = response['Datapoints']",
" p = dp[0]",
" results[metric['metricname']] = p[stat]",
"",
"def analyse_queue(c, aws_objects):",
" queue_name = aws_objects['queue']",
" fleet_id = aws_objects['spotfleet_request']",
"",
" metrics = [",
" {",
" 'namespace': 'AWS/EC2Spot',",
" 'metricname': 'TargetCapacity',",
" 'dimensions': [",
" {",
" 'Name': 'FleetRequestId',",
" 'Value': fleet_id",
" }",
" ]",
" },",
" {",
" 'namespace': 'AWS/SQS',",
" 'metricname': 'ApproximateNumberOfMessagesVisible',",
" 'dimensions': [",
" {",
" 'Name': 'QueueName',",
" 'Value': queue_name",
" }",
" ]",
" }",
" ]",
" cw_stats = ['Average']",
" for stat in cw_stats:",
" results = {}",
" for metric in metrics:",
" retrieve_metric(c, metric, stat, results)",
"",
" p1 = results.get('ApproximateNumberOfMessagesVisible', 0)",
" p2 = results.get('TargetCapacity', 0)",
" if p2:",
" results['ApproximateNumberOfMessagesPerServer'] = p1/p2",
" else:",
" results['ApproximateNumberOfMessagesPerServer'] = 0",
"",
" c.put_metric_data(",
" MetricData=[",
" {",
" 'MetricName': '{}NumberOfMessagesPerServer'.format(stat),",
" 'Dimensions': [",
" {",
" 'Name': 'QueueName',",
" 'Value': queue_name",
" },",
" ],",
" 'Unit': 'Count',",
" 'Value': results['ApproximateNumberOfMessagesPerServer']",
" },",
" ],",
" Namespace='Encoding'",
" )",
"",
"def main():",
" print ('INFO: Starting task')",
" c = boto3.client('cloudwatch')",
" print ('INFO: Getting metrics from CloudWatch')",
" for obj in AWS_OBJECTS:",
" analyse_queue(c, obj)",
"",
"if __name__ == \"__main__\":",
" main()"
]
]
}
},
"Runtime": "python3.6",
"Timeout": 12
}
},
"ScheduledRule": {
"Type": "AWS::Events::Rule",
"Properties": {
"Description": "ScheduledRule",
"ScheduleExpression": "rate(1 minute)",
"State": "ENABLED",
"Targets": [
{
"Arn": {
"Fn::GetAtt": [
"AnalyseQueueMetrics",
"Arn"
]
},
"Id": "TargetFunctionV1"
}
]
}
},
"PermissionForEventsToInvokeLambda": {
"Type": "AWS::Lambda::Permission",
"Properties": {
"FunctionName": {
"Ref": "AnalyseQueueMetrics"
},
"Action": "lambda:InvokeFunction",
"Principal": "events.amazonaws.com",
"SourceArn": {
"Fn::GetAtt": [
"ScheduledRule",
"Arn"
]
}
}
}
}
}