Introducing rp, a Lightweight Framework for Building Efficient Execution Chains in Go

Jeremy Huff
10 min readAug 14, 2023

--

rp, the “request pipeline” framework, makes server endpoints with multiple execution steps easier to build, maintain, and optimize. It is built on top of Gin, Go’s top web framework.

It works by wrapping execution steps of any arbitrary code into stages that can be linked together into execution chains. Chains, in turn, can be executed, in sequence or in parallel, with a logger that automatically tracks each stage’s success or failure along with performance metrics like latency. In short, it keeps execution cleanly organized into modular, maintainable units.

rp is general in purpose and can be adopted as much or as little as you find useful.

Overview

In this tutorial, we will migrate non-rp code into the rp framework so that our router endpoint becomes a readable pipeline chain that looks like this:

pipeline := InSequence(
parse,
fetchCustomer,
fetchInventory,
checkStock,
calculateTotal,
runPayment,
createShipment,
createOrder,
sendOrderInProgressAlert,
successResponse,
)

We will see how the logger makes monitoring and debugging easy with detailed pipeline execution output like this:

And, we will optimize performance and reduce latency by ~1/3 by using rp’s intuitive built-in concurrency model:

pipeline := InSequence(
parse,
InParallel(
fetchCustomer,
fetchInventory),
checkStock,
calculateTotal,
runPayment,
InParallel(
createShipment,
createOrder,
sendOrderInProgressAlert),
successResponse,
)

In all, we will cover four implementations of the same endpoint that demonstrate varying degrees of framework adoption.

rp was built to make use of Go’s goroutines for maximum performance while also providing a simple interface that’s (hopefully!) easy to learn and work with. Overall, it’s meant to be unopinionated and to allow you to use it however you want. It’s available under the MIT license so have at it!

Setup

This example walks a simplified ecommerce purchase endpoint through four implementations, adopting rp further with each. The full source code is located in /examples/ecommerce/ecommerce.go.

The endpoint processes a purchase with these steps for a given customer, product, and purchase quantity:

  1. Fetch the customer record from the database
  2. Fetch the product record from the database
  3. Validate that the product’s stock value is ≥ the purchase quantity
  4. Process a payment transaction for the purchase
  5. Register a new shipment with the shipping provider to deliver the order
  6. Create a database record for the new order
  7. Send an email to the customer to confirm that their order is now In Progress

We will use MongoDB as our database. So fire up a mongod instance in a separate terminal window (like this example) or connect to a remote sandbox MongoDB instance. Update the mongoDBURI value with your connection string:

mongoDBURI := "" // Update to your sandbox URI. It WILL delete data at launch.

We will simulate calling external APIs to satisfy steps 4, 5, and 7 by calling methods on “dummy client” object types. The dummy client methods will always succeed after sleeping for 10’s of microseconds to simulate typical API latency.

A) Basic implementation without rp

As a baseline, PurchaseHandler() implements the endpoint using common Go and Gin code patterns without rp. To run it, just comment & uncomment the main func accordingly, build, and run:

// A) Without rp
r.POST(Path, PurchaseHandler(mongoClient, paymentClient, shippingClient, emailClient))

// B) Direct migration to rp
// r.POST(Path, PurchaseHandlerDirectMigrationToRP(mongoClient, paymentClient, shippingClient, emailClient))

// C) Tidied up implementation in rp
// r.POST(Path, MiddlewareForRPHandlers(mongoClient, paymentClient, shippingClient, emailClient), PurchaseHandlerWithRP(mongoClient, paymentClient, shippingClient, emailClient, false))

// D) With concurrency optimizations in rp
// r.POST(Path, MiddlewareForRPHandlers(mongoClient, paymentClient, shippingClient, emailClient), PurchaseHandlerWithRP(mongoClient, paymentClient, shippingClient, emailClient, true))

To build and run from the rp root directory:

go install ./examples/ecommerce
ecommerce

