A dream of scalable and enriched GraphQL subscriptions

Artjom Kurapov
Pipedrive R&D Blog
Published in
11 min readNov 23, 2022
A stylised photo of Jagala juga (Estonia), original photo by Aleksandr Abrosimov, Wikimedia Commons

In my last article, I wrote about our five-year journey with GraphQL at Pipedrive. Now, I’d like to tell you about a ten-year journey of delivering websocket events to the frontend. Hopefully, it’ll be of some help to you, too.

The “why?”

The product need for asynchronous events comes up any time the server needs to notify the user of something. This could be because

  • You’ve got an instant message
  • The uploaded image has finished resizing
  • Your colleague has changed their avatar
  • Your bulk change of 10,000 deals has reached 99% completion

As such, async events enrich UX with details and interactivity. But most importantly, they solve the problem of inconsistent data displayed or stored on the browser side.

Without these updates, users won’t see Pipedrive deal renames; another browser tab won’t receive activities deleted. Even in the same browser tab, your views may not work well with one other, lack single storage and be out of sync. Typically, websites solve this with pusher, liveblocks, firebase or a custom websocket service.

A little history

Luckily, Pipedrive “solved” this issue ten years ago. In 2012, Andris, Kapp and Tajur developed a custom socketqueue service, which to this day transports API events of 100,000+ companies to the frontend, leveraging the sockjs library.

Tajur’s talk in 2016 was one of the reasons I recreated a similar setup. I was puzzled about how to manage user connections with socket.io and scale it beyond one server — and ultimately joined Pipedrive to find out. Event streaming, and the possibilities it brings, fascinated me.

As you can see, Tajur’s talk was more about RabbitMQ — a message broker between PHP monolith and socketqueue.

Pros and cons

In-memory queues have allowed Pipedrive to withstand bursts of events triggered by external integrations or in-house things, like importing deals from an .xls file or a bulk edit.

An API event that’s pushed to the RabbitMQ and the frontend is denormalized, having different entities. This is good, because you have a consistent snapshot of multiple things at the same time and your recipients don’t need to re-query anything.

However, it’s also hugely inefficient, as all browser tabs receive all possible changes, whether the user wants them or not. For one customer, the volume of internet traffic reached 80 GB per day. Also, bear in mind that the web app on the customer end has to do all the heavy filtering when it receives those changes. Therefore, we can’t keep infinitely bloating the event schema, making it even heavier.

Cell logic as a number in a web-socket URL

The second problem is a noisy neighbor. As I’ve already mentioned, Scalability is solved with “cell logic” (in a good scenario — aka tenant isolation) which means sharding socketqueue application servers by company ID. This also means we have to have a fixed number of containers and queues to ensure predictable routing.

So, a problem arises whenever you have one company generating thousands of events that aren’t distributed across all servers. Instead, they spike the CPU of a single node to the point of a health check failure. There isn’t any room to vertically grow a single-core CPU, only to double the number of servers, which means wasted infrastructure resources.

A “noisy neighbor” CPU spike for a particular pod

The third issue is visibility and tracing. As with a REST API, without proper documentation (whether it’s swagger or type definitions), it’s very hard to understand what can be inside the event and who uses what in case you want to do a breaking change. This leads to a vicious cycle of nobody removing anything from the event and bloating it even more.

Finally, if the web-socket connection is down because the user went into a tunnel or closed their laptop, they may lose events. Could this cause important data inconsistency for the user?

Over the years, we’ve tried hard to optimize the socketqueue so that it

  • Is multi-threaded as it listens queues and handles WS connections
  • Compresses traffic
  • Checks permissions and visibility

But what if we could do even better? 🤔

The “how?”

The idea behind GraphQL subscriptions is pretty simple — you declare only what you want to receive, after specifying some filtering arguments. And it’s the server’s job to intelligently filter events.

I can’t do a better job of explaining the basic setup than Ben does here. He demonstrates a single server instance where pubsub is just in-memory event routing triggered by the mutation. But in our case, there are no mutations — real changes are generated by the PHP legacy far, far away.

The scope of the mission

