RxJS5 + PouchDB — persistent data flows

In this new post, I plan to mix two fundamentally different ways of looking at data flows: static vs dynamic. We can think of static data as one that is durable or survives permanently; in contrast, dynamic data is always in flux and moving through the system — it’s transient. In reactive systems, data is never stored but rather propagated through the application in the form of streams or flows. So, integrating both forms of data requires thinking of this static data (whether it be in a file or a database) as the source or the originator of an event that gets pushed onto the streams and processed through your business logic accordingly.

Before we dive in deeper, here’s a little about me. I’m the author of Functional Programming in JavaScript and co-author of RxJS in Action. So, for many years now it’s been my passion to study the implementation of systems using these paradigms.

In this article, the persistence technology I’ll use is a library called PouchDB, which is inspired in the very popular JSON document store CouchDB. This library provides a nice, intuitive, cross-browser wrapper API over local storage technology like IndexedDB. Also, the reactive toolkit I’ll use is RxJS, a functional library used to compose asynchronous and event-based programs using observables.

To run this code, you can load both libraries into your browser as such:

<script src="https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.8/dist/global/Rx.umd.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/pouchdb/5.3.2/pouchdb.min.js"></script>

Intro

A common problem with sophisticated client side applications is loading all of the data from the backend into the browser, an environment that utilizes a limited amount of memory. Some architectures load the data as needed; this is called progressive loading. However, this doesn’t work well if an application has high demands for performance or needs to work offline. Most modern applications are expected to work this way. Another approach is to bypass the browser’s memory and load the data into persistent storage.

The benefit of using PouchDB, like most modern asynchronous JavaScript APIs today, is that it uses Promises to model all of its asynchronous operations, which means we can use Rx.Observable.fromPromise() to adapt all of the API calls if we wanted to, which is exactly what I’ll do because observables give you a more powerful and flexible type to use over regular promises. For instance the output of PouchDB.put(), a Promise, method can be converted to an Observable, like this:

Rx.Observable.fromPromise(db.put(obj))
.map(processResponse)
..

We can use RxJS to move this static, persistent data into flows of asynchronous operations that compose or cascade the outcome of one into the next seamlessly. Hence, RxJS acts like our query language, treating data as constantly moving and changing infinitely. Keep in mind that PouchDB is a schema-less document store, so this means we don’t need to define and create schema before writing data to its tables. We’ll start with a simplified domain of a banking application that loads a set of transactions into the document store. First, constructing an instance of the database is as simple as:

const txDb = new PouchDB('transactions');
R.isNil(txDb.then);       //-> false      
R.is(Function, txDb.then) //-> true

In this code, I’m useing Ramda type check calls to prove that the API actually supports the then() chain of functions, to show they behave like promises.

This database stores transaction documents in JSON form. A Transaction has the following structure (mind you, we’re keeping our domain model extremely simple):

class Transaction {
constructor(name, type, amount, from, to = null) {
this.name = name;
this.type = type;
this.from = from;
this.to = to;
this.amount = amount;
}

name() {
return this.name;
}

from() {
return this.from;
}

to() {
return this.to;
}

amount() {
return this.amount;
}

type() {
return this.type;
}
}

Populating a database reactively

The code to create and store several transactions involves looping through transaction objects (whether they come from a locally stored array or from a remote HTTP call), date stamping each transaction with an RxJS timestamp(), and posting it to the database. Here’s a sketch of what I’m going to do:

Figure 1 Steps to populate data into local storage using streams

We’ll start by adding this set from memory:

function getTransactionsArray() {
return [
new Transaction('Marty McFly', 'withdraw', 500, 'checking'),
new Transaction('Biff Tannen', 'deposit', 800, 'savings'),
new Transaction('Emmet Brown', 'transfer', 2000, 'checking',
'savings'),
new Transaction('Jennifer Parker', 'transfer', 1000,
'savings', 'CD'),
];
}

