Creating Stream Processors with AWS Lambda Functions
There Can Be More Than One
If you follow my posts and books than you know that I think the most critical piece of the puzzle to increasing team confidence and accelerating continuous delivery is controlling the blast radius when teams make honest human errors. This involves fortifying the boundaries between autonomous services with bulkheads on both sides. We accomplish this by replacing all inter-server synchronous communication with an event-first approach and leveraging the System Wide Event Sourcing and CQRS patterns. I discuss how streaming plays an important role in treating events as first-class citizens, here, and how to map events to the physical world, here. I have also discussed how I manage the complexity of deploying many serverless functions, here.
Stream processing is a core part of this approach. In this post I introduce my stream processing framework: aws-lambda-stream and demonstrate how to efficiently and effectively process multiple event types with a single function containing multiple pipelines.
aws-lambda-stream
The event signature for many Lambda functions is an array containing a micro-batch of event.Records
. Functional Reactive Programming (FRP) is the cleanest approach for processing these streams. This library provides a light-weight framework for creating these stream processors. The underlying streaming library is Highland.js, replete with features like filter, map, reduce, backpressure, batching, parallel processing and more.
Basic Usage
The following examples show how to implement basic handler functions for consuming events from a Kinesis stream and a DynamoDB Stream. A key thing to note is that the code you see here is responsible for assembling the steps in the stream pipeline. The final step, toPromise
returns a Promise from the handler function. Then the promise starts consuming from the stream and the data starts flowing through the steps. The data is pulled through the steps, which provides natural backpressure. The promise will resolve once all the data has passed through all the stream steps or reject when an unhandled error is encountered.
Example: Listener Function
This example processes a Kinesis stream and materializes the data in a single DynamoDB table. The details are explained below.
import { fromKinesis, toPromise } from ‘aws-lambda-stream’;export const handler = async (event) =>
fromKinesis(event)
.filter(onEventType)
.map(toUpdateRequest)
.through(update({ parallel: 4 }))
.through(toPromise);
Example: Trigger Function
This example processes a DynamoDB Stream and publishes CUD events to a Kinesis stream. The details are explained below.
import { fromDynamodb, toPromise } from ‘aws-lambda-stream’;export const handler = async (event) =>
fromDynamodb(event)
.map(toEvent)
.through(publish({ batchSize: 25 }))
.through(toPromise);
Creating a stream from a lambda event
The first step of a stream processor transforms the incoming Records into a stream, like such: _(event.Records)
. The various from
functions, such as fromKinesis
and fromDynamodb
, normalize the records into a standard Event
format. The output is a stream of UnitOfWork
objects.
UnitOfWork Type (aka uow)
Think of a uow
as an immutable object that represents the scope
of a set of variables
passing through the stream. More so than ever, we should not use global variables in stream processors. Your processor steps will add new variables to the uow
for use by downstream steps (see Mapping below). This scoping is crucial when we leverage the parallel processing and pipeline features.
interface UnitOfWork {
record: any;
event?: Event;
batch?: UnitOfWork[];
}
- record — the original record
- event — the standardized event
- batch — an array of uow that should succeed or fail together
Event Type
The various streaming and messaging channels each have their own formats. We want to decouple the processing logic from the choice of these technologies. Thus all published events conform to the following Event
format. This also provides for polymorphic-like processing. This standard format is also leveraged in the event-lake and micro-event-store features.
interface Event {
id: string;
type: string;
timestamp: number;
partitionKey?: string;
tags: { [key: string]: string | number };
<entity>: any;
raw?: any;
}
- id— a unique deterministic value
- type — generally the namespace, domain entity and action performed
- timestamp — epoch value when the action was performed
- partitionKey — generally the entity id or correlation id to ensure related events can be processed together
- tags — a generic place for routing information. A standard set of values is always included, such as
account
,region
,stage
,source
,functionname
andpipeline
. - <entity> — a canonical entity that is specific to the event type. This is the contract that must be held backwards compatible. The name of this field is usually the lowerCamelCase name of the entity type, such as
thing
forThing
. - raw — this is the raw data and format produced by the source of the event. This is included so that the event-lake can form a complete audit with no lost information. This is not guaranteed to be backwards compatible, so use at your own risk.
Filters
For a variety of reasons, I generally multiplex many event types through the same stream. I discuss this in detail in the following post: Stream Inversion & Topology. Thus, we use filter
steps with functions like onEventType
to focus in on the event types of interest and perform content based routing in general.
// all event types starting with `thing-`
const onEventType = uow => uow.event.type.match(/thing-*/);
Mapping
Many stream processor steps map the incoming data to the format needed downstream. The results of the mapping are adorned to the uow
as a new variable. The uow
must be immutable, so we return a new uow
by cloning the original uow
with the spread operator and adorning the additional variable. There are various utils provided to assist.
.map((uow) => ({
...uow,
variableName: {
// mapping logic here
}
}))
This is the function used in the Listener Function example above.
const toUpdateRequest = (uow) => ({
...uow,
updateRequest: { // variable expected by `update` util
Key: {
pk: uow.event.thing.id,
sk: ‘thing’,
},
...updateExpression({
...uow.event.thing,
discriminator: ‘thing’,
timestamp: uow.event.timestamp,
}),
...timestampCondition(),
}
});
This is the function used in the Trigger Function example above.
const toEvent = (uow) => ({
...uow,
event: { // variable expected by the `publish` util
...event,
thing: uow.event.raw.new, // canonical
}
});
Connectors
At the end of a stream processor there is usually a sink step that persists the results to a datastore or another stream. These external calls are wrapped in thin Connector
classes so that they can be easily mocked for unit testing.
These connectors are then wrapped with utility functions, such as update
and publish
, to integrate them into the streaming framework. For example, the promise returned from the connector is normalized to a stream, fault handling is provided and features such as parallel and batch are utilized.
These utility functions leverage currying to override default configuration settings, such as the batchSize and the number of parallel asyn-non-blocking-io executions.
Here is the example of using the update
function.
import { update, toPromise } from ‘aws-lambda-stream’; ...
.through(update({ parallel: 4 }))
.through(toPromise);
Here is the example of using the publish
function.
import { publish, toPromise } from ‘aws-lambda-stream’; ...
.through(publish({ batchSize: 25 }))
.through(toPromise);
Pipelines
As mentioned above, we are multiplexing many event types through a single stream for a variety of good reasons. Therefore, we want to maximize the utilization of each function invocation by acting on as many events as possible. However, we also want to maintain good clean separation of the processing logic for these different event types.
The Highland.js library allows us to fork streams, passing each fork/observer through a pipeline and merge the streams back together where they can share common tail logic like fault
handling.
Each pipeline is implemented and tested separately. Each is usually defined in its own module/file.
Here is an example of a pipeline. They are curried functions that first receive options
during initialize
and then the forked stream
during assemble
(see below). During assemble
they add the desired steps to the stream. Pipelines typically start with one or more filter
steps to indicate which events the steps apply to.
const pipeline1 = (options) => (stream) => stream
.filter(onEventType)
.tap(uow => options.debug(‘%j’, uow))
.map(toUpdateRequest)
.through(update({ parallel: 4 }));export default pipeline1;
Here is an example of a handler function that uses pipelines.
import { initialize, fromKinesis } from ‘aws-lambda-stream’;
import pipeline1 from ‘./pipeline1’;
import pipeline2 from ‘./pipeline2’;const PIPELINES = {
pipeline1,
pipeline2,
};const OPTIONS = { ... };export const handler = async (event) =>
initialize(PIPELINES, OPTIONS)
.assemble(fromKinesis(event))
.through(toPromise);
- First we
initialize
the pipelines with any options. - Then we
assemble
all pipelines into a forked stream. - And finally the processing of the events through the pipelines is started by
toPromise
. - The data fans out through all the pipelines and the processing concludes when all the units of work have flowed through and merged back together.
But take care to assemble a cohesive set of pipelines into a single function. For example, a listener function in a BFF service will typically consume events from Kinesis and the various pipelines will materialize
different entities from the events into a single DynamoDB table to implement the CQRS pattern. Then the trigger function of the BFF service will consume events from the DynamoDB table, as mutations
are invoked in the graphql
function, and these pipelines will publish
events to the Kinesis stream to implement the Event Sourcing pattern. (see Flavors next)
Flavors
Many of the pipelines we write follow the exact same steps and only the filters and data mapping details are different. We can package these pipeline flavors into reusable pipelines that can be configured with rules
.
The following flavors are included and you can package your own into libaries.
- materialize — used in
listener
functions to materialize anentity
from anevent
into a DynamoDB single table - crud — used in
trigger
functions topublish
events to Kinesis as entities are maintained in a DynamoDB single table - more to be ported soon
Here is an example of initializing pipelines from rules. Note that you can initialize one-off pipelines along side rule-driven pipelines.
import { initializeFrom } from ‘aws-lambda-stream’;const PIPELINES = {
pipeline1,
pipeline2,
...initializeFrom(RULES),
};
Here are some example rules. The id
, flavor
, and eventType
fields are required. The remaining fields are defined by the specified pipeline flavor. You can define functions inline, but it is best to implement and unit test them separately.
import materialize from ‘aws-lambda-stream/flavors/materialize’;const RULES = [
{
id: ‘p1’,
flavor: materialize,
eventType: /thing-(created|updated)/,
toUpdateRequest,
},
{
id: ‘p2’,
flavor: materialize,
eventType: ‘thing-deleted’,
toUpdateRequest: toSoftDeleteUpdateRequest,
},
{
id: ‘p3’,
flavor: materialize,
eventType: [‘something-created’, ‘something-updated’],
toUpdateRequest: (uow) => ({ ... }),
},
];
- id — is a unqiue string
- flavor — the function that implements the pipeline flavor
- eventType — a regex, string or array of strings used to filter on event type
- toUpdateRequest — is a mapping function expected by the
materialize
pipeline flavor
More Details
I have released this framework as open source. For more detail, see the documentation and tests of the library: aws-lambda-stream
For more thoughts on serverless and cloud-native checkout the other posts in this series and my books: Software Architecture Patterns for Serverless Systems, Cloud Native Development Patterns and Best Practices and JavaScript Cloud Native Development Cookbook.