The endpoint’s gin.HandlerFunc implementation without rp looks like this.

func PurchaseHandler(mongoClient *mongo.Client, paymentClient *PaymentClient, shippingClient *ShippingClient, emailClient *EmailClient) gin.HandlerFunc {

return func(c *gin.Context) {

// Parse request body

var body PurchaseRequestBody
err := c.ShouldBindJSON(&body)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

// Fetch customer

customer := CustomerDocument{}
err = mongoClient.Database("rp_test").Collection("customers").FindOne(context.Background(),
map[string]any{
"customer_id": body.CustomerID,
}).Decode(&customer)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

// Fetch inventory

...

// Return response

c.JSON(http.StatusOK, gin.H{"message": "Purchase successful"})
}
}

The endpoint can be tested via curl in a separate terminal window with this:

curl -X POST -d '{"customer_id": "C975310", "sku": "SKU159260", "quantity": 1}' "http://localhost:8081/purchase"

And voila, Gin’s logger reports success with an endpoint latency of 309ms.

So it works, but this solution leaves a few things to be desired:

  • The single-line Gin log makes it difficult to locate points of failure for unsuccessful calls.
  • Similarly, managing latency for endpoints with many execution steps is tricky. Where should we start looking to reduce this 309ms total latency?
  • Furthermore, there’s no straightforward way to parallelize execution of independent endpoint operations, leaving one of the most powerful optimizations on the table.

Here’s how rp addresses these issues.

Anatomy of Stages and Chains

Stages, the core building block of rp chains, are defined by three public methods.

type Stage struct {
P func() string
F func(any, *gin.Context, Logger) (any, error)
E func(error) *StageError
n *Stage // next node in the chain
l *Stage // last node in the chain
}

F is the main function that gets called when a stage is executed. It can optionally return data in the first return parameter, which will pass it into the next stage’s first input parameter. The other input parameters allow access to the request’s Gin context and the Logger instance.

E will get called when F returns an error. Its responsibility is to translate that Go error object into a StageError, which defines the http status code and data object to return for the response.

P is the printout “name” identifier that we want the Logger to associate with this stage.

Chains can be built from stages by using the First / Then pattern,

 pipeline := First(
stage0).Then(
stage1).Then(
stage2) ...

or by just send a slice of stages to MakeChain.

pipeline := MakeChain(stage0, stage1, stage2, ...)

Finally, chains can be arranged with other chains via the InSequence and InParallel functions, which organize how the constituent chains get executed to form a full endpoint pipeline.

B) Direct migration to rp: Build pipeline transparency with minimal work

(B) is the most direct copy/paste kind of migration, wrapping each step into a generic stage type. The complete endpoint handler is then constructed using the InSequence() function.

func PurchaseHandlerDirectMigrationToRP(mongoClient *mongo.Client, paymentClient *PaymentClient, shippingClient *ShippingClient, emailClient *EmailClient) gin.HandlerFunc {

parse := MakeChain(S(
FuncStr("parse")+CtxOutStr("req.body"),
func(in any, c *gin.Context, lgr Logger) (any, error) {

var body PurchaseRequestBody
err := c.ShouldBindJSON(&body)
if err != nil {
return nil, err
}

c.Set("req.body", &body)
return nil, nil
}))

fetchCustomer := MakeChain(S(
FuncStr("fetch_customer", "req.body")+CtxOutStr("mongo.document.customer"),
func(in any, c *gin.Context, lgr Logger) (any, error) {

body := c.MustGet("req.body").(*PurchaseRequestBody)

customer := CustomerDocument{}
err := mongoClient.Database("rp_test").Collection("customers").FindOne(context.Background(),
map[string]any{
"customer_id": body.CustomerID,
}).Decode(&customer)
if err != nil {
return nil, err
}

c.Set("mongo.document.customer", &customer)
return nil, nil
}))

...

respond := MakeChain(S(
FuncStr("respond"),
func(in any, c *gin.Context, lgr Logger) (any, error) {
res := Response{
Code: http.StatusOK,
Obj: gin.H{"message": "Purchase successful"},
}
return &res, nil
}))

// Build the full pipeline chain
pipeline := InSequence(
parse,
fetchCustomer,
fetchInventory,
checkStock,
runPayment,
createShipment,
createOrder,
sendEmail,
respond,
)

return MakeGinHandlerFunc(pipeline, DefaultLogger{})
}

