Amazon EventBridge Pipes: stitch AWS services together

Guido Nebiolo
5 min readFeb 1, 2023

--

TLDR: thank to Amazon EventBridge Pipes, Cloud Developers can now create direct connections between AWS services as event producers and consumers, with the added flexibility of filtering, transforming, and enhancing events.

Betweeen the annoucements done during the AWS re:Invent 2022 (here a recap of the 8 most impactful for Cloud developers), one above the others has captured my attention: Amazon EventBridge Pipes.

In Cloud native architectures, multiple services are generating events and different consumers receive them, often discarding the ones not needed based on the information retrieved from the event payload.

The idea behind this service is very simple, but quite effective. Amazon EventBridge Pipes standardizes the way to connect event producers and consumers, optionally allowing to filter and enrich and/or transform the events before delivering to the target.

point-to-point integrations between event producers and consumers

Let’s take a deep dive into which situations would benefit the most from this new service.

Dismiss glue code that connect different services.

Think about the scenario in which events coming from a DynamoDB Stream needs to be sent to a FIFO SQS Queue to scale the processing between multiple workers maintaining the messages order.

Lambda architecture
const { unmarshall } = require("@aws-sdk/util-dynamodb");
const { SQSClient, SendMessageCommand } = require("@aws-sdk/client-sqs");

const QUEUE_URL = process.env.QUEUE_URL

const sqsClient = new SQSClient();

exports.lambdaHandler = async (event, context) => {
const records = event.Records
.filter(record => record.eventName === "INSERT")
const promises = records.map(record => {
const dynamo_event = unmarshall(record.dynamodb.NewImage);
const params = {
MessageBody: JSON.stringify(dynamo_event),
QueueUrl: QUEUE_URL
}
return sqsClient.send(new SendMessageCommand(params));
})
const result = await Promise.allSettled(promises)
// Partial batch failure handling
return {
batchItemFailures: records
.filter((e, i) => result[i].status === "rejected")
.map(e => ({
itemIdentifier: e.eventID
}))
}
}

Developers know that every line of code adds an hidden cost to their projects, known as technical debt.

In this scenario, a developer can now replace the code above with the EventBridge Pipes integration, lowering the technical debt of the solution.

EventBridge Pipes architecture

Lower the cost avoiding not needed events handling.

Looking at the code above, we are propagating to the queue only the INSERT events but the Lambda function is invoked for all the events coming from the stream.

In a scenario in which millions of events are generated from record updates in the table and only a few new record are created, EventBridge Pipes can lower the running cost of the solution, avoiding to send not needed events. In terms of pricing, EventBridge Pipes billing counts only the events which pass the filter stage.

Imagine having 100,000,000 events, with a 5% rate of INSERT: processing them with a Lambda function can cost around 6.68 $ (assuming 10 elements batches from stream and 10 seconds for processing time with 256 MB of memory), using EventBridge Pipes the cost can be lowered to 2 $.

We can have even more benefits when the Enrich/Transform process is done with an external API, leveraging the direct integration between EventBridge Pipes and EventBridge API Destination instead of having a Lambda function actively waiting for the API response.

Simplify integrations with automatic delivery retries.

Keeping in mind the API Destination, here is another code snippet to send the content to a custom API instead of an SQS queue.

Lambda architecture
const { unmarshall } = require("@aws-sdk/util-dynamodb");
const axios = require('axios')

const API_ENDPOINT = process.env.API_ENDPOINT

async function wait(millis) {
await new Promise(resolve => setTimeout(() => {
resolve()
}, millis))
}

async function sendToApi(payload, retries = 3, temptative = 0) {
try {
return await axios.post(API_ENDPOINT, payload)
} catch (error) {
if (temptative >= retries) {
throw new Error("Maximum attempts reached")
} else {
await wait(2 ** (temptative + 1) * 1000)
return await sendToApi(payload, retries, temptative + 1)
}
}
}

exports.lambdaHandler = async (event, context) => {
const records = event.Records
.filter(record => record.eventName === "INSERT")
const promises = records.map(record => {
const payload = unmarshall(record.dynamodb.NewImage)
return sendToApi(payload)
})
const result = await Promise.allSettled(promises)
// Partial batch failure handling
return {
batchItemFailures: records
.filter((e, i) => result[i].status === "rejected")
.map(e => ({
itemIdentifier: e.eventID
}))
}
}

In this scenario, the worst case keep up to almost 15 seconds before failing the processing of a record, increasing exponentially the processing time of the Lambda function and so the cost paid for the compute time.

EventBridge Pipes ensures reliable event processing by automatically retrying enrichment and target invocation in the event of failures invoking AWS services or custom implemented target APIs. These retries will use an exponential back-off, gradually decreasing the calling rate, without affecting the cost of the solution.

EventBridge Pipes architecture

This feature has the same benefits of before, reducing the costs of running custom code and providing reliable and scalable solution to integrate different services.

Right now, EventBridge Pipes supports the following services as event sources: Amazon DynamoDB, Amazon Kinesis, Amazon Managed Streaming for Apache Kafka (Amazon MSK) alongside self-managed Apache Kafka, Amazon SQS (standard, FIFO), and Amazon MQ (ActiveMQ, RabbitMQ).

Moreover it supports 15 Amazon EventBridge targets, including AWS Lambda, Amazon API Gateway, Amazon SNS, Amazon SQS, and AWS Step Functions.

You can start creating a Pipe in EventBridge today inside the AWS console or with AWS CLI, AWS CloudFormation and AWS CDK.

References:

--

--

Guido Nebiolo

Innovation Manager and AWS Ambassador @ Reply | AWS Community Builder and 13x AWS Certified | g.nebiolo@reply.it