Stay Connected: How to Implement Apollo GraphQL Subscriptions with WebSockets

Dilara Bayar
Insider Engineering
7 min readJul 22, 2024

Introduction

GraphQL subscriptions are a useful feature that allows to build applications where the server send updates to clients in real time. Different than usual request-response cycle where client requests data and server responds, subscriptions provide steady connection between server and client. This connection most of the time maintained by using WebSockets.

In this article, we will demonstrate how to build a reliable server using Apollo GraphQL, where users will be authenticated before connecting to WebSocket. We will store user IDs with company ID as a key in Redis to efficiently track registered users to specific company. This way, all users will be able to be aware of each other in same company.

On the client side, we will implement a React application that listens for subscription events, enabling users to view information about other registered users in real-time.

Let’s get started!

Setting Up WebSocket The Server

  1. Install required packages.
npm install graphql-ws ws @graphql-tools/schema

2. Add the necessary imports to your class, and set up Express and Http, which will be used later to build the server.

const GQLSchema = require("../config/ql-schema");
const { GraphQLObjectType, GraphQLSchema } = require("graphql");
const { useServer } = require('graphql-ws/lib/use/ws');
const { ApolloServerPluginDrainHttpServer } = require('@apollo/server/plugin/drainHttpServer');
const { ApolloServer } = require("@apollo/server");
const { WebSocketServer } = require('ws');
const express = require("express");
const http = require("http");

3. Create an Express app and an Http server to host it, then define the GraphQLSchema, and set up a WebSocket server for handling subscriptions.

const graphqlUri =  "/graphql";
const app = express();
const httpServer = http.createServer(app);

const schema = new GraphQLSchema({
query: new GraphQLObjectType({
name: "RootQuery",
fields: () => queries
}),
mutation: new GraphQLObjectType({
name: "RootMutation",
fields: () => mutations
}),
subscription: new GraphQLObjectType({
name: "RootSubscription",
fields: () => subscriptions
})
});

const wsServer = new WebSocketServer({
server: httpServer,
path: graphqlUri,
});

Note: If you haven’t defined your own queries, mutations, and subscriptions, you can create a basic GraphQLSchema using the makeExecutableSchema function.

const { makeExecutableSchema } = require('@graphql-tools/schema');

const schema = makeExecutableSchema({ typeDefs, resolvers });

4. To manage our WebSocket server efficiently, we’ll create a serverCleanup function, and integrate our existing WebSocket server within it to handle various events. The serverCleanup function will encompass four key parts: OnConnect, OnDisconnect, OnError, and OnClose.

OnConnect: During this event, we will verify the userID, and authenticate the user by calling the authorization method. (Optional) we can store the userIDs in Redis by calling storeUserIdInRedisto keep a record of registered users. In this example, we will be using companyID as key and storing userIDs as an array.

OnDisconnect: This will handle the logic required when a user disconnects from the server. (Optional) So, we need to remove userID, which is disconnected from WebSocket by calling deleteUserIdFromRedis.

OnError: This part will capture and handle any errors that occur during the WebSocket connection.

OnClose: This part will manage the cleanup process when the WebSocket connection is closed.

By structuring our WebSocket server management in this way, we will be able to handle of connections, disconnections, errors, and cleanups, while also maintaining a track of active users through Redis.

Note: raiseUserSubscribedEvent and raiseUserUnSubscribedEvent functions will be explained on Adding Subscriptions part.

const serverCleanup = useServer(
{
schema,
onConnect: async (ctx) => {
if (ctx && ctx.connectionParams.authorization && ctx.connectionParams.userID && ctx.connectionParams.companyID) {
/* Authenticate user with authorization key */

/* Call SETEX method inside to store userID with key */
storeUserIdOnRedis({ userID: ctx.connectionParams.userID, companyID: ctx.connectionParams.companyID});

/* Publish an event during connection so that other subscribers can detect your presence */
raiseUserSubscribedEvent(ctx.connectionParams.userID, ctx.connectionParams.companyID);
}
throw new Error("Authorization failed on subscription connection.");
},
onDisconnect: async (ctx, code, reason) => {
if (!(ctx.connectionParams && ctx.connectionParams.userID && ctx.connectionParams.companyID)) {
return;
}
try {
/* Call GET and SETEX method inside to delete userID with key */
deleteUserIdFromRedis(ctx.connectionParams.userID, ctx.connectionParams.companyID);

/* Publish an event during disconnecting so that other subscribers can understand you are leaving */
raiseUserUnSubscribedEvent({ userID: ctx.connectionParams.userID, companyID: ctx.connectionParams.companyID});

} catch (err) {
throw new Error(err.message);
}
},
onError: async (ctx, errors) => {
console.log('Error on close:', errors);
},
onClose: async (ctx, code, reason) => {
console.log('On close:', reason);
}
},
wsServer
);

