GraphQL Subscriptions with Sangria and Monix Quickstart

Eric Rodriguez
6 min readFeb 20, 2017

--

Introduction

Sangria is a Scala GraphQL implementation which supports many standard GraphQL features plus many other specific to Sangria. It supports so many features I think of it more as a framework than an implementation, for instance, you get: query parsers and renderers, projections, additional types for complex queries, macro-based type derivation, deferred value resolution, schema comparators, prepared queries and most important for this post: Stream-based Subscriptions!

Sangria provides a sangria-subscriptions-example project based on akka-streams, if you are versed in akka then by all means take a look at that. If not then I hope this post helps you. As I already stated on my last post, while I think Akka is the most powerful and feature-full framework out there, I also find it too complex and configuration prone for some usecases. Last time I used Vertx as an alternative for actors in reactive programming and used Enterprise Integration Patterns with Apache Camel as “glue” between different frameworks. Now I’ll use Monix as an alternative to akka-streams. Just like before learning to run one needs to learn to crawl, before building a subscriptions we need to learn about queries and mutations. Don’t worry though, I like to keep things as brief as possible.

Warning

I’ll be using Argonaut for working with Json. Its current version is 6.1, Sangria is built based on the non-stable (as of this writing) 6.2 so there it doesn’t work out of the box with 6.1. I’ll show a quick and dirty hack to fix this until version6.2 is stable… Or you can just use version 6.2 and adjust the examples here.

Dependencies

Sbt:

Maven:

Queries

We start defining a simple case-class with its respective json encoder/decoder. I’m saving this in Data.scala.

Now we define the GraphQL types in SchemaDefinition.scala. The simplest thing we can do here is to use case-classes with ObjectType Derivation, we will use derivation later on but for now let’s do it the old/fashioned way:

CategoryTypeis self-explanatory, except perhaps for the fields[Ctx,Val] argument. Ctxstands for context, since we are defining ur type we don’t actually need one, hence we declare it as Unit. Valis the value we are representing: a Category.

After defining Category in terms that GraphQL can understand (aka CategoryType), we define our QueryType which supports two types of queries:

  1. categories — which takes no aruments and returns a list of categories
  2. category(id: Int) — which takes a single argument “Id” and returns the Category corresponding to the given id.

Worth of note is that the “arguments” argument for our “category” field is (essentially) a List: Id :: Nil and that the “resolve” argument expects a function of type Context[Ctx, Val] => Action[Ctx, Res]. Context is just that, the context we need to resolve the query, it is sometimes called the “repository” which is why I’ve named it CatalogRepo.scala.

Your service can be a DB service, a REST service, whatever you need. We’ll need the CatalogRepo singleton object later but leave it empty for now and go on to QueryProcessor.scala

The execution context might just as well be scala.concurrent.ExecutionContext.Implicits.globalbut since we will use monix for subscriptions we might as well use its scheduler instead. We then define a process function that takes the query as a String, I’ve named it json since it is for all intent and purposes… JSON. Next we define some lenses to access the main fields (query, operationName and variables), we parse the json, apply the lenses and if successful call the executeGraphQLQueryfunction. In case you’re wondering, the query looks like this:

{
category(id: 24) {
id
code
atgCode
}
}

Query Parsing and Execution

This part deserves special mention, it is accomplished by QueryParser.parse which supports different serialization mechanisms including, of course, argonaut. As stated before, however, there is little hack in order to use Sangria with Argonaut’s stable version (currently 6.1). The issue occurs with Sangria’s ArgonautInputUnmarshaller which expects an argonaut.Json object but gets confused with argonaut.Json being a trait in 6.1, while its an abstract class in 6.2. We solve this by creating our own InputUnmarshaller which takes a JsonObject or rather by copying ArgonautInputUnmarshaller and providing a conversion from Json to JsonObject and put it somewher visible:

Finally, we just call the executor, when successful we pass the result of the query to an output function which uses an Httpresponse to return the result to the client.

The server

The last piece of the puzzle! For queries and mutations there is nothing special here. It will look something like the following:

Please note that my server is in Java, no reason in particular, I just happened to have an existing Java server and needed to add GraphQL endpoints to it.

To test it just add graphiql.html where your static route can find it.

Mutations

With all the previous pieces in place, adding mutations is quite easy! First we define a MutationType and its arguments to SchemaDefinition.scala :

Now we just add a function to add/edit/delete a category to our contextCatalogRepo , we will focus on addCategory :

You can try a mutation from graphiql.html similar to this:

mutation {
addCategory(id:25, code: "445", atgCode: "1032") {
id
code
}
}

Subscriptions

Though not strictly necessary, it is convenient to define an Event trait that all Subscriptions should implement. We’ll add it with its respective implementations to Data.scala

Now we define the type in SchemaDefinition.scala

The key piece here is the Observable (not exactly, more on this later) which is basically the Observer pattern on steroids. source is an Observable defined in CatalogRepo (our context) we make use of functional reactive programming (frp)here by applying map to our source, if you are new to FRP then this is the introduction to Reactive Programming you’ve been missing. You can guess by its name that this will be the source of our events and what allows us to deal with them asynchronously in a reactive manner. Keep in mind, however, here we’re just letting our SubscriptionType know where our event stream is coming from.

Source is in fact a ConcurrentSubject which is basically a shared “hot” data source, i.e. can have different subscribers who see the same data source. We subscribe an observer here just to print messages in case of onError but its not needed by the subscription. We define it in our CatalogRepo singleton, produce an event on it every time a category is successfully added and it will, eventually, processed by SusbcrionType’s resolve function. First we’ll have to subscribe to trigger the stream of Observable, we do this on QueryProcessor:

Place special care with the comment. This is one of those occasions when implicit values can be dangerous. You may notice these two imports are the same needed by our resolve function on our schema definitions. However, while on SchemaDefinition we got a compiler error if these imports were missing, here its not the case. Without the imports Executor.execute is still valid, but it will never call a stream just the single element, or more precisely: ObservableSubscriptionStream.single.

Two other points are important here, the first, hee is we subscribe to the Observable, execute is returning the obs val we defined in the resolve function from SubscriptionType. Second we are also passing an EventTarget , the trick to having real-time subscriptions is to have either Websockets or Server-Sent Events. EventTarget is just the implementation for what I’m using as a server but there are alternatives.

In the subscriptionsEndpoint from my Server class I initialize the EventTarget and pass it to the query processor.

QueryProcessor.process(json, routingContext.response(), target);

You can try the subscription by placing client.html where your static route can find it and using a subscription similar to this one:

subscription {
categoryEvents {
id
code
atgCode
parentCategory
}
}

Conclusion

This is just scratching all you can do with Sangria/Monix/Argonaut. Its by no means the only nor best way to work with any of this frameworks but hopefully it will get your feet wet. The only example I could find to use subscriptions was based on akka-streams. This post just tries to be a hands-on approach to using Sangria with alternative technologies, namely Monix and Argonaut.

--

--