Here’s what’s going on with the stage definitions (parse, fetchCustomer, etc):

  1. The core functional code of each step remains unchanged, but wrapped.
  2. It’s wrapped at the top level into a chain by MakeChain(), which in turn contains a single stage defined by S().
  3. A name is defined for the stage by the first parameter, which leverages rp’s naming utilities to create a function-like string representation. The second parameter is a function literal that defines the stage’s F method.
  4. Variables are now removed from the large handler function’s scope and kept scoped-in to the stages’ F functions. Data gets handed off between stages as-needed by the gin.Context’s Get/Set methods (Notice that keys are reverse domain name format by convention). This is an important part of rp’s structured logging and sets us up for easy parallelization (as seen in part (D)).
  5. Each stage fails back to the default E method for its network response, removing the need for the making repetitive c.JSON throughout the pipeline code. (Note — E can be overridden via the Catch methods in pipeline.go, if needed)

While this may appear cumbersome at first, the refactoring doesn’t take long, and the modular structure creates the core of rp’s most powerful features.

Just like with Part (A), comment / uncomment main() to enable just the (B) implementation, build it, run it, and send the test request via curl.

Logging should now show stage-by-stage execution with success/failure status, stage latency, and the stage name, as we defined in our S() function definitions.

Already, this gives us a lot more transparency around sources of latency and allows us to identify points of failure more precisely, at the stage level. We will take this further in (C) and (D).

C) Tidied up implementation in rp: Better precision and cleaner code

PurchaseHandlerWithRP() uses rp’s stage generator libraries to make code more concise while eliminating common, repetitive code. This has the effect of splitting our chains’ single S() stages into multiple component stages, which enables more finely-grained logging and control over pipeline execution.

The tidied up handler function is used for parts (C) and (D), both. Instead of MakeChain(), it often uses the First().Then() pattern so that stages are easy to read from the beginning of each line. You may also notice that (C) and (D) include new middleware; they are following the same general pattern as (B)(4) above, keeping client objects in the gin.Context and out of the large function scope.

func PurchaseHandlerWithRP(withConcurrency bool) gin.HandlerFunc {

// First: Parse request body
parse := First(
Bind(&PurchaseRequestBody{})).Then(
CtxSet("req.body"))

// 1) Fetch customer
fetchCustomer := First(

S(`fetch_customer_query(["req.body"]) =>`,
func(in any, c *gin.Context, lgr Logger) (any, error) {

customerID := c.MustGet("req.body").(*PurchaseRequestBody).CustomerID
query := map[string]any{
"customer_id": customerID}
return query, nil

})).Then(

rpmongo.MongoFindOne("mongo.client.database", "customers", rpmongo.MongoFindOneOptions{
Result: &CustomerDocument{}})).Then(

CtxSet("mongo.document.customer"))

// 2) Fetch inventory
fetchInventory := First(

S(`fetch_inventory_query(["req.body"]) =>`,
func(in any, c *gin.Context, lgr Logger) (any, error) {

sku := c.MustGet("req.body").(*PurchaseRequestBody).SKU
query := map[string]any{
"sku": sku}
return query, nil

})).Then(

rpmongo.MongoFindOne("mongo.client.database", "inventory", rpmongo.MongoFindOneOptions{
Result: &InventoryDocument{}})).Then(

CtxSet("mongo.document.inventory"))

// 3) Check inventory stock

...

// Last: Return response
successResponse := MakeChain(

S(`success_response()`,
func(in any, c *gin.Context, lgr Logger) (any, error) {

res := Response{
Code: http.StatusOK,
Obj: gin.H{"message": "Purchase successful"},
}
return &res, nil
}))

// Build the full pipeline chain
pipeline := &Chain{}
if !withConcurrency {
pipeline = InSequence(
parse,
fetchCustomer,
fetchInventory,
checkStock,
calculateTotal,
runPayment,
createShipment,
createOrder,
sendOrderInProgressAlert,
successResponse,
)
} else {
pipeline = InSequence(
parse,
InParallel(
fetchCustomer,
fetchInventory),
checkStock,
calculateTotal,
runPayment,
InParallel(
createShipment,
createOrder,
sendOrderInProgressAlert),
successResponse,
)
}

return MakeGinHandlerFunc(pipeline, DefaultLogger{})
}

