Flow-Based REST API with Flowex and Plug

Flowex is Flow-Based Programming framework built on top of Elixir GenStage library. It is a set of abstractions which allows writing program with Railway FBP paradigm. You can read more about Flowex in this post.

I’ve written a couple of posts about the library and had a few of talks on Elixir club and Pivorak meetups. And I’ve noticed there are two main questions that appear after one get aquatinted with Flowex.

1. Is it possible to design real applications in Railway-FBP paradigm that Flowex propose?
2. Does GenStages bring significant performance overhead to the application?

The goal of the article is to get the positive answer to the first one and negative to the last.

I’ve prepared a simple Elixir application called plug_flowex which is the simplest prototype of REST API for an imaginary blogging platform. It has only 2 endpoints (GET user and GET post) and, for simplicity, reads data from text file. But this is enough to understand the principles of design and measure performance.

After reading the article you will know about

  1. Flowex application design approach
  2. Pipelines and pipes initialization process
  3. Components and pipelines reuse
  4. Visualization of pipelines
  5. Pipeline performance versus the performance of the same code evaluated in one process

Plug and Flowex

Elixir Plug library introduces a concept of “plugs” — functions with a similar interface that accept Plug.Conn structure and must return the same but modified structure. This enforces developer to design application using so-called “Railway Oriented Programming” pattern, where an application is a pipeline of functions with similar interface.

This pattern also underlies the Flowex library. The only difference is that every function call is placed into separate process — GenStage, thus allowing to execute code in parallel.

When you design application in such way you need just think about “happy path” — how the input data (information packages or IPs) should be transformed to turn input into output. Then you prepare “plugs” (or “pipes” in Flowex) and assembly them into a pipeline.

Application overview (code)

There are 2 endpoints in the application:

  • GET /api/:user_id
  • GET /api/:user_id/posts/:post_id

There is also “GET sync_api/:user_id” endpoint used for benchmarking. We will return to this later.

Take a look at PlugFlowex module where the actions begin.

defmodule PlugFlowex do
use Plug.Router
# ...
get "api/:user_id" do
pipeline = Application.get_env(:plug_flowex,
:get_user_pipeline)
result = GetUserPipeline.call(pipeline,
%GetUserPipeline{conn: conn})
result.conn
end
  get "api/:user_id/posts/:post_id" do
pipeline = Application.get_env(:plug_flowex,
:get_user_post_pipeline)
result = GetUserPostPipeline.call(pipeline,
%GetUserPostPipeline{conn: conn})
result.conn
end
# ...
end

So, for each endpoint, there is a corresponding pipeline (stored in application environment). Each pipeline is called with Plug.Conn structure.

Let’s consider the first pipeline in details.

GetUserPipeline

The “happy path” for user retrieving is the following: authenticate a client, find a record, render JSON response and send it to the client. Below is a code of the GetUserPipeline .

defmodule GetUserPipeline do
use Flowex.Pipeline
defstruct [:conn, :user]
  pipe FetchParams, 
opts: %{auth_data: ["token"], repo_data: ["user_id"]}
pipe AuthClient
pipe FindRecord,
opts: %{finder: &__MODULE__.find_user/1, assign_to: :user}
pipe :prepare_data
pipe RenderResponse, opts: %{renderer: UserRenderer}
pipe SendResponse
error_pipe :handle_error
  def prepare_data(%{user: user}, _opts), do: %{render_data: user}

def find_user(repo_data), do: UserRepo.find(repo_data["user_id"])
  def handle_error(error, struct, _opts) do
pipeline = Application.get_env(:plug_flowex,
:handle_error_pipeline)
HandleErrorPipeline.call(pipeline,
%HandleErrorPipeline{conn: struct.conn, error: error.error})
end
end

The pipeline struct has only 2 field — :conn and :user . An information package (IP) initialized with conn passes through 6 pipes. Names are very descriptive, so it is easy to understand what is going on.

  • FetchParams — prepares data for the following components;
  • AuthClient — authenticates client by token;
  • FindRecord — fetches user from “DB”;
  • :prepare_data — prepares data for rendering;
  • RenderResponse — renders JSON response;
  • SendResponse — sends response back to the client;

Let’s take a look at component’s implementation:

FetchParams

defmodule FetchParams do
import Plug.Conn
  defstruct [:conn]
  def init(opts), do: opts
  def call(%{conn: conn}, opts) do
conn = fetch_query_params(conn)
%{auth_data: data_for(:auth_data, conn, opts),
repo_data: data_for(:repo_data, conn, opts)}
end
  defp data_for(key, conn, opts) do
opts[key]
|> Enum.reduce(%{}, &Map.put(&2, &1, conn.params[&1]))
end
end

