Lagom 1.2: Supporting Apache Kafka and JDBC

Following the initial release in July and version 1.1 in September, Lagom 1.2 has arrived with several new features aimed at expanding its event sourcing and CQRS based architecture to support message brokers and relational databases.


Lagom is an opinionated, open-source framework for building reactive microservices in Java. Its name comes from a Swedish word meaning “sufficient” or “just the right amount”. It was created after observing that too much discussion about microservices focuses on “micro” — the size of the services, when what really matters is not the size, but the architecture. Lagom provides the structure applications need to embrace asynchronous messaging, distributed persistence, and eventual consistency. This is all wrapped up in a productive development environment that makes it easy to launch your services and data stores, with automatic reloading when the code in one of your services changes.

Communicating with Events

Most real-world systems built with microservices require some amount of communicating state between services. One of the key principles in Lagom’s design is that scalable and resilient architectures require services to prefer communication using asynchronous message passing. Rather than having one service ask other services for their state, it can subscribe to a stream of updates from the services that it is interested in following, and keep its own copy of the data it needs to fulfill its own purpose. This provides the service with several benefits: it can store the data in a format that suits its own query requirements, and if the remote services go down or are unreachable, it can continue to operate using its local store of data.

Auction application

For example, imagine an online auction application. Amongst other services, it might contain an item service that is responsible for tracking and listing items available for auction, and a bidding service that maintains the history of bids on an item. The bidding service needs to know when the item service starts an auction, so it can begin to take bids for it, and the item service needs to know when a new bid has been placed, so it can update the current price in the item listing.

A naive approach to event communication might have each service call a REST API on the other to notify it of a state change. While this is an improvement on the synchronous approach of having each service ask the other for its current state on each request, it still comes with problems of its own, especially as the number of services consuming events increases. Each service emitting events either needs to have hard-coded knowledge of its downstream consumers, or an API for managing subscribers. This extra bookkeeping becomes even more complex in the presence of failures, network latency and other realities of distributed systems.


For these reasons, Lagom has always supported streaming communication between services using Akka Streams. This allows a service call to produce a continuous, theoretically infinite, stream of response messages or similarly accept a stream of requests from clients. For example, our hypothetical bidding service could declare a service call that streams all of the bids on a particular item as they are created.