5. Adjust your ApolloServer constructor to include plugins that will gracefully shut down both Http server and the WebSocket server. Start apolloServer, and our setup for WebSocket is ready.

const apolloServer = new ApolloServer({
schema,
plugins: [
ApolloServerPluginDrainHttpServer({ httpServer }),
{
async serverWillStart() {
return {
async drainServer() {
await serverCleanup.dispose();
}
}
},
},
],
stopOnTerminationSignals: true,
introspection: false
});

await apolloServer.start();

app.use(
graphqlUri,
expressMiddleware(apolloServer, {
context: async ({ req }) => ({
req,
}),
})
);

Adding Subscriptions Into The Server

  1. Install graphql-subscriptions and graphql-redis-subscriptions
npm install graphql-subscriptions graphql-redis-subscriptions

2. Create a RedisPubSub instance using your host, port, and password credentials.

const { RedisPubSub } = require("graphql-redis-subscriptions");
const Redis = require("ioredis");

const SubscriptionPubSub = "PubSub";

const options = {
host: REDIS_HOST,
port: Number(REDIS_PORT),
password: REDIS_PASSWORD,
retryStrategy: (times) => {
return Math.min(times * 50, 2000);
}
};

const pubsub = new RedisPubSub({
publisher: new Redis(options),
subscriber: new Redis(options)
});

3. To broadcast a user’s arrival in the raiseUserSubscribedEvent, you need to publish an event using the PubSub instance that we have created before. In this example, userSubscribed is name of our subscription function. The same approach can be applied to create the raiseUserUnSubscribedEvent for broadcasting when a user leaves.

raiseUserSubscribedEvent(data) {
this.pubSub.publish('USER_SUBSCRIBED', { userSubscribed: { ...data } });
}

raiseUserUnSubscribedEvent(data) {
this.pubSub.publish('USER_UNSUBSCRIBED', { userUnSubscribed: { ...data } });
}

4. To be able to listen events that are coming, you need to create a Subscription. This involves creating an AsyncIterator by calling the asyncIterator method of PubSub and passing an array of event names that the AsyncIterator should listen for. All Subscriptions must return an AsyncIterator object. If you need to filter subscriptions based on specific criteria from the user, you can use the withFilter function to compare the incoming data with your expectations. In the args, define the values you expect from the client during subscription, and in the type, specify the subscription data that will be returned to the client.

userSubscribed: {
type: User,
args: {
userID: {
description: "User id",
type: new GraphQLNonNull(GraphQLString)
},
companyID: {
description: "Company id",
type: new GraphQLNonNull(GraphQLString)
}
},
subscribe: withFilter(
() => {
return pubsub.asyncIterator(['USER_SUBSCRIBED']);
},
(payload, variables) => {
return payload.userSubscribed.companyID === variables.companyID;
}
)
},
userUnSubscribed: {
type: User,
args: {
companyID: {
description: "Company id",
type: new GraphQLNonNull(GraphQLString)
},
userID: {
description: "User id",
type: new GraphQLNonNull(GraphQLString)
}
},
subscribe: withFilter(
() => {
return pubsub.asyncIterator(['USER_UNSUBSCRIBED']);
},
(payload, variables) => {
return payload.userUnSubscribed.companyID === variables.companyID;
}
),
}

Setting Up The Client

  1. Install graphql-ws, @apollo/client and apollo-link-rest.
npm install graphql-ws @apollo/client

2. To create a WebSocket link between server and client using GraphQLWsLink, you need to define:

Server URL: Use your local server endpoint as the URL.

Set Connection Parameters: Provide the connectionParams object with the values expected by the server. In our example, include userID and companyID to be able to connect.

