Understanding RxPHP Schedulers (part 1)

Jakub Tapuć
The Startup
Published in
5 min readSep 29, 2019

Why you should care

If you’ve ever found yourself wondering how RxPHP works under the hood then this is the right place for you to start learning about one of the most basic concepts used in this library — schedulers. Even if you think you’re satisfied with your current level of knowledge of RxPHP, grokking the concept of schedulers may prove beneficial to your overall understanding of how RxPHP is built and why it works the way it works.

Without further ado let us dig into the internals of schedulers!

The big picture

Similarly to other ReactiveX libraries every time you create and subscribe to an observable in RxPHP you are implicitly using schedulers. That’s right. Schedulers are the bread and butter of RxPHP.

Observable::of

Even in the short snippet above there is a lot going on. First, we create an observable with of that emits one value and then we subscribe to it. In return, we get a Disposable as a handle than can be used to cancel the subscription. Internally, the subscribe method asks the scheduler to enqueue a piece of work (in this case our callback). As you can see there is no explicit call to the scheduler and still we’re using one because our callback needs to be scheduled somehow. It all happens through the scheduler even though we’re not dealing with a typical asynchronous use case with operators such as delay.

What is a scheduler anyway?

Scheduler is an object with an internal clock, used to enqueue a piece of work. Let that sink in. There are no constraints on this definition, no “sync”, “async” or “threading”. In its most general form the scheduler maintains a logical queue of actions (functions) to be executed. Every time you tell the scheduler to add an an action to the queue it gives you back a Disposable instance that you can use to cancel it and clean up the resources.

Schedulers are an intermediary between observables and the actual environment where the actions are executed. Since observables are decoupled from schedulers, the scheduler gives observables an execution context meaning that they govern the when (immediately, or in the future) and where (current thread, in the event loop, on another thread) of observables. To sum it up:

  • Schedulers enqueue actions,
  • schedulers know when and where to execute actions,
  • schedulers maintain their own internal clock.

Types of schedulers in RxPHP

SchedulerInterface is where it all begins

Before we can discuss the individual schedulers used in RxPHP we need to talk a bit about the abstractions behind them. These abstractions could also be used to build your own schedulers (implementing a threading scheduler not present in the default distribution of RxPHP sounds like a pretty good idea! ⚡). Every scheduler in RxPHP must implement SchedulerInterface:

SchedulerInterface

In this post we’re going to cover its first two methods. The schedule method takes a callable that will be executed with a delay (in milliseconds). Now returns the current value of the internal clock.

There is another, empty interface that simply extends SchedulerInterface:

AsyncSchedulerInterface

Its only purpose is to signal that its implementors are dealing with asynchronous scheduling of actions.

ImmediateScheduler hates when things are delayed

Probably (and hopefully) the easiest scheduler to master is the immediate scheduler. It directly implements SchedulerInterface. The cool thing about schedulers in general is that they can be used separately without the need to use observables at all; this will prove very useful later on when we acquaint ourselves with async schedulers. Let’s get our hands dirty and look at some actual code.

ImmediateScheduler

This was quite a lot to achieve something so simple so let’s get to it. First, we need to set a default (global, in fact) scheduler for our actions. We use ImmediateScheduler and pass it to the factory function. Then, upon subscribing to the observable its value is output to the console immediately.

If you’re still there you’re probably asking yourself now — but what actually happened? Let’s try to delve into the internals.

There are many static factory methods in the Observable class and of happens to be one of them. It returns an object of a funny class ReturnObservable:

Observable::of

When you call the subscribe method on an observable instance, the protected _subscribe method will eventually be called that asks the scheduler to enqueue some work associated with this particular kind of observable:

subscribe and _subscribe

So what the scheduler essentially does when schedule is invoked is just firing the callback and returning an empty Disposable. The method ensures that no delay has been set and throws an exception if one attempts to do so:

ImmediateScheduler does not accept delays

An astute reader might wonder — what about its clock? Well, the answer is pretty straightforward, ImmediateScheduler uses the system clock so nothing fancy happens underneath:

Immediate scheduler’s clock

Sorry, I lied to you

Somewhere in the beginning of the article I listed three things that characterize a scheduler. One of them was:

Schedulers enqueue actions.

Normally this is a perfectly valid statement but for the immediate scheduler it’s a big fat lie. Why? ImmediateScheduler does not need a queue at all because all it does when it receives a piece of work is invoking it. Thus, every time an observable instructs the scheduler to enqueue something on its behalf it just runs whatever action it’s fed. When the work is done it returns an EmptyDisposable that acts as a kind of identity element since its dispose method does nothing it all.

Other types of schedulers

There are three other schedulers available for use in RxPHP. VirtualTimeScheduler and its two descendants: EventLoopScheduler and TestScheduler. I am going to present virtual time scheduler and event loop scheduler in the next part of this series.

Summary

By now you should have an idea of what schedulers are and what part they play in the RxPHP environment. You should be able to name different schedulers and know the inner workings of the immediate scheduler. You should be able to explain in detail how subscribing to an observable works and how actions in RxPHP are scheduled and then invoked.

--

--

Jakub Tapuć
The Startup

Hi! I’m a developer based in Kraków, Poland. I’m passionate about programming, programming languages and their pragmatics.