public interface BiddingService extends Service { // ... ServiceCall<NotUsed, Source<Bid, ?>> getBidStream(UUID itemId); // ... @Override default Descriptor descriptor() { return named("bidding").withCalls( // ... pathCall("/api/item/:id/bidStream", this::getBidStream) ) // ... } }

This can be provided by the service implementation by using Lagom’s internal publish-subscribe mechanism to retrieve a source of bids from the persistent entity that is tracking them.

@Singleton public class BiddingServiceImpl implements BiddingService { private final PubSubRegistry pubSub; @Inject public BiddingServiceImpl(PubSubRegistry pubSub /* ... */) { this.pubSub = pubSub; // ... } // ... @Override public ServiceCall<NotUsed, Source<Bid, ?>> getBidStream(UUID itemId) { return request -> CompletableFuture.completedFuture( pubSub.refFor( TopicId.of(Bid.class,itemId.toString()) ).subscriber() ); } // ... }

Lagom implements streaming services using a persistent WebSocket, allowing both inter-service streaming communication, and the ability to consume the streaming API directly from any modern web browser.


While this approach can work very well for use cases where getting a real-time feed of recent events is most important, it breaks down in cases where you need guaranteed delivery of events. It is possible to miss events, for example due to a transient network issue or a restart of one of the services. There is no built-in mechanism for tracking which events each consumer has processed, or for delivering historical events to new subscribers, and while it is possible to build those on top, it once again adds complexity to service implementations.

Returning to our online auction example, the streaming bid service call demonstrated above could be an excellent choice for a real-time ticker of bids displayed in a web page for the item under auction. It is not appropriate, however, for consumption by our item service, which needs a reliable source of bid events in order to maintain the consistency of its displayed price.

With Lagom 1.2, an alternative has arrived: the message broker API.

Introducing Message Broker Support with Apache Kafka

In order to decouple services that produce events from those that consume them, many systems make use of a message broker that is responsible for intermediating communication, durably storing messages as they are produced by upstream services and keeping track of the messages consumed by each downstream service, allowing for retries in case of a consumer failure. There are many options for infrastructure that can act as a message broker — open source, commercial and managed services — each with their own characteristics and guarantees. One popular open-source option that fits well with Lagom’s design principles is Apache Kafka, which models state as an ordered log of events, and tracks consumers using an offset — the position within the log that each consumer has processed.


Lagom 1.2 represents this with a new concept: topics. A Lagom service can declare the topics it produces in its service interface.

public interface BiddingService extends Service { // ... Topic bidEvents(); @Override default Descriptor descriptor() { return named("bidding").withCalls( // ... ).publishing( topic("bidding-BidEvent", this::bidEvents) ) // ... } }

Consumers can then subscribe to the topic by having a service client injected at construction time and providing an Akka Streams Flow that processes each message published through the broker.

@Singleton public class ItemServiceImpl implements ItemService { // ... public ItemServiceImpl(BiddingService biddingService /* ... */) { // ... biddingService.bidEvents() .subscribe() .atLeastOnce( Flow.create() .mapAsync(1, this::handleBidEvent) ); } private CompletionStage handleBidEvent(BidEvent event) { if (event instanceof BidEvent.BidPlaced) { return handleBidPlacedEvent( (BidEvent.BidPlaced) event ); } else if (event instanceof BidEvent.BiddingFinished) { return handleBiddingFinishedEvent( (BidEvent.BiddingFinished) event ); } else { // Ignore. return CompletableFuture.completedFuture(Done.getInstance()); } } }

Lagom manages the consumer offset in Kafka for subscribers, and retries automatically if there is a failure while processing a message.


To implement the producer side, use Lagom’s support for retrieving a stream of events from a persistent entity, and transform each internal persistence event into the event type publicly exposed by your service API.

@Singleton public class BiddingServiceImpl implements BiddingService { // ... private Source<Pair<BidEvent, Offset>, ?> streamForTag( AggregateEventTag tag, Offset fromOffset) { return persistence.eventStream(tag, fromOffset) .filter(this::isPublishedEventType) .mapAsync(1, eventAndOffset -> { AuctionEvent event = eventAndOffset.first(); Offset offset = eventAndOffset.second(); return mapEvent(event).thenApply(mappedEvent -> Pair.create(mappedEvent, offset) ); }); } // ... }

True to Lagom’s commitment to an easy-to-use and productive development environment, running your services in development will automatically launch a Kafka server along with its required ZooKeeper dependency. While Lagom 1.2 provides only a Kafka implementation, its message broker API has been designed to be independent of any specific backend, and additional brokers might be added or provided by third parties in the future.

Other Changes in Lagom 1.2

While the message broker API is the headline feature for Lagom 1.2, there were other substantial additions to Lagom’s support for persistent entities and CQRS. Notably, Lagom now supports using JDBC and relational databases as the backend for both the write site and the read side of persistent entities. There were further improvements to read-side processor API for both Cassandra and RDBMS implementations, including automatic offset tracking and support for sharding across a cluster for better scalability. See the release announcement for more details on these improvements.

Get Involved

Work on Lagom 1.3 is already well underway, with support for a native Scala API and JPA read-side processors planned. Lagom is still evolving quickly, and it’s a great time to get involved and help shape the future of the project by joining the Lagom Gitter chat or mailing list, following us on Twitter, or contributing on GitHub.

Originally published at on December 6, 2016. Written by Tim Moore (Lightbend, Inc.)

Like what you read? Give Markus Eisele a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.