How We Maintain Tenant Data Isolation with Kafka at Scale

See how in kafka you can safely consume batches of data isolated per tenant

Nadav Hoze
Gong Tech Blog
10 min readDec 2, 2020

--

A bit about us

Gong is a SaaS company that offers a unique revenue intelligence platform for remote sales teams. Gong live and breathe data — our AI data-driven solution collects and analyzes many data sources of a potential sales deal, including calls, emails, and CRM data.

By analyzing that data, we can offer a rich feature set in the realms of sales coaching, deal intelligence, and market intelligence.

Some background

At Gong, we love batches! We’re a truly data-driven company, so we work extensively with ML processing, execute operations on data stores, and integrate with external APIs. We’ve found that the best way to work with all of these, in terms of performance, is using batches.

In this post, I share how we developed a robust, scalable solution to use batching capabilities wherever we can without mixing in and compromising our clients’ data.

The concern: Streaming tenant data must be isolated at all times.

As you can imagine, our most valuable asset is our clients’ data, and as a SaaS company, it’s critical for us to avoid processing mixed-tenant data batches through our Kafka pipelines.

It’s not about isolating the data stored in Kafka.

The data stored in Kafka topics is, indeed, mixed from different tenants; however, it is encrypted, and access to it is secured and must be authorized. The data also has a retention period that adheres to strict compliance regulations.

Even if we wanted to, developing a solution where each tenant has its own dedicated topic is too costly in terms of scale, coding, deployment, administration, and cost-efficiency.

So what exactly are we solving?

Developers are humans, which means that they will, at some point, make mistakes.

When it comes to a Kafka consumer that operates on a mixed batch of data (i.e., from multiple tenants), the risk is that bugs could mix different data and end up exposing one company’s data to another company.

In this post, I outline the solution we came up with to eliminate this concern. The solution allows developers to implement consumers that operate only on batches that belong to one tenant at a time while maintaining the following crucial points:

  • Balanced batches (in terms of size)
  • Fair distribution of batches among consumers
  • Scale-out (horizontally) simply by adding more consumer instances

The problem: Kafka doesn’t have a built-in tenant batch-isolation solution.

For data-driven companies like ours, Kafka was the easy choice for its ability to handle and process streams of data efficiently (especially at scale), but Kafka doesn’t have a built-in solution when it comes to consuming batches; it doesn’t allow Kafka consumers to receive a batch only with the data of one tenant.

Before we take a look at why that is and tackle how to solve it, let’s go over some Kafka basics (skip this part if you’re familiar with how Kafka works):

High-level Kafka architecture

As we can see, Apache Kafka serves as a pub/sub mechanism to accept records of data by producers and let consumers fetch this data and perform their logic.

Kafka broker

A Kafka broker receives messages from producers and stores them on the disk, keyed by a unique offset; it allows consumers to fetch messages by topic, partition, and offset (more below).

Topic

A topic is a category/feed name to which records are stored and published.

Topic records are stored in partitions, with each partition holding the records in the order in which they came in.

Topic partitions, as we’ll see later, are the scale unit (i.e., the more partitions we have, the more consumers we can spin up). Topic partitions are distributed between the brokers.

Producer

A producer is an application that uses the Kafka Producer API to publish records into the Kafka cluster.

Here’s an example of how the producer writes data to Kafka:

Consumer

A consumer is an application that uses the Kafka Consumer API to consume records from a Kafka cluster.

A consumer belongs to a consumer group (only one!), which is the protocol that Kafka uses to decide on the partition assignment to a consumer’s lifecycle state.

Here’s an illustration of consumer groups and their consumers:

As we can see, the scale unit of this topic is three.

That’s why we have one consumer idle in Consumer Group A (4 consumers) and why all the consumers in Consumer Group B (2 consumers) have partitions.

Back to the problem

These basics can help us understand why Kafka doesn’t have a built-in solution to let consumers consume a batch of data that only belongs to a single tenant.

Based on the diagram above, we can see that Kafka consumers receive data from multiple partitions they were assigned to, with each partition holding events from different tenants. When a consumer polls a batch of data, it will inevitably involve a mix of multiple tenants.

At Gong, we use Spring Kafka to produce and consume with Kafka.

Spring Kafka offers two modes of consumption:

1. Single record consumption (tenant isolated by default)

Different colors represent different company events.

We can see that in this case, the tenant is isolated by default because the consumer receives a single event each time.

2. Batch record consumption (tenant not isolated)

Different colors represent different company events.

We can see that in this case tenants are not isolated, and the consumer will receive a mixed-tenant batch of data.

So why not go with the single record consumer?

We could… but this is less of a fit for common-batch use cases such as: calling an external API, executing ML algorithms, or updating/querying data stores.
Applying one record at a time can have quite an impact on performance. Without batching, there’s an overhead of time and network traffic for every single record, and Kafka has to work much harder as it performs many network round-trips to fetch data. So that’s a no-go.

What about doing the isolation on the consumer side?

We can create a consumer behind our real consumer, one that polls batches of data, splits each batch into smaller batches by tenant ID, and then forwards these batches to our real consumer one by one.

But there are some significant flaws here, too:

While we can spread the batches in parallel, concurrency will still be bound to the threads of the same machine. And because the data is mixed and we don’t control what comes into a partition, we will have unbalanced batches of data (size-wise), and we’re back to not maximizing the advantage of passing batches. No-go again.

