Towards achieving a 10x compile-time improvement in a Flink codebase

Aris Koliopoulos
DriveTribe Engineering
6 min readJan 10, 2019
All Flink related articles must start with a squirrel.

This article aims to outline how we managed to achieve a 10x improvement in compile-time performance of our Apache Flink codebase in Scala. More specifically, it presents an approach to managing TypeInformation instances in large Scala codebases.

Apache Flink processes all data ever generated at DRIVETRIBE. A couple of years in, the number of execution graphs currently running in production is over 100. This entails tens of thousands of lines of Scala and a very heavy load for the compiler.

More information on the hows and the whys of our architecture can be found here and here.

On top of that, we have introduced a fair amount of type-level trickery using Shapeless, we handle our JSON serialisation/deserialisation using Circe and we also use a fair amount of the functional programming abstractions in Cats. All of the above have contributed a large number of type classes and quite a heavy implicit resolution workload for the compiler.

Adding up all of the above:

ariskoliopoulos@Ariss-MacBook-Pro ~/D/backend> sbt -J-Xss20m -mem 4000 data-processing/clean data-processing/compile[info] Loading global plugins from /Users/ariskoliopoulos/.sbt/0.13/plugins[info] Loading project definition from /Users/ariskoliopoulos/Desktop/backend/project[info] Set current project to backend (in build file:/Users/ariskoliopoulos/Desktop/backend/)[success] Total time: 0 s, completed 23-Dec-2018 16:36:12[info] Compiling 110 Scala sources to /Users/ariskoliopoulos/Desktop/backend/data-processing/target/classes...[success] Total time: 879s, completed 23-Dec-2018 16:50:51

We found ourselves in the rather unfortunate situation of having to wait up to 15 minutes for a clean compilation. Given the lack of tooling in Scala 2.11, debugging wasn’t exactly a trivial task. Intuitively, implicit searches and macro expansions are to blame. Given however our heavy use of type classes, how can this be pinpointed to a small, fixable subset of the codebase?

To make things worse, while trying to upgrade from Flink 1.4.2 to 1.6.0, the compiler hung in what looked like a infinite recursion:

The end result was a StackOverflow exception. Increasing the stack size only delayed the inevitable for a little while. This stack trace provides a valuable clue: Flink’s TypeAnalyzer’s macros are stuck in an infinite loop. Looking at how TypeInformation instances are generated may be a good candidate for resolving the issue (or a red herring).

Flink requires a TypeInformation instance implicitly available for every type it processes. Flink uses those instances to generate serialisers and comparators, which in turn enable passing data from one operator to the next, storing data to the various state backends and so on and so forth. More info on this can be found here.

One of the most annoying compile time errors Flink developers run into:

scala> case class User(id: String, name: String, age: Int)defined class Userscala> environment.fromElements(User("asd", "Joe", 32))<console>:16: error: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[User]environment.fromElements(User("asd", "Joe", 32))

Which can be trivially resolved with a magic import:

scala> import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.scala._scala> environment.fromElements(User("asd", "Joe", 32))res1: org.apache.flink.streaming.api.scala.DataStream[User] = org.apache.flink.streaming.api.scala.DataStream@711823c0

Inspecting the contents of the package object reveals the following line:

This is an implicit macro. Given the lack of context bounds, this macro can generate TypeInformation[T] for any type T. This makes it eligible for any implicit search for a TypeInformation[T] instance. If no other non-ambiguous implicit is found before createTypeInformation is matched, then the compiler will expand the macro inline.

This is fantastic when bootstrapping a project as it uses Scala macro voodoo magic to hide internal complexities related to Flink’s type handling system. A file may contain tens of transformations DataStream[T] => KeyedStream[K, T] => DataStream[R]and so on without the boilerplate of defining TypeInformation instances for T,K or R. There is however a caveat.

The macro-generated instances cannot be shared between different implicit searches. createTypeInformation doesn’t have access to previous expansions and thus it has to recompute the same TypeInformation[T] every time Flink needs it. For example:

scala> compileTime("""| import org.apache.flink.streaming.api.scala._| val env = StreamExecutionEnvironment.getExecutionEnvironment| case class SimpleType(a: String, b: String, c: Int)| env.fromElements(SimpleType("foo","bar", 100))| """).toMillisres43: Long = 160

Note: shapeless.test.compileTime isn’t exactly deterministic. Those are medians out of 5-9 runs.

Let’s duplicate the DataStream generator a few times:

scala> compileTime("""| import org.apache.flink.streaming.api.scala._| val env = StreamExecutionEnvironment.getExecutionEnvironment|| case class SimpleType(a: String, b: String, c: Int)| env.fromElements(SimpleType("foo","bar", 100))20 more times| env.fromElements(SimpleType("foo","bar", 100))| """).toMillisres44: Long = 353