(With RxJS, it doesn’t really matter if the data comes from a local array or a remote HTTP call, it’s handled in the exact same way).

Now, let’s see some reactive programming in action. We’ll create two streams: one in charge of performing the database operation and the other processing the input:

// Using Ramda to create a Lens to access/manipulate the date 
// property of an object
const dateLens = R.lensProp('date'); 
// Create an RxJS timestamp, and uses lenses to update the 
// transaction object with the proper time the transaction
// is processed
// Function that writes a transaction to document store
const writeTx$ = tx => Rx.Observable.of(tx)
.timestamp()
.map(obj => R.set(dateLens, obj.timestamp, obj.value))
.do(tx => console.log(`Processing transaction for: ${tx.name}`))
.mergeMap(datedTx =>
Rx.Observable.fromPromise(txDb.post(datedTx)));
// Run through all transactions in the array
Rx.Observable.from(getTransactionsArray())
.concatMap(writeTx$) // Map the writeTx$ to each transaction object
.subscribe(
rec => console.log(`New record created: ${rec.id}`),
err => console.log(‘Error: ‘ + err),
() => console.log(‘Database populated!’)
);

Before we get into the details of this code, it’s important to understand that I am able to process and manipulate a set of objects and store them in a database, all in an immutable manner! This is a very compelling quality of functional programs and reduces the probability of bugs. Running this code prints the following to the console:

“Processing transaction for: Mary McFly”
“New record created: 4F7404AF-10D2–8438-AEAB-CC21CDC23810”
“Processing transaction for: Biff Tannen”
“New record created: A9ACE7FE-85DB-484E-AA74-B47A7F4D32B1”
“Processing transaction for: Emmet Brown”
“New record created: DD469ACA-BC5C-A5C6–8E4A-0FB544C62231”
“Processing transaction for: Jennifer Parker”
“New record created: B5C8B8C7–127B-11C7-A90E-64D79C8315E2”
“Database populated!”

Another benefit of wrapping observables over the database API is that all side effects are pushed downstream to observers instead of each dealing with it within each Promise.then() block. It’s nice to keep your business logic pure as much as possible and side effects isolated and contained.

Depending on the size of the transaction objects, when storing thousands of them in an array, we could end up with a very large memory footprints. Of course, we would like to avoid keeping all of that data directly in memory, which is why we leverage the browser’s database to store this data within it but persisted out of memory. To make this example simple we used a small array. Most likely you’ll also want to transactions created locally as well as data coming in remotely. We can use RxJS’ merge() to plug in multiple sources of data and process them in the same way:

// Merging the output from both local and remote streams
Rx.Observable.merge(
getTransactionsArray(),
Rx.Observable.fromPromise(makeHttpCall(‘/transactions’)))
.concatMap(writeTx$) // mapping the writeTx$ stream onto this data
.subscribe(
rec => console.log(`New record created: ${rec.id}`),
err => console.log(‘Error: ‘ + err),
() => console.log(‘Database populated!’)
);

The rest of the code continues to work exactly the same way. Brilliant! The asynchronicity of code is seamless in reactive programming! And in the event that the remote HTTP call response is not an array, remember we can make the Observable conformant by pushing some logic upstream. It’s typical of remote calls to return an object with a status and a payload. So if you’re response object is something like:

{
status: ‘OK’,
payload: [{name: ‘Brendan Eich’, …}]
}

You can make it conformant as you inject it into the stream before you map writeTx$ to it:

Rx.Observable.merge( 
getTransactionsArray(),
Rx.Observable.fromPromise(makeHttpCall(‘/transactions’))
.mergeMap(response => Rx.Observable.from(response.payload)))
.concatMap(writeTx$)

Moreover, databases are full of optimizations to improve read and write speed. We can further help these optimizations by performing bulk operations whenever possible.

Writing bulk data

