Cree un Pipeline de ingesta de datos serverless con CDK, AWS S3 y API Gateway

JugueDev
7 min readMay 20, 2023

--

… y un bonus para poder disparar un State Function con EventBridge.

Primero:

Para poder agilizar el proceso de desarrollo de este proyecto, así como mantener una consistencia en el despliegue y mejorar la seguridad se utilizará IaC a través del Cloud Development Kit (CDK) provisto por AWS. Específicamente se usará Typescript para el despliegue de la infraestructura.

DESPLEGAMOS LOS ARCHIVOS BASE DE CDK

Creamos el esqueleto base de nuestra aplicación en CDK, para eso tenemos que instalar el toolkit de AWS e inicializar nuestra aplicación.

#Creamos una carpeta de trabajo
mkdir cdkcode
cd cdkcode

#Instalar el toolkit de AWS CDK
npm install -g aws-cdk

# create a typescript cdk project (sample-app is a predefined template)
cdk init sample-app --language typescript

Al ejecutar este comando se nos generará el siguiente set de archivos. Dado que no es un tutorial enfocado totalmente en CDK, no vamos entrar al detalle de el funcionamiento de cada archivo. solo nos vamos a enfocar en aquellos archivos que se encuentren en la carpeta lib.

Inicialmente vamos a borrar todo el contenido de el archivo cdkcode-stack.ts y lo reemplazamos por el siguiente código.

import { Duration, Stack, StackProps } from 'aws-cdk-lib';
import { Construct } from 'constructs';

export class CdkcodeStack extends Stack {

constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);


}
}

ARQUITECTURA

El siguiente diagrama define a grandes razgos el despliegue que se usará en esta prueba:

CREAMOS UN S3

Para añadir nuestro primer recurso, un bucket de S3, vamos a crear un nuevo stack al cual llamaremos ingestion-stack, para esto basta con crear un nuevo archivo llamado ingestion-stack.ts en en la carpeta lib y añadir el siguiente código en el archivo cdkcode-stack:

import { Duration, Stack, StackProps } from 'aws-cdk-lib';
import { IngestStack } from './ingestion-stack';
import { Construct } from 'constructs';

export class CdkcodeStack extends Stack {

constructor(scope: Construct, id: string) {
super(scope, id, props);

const IngestionStack = new IngestStack(this, 'ingestionStack', {
});

}
}

Y el siguiente código en el ingestion-stack-code.ts:

import { Construct,  } from 'constructs';

export class IngestStack extends Construct {

constructor(scope: Construct, id: string, props: IngestProps) {
super(scope, id);

}
}

Le pasamos un objeto (en este caso el nombre del bucket) del cdkcode-stack al ingestion-stack a través de los props:

    const IngestionStack = new IngestStack(this, 'ingestionStack', {
bucketname: 'ingestion-bucket-data-jg',
});

Y desde el ingestion-stack lo usamos para poder crear el bucket con ese nombre:


export interface IngestProps {
// props que usa el ingestion-stack
bucketname: string,
}

export class IngestStack extends Construct {

constructor(scope: Construct, id: string, props: IngestProps) {
super(scope, id);

// Bucket donde se almacenarán los archivos
const dataBucket = new s3.Bucket(this, "ingestion-bucket-id", {
bucketName: props.bucketname,
eventBridgeEnabled: true,
});


}
}

De este modo ya se tiene definido el código necesario para desplegar un bucket de S3. Para poder compilar y crear realmente el recurso en nuestra cuenta de AWS. Es necesario que configuremos de algún modo las credenciales del usuario IAM, para ello podemos usar el comando:

aws configure

Una vez configuradas nuestras credenciales ejecutamos los siguientes comandos para poder crear los recursos:

# Primero sintetizamos el stack en un template de Cloudformation
cdk synth

# Instalamos un stack predefinido que incluya un S3 para almacenar plantillas
cdk bootstrap

# Muestra los cambios en nuestra aplicación de CDK y que acciones ejecutara (destroy, create)
cdk diff

# Desplegamos stack (es necesario configurar credenciales con cdk configure)
cdk deploy

De este modo ya habríamos desplegado el bucket en nuestra cuenta.

CREAMOS EL API GATEWAY

Una vez creado el bucket podemos crear un API Gateway para ello primero debemos definir un rol de IAM para que el API GW pueda guardar objetos en el bucket de S3:

    // Rol de IAM a asignar al Api Gateway
const apiGWRole = new iam.Role(this, "api-gateway-role-id", {
assumedBy: new iam.ServicePrincipal("apigateway.amazonaws.com"),
roleName: "API-Gateway-S3-Integration-Role",
description: "Rol de IAM para que un AP GW pueda guardar archivos en S3",
});

// Añademos un Policy al rol de IAM
apiGWRole.addToPolicy(
new iam.PolicyStatement({
resources: [this.dataBucket.bucketArn + "/*"],
actions: ["s3:PutObject"],
})
);

// API Gateway para ingestar los archivos
this.apiGateway = new api_gw.RestApi(this, "data-api-id", {
restApiName: "Files receiver",
description: "Recibe archivos para poder almacenarlos en S3.",
binaryMediaTypes: ["*/*"],
endpointTypes: [api_gw.EndpointType.REGIONAL],
defaultMethodOptions: {
apiKeyRequired: true
}
});

Luego creamos un API REST con API Gateway:

    // API Gateway para ingestar los archivos
