With this post, I want to share with you the design process on one of our latest projects — a P2P marketplace that was intended to be used by one of largest online lenders and all kinds of investors for securitization of their portfolios. And almost insane stack change that turned out to be a great idea.
Why? To attract new engineers and backers to Elixir community. I do not consider myself an enterprise guy, but I know that first question they tend to ask is “What is already done in Elixir?” and it’s really hard to find good examples on the Internet.
Disclaimer: I will try to make sure that content in this post is close to original implementation from a technical point of view, but there are some business related details that I can not disclose.
Understanding business needs
Imagine that you have an online lending company. It means that on a daily basis you borrow money to people, gain interest when they repay loans and lose some when they don’t. If you have good marketing channels and smart enough to balance risks with profit when deciding who should receive a loan from you — congratulations, you own successful, growing and profitable origination business.
But at some point in time, you won’t be able to grow further, because, until repaid, each loan will drain your money and there will be none to issue new loans.
Now you might start looking for investors who are willing to buy batches of your loans (portfolios) and, probably, share risks with you. You will receive less interest per each single loan, but you will be able to compensate that by future growth and reduced risks.
Investors might want to tell you what kind of portfolios they want to buy, based on their own “technical analysis”, “best practice” or a “gut feel”. For example, I want my portfolio to consist of 75% of loans with good scoring grade, but only issued in Spain for people in 28–35 age group with high income (low risk/low profit); and by 25% of loans issued to people with low scoring grade without other preferences (high risk/high profit).
What is the best place where both parties can meet? On a marketplace! Originators can send information about their loans; investors can set their expectations; marketplace can enrich originators data with it’s own scoring, handle their matching and further lifecycle, protecting an interest of both parties for a small fee.
Additional requirements and context
- From day one we should be ready to process hundreds of thousands of loans per day and updates for them. This is not a startup, big partner is already waiting to integrate;
- Maintaining of project should be hand-offed after active development ends, so we could keep creating other projects;
- Configuration should be flexible enough so that business guys can experiment with products without additional coding;
- Resource limits: 6 months, ~10 developers, few analysts and a few business guys;
- We should make integration as simple as possible for originators and other parties. Changes are way more expensive to them, than our development cost;
- We are in fintech field. No data can be lost. Everything should be traceable. Even problems with a floating number calculations can be a time-bomb for this kind of projects. Many legal regulations are applied;
- Future plans: loans are one of many kinds of assets that can be traded, this extensibility should be kept in mind. (In this article loan and asset have equal meanings.)
Insane stack change
Prior to the marketplace development our team was completely in PHP, MongoDB, Puppet and Angular.js stack, but we decided to take all risks and instantly migrate to a completely new one — Elixir, PostgreSQL, RabbitMQ, Docker, Kubernetes and React.js+Redux.js, which turned out to be a great decision both for skills of all our team members and for many additional benefits that this stack brings by itself:
- Code that much cleaner and easier to write and maintain;
- Fault tolerance;
- Concurrent processing;
- Stream data processing;
- Single released binary that is used across all environments;
- No more dealing with server setup and it’s differences between environments;
- Well-structured front-end applications with server-side rendering;
- Our analysis team used to produce specification in BPMN notation which perfectly matches on code thanks to pattern matching and lightweight processes.
How does that happen at all?
Back in summer 2016, a friend of mine (hi, Alex) told me about this new shiny thing — Elixir. I thought that it is a new useless language, but decided to give it a try and spent some time implementing a new microservice in it.
I’ve read all the docs on the official site, watched a few videos on YouTube and started to write code. For the first few days, I felt like “this stuff is bad, why do I even consider it?”. Later — “ok, this stuff is great, but I’m not smart enough to use it”. In a week functional programming broke my mind and I was in love with Elixir.
Ecto 2.0 did not have an adapter for MongoDB, so I’ve installed PostgreSQL and decided to keep using it. This decision was also affected by Aphyr’s blog post on how MongoDB can loose your data and support of JSONB indexes in PostgreSQL.
Writing code became easy and I felt productive as never before, until I’ve tried to release my microservice. Compilation turned out to be platform-dependent. I’ve used to spin off a VM for this single purpose. Another option was to release a Docker container, which had friendlier delivering and development lifecycle. Also, I was able to write acceptance tests for containers to make sure that code would work in all environment after compiling it into OTP release — feature that I felt missing in my past “PHP” life.
Release configuration was another difficulty in dealing with. To use one container in all environment it must be configurable at runtime, but Elixir itself does not allow you do to that (at least there is no easy way). You could set REPLACE_OS_VARS environment variable in a container and start using bash-style variables in configs, but what if you need some value to be an integer? Confex was born to solve that problem. UPD: I’ve wrote an article on this topic.
Because I did not want to deal with server setups anymore, Kubernetes was picked to run these containers. Deploying and maintaining an application in it is a separate love story.
Breaking up the requirements
By analyzing all requirements marketplace can be broken into a separate components (domains?) by their purpose:
- Asset Processor, which was responsible for communication with originators, processing data from them and creating Sell Offers;
- Trader that acts like a trading bot, which, by criteria that are set by investors, places Buy Offers;
- Matcher that matches Sell Offers with Buy Offers and creates data to make an Investment;
- Asset Management System (or a General Ledger) that takes this data and produces actual Investment, move money between accounts, creates reports, makes sure that there are no technical overdrafts;
- CRM as a central place for customer lifecycle management and configuration.
- Communicate with originators;
- Persist and enrich the data;
- Generate Sell Offers.
To reduce integration cost for originators we don’t want them neither to remember the last state of the data that was sent to us nor to produce incremental updates. Instead, they can send the only subset that was changed after end-of-day (by last updated date) and we will find changes by ourselves. This means that we need to persist everything on our side.
Because that data is not tightly coupled, scaling database is not an issue — we can shard by a customer (originator) ID and asset (loan) ID.
First of all, we should come up with an idea how to receive and process large datasets from originators. Simple REST JSON API won’t be the best fit, because:
- Establishing connection for each single asset is an expensive operation. We can handle that, but for originators producing this load may be expensive because of all legacies in their systems. So we should receive data in batches;
- On an originator side most of the changes are not real-time, rather they are happening during their end-of-day procedure. These batches can be very large;
- Intraday updates must be also supported, for example when originator want to call back the repaid loan before it is sold to an investor. We never know when their end-of-day actually happens;
- JSON can not be parsed without loading whole document into memory.
JSON was replaced with BSON. It allows to lazy-read documents one by one and easy to implement in Elixir. And there are plenty of libraries for most of other languages that can be a drop-in replacement for JSON encoders, which makes integration less painful.
We ended up providing a single endpoint for multipart file upload in a BSON format. Originator keeps track of what data was changed since the last upload, writes that data into BSON file and sends it to us.
For backward communication we agreed to do a similar thing — create a simple microservice that reads messages from a queue and persist it to a file on a disk. After some timeout this file is sent to an originator, deleted and the process starts over again.
Persistence and processing
After receiving upload we need to backup this batch to Amazon S3 and start processing a local copy of the data. (This backup can be used in event sourcing later).
What needs to be done:
- Find a difference for each asset and decide what to do (delete Sell Offer; create or update Sell Offer; notify Asset Management system about repayment).
- Persist changes in the database;
- Enrich the data — we need to predict cash flow and split all loans into a different risk groups based this prediction and other data from the originator.
Our first implementation was recursively reading file contents, but database connection pool quickly started to drain. The best way to deal with this kind of problems is to apply back-pressure while processing the data. Here comes GenStage (keynote).
It both reduced memory usage (from up to few Gb for a large dataset to almost constant 56 Mb) and stopped overloading connection pool. Eventually, we completely rewrote all parts of Asset Processor to use GenStages and used it few times in other places.
Asset Processor could be split by few subparts:
- Integration Layer — core component that receives this batches and decides how to process the data, routes it to appropriate queues. You can think about it as business process owner;
- Assessor — predicts cash flow and classifies assets by risk groups;
- Merger — efficiently persists enriched data back to the database.
This is how data pipeline may look for new assets:
- IL/File Watcher: spin off a new GenStage producer; a new process that uploads the backup; and wait for the producer to reach EOF. When data is both uploaded and processed — delete the file;
- IL/Producer (spawned dynamically): Lazy read contents of the file (via BSONEach) and notify file watcher when EOF is reached;
- IL/Producer-Consumer: Query database for latest persisted version of an asset;
- IL/Producer-Consumer: Calculate diff between two states and route it to the appropriate consumer;
- IL/Consumer: Publish a message to the Assessor.In RabbitMQ queue;
- Assessor/Consumer: Read messages from Assessor.In queue;
- Assessor/Producer-Consumer: Send REST API call to a cash-flow prediction service (our other project) and enrich data with response;
- Assessor/Producer-Consumer: Send REST API call classificator (same) and enrich the data once more;
- Assessor/Producer: Publish the message to Assessor.Out RabbitMQ queue and acknowledge it’s processing in Assessor.In;
- IL/Consumer-Producer: Shovel messages from this Assessor.In to Merger.In queue (simply to keep control over the overall business process in the IL);
- Merger: Read a message from Merger.In queue, write data to the database and publish a message to Merger.Out queue;
- IL/Router: Read a message from this queue, insert data into a Sell Offer structure and publish it in a message that should be read later by Matcher.
All this complexity may look like overkill, but this maybe be a good selling point when you are making project in collaboration with enterprise guys. They can “replace and reuse components” of the system and actual business logic is located only in one part of the system. We spend few man days discussing it.
Handling external end-of-days
For originators that are not able to be “always online” and perform a downtime on end-of-day, we may face an issue when EOD terminates an asset that we are selling at a same time.
To solve it we added an additional call that literally says “we are going to sell this, are you okay with that?” before creating an actual financial transactions. Callback url can be optionally sent by originator along with a data. This is inspired by two-phase commits in a distributed system.
Either we receive an error response or a timeout, we return all affected offers to the marketplace.
Along with backups for received batches, data in our database is immutable. We never execute update statements in the database. Instead, we increment the version of existing record and write it in a new row.
Thanks to this we can guarantee that asset from a single batch will be processed exactly once. This is helpful both for complete batch job restarts and finding duplicate messages in the queue. (In practice it is hard guarantee exactly once delivery.)
Also, we can safely restart batch processing with or without dropping data from the database, which may save you lots of brain cells in production.
There are good white paper on immutable data: Immutability changes everything. I am extremely happy when we have a project with immutable persistent store, it provides traceability in development and production; improves overall security; makes maintenance easier.
Of course, you have a problem of fast growing database, but you can deal with it by moving old versions to the cold storage or by simply saving database snapshots and dropping everything that is not related to the business processes.
- Provide REST JSON API for investors front-end;
- Persist portfolio criteria in the database;
- Place Buy Offers by this criteria;
- Do not violate portfolio structure.
The Trader was one of simplest parts in our marketplace, along with Phoenix for API it also has a “Gap Analyzer” microservice that takes the current state of a portfolio and produces new Buy Offers.
When investor creates a portfolio with different kinds of assets (let’s call them “buckets”) we need to make sure that balance between them is not violated. Otherwise, by current market conditions, we might fill all demand for bucket with a high risk and never balance them with low-risk assets, thus portfolio itself will not match customers expectations and may lead to money loss.
To balance it we decided to create initial Buy Offers (one for each bucket) by a small amount of bucket volume (10%):
After Buy Offer is matched and portfolio balance is updated, eg. by filling the demand for the first bucket, the new trading cycle starts. Volume for buckets that are not filled is increased proportionally to the overall structure:
This cycle is repeated until all volume is utilized. If some bucket can not be filled we will wait, but never violate the structure by more than few percents of overall volume.
- Efficiently match Sell Offers with Buy Offers;
- Calculate Sell and Buy price based on configuration (investors may have their personal rates);
- Send matched offers to Asset Management System.
The Matcher is reading messages from Sell Offers and Buy Offers queues and tries to match them to create as much investments as possible. Everything can be done in RAM, we can generate new input from Trader and Asset Processor if data is lost.
In most cases, an investor does not want to buy a whole asset. Usually, he wants to purchase a small amount in a large number of loans, which decreases risks for each individual investor.
It is important to notice that there are assets that can not be sold partially, for them investment is created when investors are bidden all Sell Offer volume. So, additionally, we want to match most filled assets first. Otherwise, investor money may hold for an asset that will be never bought by other investors.
Each time Sell Offer comes we take its description and match versus all Buy Offers. On each match, we calculate sell and buy price (delta is our fee) and place a bid in a Sell Offer. When it’s volume is filled by 100% or partial sale is allowed — a message to Investments queue is published with Sell Offer and information about all the bids.
When Buy Offer comes everything works pretty much the same way — we are looking for all Sell Offers those match criteria and decrease Buy Offer volume by an amount of bids we placed. If all volume is bidden, Buy Offer becomes inactive and can not be matched anymore. Otherwise — we will keep it active to match against upcoming Sell Offers.
When an offer update comes we need to rollback all the changes made for its previous version and run matching again. (This can be optimized later.)
Erlang and Elixir have a Mnesia database, which lives in same memory space and provides very low latency lookups.
Since we were limited in time, we decided to start development with a PostgreSQL and to write an Ecto adapter that can be a drop-in replacement when it’s ready.
Asset Management System
- Account investors money, investments;
- Comply with all regulations;
- Notify Trader about balance changes.
Instead of writing AMS by our own, we decided to buy a SaaS service that does what we need and created a wrapper around its API.
Because marketplace works by running a frequent trading cycles and it becomes too expensive to run matching for each single asset purchase, so we delay balance change events by 30 seconds via simple GenServer, thus only accumulated data is sent to Trader.
This is another part that wasn’t done from ground-up, we took a CRM that takes leader positions in Gartner magic quadrant and built a wrapper around it.
One of most interesting approaches here was Legal Framework. If you run a business, you might want to change your legal agreements once a while, but due to regulations, it can not be done without investors approval. Thus if you want to roll out a new product, that could not exist by old legal conditions, — you need to create a new version of public agreement and allow investors to use this product only if they agreed with the version that supports it.
When you develop a large project with microservice architecture, you might start doing common things (access management, validation, logging, etc.) over and over again. Instead, we decided to built and open source an API gateway in Elixir that does this common parts, keeping upstream services codebase smaller.
It’s documentation is pretty self-explanatory, so if you interested how it works — dig deeper here.
Don’t get too crazy with microservices
Our first Asset Processor implementation consisted of ~10 microservices, which added too much traction. (For that small team!) Every change in the data model, CI scripts or other shared parts required us to re-apply this change over and over again.
To make it simpler, we moved out our Ecto-related code to a separate GitHub repo and included it as a dependency elsewhere. But business logic was spread out across all repos, so you still need to update it, and tests. Or, at least, rebuild all containers on each change in this dependency.
After rewrite, we squashed everything to only 3 of them. (Most of them were replaced with a GenStage/Flow producer-consumers.)
And hey, it’s Elixir! In most cases, you don’t need them at all! Take look at OTP applications! Rule of thumb:
If you want to deploy something separately — it is a separate microservice, otherwise — keep it in a separate contexts/OTP applications in an umbrella project.
Microservices is just a way to move out complexity to communication layer of your organization.
Avoid defensive programming and architecture
This is another lesson learned by the development of Asset Processor.
I was too defensive and expected RabbitMQ issues due to the volume of data that is passing though it. In some cases, heartbeat packet between replicas may be delayed because of a large amount of other data that is passed trough TCP protocol between nodes. RabbitMQ can experience “split brains” without network actually going down. And felt like some data may be lost.
To deal with it, we built pretty complex persistence system. For each incoming batch, we created a new database (on a separate virtual host) that was used as temporary storage. Each change was persisted back to this database and RabbitMQ was used only to send jobs (temporary database ID and asset ID) between microservices. After processing is completed this database can be erased.
This gave us few additional features:
- we can know processing status for a whole batch (which is hard when everything is asynchronous and you don’t know how much data you actually received);
- we could drop the staging database and retry processing when sh*t happens.
But overall, all this complexity was not worth the gains that we received, so we started to send all data trough RabbitMQ persistent queues and I don’t remember any actual issues that we faced with this approach.
When workers supervisor fails it may take all children processes with it
This part looks obvious, but we managed to screw with it.
After receiving the message we started a new supervised GenServer process that was responsible for applying business logic. This process was holding acknowledgment tag along with the data. And when, by any reason, supervisor terminated its child’s — tags were irreversibly lost.
Due that fact that we limited concurrency via prefetch count (maximum number of unacknowledged queue jobs per worker node), without tags RabbitMQ did not send any additional data waiting for old jobs to be completed. Sometimes, we ended up with a zombie nodes that took jobs that would never be processed until node restarts (RabbitMQ will reschedule delivery of all unacknowledged messages to another worker if the connection is lost).
You can take look at GenTask that addresses exactly this problem.
Our community is not that big and you should not expect that there is a package for everything. Instead, Elixir gives you efficiency to create one in a matter of days.
If you trying to do commercial projects on Elixir, plan some time for this kind of work. And, please, try to spend some additional time to contribute most valuable parts back to the community.
I want to thank Alex for telling me about Elixir, my team for supporting me during this project and everybody who helped me by reviewing the article.
And a special thank to Jose Valim, who was blazingly fast responding in GitHub issues and extremely supportive in all possible ways.