The previous code samples create single bank transactions records at a time. We can optimize this process with bulk operations. Bulk operations write an entire set of records with a single post request. Naturally, the PouchDB operation bulkDocs() takes an array. We talked about how much memory used to build this set before, and this is completely in your control using RxJS buffers.

The buffer() operator comes in very handy here when we’re not just processing a handful of transactions, but hundreds of them. Let’s optimize the previous code:

Rx.Observable.from(getTransactionsArray()) 
.bufferCount(20)
.timestamp()
.map(objs => R.map(R.set(dateLens, objs.timestamp), objs.value))
.do(txs => console.log(`Processing ${txs.length} transactions`))
.mergeMap(datedTxs =>
Rx.Observable.fromPromise(txDb.bulkDocs(datedTxs)))
.subscribe(
rec => console.log(`New record created: ${rec.id}`),
err => console.log(‘Error: ‘ + err),
() => console.log(‘Database populated!’)
);

To support this optimization, we had to make a few adjustments. After collecting 20 objects with bufferCount(20), the data passing through the stream is now an array instead of a single record. This is a sketch of this process:

Processing data in bulk

Alternatively you could have also buffered for a certain period of time with buffer(Rx.Observable.interval(500)), this decision will depend on the amount of data your application needs to handle at a given point. In this case, each record will be kept in a buffer for 500 milliseconds, at which point it will be released and all the records can be written in bulk to the database.

However, there is a problem with just using a count or time based buffer. If the user attempts to navigate away from the page while the data is being cached, we could potentially lose anything that is waiting in the buffer, up to 20 transactions in this case, which will never get saved. To fix this let’s introduce another Observable to trigger the buffer’s emission. Buffers also support signaling, so that the emission can occur in response to the execution of some browser hook, such as closing the window. To implement this, we can use the bufferWhen() operator with an Observable that’s smart enough to support both use cases: to cache the results for a specific period of time or emit before the browser closes:

Rx.Observable.from(getTransactionsArray())
.bufferWhen(() => // buffer until any of these objects emits
Rx.Observable.race(
Rx.Observable.interval(500),
Rx.Observable.fromEvent(window, 'beforeunload'))
)

Rx.Observable.race() mirrors the first Observable to emit a value of the ones provided to it. In this case, it will emit after half a second, or if the window closes, whichever comes first.

The bufferWhen() operator instead of taking an Observable to trigger each new buffer, accepts a closing selector method that is re-invoked every time the buffer is closed and the resulting Observable is used to determine when the next buffer should close. Using this we can create a signal Observable that has a whole host of possible constraint states. Now that you know how to get data into the database, let’s join with a query that can count the total number of records.

Joining related database operations

All of the local store operations, whether you’re using IndexedDB directly or PouchDB, happen asynchronously, but with RxJS you can treat your operations almost as if they were synchronous due to the abstraction that it poses over the latency involved in database calls. To illustrate this, we’ll chain together an operation to insert a record, followed by an operation that retrieves the total count.

PouchDB is a map/reduce database, so in order to query the data, you must first define how the projection or the mapping function works. This object is called a design document, which you need to include as part of the query. For our purpose, we’ll keep it simple and count number of transactions performed. So our design document, we’ll call it count, looks like this:

const count = {
map: function (doc) {
emit(doc.name); // Project data based on user's name in tx
},
reduce: '_count'
};

Now, the next code shows how we can join two queries with a single stream declaration:

Rx.Observable.from(getTransactionsArray()) 
.concatMap(writeTx$)
.mergeMap(() => Rx.Observable.fromPromise(
txDb.query(count, {reduce: true})))
.subscribe(
recs => console.log(recs.rows[0].value),
error => console.log(`Error: ${error}`),
() => console.log('Query completed!')
);

PouchDB also has some reduction operations on its own. Aside from _count, you can perform other reductions such as _sum, and _stats. Let’s go over another a bit more complex example that really shows joining multiple datasets. I’m going to perform a withdraw from the account database and create a new transaction document for each withdraw action.