this.apiGateway = new api_gw.RestApi(this, "data-api-id", {
restApiName: "Files receiver",
description: "Recibe archivos para poder almacenarlos en S3.",
binaryMediaTypes: ["*/*"],
endpointTypes: [api_gw.EndpointType.REGIONAL],
defaultMethodOptions: {
apiKeyRequired: true
}
});

A este API REST le asignamos un plan de uso, que sirve principalmente para definir un control en el uso de sus recursos:

    // Añadimos un usage plan para la API
const basicUsagePlan = this.apiGateway.addUsagePlan('UsagePlan', {
name: "MyUsagePlan",
apiStages: [{
api: this.apiGateway,
stage: this.apiGateway.deploymentStage
}],
throttle: {
burstLimit: 100, // Límite máximo de uso en simultáneo
rateLimit: 200
},
quota: {
limit: 10000, // Cantidad máxima de peticiones
period: api_gw.Period.MONTH
},
description: "Ingestion API GW usage plan."
})

Y adicionalmente configuramos un API KEY para este Usage Plan:

// Añadimos un API Key autogenerado
const basicApikey = this.apiGateway.addApiKey( `MyAPIkey`, {
//apiKeyName, // this can be autogenerated
description: `APIKey para asegurar la API de ingesta.`,
})

basicUsagePlan.addApiKey(basicApikey);

CREAMOS LOS RECURSOS

Para los recursos del API Gateway es necesario que creemos primero una integración con el servicio de S3. En esta integración se definen el path, el método, las credenciales del role de IAM que creamos anteriormente y el formato de las respuestas a los request de la API.

    // Añadir una integración para el API Gateway con S3
const s3Integration = new api_gw.AwsIntegration({
service: "s3",
integrationHttpMethod: "PUT",
path: `${this.dataBucket.bucketName}/ingestion/{key}`,
options: {
credentialsRole: apiGWRole,
integrationResponses: [
{
statusCode: "200",
responseParameters: {
"method.response.header.Content-Type": "integration.response.header.Content-Type",
},
},
],
requestParameters: {
"integration.request.path.key": "method.request.path.key",
},
},
});

Esta integración de S3 se añade a el API Gatwway a través de un método:

    //Añadir los recursos al API Gateway y un método ligado a la integración de S3
this.apiGateway.root
.addResource("upload")
.addResource("{key}")
.addMethod("PUT", s3Integration, {
methodResponses: [
{
statusCode: "200",
responseParameters: {
"method.response.header.Content-Type": true,
},
},
],
requestParameters: {
"method.request.path.key": true,
},

}
);

}

Procedemos a deployar nuevamente los recursos con cdk deploy y podremos visualizar nuestro API Gateway creado:

Junto con sus recursos:

Para poder probar su funcionamiento requerimos el API key que se puede obtener en esa misma interfaz asi como …

… El URL de invocación:

Probamos la ingesta archivos con curl:

# Para la ingesta de archivos
curl -i -H "x-api-key: {api-key}" -X PUT 'https://{api_id}.execute-api.{region}.amazonaws.com/prod/upload/file_destino.csv' --data-binary '@file_origen.csv'

De ese modo el archivo habrá sido cargado a s3:

BONUS

Adicionalmente se puede utilizar las notificaciones de S3 para disparar algún otro evento con S3, para ello solo hace falta habilitar esta funcionalidad al momento de crear el bucket:

    // Bucket donde se almacenarán los archivos 
this.dataBucket = new s3.Bucket(this, "ingestion-bucket-id", {
bucketName: props.bucketname,
eventBridgeEnabled: true, // ESTA LINEA
});

Y crear una regla en eventbridge que dispare el recurso que tu quieras.

  // Regla de eventbridge que detecta objetos creados en S3
const eventRule = new events.Rule(this, "s3-object-created", {
ruleName: "s3-object-created",
eventPattern: {
source: ["aws.s3"],
detailType: ["Object Created"],
detail: {
bucket: {
name: [props.dataBucket.bucketName]
},
object: {
key: [{
prefix: "ingestion"
}]
}
}
}
});

Por ejemplo para disparar un step function se puede utilizar el siguiente código:


// Creamos un rol para ejecutar un step function
const invokeStepFunctionRole = new iam.Role(this, "invoke-step-function-role-id", {
assumedBy: new iam.ServicePrincipal("events.amazonaws.com"),
roleName: "Invoke-Step-Function-Role",
description: "Rol de IAM para invocar el Step Function.",
});

// Añademos un Policy al rol de IAM
invokeStepFunctionRole.addToPolicy(
new iam.PolicyStatement({
resources: [SFMachine.stateMachineArn],
actions: ["states:StartExecution"],
})
);

// Añadimos un target a la regla del evento
eventRule.addTarget(new targets.SfnStateMachine(SFMachine, {
role: invokeStepFunctionRole
}));

Tips

  1. Es importante mantener el orden en nuestro repositorio de CDK, cada vez que necesitemos crear recursos adicionales para una aplicación distinta (pipeline de procesamiento, ML) se deben crear los recursos en un stack diferente.
  2. Para poder asegurar aún más los recursos de nuestra API se pueden añadir metodos adicionales autentificación y validación, sigue la guía de AWS.
  3. Se puede encontrar el código de esta prueba en el siguiente repo de Github.

Referencias

--

--

JugueDev

Solutions Architect | AWS x5 | Databricks x2 | Azure x1 | IoT | Machine Learning | Electronic Enginee