Watermill Golang Message Stream Library
Have you ever written an event-driven application and found out that some native libraries are kinda complex to use? I’ll introduce you to a Golang library called Watermill for working efficiently with message streams and building event-driven applications.
Here are some advantages of using Watermill:
- Simple. — Watermill works with high-level abstractions and hides all the complexity of conventional libraries, which means it is easier to understand and implement and it also means faster development time because you’ll write fewer lines of code.
- Universal. — Watermill supports many pub/subs implementation such as Golang channel, Kafka, HTTP, Google Cloud Pub/Sub, NATS Streaming, SQL, RabbitMQ (AMPQ), and io.Writer/io.Reader.
- Fast. — Watermill is built to handle thousands of messages per second.
- Flexible. — Watermill comes with some built-in features like middleware, plugin, pub/sub configuration and you can customize those components for your needs.
- Resilient. — Watermill is built using proven technologies and has strong unit and integration tests and also has passed a stress test.
Benchmarks
These are the Watermill’s benchmarks for a message size of 16 bytes that has been done using a single 16 CPU VM instance.
Here is a quick guide of how to use and implement Watermill into your application
A. Installation
To install Watermill, just use the following command
go get -u github.com/ThreeDotsLabs/watermill
B. Publisher and Subscriber
If we’re talking about pub/sub architecture, there are 2 core components in it which are publisher and subscriber. A conventional pub/sub library might come with complex features and implementations, but in Watermill you just only need or implement these two interfaces which you will often use.
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
Close() error
}
C. Subscribing a Message
For subscribing a message, you can just create a subscriber instance, call the Subscribe
function, and pass your topic to the function. This is an example of how to subscribing to a message using Go channel pub/sub implementation.
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)messages, err := pubSub.Subscribe(context.Background(), "topic")
if err != nil {
panic(err)
}go process(messages)...
D. Publishing a Message
For publishing a message, you have to create a message first. This can be done using the message
package within the Watermill library. The NewMessage
function is only expecting message ID and a slice of bytes as the payload.
msg := message.NewMessage(watermill.NewUUID(), []byte("message"))
After that, all you need to do is just publish the message using the publisher that you have created before
err := pubSub.Publish("topic", msg)
if err != nil {
panic(err)
}
E. Using Message Router
Sometimes maybe you’ll find out that you need some features when processing messages in your application such as poison queue, retrying, throttling, etc. Watermill has provided a component called message.Router
so that you can easily implement those features. This message.Router
is similar to the HTTP router that you used for creating REST API where you can also add some plugins or middlewares into it.
To implement the router, first, you have to create the router instance and add a handler to the router. Watermill’s handler function has a similar abstraction level with the Golang HTTP handler.
type HandlerFunc func(msg *Message) ([]*Message, error)type NoPublishHandlerFunc func(msg *Message) error
Just create a function with the Watermill’s handler function signature and add it to your router. Use HandlerFunc
if you want to publish another message(s) after you processing the messages from the topic that you’re subscribing to, otherwise use NoPublishHandlerFunc
instead.
func handler(msg *message.Message) error {
fmt.Println("Received message: ", string(msg.Payload))
return nil
}logger = watermill.NewStdLogger(false, false)router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}router.AddMiddleware(middleware.CorrelationID)router.AddNoPublisherHandler(
"handler name",
"topic to be subscribed",
pubSub,
handler,
)
Message acknowledgment (ack) is called automatically when HandlerFunc
doesn't return an error, and negative acknowledgment (nack) is called when HandlerFunc
returns an error.
You are also able to create your own middleware and plugin if the middlewares or plugins that are provided by Watermill don’t have the feature that you need.
type HandlerMiddleware func(h HandlerFunc) HandlerFunctype RouterPlugin func(*Router) error
Conclusion
Watermill is a quite nice library for building event-driven applications. You should consider using it due to the simplicity, flexibility and also it has a good performance.
References:
https://watermill.io/
https://threedots.tech/post/introducing-watermill/
https://github.com/ThreeDotsLabs/watermill