(Optional) Retry Logic: Add waitForServerHealthyBeforeRetry to attempt connecting to the WebSocket server up to 5 times before failing. You can also define connectionAckWaitTimeout to wait before retrying connection.

import { ApolloClient, createHttpLink, from, split } from "@apollo/client";
import { getMainDefinition } from '@apollo/client/utilities';
import { GraphQLWsLink } from "@apollo/client/link/subscriptions";
import { createClient } from "graphql-ws";

const serverSocketLink = new GraphQLWsLink(
createClient({
url: `ws://localhost:3000/graphql`,
retryAttempts: 5,
connectionAckWaitTimeout: 2000,
retryWait: async function waitForServerHealthyBeforeRetry(retryAttemps) {
if (retryAttemps >= 4) {
console.debug(`socket:DISCONNECTED ${new Date()}`);
}
await new Promise((resolve) => setTimeout(resolve, 5000));
},
shouldRetry: (errOrCloseEvent) => {
console.log("errOrCloseEvent", errOrCloseEvent);
return true;
},
lazy: true,
keepAlive: 1000,
connectionParams: async () => ({
authorization: `Bearer token`,
userID: getUserId(),
companyID: getCompanyId(),
}),
on: {
error: (errors) => {
console.error(`ERROR: ${JSON.stringify(errors)}`);
},
connected: () => {
console.log(`socket:CONNECTED ${new Date()}`);
},
closed: (reason) => {
console.log(`socket:CLOSED ${new Date()}`);
},
connecting: () => {
console.log(`socket:CONNECTING ${new Date()}`);
},
ping: async () => {},
pong: async () => {}
}
})
);

3. If you plan to use your service for both GraphQL requests and WebSocket subscriptions, you need to split the Apollo Server link between Http and WebSocket links. So, here httplink will be used for to handle standart GraphQL requests and serverSocketLink will be used for GraphQL subscriptions.

const serverSplitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === 'OperationDefinition' &&
definition.operation === 'subscription'
);
},
serverSocketLink,
httplink,
);

4. Now, we can create an ApolloClient instance by combining the authentication link and the server split link. This client will be used for all our GraphQL requests.

const serverLink = from([authLink, serverSplitLink]);

const client = new ApolloClient({
link: serverLink,
cache,
connectToDevTools: false,
queryDeduplication: true,
cors: true
});

5. To define GraphQL subscription queries, you can use the gql tag from the graphql-tag package. Below are for the subscriptions we have created on previous steps: one for subscribing to user events userSubscribed and another for unsubscribing from user events userUnSubscribed

import gql from "graphql-tag";
import queryString from "query-string";

export const userSubscriptionQuery = gql`
subscription ($companyID: String!, $userID: String!) {
userSubscribed(companyID: $companyID, userID: $userID) {
companyID
userID
}
}
`;
export const userUnSubscriptionQuery = gql`
subscription ($companyID: String!, $userID: String!) {
userUnSubscribed(companyID: $companyID, userID: $userID) {
companyID
userID
}
}
`;

6. To listen for user subscription and unsubscription events we need to use the useSubscription hook, set up the subscriptions and use the useEffect hook to ensure they are active and listening for changes continuously.

import { userSubscriptionQuery, userUnSubscriptionQuery } from ".";
import { useSubscription, useQuery } from "@apollo/client";
import React, { useEffect} from "react";

const { data: subscriptionData } = useSubscription(userSubscriptionQuery, {
client: client,
fetchPolicy: "no-cache",
variables: { companyID: companyId, userID: currentUserID }
});

const { data: unSubscriptionData } = useSubscription(userUnSubscriptionQuery, {
client: client,
fetchPolicy: "no-cache",
variables: { companyID: companyId, userID: currentUserID }
});

useEffect(() => {
if (subscriptionData && subscriptionData.userSubscribed.userID !== currentUserID) {
/* ... */
}
}, [subscriptionData]);

useEffect(() => {
if (unSubscriptionData) {
/* ... */
}
}, [unSubscriptionData]);

That’s it for all! Thank you so much for reading this far. If you are interested in topics like this, don’t forget to follow Insider Engineering and I also recommend check this blog as well!

--

--