Filter and Enrich all your events with EventBridge Pipes

Eric Bach
AMA Technology Blog
4 min readNov 6, 2023

One of the useful features with Amazon EventBridge is how EventBridge Pipes can simplify the complexity involved in setting up point-to-point messaging and communication between services. This is typically referred to as the “glue” code used to integrate services with each other which adds additional noise to the existing application code.

Photo in collaboration with Ave Calvar on UnSplash

EventBridge Pipes is a feature of Amazon EventBridge that provides a way to build the integrations between these services removing the need to write this “glue” code. It achieves this by through 4 steps:

  1. Source: Accepting the source of events, typically a DynamoDB stream, Kinesis, and SQS.
  2. Filtering: Optionally, filtering the events the target is interested in by defining an event pattern.
  3. Enrichment: Enriching the event (optionally), by transforming it or pulling additional data from a source like Lambda, a Step Function, or an API.
  4. Target: Delivering the event to an AWS service, another event bus, or an API destination.
EventBridge Pipes

Use Case — Email Delivery

One common event-driven use case is to send email notifications to signal an event has happened. For example, when an order is created in a DynamoDB table we would want to email the invoice to the customer; or when a user deleted some important information in the same table we would want to be notified.

In an event-driven approach we can make use of patterns like EventBridge to SNS, Kinesis to SNS, or Lambda to SNS to asynchronously process the notification to the email subscription.

We can enable DynamoDB Streams to send events directly to EventBridge Pipes, where events can further be filtered and enriched before delivery to SNS with an email subscription. The image below illustrates this use case.

Steps to send an email notification using Pipes

In the EventBridge Pipes console, we can see how we can make use of the filtering and encrichment steps to send our notification.

EventBridge Console

All of this has the significant advantage of requiring absolutely none of the “glue” code in our application, only our CDK infrastructure code to setup the EventBridge Pipes. Not only is our integration much simpler but we no longer need to worry about maintaining testable code in our application anymore.

Getting Started

To get this setup we first must create a DynamoDB table with streams enabled, our EventBridge Pipe, the SNS topic with email subscription, and then finally configure our permissions.

  1. In our CDK stack we first define our DynamoDB table and enable streams.
const dataTable = new Table(this, 'DataTable', {
tableName: 'DataTable',
billingMode: BillingMode.PAY_PER_REQUEST,
partitionKey: {
name: 'pk',
type: AttributeType.STRING,
},
sortKey: {
name: 'sk',
type: AttributeType.STRING,
},
removalPolicy: RemovalPolicy.DESTROY,
stream: StreamViewType.NEW_IMAGE,
});

2. Next, in the same CDK stack, we define our EventBridge Pipe and role. The filterCriteria includes our pattern that tells Pipe to only process DynamoDB INSERT events with a type = invoice.

const pipeRole = new Role(this, 'PipesRole', {
assumedBy: new ServicePrincipal('pipes.amazonaws.com'),
});

const pipe = new CfnPipe(this, 'Pipe', {
roleArn: pipeRole.roleArn,
source: dataTable.tableStreamArn,
sourceParameters: {
dynamoDbStreamParameters: {
startingPosition: StartingPosition.LATEST,
batchSize: 1,
},
filterCriteria: {
filters: [
{
pattern: '{ "eventName": ["INSERT"],
"dynamodb": { "NewImage": { "type": { "S": ["invoice"] } } } }',
},
],
},
},
target: sendEmailFunction.functionArn,
});

3. With our Pipe created we can define our SNS Topic with an email subscription as a target.

const snsTopic = new Topic(this, 'SNSTopic', {
displayName: 'SNSTopic'
});

snsTopic.addSubscription(new EmailSubscription('hello@email.com'));

4. Lastly, we give the Pipe access to read the DynamoDB Stream and to publish a message to the SNS Topic.

dataTable.grantStreamRead(pipeRole);
snsTopic.grantPublish(pipeRole);

With all of this living entirely in our CDK infrastructure code, we can get setup with email notifications from events in our system without writing any actual code.

Summary

We have seen how to make use of EventBridge Pipes to filter and enrich our event for further asynchronous processing. The benefit of using this architecture is that the glue code is no longer necessary, reducing the responsibility from your code and even saving costs.

Using EventBridge Pipes also ensures that event ordering is maintained. However there are a few limitations to take note of:

  • There are a limited number of sources today such as Kinesis, DymamoDB streams, EventBridge, and SQS
  • A pipe may only have one source and one target
  • EventBridge Pipe rules are limited to one pattern if you choose to filter the event (Enhanced Filtering is now supported to include multiple conditions)

I hope you found this helpful and a useful pattern to incorporate in you application!

Eric Bach is a Senior Software Developer @Alberta Motor Association who enjoys learning, reading, and writing about leadership principles, event-driven microservices, and all things AWS.

--

--

Eric Bach
AMA Technology Blog

Senior Software Developer @ amaabca | AWS Certified x 2 | Domain Driven Design | Event Driven Architecture | CQRS | Microservices