How about still isolating on the consumer side, but routing records by tenant ID?

Routing data using the tenant (company) as the key, set events of the same company in the same partition. That indeed gives us larger batches, but there’s a major flaw in this approach, too: We lose the fair distribution.

Consider the following scenario:

1. A small company’s events (shown in green below) with a low rate are sent to partition 1.

2. A large company’s events (shown in pink below) with a higher rate are sent to the same partition 1.

Because events in a partition are entered in the order in which they were sent, there will be unfair distribution:

As we can see, this distribution gives precedence to the large company’s events. No-go!

So we need something else: a scalable solution that offers both balanced batches and fair distribution. In the next section, we’ll see exactly how to do that.

The Gong solution

First, let’s recall our concerns:

  • As a SaaS solution with demanding SLAs, we can’t give precedence to one company over another (i.e., distribution must be fair).
  • We still need to maximize the advantage of batches, even when we isolate tenant data (to have a performant solution that better utilizes calls of external APIs and data store queries/updates).
  • Since we’re pretty successful (knock on wood), we keep scaling and scaling (i.e., the solution must be scalable to meet the ongoing demand).

This diagram shows how we solve it:

Different colors represent different companies.

As you can see, in the end topic, events are grouped in balanced batches which are distributed evenly across all the partitions.

Here’s how we achieved this:

  1. Events are sent in a round-robin manner to topic A, our source topic.
  2. We consume these events and repartition them by sending them to a new intermediate topic with the company ID key
  3. Now that the same company events are stored in the same partition, we simply collect them by their company ID and group them into a single batch event that will hold them.
  4. These batch events are then sent in a round-robin manner to the final destination topic to be consumed by our Service.

As a result, Service C consumes tenant-isolated batch data that is now evenly balanced!

Wait… why do we need a dedicated repartition topic?

Why not let the producer send events to the source topic with the tenant ID as the key? The answer is simple — the need for the tenant ID as a key is a requirement for our problem; other consumers that use this source topic, such as a single consumer, don’t face this problem since, as we saw, they’re isolated by design.

Consuming single events from a topic partitioned by tenant ID will break down the fairness and scalability as explained earlier.

As a rule of thumb, it’s always good practice to decouple producers from consumers’ concerns. Forcing producers to fulfill consumer needs will cause code stickiness and always break down other consumers.

OK, but what makes this solution scalable?

As you probably recall, now our batched events are evenly balanced and distributed fairly among all partitions, meaning that we met the requirements to scale over Kafka. Now the more consumers’ threads/instances you spin up, the more tenant batches will be consumed concurrently (the limit is the number of partitions), all without giving any precedence to one company over another.

The implementation

We could have implemented this with plain native Kafka consumers and producers, but then our code would be more complex.

  • We need to implement our own solution for grouping data by size or time.
  • We need dedicated persistent storage to recover in cases of faults and disaster.
  • We need to create two dedicated consumers:
  1. One for routing from source topic events to the new repartition topic with tenant ID as the key.
  2. One to consume from the repartition topic, use the grouping mechanism above, and send it to the destination topic.2.

So how can we code this fast and efficiently?

Kafka Streams to the rescue!

Kafka Streams is a library above Kafka native that abstracts consumer and producer, letting you receive incoming events from source topics as a stream and write a DSL to create rich pipelines that transform, aggregate, join, and forward these events to other topics.

Stream pipelines are local to the JVM they were created in, not between applications.

Here’s a more detailed diagram that outlines the solution via Kafka Streams:

Now for some code…

We were able to achieve this with Kafka StreamsBuilder, using five simple operations. The fourth operation (transform) uses our unique transformer.

Before we break down the stream builder code, some clarifications:

  • The first two lines are for creating the stream builder.
  • The GROUPED_EVENT_STORE is a KTable where we keep events with the same company ID to perform the aggregation by company ID.
  • The JsonSerDe stands for the JSON Serializer and Deserializer class of our event. It is needed for consuming and sending operations in a stream.
  • The GroupedGongEventSerde is also a JSON Serializer and Deserializer class but for the grouped Going events.

Stream builder code breakdown:

  1. Stream will consume events from the source topic.
  2. Key selector will take each event and choose companyId as the key.
  3. Each event with the new key will be sent to a companyIdRepartitioning topic.
  4. The GongBulkEventsBuffer.bufferBy(maxDuration,maxBatchSize) returns transformer that consumes the events from the companyIdRepartitioning topic and aggregates them by the companyId till size reaches the maximumBatchSize or time has reached maxDuration.

    This transformer is our own implementation cause Kafka streams doesn’t have an out-of-the-box solution for grouping by batch size or time.
    Kafka Streams has several different types of windowing operations, none of which are relevant.

    Also transformer aggregate is a stateful operation, for this, we use a dedicated Kafka streams state store (TopologyNaming.GROUPED_EVENT_STORE)
  5. Once grouped is sealed (as mentioned above), it is sent in a round-robin manner to the output topic.

Ta-da! And that’s how we achieved tenant batch isolation at scale at Gong, ensuring that our clients’ data stays safe and that our developers don’t have to worry about accidentally introducing bugs that expose data to other clients.

Thanks for reading! Have questions or thoughts? Drop them in the comments.

--

--