Taking control of your Apache Storm cluster with tag-aware scheduling

Panos Katseas
Inside EDITED
Published in
10 min readMar 15, 2017

You know your use case is fairly atypical when you have to implement your own custom logic for an open-source distributed data processing system, and plug it into the system to be used instead of internal logic.

And thus, the adventure begins…

Inserting your own logic: easy, right?

The case of Apache Storm

We recently started using Apache Storm for our product pipeline at EDITED and so far we’ve have been nothing but impressed, especially when it comes to scaling. Our data processing tasks have been steadily moving to Storm topologies over the last few months and we now have a variety of use cases for our Storm cluster, each with its own characteristics and requirements.

The problem, however, is that…

Due to those requirements, we need to make sure that specific topology components are assigned to particular supervisor nodes, based on hardware (or other, arbitrary) attributes.

The use case that set us down this path, was when our data scientists needed GPU resources to crunch image data at breakneck speeds, as part of their ongoing efforts to improve EDITED data. There was no way for us to make sure that image-processing components were placed on supervisor nodes equipped with a GPU. We also felt that engineering around the problem, to allow such components to run both on CPU and GPU, was too hacky and would eventually lead to issues.

So we arrived at a point where the default scheduling logic in Storm can no longer fit the bill. And since it can’t, we need to find a way to get around that. Right… but how exactly?

Like this.

We decided to implement a tag-aware scheduler that allowed us to explicitly assign components to nodes based on user-defined tags in their configuration.

We began by looking at the existing resources describing custom scheduling and discovered some good information around locations and node groups. This gave us a stronger grip on the relevant Storm APIs and how other people are using them, however nothing quite matched our set of requirements.

By building it we gained a really comprehensive understanding of the subject, so here’s what we have learned.

Basic Storm concepts

Before we dive into scheduling logic specifics, we need to clearly define what it is we were trying to do and the Storm concepts behind it.

Storm Cluster

A Storm cluster is made of one or more nimbus nodes, and one or more supervisor nodes.

Anatomy of a Storm cluster

A supervisor node is responsible for executing computation tasks as defined in a Storm topology’s spout and bolt components.

Each supervisor has a configurable number of worker slots which can be assigned (via the scheduler) an arbitrary amount of executors, which in turn correspond to topology component threads.

A worker slot can only house executors of a single topology.

Default vs. custom scheduling

The default scheduler in Storm assigns component executors as evenly as possible across the cluster’s supervisor slots. That works great for typical usage, but what we want to do instead is assign specific executors on particular supervisors, based on scheduling metadata configured on both component and supervisor levels.

If we take an example topology that looks as below and assume a cluster with four supervisors, the default scheduler will place a single component executor on each supervisor, taking up one slot on each.

Default scheduling

Let’s assume for example that the last bolt in our topology, Bolt3, needs to process some data using a GPU rather than on the CPU, and there’s only one of the supervisors with a GPU.

Under the default scheduling logic, there’s no way of identifying that particular supervisor to make sure that Bolt3 is assigned to it so it can take advantage of its GPU resources. Leveraging the extra scheduling metadata that we can rely on, in the form of tags, our custom scheduling logic could then assign the executors more smartly.

Custom scheduling

You will notice three things highlighted in the image above, which are the necessary pieces to any custom scheduling:

  • config at component level
  • config at supervisor level
  • custom scheduling logic

Configuration

To start off easy with the technical stuff, let’s have a look at what configuration is involved here.

First, we need to tag our supervisor nodes with their relevant attributes, which we do with a single line of config in their $STORM_HOME/conf/storm.yaml file (remember that the supervisor process needs to be restarted for config changes to be picked up). For example, our tagged supervisor above would have the following in its config:

supervisor.scheduler.meta:
tags: GPU

We also have to tag our component, so we’d use something like the following for a Java topology:

...TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new ExampleSpout(), 1); builder.setBolt("bolt1", new ExampleBolt1(), 1).shuffleGrouping("spout");
builder.setBolt("bolt2", new ExampleBolt2(), 1).shuffleGrouping("bolt1");
builder.setBolt("bolt3", new ExampleBolt3(), 1).shuffleGrouping("bolt2").addConfiguration("tags", "GPU");
...

As an aside, we also use the excellent streamparse library to manage our Python-based topologies at EDITED. We found that with the current version of streamparse (3.4.0 at the time of writing), component config works great when declared in the component class definition:

class Bolt3(streamparse.Bolt):                                                                               
config = {"tags": "GPU"}

def process(self, tup):
pass

Custom scheduler classes

And now comes the final part of the equation: the actual scheduling logic.

As far as Storm is concerned, the only thing a custom scheduler needs to do is implement the Ischeduler Java interface, which is made up of only two methods:

  • prepare(Map conf) which only initializes the scheduler
  • schedule(Topologies topologies, Cluster cluster) which is responsible for the assignment of topologies to worker slots on the cluster’s supervisors. (Notice how the two arguments contain the two very things whose configuration we set up just above: topologies, containing tagged components, and cluster, part of which are tagged supervisors.)

Using that tag config

Speaking of supervisors, let’s have a look at how we can take advantage of their tags.

Any scheduling metadata is accessible through the getSchedulerMeta method of SupervisorDetails objects, so very straightforward to use. To make scheduling components on them easier, we group SupervisorDetails objects by tag in a Map; we handle any untagged supervisors by placing them under an "untagged" tag group.