Some examples of stage generator functions include:

  • The parse chain’s Bind and CtxSet stages, which wrap c.ShouldBindJSON and c.Set, respectively.
  • The fetchCustomer chain’s rpmongo.MongoFindOne stage, which executes the query passed in by the preceding S() stage.

Unlike the functionally distinct stages defined in the (B) implementation, these stages pass data directly stage-to-stage in the chain via the F method’s return value. This breaks the stages’ independence described in (B)(4), but that’s OK since these sub-chain stages need to remain tightly-coupled together.

Enable this implementation as you did in (A) and (B), and the logging will now appear more verbose and fine-grained.

Debugging and optimization become really easy now! We can clearly see that the main source of latency within the fetchCustomer chain is the mongo.Client’s execution of FindOne. Adding an index to the customers collection for customer_id should help, and it’s easy to do, so why not?

Similarly, the three dummy clients are clearly the biggest contributors to overall latency.

To see how this benefits debugging, try out some bad-valued test requests. Sending this request with an invalid customer_id

curl -X POST -d '{"customer_id": "C999999", "sku": "SKU159260", "quantity": 1}' "http://localhost:8081/purchase"

leads to the log

while this request with an invalid sku

curl -X POST -d '{"customer_id": "C975310", "sku": "SKU999999", "quantity": 1}' "http://localhost:8081/purchase"

leads to the log

The level of clarity in these error logs can eliminate the need to spend a lot of development time digging through the internals of the request execution to find a problem.

D) rp with concurrency optimizations: Get big efficiency gains with quick pipeline tweaks

The latency of (C) is similar to (A) at 304ms, but it’s now clear that the three dummy clients account for 254ms of that. Meanwhile, MongoDB fetch queries account for 44ms. That leaves just 6ms for all other operations.

The good news is that we can run the customer and inventory MongoDB fetches at the same time, and we can create the shipment, create the order document, and send the email at the same time. We still want to keep the payment sequentially in the middle so that it can stop execution upon a failed payment.

This logical flow can be done with very minor changes to the pipeline through the InParallel() function. It wraps chains into a specific type of stage that runs the chains internally and only returns from the stage once all stages are complete. And it does so safely by leveraging the gin.Context’s built-in goroutine-safe implementation.

pipeline := InSequence(
parse,
InParallel(
fetchCustomer,
fetchInventory),
checkStock,
calculateTotal,
runPayment,
InParallel(
createShipment,
createOrder,
sendOrderInProgressAlert),
successResponse,
)

The log for (D) will be a bit confusing since it is processing many steps in parallel and printing them as they are finished. But it makes it clear that we have reduced the total pipeline latency down to 222ms from 304ms, cutting down latency by 82ms (27%).

Wrapping Up

If you’ve made it this far, I hope you can see why I built this framework and what it might be useful for. Or perhaps you’re totally unconvinced and just think I’m crazy. Either way, DM or comment to let me know your thoughts! If I can make this framework useful for you, I would love to hear how.

--

--

Jeremy Huff

Full Stack Developer | 2X Company Co-Founder | Building with AI 🤖