First we find the account by ID, if it results in a valid object, we subtract the withdraw amount and update the account. We’ll create a few different types of accounts for our user Emmet Brown:

const accounts = [
new Account('1', 'Emmet Brown', 'savings', 1000),
new Account('2', 'Emmet Brown', 'checking', 2000),
new Account('3', 'Emmet Brown', 'CD', 20000),
];

The accounts data will reside in:

const accountsDb = new PouchDB('accounts');

Because you’re already familiar with creating databases and populating them, we’ll jump right into our withdraw() function, which returns an Observable responsible for creating the flow to query and update multiple databases:

// Finds the user account by id, checks for null
// Performs the subtraction of the balance, and updates the record
// Last, creates the transaction object for the withdraw
function withdraw$({name, accountId, type, amount}) {
return Rx.Observable.of(accountId)
.mergeMap(id => Rx.Observable.fromPromise(accountsDb.get(id)))
.filter(doc => !!doc)
.do(doc => console.log(
doc.balance < amount ?
'WARN: This operation will cause an overdraft!' :
'Sufficient funds'
))
.mergeMap(doc => Rx.Observable.fromPromise(
accountsDb.put({
_id: doc._id,
_rev: doc._rev,
balance: doc.balance — amount
}))
)
.filter(response => response.ok)
.do(() =>
console.log('Withdraw succeeded.
Creating transaction document'))
.concatMap(() => writeTx$(
new Transaction(name, 'withdraw', amount, type)));
}

You can run this code by passing it an object literal with the pertinent information:

withdraw$({
name: 'Emmet Brown',
accountId: '3',
type: 'checking',
amount: 1000
})
.subscribe(
tx => console.log(`Transaction number: ${tx.id}`),
error => console.log(`Error: ${error}`),
() => console.log('Operation completed!!')
);

Which will generate the following output:

“Sufficient funds”
“Withdraw succeeded. Creating transaction document”
“Processing transaction for: Emmet Brown”
“Transaction number: DB6FF825-C703–0F1A-B860-DA6B1138F723”
“Operation completed!!”

As you can see, because the PouchDB’s API uses promises, it’s easy to integrate you’re your database code with your business logic, all wrapped and coordinated via the Observable operators. While database calls are a form of a side effect, is one we’re willing to take in practice, and rely on the unidirectional flow of streams to streamline the use of this shared state. But wrapping API calls is not the only thing you can do with PouchDB. Because it’s an event emitter, you can tap into parts of its lifecycle.

PouchDB as an event emitter

PouchDB is an Event Emitter, which means it exposes a set of events or hooks for you to plug in logic into certain phases of its lifecycle. For instance, just like Github exposes hooks to tap into when branches are created, you can add event listeners that fire when databases are created and destroyed.

This is very important in browser storage where the lifespan of a database is temporary as it can be destroyed and re-created at any point in time. So before we begin adding any documents, it will be good to do so under the context of database created hook.

Using the Rx.Observable.fromEvent() operator you can transform any event emitter into an Observable sequence. So, hooking into the database creation event looks like the following:

Rx.Observable.fromEvent(txDb, 'created')
.subscribe(
() => console.log(‘Database to accept data!’)
);

So, adding this check into our streams is very easy. All you need to do is key off of that hook, to perform all of your logic. This is somewhat similar to waiting for the document to be ready, before executing any of our JavaScript code. The withdraw operation would look like this:

Rx.Observable.fromEvent(txDb, 'created')
.switchMap(() =>
withdraw$({
name: 'Charlie Brown',
accountId: '1',
type: 'checking',
amount: 1000
})
)

In _sum

Of course we’re only scratching the surface of what can be done with RxJS. I hope that with these simple examples you can begin to see the its distinctive declarative, functional style of writing your code where essentially everything is represented by an immutable flow of data, regardless of whether the data started out statically.

Enjoy!