Creating Stream Processors with AWS Lambda Functions

There Can Be More Than One

John Gilbert
8 min readFeb 28, 2020

If you follow my posts and books than you know that I think the most critical piece of the puzzle to increasing team confidence and accelerating continuous delivery is controlling the blast radius when teams make honest human errors. This involves fortifying the boundaries between autonomous services with bulkheads on both sides. We accomplish this by replacing all inter-server synchronous communication with an event-first approach and leveraging the System Wide Event Sourcing and CQRS patterns. I discuss how streaming plays an important role in treating events as first-class citizens, here, and how to map events to the physical world, here. I have also discussed how I manage the complexity of deploying many serverless functions, here.

Stream processing is a core part of this approach. In this post I introduce my stream processing framework: aws-lambda-stream and demonstrate how to efficiently and effectively process multiple event types with a single function containing multiple pipelines.

aws-lambda-stream

The event signature for many Lambda functions is an array containing a micro-batch of event.Records. Functional Reactive Programming (FRP) is the cleanest approach for processing these streams. This library provides a light-weight framework for creating these stream processors. The underlying streaming library is Highland.js, replete with features like filter, map, reduce, backpressure, batching, parallel processing and more.

Basic Usage

The following examples show how to implement basic handler functions for consuming events from a Kinesis stream and a DynamoDB Stream. A key thing to note is that the code you see here is responsible for assembling the steps in the stream pipeline. The final step, toPromise returns a Promise from the handler function. Then the promise starts consuming from the stream and the data starts flowing through the steps. The data is pulled through the steps, which provides natural backpressure. The promise will resolve once all the data has passed through all the stream steps or reject when an unhandled error is encountered.

Example: Listener Function

This example processes a Kinesis stream and materializes the data in a single DynamoDB table. The details are explained below.

import { fromKinesis, toPromise } from ‘aws-lambda-stream’;export const handler = async (event) =>
fromKinesis(event)
.filter(onEventType)
.map(toUpdateRequest)
.through(update({ parallel: 4 }))
.through(toPromise);

Example: Trigger Function

This example processes a DynamoDB Stream and publishes CUD events to a Kinesis stream. The details are explained below.

import { fromDynamodb, toPromise } from ‘aws-lambda-stream’;export const handler = async (event) =>
fromDynamodb(event)
.map(toEvent)
.through(publish({ batchSize: 25 }))
.through(toPromise);

Creating a stream from a lambda event

The first step of a stream processor transforms the incoming Records into a stream, like such: _(event.Records). The various from functions, such as fromKinesis and fromDynamodb, normalize the records into a standard Event format. The output is a stream of UnitOfWork objects.

UnitOfWork Type (aka uow)

Think of a uow as an immutable object that represents the scope of a set of variables passing through the stream. More so than ever, we should not use global variables in stream processors. Your processor steps will add new variables to the uow for use by downstream steps (see Mapping below). This scoping is crucial when we leverage the parallel processing and pipeline features.

interface UnitOfWork {
record: any;
event?: Event;
batch?: UnitOfWork[];
}
  • record — the original record
  • event — the standardized event
  • batch — an array of uow that should succeed or fail together

Event Type

The various streaming and messaging channels each have their own formats. We want to decouple the processing logic from the choice of these technologies. Thus all published events conform to the following Event format. This also provides for polymorphic-like processing. This standard format is also leveraged in the event-lake and micro-event-store features.

interface Event {
id: string;
type: string;
timestamp: number;
partitionKey?: string;
tags: { [key: string]: string | number };
<entity>: any;
raw?: any;
}
  • id— a unique deterministic value
  • type — generally the namespace, domain entity and action performed
  • timestamp — epoch value when the action was performed
  • partitionKey — generally the entity id or correlation id to ensure related events can be processed together
  • tags — a generic place for routing information. A standard set of values is always included, such as account, region, stage, source, functionname and pipeline.
  • <entity> — a canonical entity that is specific to the event type. This is the contract that must be held backwards compatible. The name of this field is usually the lowerCamelCase name of the entity type, such as thing for Thing.
  • raw — this is the raw data and format produced by the source of the event. This is included so that the event-lake can form a complete audit with no lost information. This is not guaranteed to be backwards compatible, so use at your own risk.

Filters

For a variety of reasons, I generally multiplex many event types through the same stream. I discuss this in detail in the following post: Stream Inversion & Topology. Thus, we use filter steps with functions like onEventType to focus in on the event types of interest and perform content based routing in general.

// all event types starting with `thing-`
const onEventType = uow => uow.event.type.match(/thing-*/);

Mapping

Many stream processor steps map the incoming data to the format needed downstream. The results of the mapping are adorned to the uow as a new variable. The uow must be immutable, so we return a new uow by cloning the original uow with the spread operator and adorning the additional variable. There are various utils provided to assist.

.map((uow) => ({
...uow,
variableName: {
// mapping logic here
}
}))

This is the function used in the Listener Function example above.

const toUpdateRequest = (uow) => ({
...uow,
updateRequest: { // variable expected by `update` util
Key: {
pk: uow.event.thing.id,
sk: ‘thing’,
},
...updateExpression({
...uow.event.thing,
discriminator: ‘thing’,
timestamp: uow.event.timestamp,
}),
...timestampCondition(),
}
});

