GraphQL Subscriptions with Sangria and Monix Quickstart
Introduction
Sangria is a Scala GraphQL implementation which supports many standard GraphQL features plus many other specific to Sangria. It supports so many features I think of it more as a framework than an implementation, for instance, you get: query parsers and renderers, projections, additional types for complex queries, macro-based type derivation, deferred value resolution, schema comparators, prepared queries and most important for this post: Stream-based Subscriptions!
Sangria provides a sangria-subscriptions-example project based on akka-streams, if you are versed in akka then by all means take a look at that. If not then I hope this post helps you. As I already stated on my last post, while I think Akka is the most powerful and feature-full framework out there, I also find it too complex and configuration prone for some usecases. Last time I used Vertx as an alternative for actors in reactive programming and used Enterprise Integration Patterns with Apache Camel as “glue” between different frameworks. Now I’ll use Monix as an alternative to akka-streams. Just like before learning to run one needs to learn to crawl, before building a subscriptions we need to learn about queries and mutations. Don’t worry though, I like to keep things as brief as possible.
Warning
I’ll be using Argonaut for working with Json. Its current version is 6.1, Sangria is built based on the non-stable (as of this writing) 6.2 so there it doesn’t work out of the box with 6.1. I’ll show a quick and dirty hack to fix this until version6.2 is stable… Or you can just use version 6.2 and adjust the examples here.
Dependencies
Sbt:
Maven:
Queries
We start defining a simple case-class with its respective json encoder/decoder. I’m saving this in Data.scala.
Now we define the GraphQL types in SchemaDefinition.scala. The simplest thing we can do here is to use case-classes with ObjectType Derivation, we will use derivation later on but for now let’s do it the old/fashioned way:
CategoryType
is self-explanatory, except perhaps for the fields[Ctx,Val]
argument. Ctx
stands for context, since we are defining ur type we don’t actually need one, hence we declare it as Unit. Val
is the value we are representing: a Category
.
After defining Category in terms that GraphQL can understand (aka CategoryType), we define our QueryType which supports two types of queries:
- categories — which takes no aruments and returns a list of categories
- category(id: Int) — which takes a single argument “Id” and returns the Category corresponding to the given id.
Worth of note is that the “arguments” argument for our “category” field is (essentially) a List: Id :: Nil and that the “resolve” argument expects a function of type Context[Ctx, Val] => Action[Ctx, Res]. Context is just that, the context we need to resolve the query, it is sometimes called the “repository” which is why I’ve named it CatalogRepo.scala.
Your service can be a DB service, a REST service, whatever you need. We’ll need the CatalogRepo
singleton object later but leave it empty for now and go on to QueryProcessor.scala
The execution context might just as well be scala.concurrent.ExecutionContext.Implicits.global
but since we will use monix for subscriptions we might as well use its scheduler instead. We then define a process
function that takes the query as a String, I’ve named it json since it is for all intent and purposes… JSON. Next we define some lenses to access the main fields (query, operationName and variables), we parse the json, apply the lenses and if successful call the executeGraphQLQuery
function. In case you’re wondering, the query looks like this:
{
category(id: 24) {
id
code
atgCode
}
}
Query Parsing and Execution
This part deserves special mention, it is accomplished by QueryParser.parse
which supports different serialization mechanisms including, of course, argonaut. As stated before, however, there is little hack in order to use Sangria with Argonaut’s stable version (currently 6.1). The issue occurs with Sangria’s ArgonautInputUnmarshaller
which expects an argonaut.Json
object but gets confused with argonaut.Json
being a trait in 6.1, while its an abstract class in 6.2. We solve this by creating our own InputUnmarshaller which takes a JsonObject or rather by copying ArgonautInputUnmarshaller
and providing a conversion from Json to JsonObject and put it somewher visible:
Finally, we just call the executor, when successful we pass the result of the query to an output
function which uses an Httpresponse
to return the result to the client.
The server
The last piece of the puzzle! For queries and mutations there is nothing special here. It will look something like the following:
Please note that my server is in Java, no reason in particular, I just happened to have an existing Java server and needed to add GraphQL endpoints to it.
To test it just add graphiql.html where your static route can find it.
Mutations
With all the previous pieces in place, adding mutations is quite easy! First we define a MutationType
and its arguments to SchemaDefinition.scala
:
Now we just add a function to add/edit/delete a category to our contextCatalogRepo
, we will focus on addCategory
:
You can try a mutation from graphiql.html
similar to this:
mutation {
addCategory(id:25, code: "445", atgCode: "1032") {
id
code
}
}
Subscriptions
Though not strictly necessary, it is convenient to define an Event
trait that all Subscriptions should implement. We’ll add it with its respective implementations to Data.scala
Now we define the type in SchemaDefinition.scala
The key piece here is the Observable (not exactly, more on this later) which is basically the Observer pattern on steroids. source
is an Observable defined in CatalogRepo
(our context) we make use of functional reactive programming (frp)here by applying map to our source, if you are new to FRP then this is the introduction to Reactive Programming you’ve been missing. You can guess by its name that this will be the source of our events and what allows us to deal with them asynchronously in a reactive manner. Keep in mind, however, here we’re just letting our SubscriptionType
know where our event stream is coming from.
Source
is in fact a ConcurrentSubject
which is basically a shared “hot” data source, i.e. can have different subscribers who see the same data source. We subscribe an observer here just to print messages in case of onError
but its not needed by the subscription. We define it in our CatalogRepo
singleton, produce an event on it every time a category is successfully added and it will, eventually, processed by SusbcrionType
’s resolve
function. First we’ll have to subscribe to trigger the stream
of Observable
, we do this on QueryProcessor:
Place special care with the comment. This is one of those occasions when implicit values can be dangerous. You may notice these two imports are the same needed by our resolve
function on our schema definitions. However, while on SchemaDefinition we got a compiler error if these imports were missing, here its not the case. Without the imports Executor.execute is still valid, but it will never call a stream just the single element, or more precisely: ObservableSubscriptionStream.single
.
Two other points are important here, the first, hee is we subscribe to the Observable
, execute
is returning the obs
val we defined in the resolve
function from SubscriptionType
. Second we are also passing an EventTarget
, the trick to having real-time subscriptions is to have either Websockets or Server-Sent Events. EventTarget
is just the implementation for what I’m using as a server but there are alternatives.
In the subscriptionsEndpoint
from my Server class I initialize the EventTarget
and pass it to the query processor.
QueryProcessor.process(json, routingContext.response(), target);
You can try the subscription by placing client.html where your static route can find it and using a subscription similar to this one:
subscription {
categoryEvents {
id
code
atgCode
parentCategory
}
}
Conclusion
This is just scratching all you can do with Sangria/Monix/Argonaut. Its by no means the only nor best way to work with any of this frameworks but hopefully it will get your feet wet. The only example I could find to use subscriptions was based on akka-streams. This post just tries to be a hands-on approach to using Sangria with alternative technologies, namely Monix and Argonaut.