Event Driven hotel reservations with Kafka

Thắng Đỗ
Altitude
Published in
10 min readOct 4, 2021

Reservation data is the heart of a hotel digital platform as Altitude. Almost everything happens around reservation: mobile keys should be created when a reservation is checked-in. Service auto remove all guest ID documents when they checkout.

Our system is designed as microservices that mean each service does one task and they need to know what’s happened with reservation data to implement its business logic.

If we put all those flows inside a service as a reservation service that decides to call which services when reservation changes state then they will become coupling. Reservation services will quickly become a bottleneck when the number of services increase.

A popular way to solve this problem is to use the Pub/Sub pattern. Reservation service will publish an event when the state changes but don’t know which will consume it. Other services will subscribe to those events and processes. We have some choices such as RabbitMQ or Kafka or even Redis but before going into detail, we try to define a specific challenge to solve.

Problem

A chain of properties wants when a reservation is checked in, RFID cards for reservation’s guests will be created automatically in the key service. When a reservation changes room or updates the departure date then all cards will be updated automatically. All cards will be revoked when reservation is checked out. Another service will remove all guest ID documents when they are checked out too. All should run at near real time.

Automatic create/update/revoke cards when reservation sate changed.

Sync service is out of scope of this article. We have 2 problems to solve:

  • How databases notify other services when reservations change state.
  • How same event could be processed by more consumers. For example, event checkout is processed in both key service to revoke cards and id document service to remove documents.

First solution

Obviously that reservation database is publisher and services are subscriber. We used a Pub/Sub system called RabbitMQ in our microservices so wonder if it can solve this problem?

RabbitMQ is a message queue that allows a bunch of subscribers to pull one or more messages from the end of the queue. Once a message has been processed, it gets removed from the queue.

When we put an event checkout of a reservation to queue, only one service can consume, if the key service consumed this event then other services can’t and vice versa.

So RabbitMQ can’t solve this problem!

We need to find other pub/sub systems that allow more consumers to consume the same event. It’s time for Kafka.

Kafka overview

Kafka could be used as a pub/sub system but it isn’t designed as a message broker, it’s a distributed event streaming . We will make a basic overview about Kafka. Detail explain could be at official document https://kafka.apache.org or Confluent https://www.confluent.io/what-is-apache-kafka/Kafka producers send messages and Kafka consumers listen. Messages are sent to a topic that has the same role as a queue in RabbitMQ. However, it isn’t a single queue. Topic has more partitions and each partition is a single queue, they help a topic scale.

Kafka overview

Kafka producers send messages and Kafka consumers listen. Messages are sent to a topic that has the same role as a queue in RabbitMQ. However, it isn’t a single queue. Topic has more partitions and each partition is a single queue, they help a topic scale.

All messages get persisted in the topic. They don’t get removed when a consumer receives them. It’s very different from RabbitMQ and it helps to solve our problem: more consumers consume the same event.Beside topic, producer and consumer, Kafka has more other components such as Kafka connect source and Kafka connect sink, Kafka streams and KSQL. We will talk about those components in the next parts.

Because messages are persisted so a consumer can choose any message in topic to process: first message, last message, first message has not been processed. It’s really useful for error handling.

Beside topic, producer and consumer, Kafka has more other components as Kafka connect source and Kafka connect sink, Kafka streams and KSQL. We will talk about those components at next parts.

Next solution with Kafka

Let’s see how we use Kafka in our problem

High level design

Design is pretty simple, here are the components we are going to need:

  1. Reservation Kafka connect: stream reservation db change to Kafka topic
  2. Event service: register to listen to changes in reservation db, detect events and publish to Kafka topic.
  3. Key service, ID Document service…: services in microservices will be updated as a Kafka consumer to consume events that are published from Event service.

Now we get a clear view of each component:

Reservation Kafka connect

Back to our first requirement

How databases notify other services when reservations change.

A reservation collection has format:

{
bookingStatus: "Not Arrived" | "Arrived" | "Departure",
roomNumber: "101",
departureDate: "2020-01-19T00:00:00.000+00:00"
...
}

We need to follow those properties and when they are changed, an event is published to Kafka topic (database event), service consumes it, calculate and generates a business event as checked in , checked out…

With MongoDB, we can capture all database changes with change stream API then use Kafka producer API to push to topic

const changeStream = collection.watch([
$match: {operationType: 'insert'}
])
changeStream.on('change', function(change) {
// write content to Kafka topic
producer.send({
topic: 'reservation_event',
message: ...
})
})

This looks easy but it will have a lot of work ahead in the real case as resumability if error, publish a batch data to improve the network. Fortunately, we don’t need to reinvent the wheel. Kafka provides Kafka Connect Source API, a framework built on top of the Producer API.

Kafka Connect consists of connectors that are also developed by the open-source community, each connector connects to the database to stream data events to Kafka topic (Kafka Connect Source) or read a stream and store it into the target database (Kafka Connect Sink). For this problem, we use only the source.

We will use Confluent Kafka Connect, an implementation of such a connector. To make Kafka Connect work with MongoDB, we need to install Mongodb Kafka Connector to this server. Detail can be find in https://docs.mongodb.com/kafka-connector/current/kafka-installation/

After setting up the Kafka Connect server, we can start a listening reservation database by registering a config. The most important thing in Kafka connect source we need to pay attention to is this config.

For example, it just push event to Kafka when reservation created or some selected properties changed, not all:

{
'tasks.max': 1,
'connector.class': "com.mongodb.kafka.connect.MongoSourceConnector",
'connection.uri': "mongodb://localhost:27017/test",
'topic.prefix': "res-event-engine",
'change.stream.full.document': "updateLookup",
'pipeline': '[{"$match":{"$or":[{"operationType":"insert"},{"$and":[{"operationType":"update"},{"$or":[{"updateDescription.updatedFields.bookingStatus":{"$exists":true}},{"updateDescription.updatedFields.arrivalDate":{"$exists":true}},{"updateDescription.updatedFields.departureDate":{"$exists":true}},{"updateDescription.updatedFields.roomNumber":{"$exists":true}}]}]}]}}]',
database: "property"
collection: "reservation"

Explain for important fields:

  • connection.uri, database, collection: mongodb uri, database and collection that we want to listen to the change.
  • topic.prefix: Prefix to prepend to database and collection names to generate the name of the Kafka topic to publish data to. In this example, Kafka topic to publish data is res-event-engine.property.reservation
  • pipeline: Choose change stream events to publish to Kafka topic. In this example, we publish only when there is a new reservation created or a reservation updated fields bookingStatus to detect check-in, check-out events, fields departureDate and roomNumber for update reservation events.
  • change.stream.full.document: Because changing reservations of all properties will be listened to so we need to choose which property should listen. So we must know the field propertyId of reservation data. To know a field of an object, we must put the whole object in a message to Kafka topic. This config will help to put all object's data to the message of Kafka topic. Messages will become bigger but we don't have another option, this is a weakness that needs improvement in Kafka Mongodb Connector.

Event service

We already have database events in Kafka topic, next challenge is generate new events check-in, check-out … from those events and put them to new topic. It like convert from raw data events to business events.

This task looks easy, if field bookingStatus updated to Arrived then we pushed an event checked-in to Kafka topic. So why do we need this simple service? We can consume database events right in other business services as key service, id document service?

Cause by reservation’s state shouldn’t be detected by latest database event, them should be detected by a sequence events as below table:

Reservation state table

Please see 2 states Checked in and Accident checkout, if we just use latest database event then we can’t separate them

But back to Kafka design, we already have a topic containing all those events, we just need to loop and find all events that are generated for a reservation? But loading events for a particular entity like this is not easy in Kafka because the topic contains event for all reservations so it has tens of thousands of events inside.

So we need a service, keep latest reservation state and when an event come, we compare it with latest state to detect right business events.

Event service high level design

Another reason to create this service is that we can choose a property to publish business events for it or not, do not publish events for all properties. Because Kafka connect source can’t be configured for a filter so we need to do it at the service layer.

To build this service, we can use Kafka Consumer API and Publisher API or use Kafka stream API for improvement speed.

When there are already business events on Kafka topic, we can use Kafka consumer API on other services as key service, id document service to consume events in business topics.

Scalability

The first thing to consider is how to run multi instances for consumers. It means we can run event service on more instances and if one is down, others will handle next events.

Kafka supported this feature with consumer group. With any message to the consumer group, Kafka routes that message to a single instance as a load balancer.

Kafka consumer group

Converting event service from single instance to multi instances is easy as well as all consumers have the same consumer group id when creating.

Another problem occurs when running service on more instances: how to assure order of events. For example a reservation has 2 events: check-in and check-out, if check-out event comes before the check-in event then the service will detect the wrong reservation’s state.

Back to Kafka design, each topic has more partitions. Partition helps scale topics but it has a feature: all messages in partition are ordered. It means FIFO: event is sent to consumer group only when previous event sent success.

Because events in different reservations are independent so we just need to keep all events of a reservation in the same partition. This is configurable.

Save disk space

All the messages Kafka broker receives are written to commit logs — essentially, files on the file system. When logs fill all the available disk space, the brokers exit immediately.

Additionally, storage isn’t cheap especially since we are using Amazon MSK for Kafka brokers so reducing disk space is necessary. To do this, we have two approaches: reduce the number of messages and reduce the size of messages. This blog will talk about a simple approach, reducing the number of messages by config.

In reservation, events such as check-in and check-out should be delivered within the day. This means if an event check-in happens today then it becomes meaningless if the event comes tomorrow. So we can schedule to delete all events if it’s older than 24 hours.

This can be done easily with config log.retention.hours : the message can be deleted if they are older than log.retention.hours

We can limit the total size of disk space with config log.retention.bytes: the message can be deleted if there are more than log.retention.bytes behind the last message.

We can configure both in Amazon MSK: log older than 24 hours and more than 1gb behind the last message will be removed

log.retention.hours=24
log.retention.bytes=1073741824

We can improve better with decreasing time to periodic check, decrease time to delete delay and more.

Conclusion

We used Kafka as event sourcing to solve this challenge and did some improvements. But we can always finding new ways to make it greater:

  • Save the latest state of reservation to the database can reduce the speed of the system, Kafka offers a particularly nice feature called log compaction. In a nutshell, log compaction allows us to always store the latest value in the topic for any given key. So we can save the latest state of reservation in another topic to increase speed reading.
  • We are missing a retrying consumer: now if a consumer processes a message failed, it retries a few times before reporting errors and rejects the message to process the next one. We need a better way, for example we can use another topic as retry_topic to hold a failed message and wait some predefined time to process. If the message still failed, it is published to failed_topic for further manual handling of this problem. We have a non-blocking retrying!
  • We’re using JSON as a format of message, because in JSON we have to store the name of each field with our data in the record, the total size of the record increases significantly. A better schema is Arvo that stores schema once and produces records based on that schema many times. This way we reduce the size of the record by removing the schema from the record. Reducing message size will help increase bandwidth and speed of Kafka.

In future, we will inject Kafka deeper inside our microservice: use Kafka for CQRS pattern, build real-time analytics and increase speed search by database to Elasticsearch via Kafka.

What is Altitude?

Altitude is a all in one smart hotel service, enabling hotels to connect with guests like never before and empower staff to provide more personalised services. Find out more at www.altitudehq.com

--

--