This is the function used in the Trigger Function example above.

const toEvent = (uow) => ({
...uow,
event: { // variable expected by the `publish` util
...event,
thing: uow.event.raw.new, // canonical
}
});

Connectors

At the end of a stream processor there is usually a sink step that persists the results to a datastore or another stream. These external calls are wrapped in thin Connector classes so that they can be easily mocked for unit testing.

These connectors are then wrapped with utility functions, such as update and publish, to integrate them into the streaming framework. For example, the promise returned from the connector is normalized to a stream, fault handling is provided and features such as parallel and batch are utilized.

These utility functions leverage currying to override default configuration settings, such as the batchSize and the number of parallel asyn-non-blocking-io executions.

Here is the example of using the update function.

import { update, toPromise } from ‘aws-lambda-stream’;  ...
.through(update({ parallel: 4 }))
.through(toPromise);

Here is the example of using the publish function.

import { publish, toPromise } from ‘aws-lambda-stream’;  ...
.through(publish({ batchSize: 25 }))
.through(toPromise);

Pipelines

As mentioned above, we are multiplexing many event types through a single stream for a variety of good reasons. Therefore, we want to maximize the utilization of each function invocation by acting on as many events as possible. However, we also want to maintain good clean separation of the processing logic for these different event types.

The Highland.js library allows us to fork streams, passing each fork/observer through a pipeline and merge the streams back together where they can share common tail logic like fault handling.

Each pipeline is implemented and tested separately. Each is usually defined in its own module/file.

Here is an example of a pipeline. They are curried functions that first receive options during initialize and then the forked stream during assemble (see below). During assemble they add the desired steps to the stream. Pipelines typically start with one or more filter steps to indicate which events the steps apply to.

const pipeline1 = (options) => (stream) => stream
.filter(onEventType)
.tap(uow => options.debug(‘%j’, uow))
.map(toUpdateRequest)
.through(update({ parallel: 4 }));
export default pipeline1;

Here is an example of a handler function that uses pipelines.

import { initialize, fromKinesis } from ‘aws-lambda-stream’;
import pipeline1 from ‘./pipeline1’;
import pipeline2 from ‘./pipeline2’;
const PIPELINES = {
pipeline1,
pipeline2,
};
const OPTIONS = { ... };export const handler = async (event) =>
initialize(PIPELINES, OPTIONS)
.assemble(fromKinesis(event))
.through(toPromise);
  1. First we initialize the pipelines with any options.
  2. Then we assemble all pipelines into a forked stream.
  3. And finally the processing of the events through the pipelines is started by toPromise.
  4. The data fans out through all the pipelines and the processing concludes when all the units of work have flowed through and merged back together.

But take care to assemble a cohesive set of pipelines into a single function. For example, a listener function in a BFF service will typically consume events from Kinesis and the various pipelines will materialize different entities from the events into a single DynamoDB table to implement the CQRS pattern. Then the trigger function of the BFF service will consume events from the DynamoDB table, as mutations are invoked in the graphql function, and these pipelines will publish events to the Kinesis stream to implement the Event Sourcing pattern. (see Flavors next)

Flavors

Many of the pipelines we write follow the exact same steps and only the filters and data mapping details are different. We can package these pipeline flavors into reusable pipelines that can be configured with rules.

The following flavors are included and you can package your own into libaries.

  • materialize — used in listener functions to materialize an entity from an event into a DynamoDB single table
  • crud — used in trigger functions to publish events to Kinesis as entities are maintained in a DynamoDB single table
  • more to be ported soon

Here is an example of initializing pipelines from rules. Note that you can initialize one-off pipelines along side rule-driven pipelines.

import { initializeFrom } from ‘aws-lambda-stream’;const PIPELINES = {
pipeline1,
pipeline2,
...initializeFrom(RULES),
};

Here are some example rules. The id, flavor, and eventType fields are required. The remaining fields are defined by the specified pipeline flavor. You can define functions inline, but it is best to implement and unit test them separately.

import materialize from ‘aws-lambda-stream/flavors/materialize’;const RULES = [
{
id: ‘p1’,
flavor: materialize,
eventType: /thing-(created|updated)/,
toUpdateRequest,
},
{
id: ‘p2’,
flavor: materialize,
eventType: ‘thing-deleted’,
toUpdateRequest: toSoftDeleteUpdateRequest,
},
{
id: ‘p3’,
flavor: materialize,
eventType: [‘something-created’, ‘something-updated’],
toUpdateRequest: (uow) => ({ ... }),
},
];
  • id — is a unqiue string
  • flavor — the function that implements the pipeline flavor
  • eventType — a regex, string or array of strings used to filter on event type
  • toUpdateRequest — is a mapping function expected by the materialize pipeline flavor

More Details

I have released this framework as open source. For more detail, see the documentation and tests of the library: aws-lambda-stream

For more thoughts on serverless and cloud-native checkout the other posts in this series and my books: Software Architecture Patterns for Serverless Systems, Cloud Native Development Patterns and Best Practices and JavaScript Cloud Native Development Cookbook.

--

--

John Gilbert

Author, CTO, Full-Stack Cloud-Native Architect, Serverless-First Advocate