And a bit more:

scala> compileTime("""| import org.apache.flink.streaming.api.scala._| val env = StreamExecutionEnvironment.getExecutionEnvironment|| case class SimpleType(a: String, b: String, c: Int)| env.fromElements(SimpleType("foo","bar", 100))50 more times| env.fromElements(SimpleType("foo","bar", 100))| """).toMillisres47: Long = 593

The compiler needs to expand the macro every time a TypeInformation[SimpleType] is needed. 50+2=52 times in the example above. Hence 593 milliseconds.

This is quite a lot of time for 50 lines of Scala. Macro expansions can be slow. Unnecessarily duplicated macro expansions over complex nested types can be very slow. Around ~300 types and a few tens of thousands of lines later one can end up in a situation where it is not clear whether the compiler is stuck or just very slow. To make things worse changes introduced after 1.4.2 made it impossible to successfully terminate a compilation due to some infinite recursion.

The solution?

Similar to what people in the Scala world people call semi-automatic derivation. There is a way to compute TypeInformation[T] only once for every T and use it in every instance Flink needs to process T. In fact, we don’t even need to invoke the macro if we use the TypeInformation’s Java interface as follows:

This has the benefit of being a lot faster on it own:

scala> compileTime("""| import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment| import org.apache.flink.api.common.typeinfo.TypeInformation| val env = StreamExecutionEnvironment.getExecutionEnvironment|| case class SimpleType(a: String, b: String, c: Int)| implicit val typeInfo = TypeInformation.of(classOf[SimpleType])| env.fromElements(SimpleType("foo","bar", 100))| """).toMillisres8: Long = 26

It also gets computed exactly once:

scala> compileTime("""| import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment| import org.apache.flink.api.common.typeinfo.TypeInformation| val env = StreamExecutionEnvironment.getExecutionEnvironment|| case class SimpleType(a: String, b: String, c: Int)| implicit val typeInfo = TypeInformation.of(classOf[SimpleType])| env.fromElements(SimpleType("foo","bar", 100))50 more times| env.fromElements(SimpleType("foo","bar", 100))| """).toMillisres34: Long = 42

And thus it is an order of magnitude faster.

The downside of this approach is that it creates quite a bit of boilerplate. One has to manually define TypeInformation instances for all types. This is normally achieved using one of the three following methods:

  • In the companion object. The compiler will always look in the companion object of T for an implicit F[T] . This pattern is quite popular amongst adopters of JSON libraries that use a type class pattern such as Circe. Small subtle difference here is that the example below will compile even if there is no instance for Bar available.
  • A trait containing all the implicits. This can be converted to an object and imported or mixed in a package object. This solution is suitable if the model is in a different module which doesn’t need to depend on Flink.
  • In current scope, right where it is needed. This is the fastest as this is where the compiler looks first. However, if a type is used in multiple files this definition will need to be repeated.

We decided to go for the second approach. The results:

ariskoliopoulos@Ariss-MacBook-Pro ~/D/backend> sbt -J-Xss20m -mem 4000 data-processing/clean data-processing/compile[info] Loading global plugins from /Users/ariskoliopoulos/.sbt/0.13/plugins[info] Loading project definition from /Users/ariskoliopoulos/Desktop/backend/project[info] Set current project to backend (in build file:/Users/ariskoliopoulos/Desktop/backend/)[success] Total time: 0 s, completed 23-Dec-2018 18:56:12[info] Compiling 110 Scala sources to /Users/ariskoliopoulos/Desktop/backend/data-processing/target/classes...[success] Total time: 89s, completed 23-Dec-2018 18:57:42

This is a 9.87x speedup 🎉🎉🎉

In Scala 2.11, compile-time inefficiencies required decent knowledge of compiler internals combined with loads of experiments and quite some patience. And a hunch.

In Scala 2.12, scalac-profiling provides a nice toolchain to debug implicit searches and macro expansions. Introducing the plugin:

addCompilerPlugin(“ch.epfl.scala” %% “scalac-profiling” % “1.0.0”)

and adding the following

"-Ystatistics",
"-P:scalac-profiling:no-profiledb",
"-P:scalac-profiling:show-profiles",
"-P:scalac-profiling:print-failed-implicit-macro-candidates",
"-P:scalac-profiling:generate-macro-flamegraph"

will reveal a plethora of stats and debugging info. A lot more on this here. It’s a long read but very well worth it.

If you reached that far and for some bizarre reason you would like to solve difficult Flink and Scala related problems for a living, we are hiring!

--

--

Aris Koliopoulos
DriveTribe Engineering

Distributed systems and functional programming nerd, geeking out @Drivetribes