So, what I wanted to achieve was:

  • Demo GraphQL subscriptions in production, having explicit schema
  • Test horizontal pod scaling
  • Better reliability by using kafka instead of rabbitMQ
  • Small, normalized domain events to keep things simple
  • Event delivery latency under two seconds (so it’s not slower than the socketqueue we’re trying to replace)

Risks

What I didn’t know:

  • How we authenticate
  • What protocol to use. SSE? WS? What’s this mercure thing, and does some of it fall back to pooling?
  • Whether or not WS/TCP connections would be bound to the same pod
  • Whether we could do multiple subscriptions at once
  • What storage or transfer medium to use? DB? Redis streams? Kafka? Redis pubsub? KTable?
  • Whether we could rewind events in time, in case the user got disconnected?
  • Whether we would need to store cursor ID per entity. Also, how live query worked?
  • How to filter events by company, user, session and entity. Was there a standard for subscription filter fields?
  • Whether we could re-use federated GraphQL schema? Could we enrich events? What should the QoS or retry logic be if the gateway was down? (Lee Byron went into this in-depth in his talk over five years ago)
  • How many items we could subscribe to?
  • How to unsubscribe or disconnect users when they log out?

Proof of the concept

Before starting the mission, I made a simple node service that connected to kafka and proxied events without any filtering. This was already promising. But I wanted to try go, which allows you to utilize all available CPUs, maximizing efficiency. PoC was a fallback that actually proved itself very useful.

Lift-off

Four of us (myself, Pavel, Kristjan and Hiro) decided to try and explore the unknown 🛸 — and bring value back to the launchpad.

Gophers

We looked into graph-gophers/graphql-go first, and within a couple of days, re-created the PoC (Proof of Concept) of subscribing to kafka events.

Here’s what happened:

  • We were faced with the dilemma of the GraphQL integer spec not matching go’s int32
  • We got the per-property filtering and per-argument filtering working
  • We hit the need of SSL for WSS to work. Otherwise, CORS blocked requests
  • We got nginx to proxy web socket requests!
  • We found that there are two transport protocols. Websocket is a loose transport, so any library can implement whatever it wants. Gophers implemented an older version that was compliant with Apollo and GraphQL but not with graphql-ws.
  • We had the issue that the Apollo federation lib did not like the Subscription in schema registration.

The basic filtering was pretty simple. We just needed to connect two streams (channels) — kafka and web sockets (that gophers need as schema resolvers) — while having a goroutine in the middle, transforming data.

We hit a major roadblock with gophers and JWT authentication. We weren’t very experienced with go, and didn’t have much time to change a major framework.

We also figured that enrichment wasn’t a priority, and that we needed to bring code into production (🚀) and test scaling. That’s the agile way.

The second big roadblock was entity visibility checks. Reading domain events from kafka meant we didn’t have any extra information, like who could see a deal. We needed to query a microservice about this. Doing it for every event of every company can easily lead to DoS. 🤦‍♂️

Gqlgen

We scratched gophers and switched to gqlgen. The channel exchange logic remained the same and we got the JWT token authentication working. The difference was that subscription events were now autogenerated based on schema.graphql.

Next, I discovered that the exchange (in-memory pub-sub in go) was somewhat flawed. We needed at least one subscription for streaming to take place (the newer version is a bit better and allows one channel per company).

We also started brainstorming how the schema should look to have it scalable across different entities — to have filters and multiple entity actions and support multiple IDs.

After one month, we had the UI working with a hacked but functional permission check. We took a break to let things cool off a bit. 🌴

(It’s only then that I saw this 🤯 presentation by Mandi Wise from Apollo, even though I’d already gone through most of the available YouTube videos by that point.)

Final architecture

So, we came back and scratched gqlgen again. It was painful, but, well…agile. We split the service in two, adding redis pub-sub with replication and sentinel in between.

  • graphql-subscription-workers remained in go, but became way simpler — it was able to scale well to consume all kafka partitions. The workers filtered out events by doing ten parallel permissions checks and transforming the event schema if needed.
  • graphql-subscriptions dealt with connections and scaled horizontally as much as needed. It used the great graphql-ws library that has all sorts of hooks, so kudos to Denis Badurina.
  • Redis pub-sub served as an exchange broker, doing most of the filtering and sharding by company, user and entity IDs.