This component fetches “token” and “user_id” from params and sets the values to :auth_data and :repo_data fields which will be available in the next components.

Note how these keys are passed to the component via options into the pipe macro. This is a component parameterization mechanism, which allows to reuse components in different pipelines.

AuthClient

This component just finds token in DB. If there is no such token it will raise an exception.

defmodule AuthClient do
defstruct [:auth_data]

defmodule Error do
defexception message: “invalid token”
end
  def init(opts), do: opts
  def call(%{auth_data: auth_data}, _opts) do
if token = TokenRepo.find_by_token(auth_data[“token”]) do
%{current_user_id: token[“user_id”]}
else
raise Error
end
end
end

FindRecord

defmodule FindRecord do
defstruct [:repo_data]
  defmodule Error do
defexception message: “record not found”
end
  def init(opts), do: opts
  def call(%{repo_data: repo_data}, opts) do
if record = opts[:finder].(repo_data) do
%{opts[:assign_to] => record}
else
raise Error
end
end
end

Note, this component has a specific option :finder which defines a function which will be called inside the component:

 def find_user(repo_data), do: UserRepo.find(repo_data[“user_id”])

:prepare_data

This component does not do much work. It just prepares data for the renderer.

def prepare_data(%{user: user}, _opts), do: %{render_data: user}

RenderResponse

The component is parameterized with renderer module UserRenderer which is responsible for presentation of user records.

defmodule RenderResponse do
defstruct [:render_data]
  def init(opts), do: opts
  def call(%{render_data: render_data}, opts) do
content = opts[:renderer].render(render_data)
%{status: 200, content: content}
end
end

SendResponse

The component just calls Plug.Conn.send_resp function to send data to the client.

defmodule SendResponse do
import Plug.Conn
defstruct [:conn, :status, :content]
  def init(opts), do: opts
  def call(data, _opts) do
%{conn: send_resp(data.conn, data.status, data.content)}
end
end

That’s it. These 6 components form a successful part of the pipeline. We will discuss error handling mechanism later, but for now, lets take a look at another pipeline — GetUserPostPipeline and discover how one can reuse existing components

Reuse of components

GetUserPostPipeline

defmodule GetUserPostPipeline do
use Flowex.Pipeline
defstruct [:conn, :post]
  pipe FetchParams,
opts: %{auth_data: [“token”], repo_data: [“user_id”, “post_id”]}
pipe AuthClient
pipe FindRecord,
opts: %{finder: &__MODULE__.find_post/1, assign_to: :post}
pipe :prepare_data
pipe RenderResponse, opts: %{renderer: PostRenderer}
pipe SendResponse
error_pipe :handle_error
  def prepare_data(%{post: post}, _opts), do: %{render_data: post}
  def find_post(repo_data) do
PostRepo.find_user_post(repo_data[“user_id”],
repo_data[“post_id”])
end
  def handle_error(error, struct, _opts) do
pipeline = Application.get_env(:plug_flowex,
:handle_error_pipeline)
HandleErrorPipeline.call(pipeline,
%HandleErrorPipeline{conn: struct.conn, error: error.error})
end
end

You may notice that there is no any new component in the pipeline. GetUserPostPipeline uses the same components as the previous one but some of them are parameterized in a different way.

FetchParams pipe fetches post_id in addition to user_id . FindRecord now uses find_post function to find user post. And, finally, RenderResponse uses PostRenderer module to generate JSON response.

Component parameterization is a very important mechanism which allows initializing pipes with some specific state so they can be used in a variety of contexts. The same approach also can be applied for the whole pipeline. One can pass options to the pipeline’s start function and thereby initialize pipeline with some specific state.

Pipeline initialization

Each pipeline is a chain of GenStage processes under one supervisor. Generally, you can start pipelines in any place in your code. But a good practice is to build all the pipelines when starting the application and place pipelines’ supervisors under application supervisor.

Below is the code of PipelineApp module which is the main application module:

defmodule PipelineApp do
use Application
import Supervisor.Spec
  def start(_type, _opts) do
children = [worker(PlugFlowex, [])]
{:ok, supervisor_pid} = Supervisor.start_link(children,
strategy: :one_for_one)
    get_user_pipeline = 
GetUserPipeline.supervised_start(supervisor_pid)
get_user_post_pipeline =
GetUserPostPipeline.supervised_start(supervisor_pid)
handle_error_pipeline =
HandleErrorPipeline.supervised_start(supervisor_pid)
    Application.put_env(:plug_flowex,
:get_user_pipeline, get_user_pipeline)
Application.put_env(:plug_flowex,
:get_user_post_pipeline, get_user_post_pipeline)
Application.put_env(:plug_flowex,
:handle_error_pipeline, handle_error_pipeline)
    {:ok, supervisor_pid}
