EXPEDIA GROUP TECHNOLOGY — ENGINEERING
How Expedia Reviews Engineering Is Using Event Streams as a Source Of Truth
A journey to “Event sourcing”
Travel is fun, and Expedia Group™ provides the biggest network of travel content for any type of travel seller. By collecting and sharing reviews, we help to create an engaging user experience and build trust by setting expectations for both partners and travelers. We know that there is a direct correlation between conversion and review count/ratings.
Expedia Group™ is becoming the most trusted travel marketplace in the world for reviews and insights from global and local travelers. We enable this by implementing a publishing loop of flexible review solicitation ensuring coverage, differentiation and stay experiences for continuous improvement.
The Expedia Group™ Reviews platform is built on top of four foundational pillars which are :
- Notification: Notification focuses on “WHEN” to collect feedback from travelers.
- Collection: The Collection focuses on “WHAT” to collect from the traveler.
- Moderation: After collecting feedback reviews are moderated and validated.
- Distribution: Once reviews are validated and checked for appropriateness they are made available through APIs via Distribution Pillar
With Moderation, the moderation pipeline sends validated reviews to a Kafka topic, which acts as the source of truth for all moderated reviews. After moderation, reviews are ready to be cached and displayed on the website. These reviews can also be processed for sentiment analysis, aggregation and analytics. The reviews pipeline contains multiple processors which are Kafka consumers streaming events from the source topic, processing events per use case, and providing output to different datastore/topics. Supporting new processors or use cases involves adding a new consumer reading event from the topic, starting at the beginning and providing the necessary output.
The goal is to simplify integration, reduce migrations and support future use cases with agility.
The Reviews Distribution pipeline is responsible for reviews data distribution. Moderated reviews are pushed to Kafka Topic, which acts as the source for different processors:
- Aggregation: Aggregating reviews data and provides a high-level summary of products.
- Caching: All the reviews are cached to In-Memory Datastore which is exposed through APIs for fast retrieval.
- Searching: Storing review data to Elasticsearch for sorting, filtering and searching use cases.
- Sentiment Analysis: Moderated reviews are sent for sentiment analysis to understand the experience, emotion or opinion of travelers about a particular product. e.g. different amenities of a hotel
- Tag Extraction: Using Natural Language Processing (NLP), relevant tags are extracted from reviews and tagged.
- Analytics: Reviews are then sent for analytics to better understand the customer’s experience with our product.
We store the complete current state of the review with
reviewId as the partition key in Moderated Reviews Kafka Topic, which acts as a source of truth for different data stores.
What to store as an event
“Events as a source of truth” is a simple but powerful idea to persist the state of the business entity as a sequence of state-changing events. A new event is atomically inserted into the event store whenever the state of the entity changes. Event sourcing involves state changes as an immutable sequence of “log” of events.
- Entities’ current state can be reconstructed by capturing all the changes to the application state as a sequence of events.
- Events for every entity change can lead to a large number of events, which can be optimized by periodically persisting snapshots of the entity’s current state. To reconstruct the current state, recent snapshots and the events that have occurred since that snapshot can be used.
- An entity’s complete current state can also be stored in an event whenever there is a state change. The latest event provides the final state of the entity and doesn’t depend on any other snapshot or older events.
Storing complete entity state vs state changes depends on the use case and performance trade-offs. The former erodes performance at the producer if a new change needs to be merged with an older snapshot before it is pushed to the store. In later consumption, performance degrades if merging needs to happen at the consumer. Another scenario involves producing an event which is consumed as-is, leading to no performance impact.
Why Kafka for event sourcing
Kafka is a high-performance, low-latency, scalable and durable log used worldwide and battle-tested at scale. Kafka not only offers events subscriptions by multiple applications but can be used as a primary data store. Kafka provides out-of-the-box Compacted Topics which allows data to be retained based on the key rather than time. It can not only be used to store events but also enables rewinding/replaying events multiple times, and reprocessing data at scale.
Storing events in Kafka
Storing in Topic “per-entity-type”: A separate topic is created for all events related to a particular entity. E.g., Events related to all products should be present within one topic. Having a separate topic per entity allows a logical grouping of events and enables scaling each entity type stream separately.
Storing in Partition “per-entity”: All of the events for a single entity should be assigned a single partition. E.g., Events related to a single user or product should always be present in a single partition. Assigning events for a single entity to the same partition ensures that all the updates are applied sequentially and don’t lead to an inconsistent state.
The above diagram illustrates two topics which are Product and User storing events per entity type. Within each entity topic, we have partitions and events for a single product or user that are assigned the same partitions.
Cluster Configuration and the number of brokers depend on several factors. Different parameters like CPU, RAM, Disk, JVM Size, Number of Kafka Instances, and Replication Factor can be estimated if we know the rate of events produced, consumed, and the number of events retained for any given window within the cluster. For now, let’s focus only on how we can calculate topic size and the number of partitions within it.
Calculating queue size
The size of the queue depends upon the throughput of the producer. Depending upon event size (storing complete state vs all state changes) E bytes, with Number of Events N, we can calculate the size of the queue for a particular time window.
Size of Queue (in bytes) S = E X N which is uncompressed.
Queue Size also depends upon Retention Period R (which may be infinite or a few days).
Kafka is slightly different from traditional message queues, as consumed messages are not immediately deleted. The message will continue to live until the retention period is elapsed, which is configurable for each topic on Kafka. An increasing number of partitions leads to higher throughput.
A rough formula for picking the number of partitions is based on throughput. Measure the throughput that can be achieved on a single partition (call it p) and consumption (call it c).
To achieve Target throughput t, at least max(t/p, t/c) partitions are required.
If we have infinite retention for the topic, other factors like Disk Space also come into the picture.
Partitions can be increased over time. But as Kafka maps messages to partitions based on a hash of the key, this ensures messages within a partition are always delivered to the consumer. If the number of partitions changes, such a guarantee may no longer hold. To avoid this situation, a common practice is to over-partition by just a bit, as too many partitions may also have a negative impact.
Kafka log compaction
Log compaction is a mechanism to give finer-grained per-record retention, rather than coarser-grained time-based retention. The idea is to selectively remove records where we have a more recent update with the same primary key. This way the log is guaranteed to have at least the last state for each key.
Kafka automatically removes messages when time or size is reached for a particular topic using a configurable retention policy. However, this behaviour can be changed to key-based retention by enabling Log Compaction. Kafka will remove any old event with a newer version of the event having the same key in the same partition. Compacted topics provide the ‘latest’ view of the entity.
“Compacted Kafka topics can be used as KV datastore for storing very large datasets. Kafka is battle-tested for production workloads serving use cases over terabytes and can be used to store the state of an entity similar to a traditional database.”
The below figure illustrates how we can migrate data from a traditional database and migrate to Event Source architecture in different phases.
Figure existing architecture: Application writing data to database
This represents an existing architecture where the application is writing data to a traditional SQL/NoSQL Database and is considered the source of truth.
Figure phase II: Historical data migration to Kafka topic
As the first phase for migration, the application starts writing data to a database with Record Version R(v) (which is strictly increasing). A migration job starts migrating data from a traditional database to Kafka Topic. It is also responsible for transforming data to the event, which is a snapshot of the event state having Event Version E(v)=R(v). AWS DMS can also be considered to migrate all the data.
Figure phase III: streaming latest changes to Kafka topic
Once all the data is loaded/migrated to Kafka Topic, migration jobs start listening to new changes happening on the database and will stream those events to the Kafka topic. Another backfill job is also started, which writes back events to a database having Event Version E(v) > R(v). This ensures that older updates don’t override the latest updates written by the application.
Figure phase IV: backfill for backward compatibility
The application starts writing events to Kafka, and all the latest updates are written to the database by the backfill job. E(v) > R(v) ensures that we only write the latest information to the database, so the state of the entity is always the latest, it doesn’t change interim to older value, and all the latest updates will eventually be applied to the record state.
Note: Not all phases might be required, as this depends on the use cases. A trade-off can be evaluated to reduce phases, depending on the acceptable percentage of data losses.
Eventual consistency is a theoretical guarantee that, provided no new updates to an entity are made, all reads of the entity will eventually return the last updated value.
Achieving eventual consistency is also not a trivial task, and we must be careful that we eventually have consistent information during migration. Record versioning can be used to ensure consistency, and only the most recent updates should be applied to drop events/records with an older version.
Consistency across multiple data stores
Eventual Consistency across different data sources can be easily ensured by consuming events from Kafka and writing them to multiple data stores with infinite retries. The addition of a new data store to support different use cases and query patterns is made simple and easy by just consuming all the events from the beginning of the queue.
During migration and ensuring backward compatibility, we also must make sure that all operations are idempotent — there is no impact if the operation is executed more than once. Again, using version-based conflict resolution and storing the full object state to history, we can restore the final state by correctly reordering the object states and saving only the delta in persistent storage. The only restriction is that the client should use some sort of concurrency, which should be manageable by sharing object updates to a single partition.
As discussed, in capacity Planning depending on producer traffic and consumer consumption rate, we can easily identify the right number of partitions. The application can be scaled to the extent that there is almost zero lag during the streaming of events.
Treating events as the primary source of truth not only provides decoupling but also supports extensibility for future use cases. This also allows organisations to end up with loosely coupled application architecture that is both resilient and scalable.
Special thanks to Lucas Pelosi and Mitul Kapoor for their reviews and comments.