And then we follow the same logic for tagged components too. A few notes at this point:

  • As we will see further along, we only need component IDs for scheduling, not the actual component objects, so we just store the IDs for later referencing.
  • This method was written to be compatible with both types of components, bolts and spouts. Their respective Java classes, Bolt and SpoutSpec, share a get_common method that returns a ComponentCommon object we can use to access a component’s config and therefore its tags. We have used Java reflection to treat objects of either class the same and extract the information necessary to group components by tag.
  • Contrary to the supervisor logic above, here we are using a single Map object that we pass in as the first argument to keep track of all components by tag groups. This makes it easier to reuse the method for bolts and spouts alike.
  • The component config is returned in JSON format. We have used the com.googlecode.json-simple library for parsing it.
  • We use an "untagged" tag here too to group components with no tags. The purpose of that is to effectively handle these untagged components the same way the default scheduler performs its assigning. That means that a topology with no tagged components will be successfully scheduled all the same, with no issue, across untagged supervisors.

Apart from bolts and spouts though, there’s a third kind of component as well: internal Storm components that are used for managing a topology’s data flow, such as __acker for example.

To make sure that we account for that kind of component too, we populate our componentsByTag map with those components’ IDs via a separate method.

So far we’ve got to the point where we hold maps of supervisors by tag and components by tag, which is the high-level information we’ll need to get on with assigning components to worker slots.

Convert component IDs to executors

Now let’s make the jump from component IDs to actual executors, as that’s the level at which the Storm cluster deals with assignments.

The process is quite straightforward:

  1. We get a Map of executors by component from the cluster.
  2. Check which components’ executors need scheduling, according to the cluster.
  3. Create a Map of tag to executors, populating only those executors that are awaiting scheduling.

Converting from supervisors to slots

And now for the final conversion we have to perform, jumping from supervisors down to slots. As before with components and their executors, we need this because the cluster assigns executors at the slot level, not the supervisor level.

There are a few things to do at this point, which have been broken down to smaller methods to preserve readability. The main steps we need to perform are:

  1. Find which slots we can assign to, given a list of supervisors for a tag. This is simply the case of a for loop that collects all supervisors’ slots, and then returning as many of the slots as are requested by the topology.
  2. Divide the executors awaiting scheduling for the tag into even groups across the slots.
  3. Populate a Map with entries of slot to executors.

The idea here is to call the populateComponentExecutorsToSlotsMap method once per tag, and result in a single Map holding all the assignments we need to perform.

As is explained in the code’s comments too, we had previously found that sometimes we would eagerly assign a tag’s executors to a slot, only to have a successive tag fail to assign its executors, leading to partial scheduling. We have since made sure that the flow of scheduling ensures that no partial scheduling is ever performed (either all is scheduled, or nothing is), at the cost of an extra for loop, as we believe that’s a cleaner state for a topology to be in.

Aaaand that bring us to the completion of our scheduling logic; you can find the full TagAwareScheduler class here.

We’ve tried to keep the code as documented as possible, so that every part of it is pretty clear and readable. It may look like a lot (and it certainly was while developing it!), but paired with this blog post, we hope it will be easy to understand.

Highlights

A couple of points we want to highlight are areas that we built more on, in comparison to the resources we found online:

Group supervisors per tag

Instead of simplifying the scheduling logic to only take a single supervisor or slot into account, our logic was based from its beginning on arbitrary-length groups of supervisors per tag, and handles any edge cases accordingly.

No partial scheduling

Leaning in a bit more on that handling, we have made use of the Cluster.setStatus method, to provide handy feedback to the user about the outcome of a topology’s scheduling.

Paired with our previous point about a binary outcome of scheduling (either everything is scheduled, or nothing is), this has proved very useful in terms of understanding what is happening during scheduling, as well as avoiding topologies being in a weird state (partial scheduling).

Example of successful scheduling feedback on Storm UI
Example of unsuccessful scheduling feedback on Storm UI: no slots available

That’s all great, but how do I use it?

To wrap things up, we should also cover how the class is meant to be actually used by Storm. There are two things we need to do for that:

  • Instruct storm to use our scheduler instead of its default scheduler, via config.
  • Package our class in a .jar file and add it to the $STORM_HOME/lib directory for Storm to be able to access it.

The config necessary is again very simple; similarly to the tag scheduling metadata, we just have to add this line to $STORM_HOME/conf/storm.yaml(and make sure the nimbus process is restarted):

storm.scheduler: com.edited.tagawarescheduler.TagAwareScheduler

As for the .jar file, we have opted to rely on the Java maven tool. We have created a pom.xml file for our scheduler project, that takes care of dependency management, compilation and packaging.

Executing maven package will create a .jar file containing both the class and its dependencies (namely the JSON library), ready to be added to Storm’s lib directory and be used.

Further work and extensions

While this is a fairly complete custom scheduling solution, it wouldn’t be proper software if we couldn’t further iterate on it! A couple of our thoughts about next steps and potential improvements include:

  • We routinely use Sentry to monitor our services at EDITED. Integrating Sentry into the scheduler would make dealing with any issues that arise during scheduling a breeze.
  • We’re also thinking about using further config per topology, to explicitly define the number of workers a topology requires per tag. While we’ve not quite reached the point where that’s necessary for our topologies, we can anticipate that being needed at a later time as our Storm cluster and number of topologies grow.

Panos Katseas is a collaborative individual and a software engineer at EDITED. If you’d like to work with him and the rest of the team to help push things forward, let us know using hello@edited.com or applying at https://edited.com/jobs/engineering/.

--

--