end
end

As you can see there are 3 pipelines started using supervised_start function which places pipeline`s supervisor under the main application supervisor. Then the pipelines are stored in application environment to be globally accessible.

Pipeline branching

You definitely noticed HandleErrorPipeline pipeline in the code. It is called in handle_error function both in GetUserPipeline andGetUserPostPipeline . This is a good example of how some specific functionality may be moved to separate pipeline:

defmodule HandleErrorPipeline do
use Flowex.Pipeline
defstruct [:conn, :error]
  pipe Handle401
pipe Handle404
pipe Handle500
pipe RenderError
pipe SendResponse
end

This pipeline has a couple of components that just prepare information about the exception. For example, code of Handle401 pipe:

defmodule Handle401 do
defstruct [:error, :status, :content]
use HandleErrorCommon
@status 401
  def call(data, _opts) do
case data.error do
%AuthClient.Error{message: message} ->
%{status: @status, content: message}
_ -> data
end
end
end

Of course, error handling is not a single case when you may call the separate pipeline. Imagine, for example, that you have complex authentication process or some sophisticated data extraction mechanism (ETL pipelines is a good example). All these cases can be easily extracted into separate pipelines.

Flowex diagrams

This is the most interesting part of Flow-Based Programming. The ability to draw the application graph is, in my view, the strongest suite of FBP approach.

The figure below demonstrates GetUserPipeline diagram. I added (just for demonstration) two additional pipes to AuthClient and FindRecord components that do not exist in the code.

GetUserPipeline diagram

One can see all the components and pipelines that IPs should pass, from left to right and from top to bottom. And this is very cool!

Benchmarks

To compare pipeline performance with the performance of the same code evaluated synchronously I’ve prepared a GetUserSync module with call function which does the same as GetUserPipeline . There is the code.

For benchmarking I chose wrk utility. The utility allows to specify number of threads (-t option) and number of connections per one thread (-c option). There will be results with different values: -t1 -c1 — only one request at a time, -t10 -c10–10 threads with 10 connections in each, so 100 simultaneous requests, and -t100 -c100 for 10K connections

For the benchmarking I’ve added a count: 10 option for each pipe in the GetUserPipeline pipelines, so there were 10 copies of each component exist in the pipeline.

Below are test results for different wrk options:

|           | -t1 -c1  | -t10 -c10  | -t100 -c100 |
|-----------|----------|------------|-------------|
| pipeline | 4.6 K | 15.2 K | 14.8 K |
| sync | 12.1 K | 33.8 K | 38.4 K |

So, as you can see there is a significant difference. Synchronous evaluation is 2–3x times faster then in pipelines. But remember that this demo application don’t do anything: each component perform the simplest data transformation and there is no real database (all data a cached in memory).

A real application must calculate something and wait for something! Let’s simulate these calculations and pauses.

For the next benchmark, I added :timer.sleep(1) to FindRecord pipe in order to simulate 1ms database delay. Also I added Enum.reduce(1..1650, 1, &(&1*&2)) to RenderResponse component to simulate processor intensive task. This factorial calculation also takes ~1ms on my machine.

|          | -t1 -c1 | -t10 -c10 | -t100 -c100 |
|----------|---------|-----------|-------------|
| pipeline | 0.25 K | 2.6 K | 3.6 K |
| sync | 0.25 K | 2.8 K | 4.1 K |

The situation changed significantly! The difference became negligible!

If we increase the delay by 10 times — :timer.sleep(10) and Enum.reduce(1..5000, 1, &(&1*&2)) (this ~10ms of processor time) the result will be:

|           | -t1 -c1  | -t10 -c10  | -t100 -c100 |
|-----------|----------|------------|-------------|
| pipeline | 42 | 0.45 K | 0.56 K |
| sync | 42 | 0.45 K | 0.58 K |

Expected result! The difference is less 4%!

Conclusion

Designing application with Flowex Railway-FBP paradigm is quite an easy process. You are still in FP paradigm where your application is a set of functions which transform data structures in some way. But you need be more data-centric and think carefully about function interfaces.

Imagine a “happy path” — the sequence of data transformation to be performed in order to transform input to output. This defines components in the pipeline. Then think how to make components reusable, so you will be able to compose other pipelines with the same pipes parameterized in a different way.

Benchmark result show that the overhead of GenStage communication is very low. One can see performance delay only if components do almost nothing. But in a real situation it is absolutely negligible!

I hope, now you are ready to try Flowex in your projects!

Hit the 💚 if you enjoyed the article and do not hesitate to contact me if you have questions or proposals!

Have a wonderful week,
Anton