Watermill Golang Message Stream Library

Steven Martin
Tunaiku Tech
Published in
4 min readMar 23, 2021

--

Have you ever written an event-driven application and found out that some native libraries are kinda complex to use? I’ll introduce you to a Golang library called Watermill for working efficiently with message streams and building event-driven applications.

Here are some advantages of using Watermill:

  1. Simple. — Watermill works with high-level abstractions and hides all the complexity of conventional libraries, which means it is easier to understand and implement and it also means faster development time because you’ll write fewer lines of code.
  2. Universal. — Watermill supports many pub/subs implementation such as Golang channel, Kafka, HTTP, Google Cloud Pub/Sub, NATS Streaming, SQL, RabbitMQ (AMPQ), and io.Writer/io.Reader.
  3. Fast. — Watermill is built to handle thousands of messages per second.
  4. Flexible. — Watermill comes with some built-in features like middleware, plugin, pub/sub configuration and you can customize those components for your needs.
  5. Resilient. — Watermill is built using proven technologies and has strong unit and integration tests and also has passed a stress test.

Benchmarks

These are the Watermill’s benchmarks for a message size of 16 bytes that has been done using a single 16 CPU VM instance.

source: https://github.com/ThreeDotsLabs/watermill#benchmarks

Here is a quick guide of how to use and implement Watermill into your application

A. Installation

To install Watermill, just use the following command

go get -u github.com/ThreeDotsLabs/watermill

B. Publisher and Subscriber

If we’re talking about pub/sub architecture, there are 2 core components in it which are publisher and subscriber. A conventional pub/sub library might come with complex features and implementations, but in Watermill you just only need or implement these two interfaces which you will often use.

type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
Close() error
}

C. Subscribing a Message

For subscribing a message, you can just create a subscriber instance, call the Subscribe function, and pass your topic to the function. This is an example of how to subscribing to a message using Go channel pub/sub implementation.

pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "topic")
if err != nil {
panic(err)
}
go process(messages)...

D. Publishing a Message

For publishing a message, you have to create a message first. This can be done using the message package within the Watermill library. The NewMessage function is only expecting message ID and a slice of bytes as the payload.

msg := message.NewMessage(watermill.NewUUID(), []byte("message"))

After that, all you need to do is just publish the message using the publisher that you have created before

err := pubSub.Publish("topic", msg)
if err != nil {
panic(err)
}

E. Using Message Router

Sometimes maybe you’ll find out that you need some features when processing messages in your application such as poison queue, retrying, throttling, etc. Watermill has provided a component called message.Router so that you can easily implement those features. This message.Router is similar to the HTTP router that you used for creating REST API where you can also add some plugins or middlewares into it.

To implement the router, first, you have to create the router instance and add a handler to the router. Watermill’s handler function has a similar abstraction level with the Golang HTTP handler.

type HandlerFunc func(msg *Message) ([]*Message, error)type NoPublishHandlerFunc func(msg *Message) error

Just create a function with the Watermill’s handler function signature and add it to your router. Use HandlerFunc if you want to publish another message(s) after you processing the messages from the topic that you’re subscribing to, otherwise use NoPublishHandlerFunc instead.

func handler(msg *message.Message) error {
fmt.Println("Received message: ", string(msg.Payload))
return nil
}
logger = watermill.NewStdLogger(false, false)router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
router.AddMiddleware(middleware.CorrelationID)router.AddNoPublisherHandler(
"handler name",
"topic to be subscribed",
pubSub,
handler,
)

Message acknowledgment (ack) is called automatically when HandlerFunc doesn't return an error, and negative acknowledgment (nack) is called when HandlerFunc returns an error.

You are also able to create your own middleware and plugin if the middlewares or plugins that are provided by Watermill don’t have the feature that you need.

type HandlerMiddleware func(h HandlerFunc) HandlerFunctype RouterPlugin func(*Router) error

Conclusion

Watermill is a quite nice library for building event-driven applications. You should consider using it due to the simplicity, flexibility and also it has a good performance.

References:
https://watermill.io/
https://threedots.tech/post/introducing-watermill/
https://github.com/ThreeDotsLabs/watermill

--

--