Expedia Group Technology — Engineering
Explore near Real-time Streaming Data Using a Web UI
How we created a tool to query near real-time streaming data using Kafka, Postgres and WebSockets
The Clickstream team at Expedia Group™️ have developed a tool that helps users query streaming data and get a live feed of the events over a web browser
By: Ryan Lacerna, Vishnu Shankar, Istvan Megyeri, Dimple Verma, Zhiying Gao, Chris Savio, Connor Culleton, Lukasz Krawiec, Amit Kumar
Introduction
Data is critical. At Expedia Group, data enables us to understand our customers, optimise our operations, and make strategic decisions. Expedia Group is one of the world’s leading online travel platforms, and it generates vast amounts of data through various sources such as customer bookings, search queries, and website interactions based on Clickstream.
“Clickstream is the lifeblood of Expedia Group’s intelligence” — Connor Culleton, Director of Data Engineering
Clickstream data allows us at Expedia Group to perform personalised, tailored and satisfying experiences for our users, improve operational efficiency, acquire business intelligence insights and experiment with new innovative product features.
To help ensure data quality, one of the challenges is to see the data as soon as it is injected into the pipeline. Traditional methods such as querying data lakes and data warehouses take time to process. However, by implementing an event-driven tool such as the combination of Kafka, Postgres and WebSockets that can handle high velocities and volumes of data, data can be ingested, processed and provided to the user’s browser in near real-time. This enables the user to query and view streaming data quickly and efficiently, providing quick feedback to help data producers act, and for data consumers to understand what data are being captured for their use cases.
Is streaming data over WebSockets anything new?
WebSockets are pretty cool because they allow for real-time communication between a web browser and a server. This means that instead of having to constantly refresh a webpage to get updated information, you can receive updates as soon as they happen. This is particularly useful for things like online gaming, chat applications and financial trading platforms, where speed and accuracy are key. Plus, because WebSockets are based on a single, long-lived connection, they reduce the amount of network overhead and improve performance compared to traditional HTTP requests. So, whether you’re a developer building a cutting-edge app or just a casual user who wants a smoother browsing experience, WebSockets can come in handy.
There are tons of great solutions that solve this problem, such as:
- Routing Kafka messages to browser over web socket
- Browser integration with WebSockets, Kafka and KSQLDB
- Angular + Spring Boot + Kafka: How to stream realtime data the reactive way.
- Kafka + WebSockets + Angular: event-driven microservices all the way to the frontend
and many, many more …
Although the previously mentioned solutions can stream Kafka messages to client browsers via WebSockets, they are mainly proofs-of-concept and are not production-ready. In our case, we need to be able to process data at scale — approximately 3 million records per minute. Therefore, we had to rethink the solution we were trying to achieve.
The challenge
We initially developed a simple proof of concept:
- On the UI view, a user wants to see a collection of events that has a specific event name, so the user enters the search criteria of an event name called “user.clicked_event” and clicks the Connect button.
- Using SockJS and STOMP, the browser establishes a full-duplex WebSocket communication to the WebSocket handler application.
- The WebSocket handler is responsible for consuming the messages from Kafka and maintaining the WebSocket routing between sessions (users).
- As soon as the WebSocket communication is established, the app subscribes to the selected Kafka topic and starts filtering messages that the user specified in their search criteria.
- The WebSocket handler consumes from Kafka and finds events that match the filter specified.
- Events matching the filter criteria are then automatically sent back to the UI via WebSocket.
The above proof of concept performed relatively well against topics with low volume and rate of traffic, but as soon as we subscribed to a heavily loaded topic, that’s when we started seeing some problems.
The single WebSocket handler instance is unable to keep up with the volume and rate of messages being produced by the large topic. We observed a latency of several hours between the message creation time until the message is processed by the app. This resulted in no events being shown in the UI. This is due to the sheer rate and volume of the messages, as well as consuming 400+ topic partitions from a single instance.
Basically, it’s like drinking a swimming pool using a straw.
How did we solve it?
One of the great things about Kafka is that it can divide and balance the workload of consuming from multiple partitions among multiple consumer nodes using consumer groups. This means that as the data and workload increase, an app can scale alongside it just by adding new nodes to the same consumer group, automatically balancing the workload across each worker.
With the WebSocket Handler, this was a limitation. This is because each WebSocket session can only establish a connection to a single server at a time, so even if we were to scale the handler by adding multiple nodes, the user will only see the messages that have been processed by a single server connection.
So, we separated the responsibilities …
Let the WebSocket handler deal with the WebSockets user sessions.
To consume the heavily loaded topics in a scalable way, we’ve developed a simple Filter worker application deployed in Kubernetes (K8s) that has the responsibility of consuming from multiple heavily loaded Kafka topics using the Kafka-client library and selecting/filtering events.
This improved performance and scalability, allowing the Filter worker app to scale along with an increase in data volume and workload by adding new nodes to the consumer group. The WebSocket handler can focus on its primary responsibility of handling WebSocket routing between sessions (users) and communicating with the browser via WebSocket, while the Filter worker does the heavy lifting of consuming and filtering our gigantic streams.
The output (number of messages) of each worker is also limited to, e.g., 100 events per node, which is configurable. So, if you have 10 nodes → 10 nodes * 100 output events = 1,000 events per user filter. This way, we can apply back-pressure to the downstream WebSocket handler.
But wait, how would each worker know what to filter for?
In our case, we want all the consumers within our consumer group to have the same context so that they can all work together to filter for the same event across all partitions.
This means we’ll need a shared cache that each worker can use to have a shared context.
We want to make the following actions to the cache:
- Add a filter to the cache when the user connects to the tool.
- Delete a filter when the user disconnects from the session.
- Delete a filter after a given time (TTL) in case of lingering user sessions.
- Delete a filter when the specified number of events have been received by the WebSocket handler app (back-pressure).
- Notify workers when a filter has been added to the cache (start filtering).
- Notify workers when a filter has been removed from the cache (stop filtering).
We took advantage of the LISTEN/NOTIFY feature available in Postgres so that we only take actions to the cache when a new node has been created, or when a filter entry is added/deleted based on notification, so it’s event-driven. This way, we reduce the network overhead and CPU operations polling the cache for updates.
CREATE OR REPLACE FUNCTION notify_filter_changes()
RETURNS trigger AS $$
BEGIN IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE'
THEN PERFORM pg_notify('filter_updated', json_build_object('filter', NEW, 'operation', TG_OP)::text);
ELSE PERFORM pg_notify('filter_updated', json_build_object('filter', OLD, 'operation', TG_OP)::text);
END IF;
RETURN NEW;
END; $$
LANGUAGE plpgsql;
When a new entry is added, Postgres pushes notifications out to the client apps listening for notification events. Once our workers have received a notification of an update, that update is cached locally (in-memory). Each filter operation then only needs to access the local cache during processing, and the local cache is updated for every cache update notification.
CREATE TRIGGER filter_updated
AFTER INSERT OR
UPDATE OR
DELETE ON filters
FOR EACH ROW EXECUTE PROCEDURE notify_filter_changes();
We’ve consumed from Kafka and filtered the events — what’s next?
So now that we’ve read the large topics and filtered the events, where do the messages make their way back to the user?
The filtered events that the users have queried for are published to a smaller filtered Kafka topic. This topic houses all events that every user has filtered for, which gives us a significantly lower volume and rate of messages since it will only contain selected events, and back-pressure has been applied.
Our WebSocket Handler is subscribed to this new filtered topic and then consumes the messages based on the latest offset (latest messages). The filtered topic is partitioned by filter id which is unique, based on each WebSocket user session.
As the WebSocket handler consumes from this topic, the filter id is used by our Handler app to determine which message should be routed back to a user.
As a result, as the number of tool users grows because we are consuming from the filtered topic partitioned based on filter id, we can now scale the number of nodes of the WebSocket Handler to cater for the volume of user requests. Each new node will be consuming the latest message from the topic independently, so for every user, no matter which node the browser establishes a connection with, the correct messages for which the user has queried will still be returned.
Summary
In this story, we’ve shared how we have resolved our challenge of streaming high-volume data over WebSockets and developed a tool that helped enable the users of Clickstream data within Expedia Group to view their events near real-time.
The resulting tool is able to handle an additional ~1.1 million more records per minute compared to similar tools we’ve previously developed at an ~50% reduction in monthly running cost. And what’s cooler is that users of the tool are able to query and see their events much quicker in the UI, therefore enhancing the user experience.
To sum it all up…
- We’ve separated the responsibility of consuming & filtering from handling WebSockets.
- We’ve leveraged Postgres notification feature to efficiently notify our workers of new filters.
- Using the new filtered topic, we’re able to scale and load balance our WebSocket handlers.
Like any software development, we’re always striving to learn and improve our solution as new use-cases come along. We hope you’ve enjoyed the experience and our learning with you.
Thanks for reading, and happy coding!