Redis routing

Redis pub-sub acts as a router, very similar to RabbitMQ. It consumes very little memory and provides regex wildcard matching.

For example, graphql-subscriptions-worker pushes events to Redis channel with the key “<companyId>.deal.<dealId>”. This allows graphql-subscriptions to selectively subscribe to specific channels based on what the deal ID was in the subscription.

Subscription schema types

So, as you can see from the diagram, we followed Mandi’s advice and wrote event enrichment by querying the gateway. This required polling the federated schema and hard-coding resolvers for every entity.

Schema choices we had to make:

  • We didn’t merge original events with enriched ones, although we could have. This was because we didn’t have any guarantee that the graphql-service would respond in a timely manner. Kafka “raw” event data was much more reliable. There were also minor differences in schema when using referencing (raw deal.stageId vs. enriched deal.stage.id).
  • We added “delta” as JSON, mostly because frontend really wanted to use that in case frontend storage wanted to only update specific keys. However, this required a specific shape of kafka event.
  • We had a generic event type (dealEvent) alongside action-specific events (dealAdded). Generic events take into account the correct order of events for the same entity (added > changed > deleted), which isn’t guaranteed chronologically if you subscribe to separate types.

Live queries

This is a cool concept that elegantly fixes a problem I was really fixated on — connection loss.

From Laurin’s article by The Guild, we had the same fetchOrSubscribe function which first performed an enrichment request and then subscribed to kafka events using async iterator magic. The difference, however, was in the schema ​​– we used “liveQuery” as an argument, not as a root property.

This alone met 99% of the consistency requirements for web apps, without the need to rewind events (perhaps by using redis streams). So, the “cursor-based query revalidation and diffing” that Ben Newman suggested at the last GraphQL Summit seems like a very rare use case where you want to rewind events as a niche product feature (perhaps for gaming).

Performance and security testing

Finally, by the end of the mission, we tested how well our solution scaled in production, gradually rolling it out to real customers.

Performance testing in production

This revealed some interesting things. For example, if you subscribe to deals by IDs and do this in the pipeline view where users can scroll very quickly, you can have waves of subscriptions. So, it’s better to avoid blocking permission checks on the subscribe feature and have some throttling logic on the frontend instead.

We also observed random pods suddenly losing all their connections. This has led us to revise memory limits and the max_old_space_size bug.

We’ve implemented various security limitations on the amount of connections, subscriptions and timeouts, taking into account logouts, permission changes etc. This isn’t only because of external risks, but also because we found that Redis requires a lot of processing power (CPU) to match publishers to subscribers.

Future steps

Theoretically, we could shard Redis instances by entity types if we face performance problems.

We could force socketqueue deprecation by temporarily adding API events as a universal subscription proxying all data to the frontend without any filtering. This would sacrifice efficiency for the sake of a single transport layer.

This service has been operational for over a year now and, so far, we have around ten entities supported, tied to different kafka topics.

In defense of GraphQL

Adoption of GraphQL and subscriptions in particular is seen in Pipedrive as slow and complex. Here’s why:

  1. We use GraphQL only in new microfrontends. Replacing views that depend on backbone.js is risky and requires refactoring.
  2. It’s voluntary. We don’t have a dedicated team that owns supergraph and forces every service to expose GraphQL schema.

On the backend, codegen intended to ease adoption and wrap REST API can be seen as complex. Adopting GraphQL natively in a microservice alongside a REST API can be seen as redundant.

The Apollo federation gateway can be seen as another layer of complexity, even though it plans and executes multiple requests.

A return on investment is not seen if value isn’t measured.

GraphQL has taken on the huge responsibility of making the web more transparent, predictable, reusable and efficient. I urge developers to have professional patience — educate your colleagues, compare it to RPC (gRPC, tRPC, xmlRPC, JSON-RPC) and REST (json:api, odata), measure hard metrics and prove GraphQL’s superiority.

My dream is that there will come a day when developers will replace schema-less REST APIs and WS events with GraphQL and subscriptions for everybody. The clients are the future. This is the way.

--

--

Artjom Kurapov
Pipedrive R&D Blog

Software Engineer. I like bees, cats and complex spaghetti-systems