On Hive UDAFs, or why Java’s type system sucks

Matthew Smedberg
9 min readMay 10, 2020

--

Apache Hive is one of the most ubiquitous big data technologies out there; its job is to enable all kinds of data operations on top of Hadoop. In particular, it is the engine for running map-reduce jobs.

I’m focusing on the “reduce” part of “map-reduce” today. Conceptually, “reduce” takes a set of “rows” and returns a single value based on those rows. There are a number of built-in reducers in Hive (things you’d expect like max, but also things you would necessarily expect, like histogram_numeric), but you can also provide your own, using what they call a User-Defined Aggregation Function or UDAF.

The thing is, though, the interface that Hive provides for writing a UDAF is bad. Like, bananas bad. And most of the blame can be laid at the feet of Java’s type system. (Some of the blame is due to the people who wrote Hive, but that’s mostly stuff like “not making guarantees about how the interface will be called” and “choosing to do too much via Java reflection”.)

I could write a better UDAF interface on the back of a cocktail napkin. In fact, I’ve done exactly that, because I was so frustrated with the experience of implementing a (conceptually very simple!) UDAF in my day job.

There are four interfaces that I want to destroy and rebuild here: org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector, org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver, org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator, and org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer.

I’ll be building these interfaces in Scala, because it has an actual type system, not the hot garbage that is Java’s.

Not the type of Hive we’re dealing with

The State Of The Art

Let’s start by describing at a high level how the interface works.

  • First, an array of ObjectInspectors is passed to your class’s init function. This is supposed to let your class see what input types it is supposed to aggregate, so it can either build the appropriate Evaluator or throw an exception.
  • Then rows of serialized data are passed in as arrays of Object. Your class is supposed to have saved those ObjectInspectors in private member variables and should now use them to unpack the objects. However, you have to unpack in stages: when you unpack a map, its keys and values are still packed and must go through a further unpacking.
  • Now you pass the unpacked row to your aggregator, who is responsible for updating its state in response.
  • The same group of data may be processed in parallel on many nodes, so your aggregator must be able to communicate a partial result out. However, you (the developer) have no control over how the partial result is serialized and communicated, so the advice is to only return partial results that are built up of primitive Java objects, strings, Hive primitives, and Lists and Maps of these.
  • When we receive a partial result, we must use a (potentially different) ObjectInspector to unpack it and merge into our local partial aggregation state.
  • Finally, when requested, we return the final aggregation result. Again we have no control over how it is serialized.

ObjectInspector

We’ll look at ObjectInspector first, since it’s used in more places than just UDAF code. As mentioned above, an ObjectInspector is used to deserialize objects from the (possibly metaphorical) wire. Kinda.

The weirdness starts with the interface declaration:

public interface ObjectInspector {
String getTypeName();
Category getCategory();
}

Um… what? A deserializer that doesn’t offer any deserialization methods? That’s sure odd.

It turns out that all the deserialization methods are provided in intermediate interfaces — an ObjectInspector for Java primitives will have one set of them, one for maps another. Here’s the map inspector interface:

public interface MapObjectInspector extends ObjectInspector {
ObjectInspector getMapKeyObjectInspector();
ObjectInspector getMapValueObjectInspector();
Object getMapValueElement(Object data, Object key);
Map<?, ?> getMap(Object data);
int getMapSize(Object data);
}

Furthermore, a MapObjectInspector doesn’t do a full deserialization; its getMap(...) method returns a Map<Wrapper, Wrapper>, where the Wrappers will need to be unwrapped by further calls to further object inspectors. This is, however, not explicitly stated anywhere.

This design has a few obvious problems:

  1. It operates on Objects. This is a huge red flag for interface design, because it gives no hints as to what values these methods might be reasonably or unreasonably passed.
  2. Even though any particular MapObjectInspector knows what its keys and values are supposed to be, the interface does not carry type information, instead returning a Map<?, ?>. As mentioned, the keys and values must be further passed to key and value inspectors.
  3. It handles deserialization opaquely and serialization not at all. That makes testing a nightmare.
  4. The inclusion of getMapSize and getMapValueElement, however, are nice. By “nice” here I mean that they give the user a hint about what they should expect from the behavior of such an object: it will probably have some binary store that it can query if you don’t want it to read and parse the whole map.

How could we make this better?

Well, first we could improve the Category handling. Category is a Java enum, whose elements are PRIMITIVE, LIST, MAP, STRUCT, and UNION. We can do better:

sealed trait Category

case object PRIMITIVE extends Category
case class LIST(c: Category) extends Category
case class MAP(k: Category, v: Category) extends Category
case class STRUCT(members: List[(String, Category)]) extends Category
case class UNION(variants: List[Category]) extends Category

We could even get fancier and instead of having just PRIMITIVE, have case objects encoding all the specific primitives that we know about:

sealed trait Primitive extends Category
case object INT extends Primitive
case object DOUBLE extends Primitive
...

These types are supposed to encode an assertion: any type represented by a Category can be used as a column type in a Hive table. Well we can encode that assertion in our type system:

sealed trait Category {
type JVMType
}
sealed trait Primitive extends Category
case object INT extends Primitive {
override type JVMType = Int
}
case object DOUBLE extends Primitive {
override type JVMType = Double
}
...
case class LIST(c: Category) extends Category {
override type JVMType = List[c.JVMType]
}
case class MAP(k: Category, v: Category) extends Category {
override type JVMType = Map[k.JVMType, v.JVMType]
}

Struct and unions are a bit trickier:

sealed trait Struct extends Category {
override type JVMType <: Struct.StructCell
}
case object EMPTYSTRUCT extends Struct {
override type JVMType = Struct.EmptyCell.type
}
case class CONSSTRUCT(name: String, category: Category, tail: Struct) extends Struct {
override type JVMType = Struct.ConsCell[category.JVMType, tail.JVMType]
}
object Struct {
sealed trait StructCell
case object EmptyCell extends StructCell
case class ConsCell[H, T <: StructCell](head: H, tail: T, fieldName: String) extends StructCell

def getCategory(members: List[(String, Category)]): Struct = members match {
case Nil => EMPTYSTRUCT
case (name0, category0) :: rest => CONSSTRUCT(name0, category0, getCategory(rest))
}
}

The getCategory method is like a static factory.

sealed trait Union extends Category
case object EMPTYUNION extends Union {
override type JVMType = Nothing
}
case class EITHER(c: Category, or: Union) extends Union {
override type JVMType = Either[c.JVMType, or.JVMType]
}

object Union {
def getCategory(variants: List[Category]): Union = variants match {
case Nil => EMPTYUNION
case c :: rest => EITHER(c, getCategory(rest))
}
}

This is not the most efficient way to do this, of course, but it’s type-safe and readable.

What we have achieved here is a complete description of the input and output types of a UDAF. Moreover, we have an isomorphic mapping between the concrete subclasses of Category and value classes encoding data that fits into the category.

This isomorphism is the reason I chose to make Category sealed — to emphasize that the correspondence is exact. However, there might be good reasons to unseal this trait to allow for alternate value classes that have extra properties (I think this is one of the main reasons for the various XYZWritable classes in org.apache.hadoop.io). A real implementation of Hadoop that included something like this design would probably bow to the Open-Closed principle and allow other instances of Category than these.

ObjectInspector

Believe it or not, that business with Category was the hardest part.

Instead of duplicating the Hadoop ObjectInspector interface, we’re going to give the developer more control over what happens at the serialization boundary:

trait Serializer {
final type Source

def encode(t: Source): Array[Byte]
def encodeTo(t: Source, byteSlice: ByteSlice): Try[Unit]
}

trait Deserializer {
final type Target

def decode(bytes: Array[Byte]): Option[Target]
def decodeFrom(slice: ByteSlice): Option[Target]
}

The second form of encode/decode points to another thing that Hadoop does well: it doesn’t always allocate new memory for a serialization, instead taking a (possibly large, possibly off-heap) region of memory and letting lots of parallel threads write to their own little slice of it. (The actual class that enables this is called ByteArrayRef but I thought Slice made a more informative name, especially to people coming from a language like Go.)

Instead of a PrimitiveObjectInspector, we now want to be able to assert that some Serializer or Deserializer works for one of the Category types we defined above — this will allow us to say “this Serializer is appropriate for returning the final aggregation result in a format that can go into a Hive table”.

trait CategorySerializer extends Serializer {
type C <: Category
override type Source = C#JVMType
val category: C
}
trait CategoryDeserializer extends Deserializer {
type C <: ObjectInspector.Category
override type Target = C#JVMType
val category: C
}

The C#JVMType expression means “The JVMType type member of the type C” — in other words # is to types as . is to objects.

It is also possible to compress these three-line traits into two, at the cost of some readability:

trait CategorySerializer extends Serializer {
val category: Category
override type Source = category.type#JVMType
}

…but that makes even me start going cross-eyed. (These CategorySerializer and CategoryDeserializer aren’t technically necessary, but they provide a good illustration of using the type system to enforce invariants.)

UDAF

And now for the moment we’ve all been waiting for: the actual UDAF class.

trait UDAF {
def analyze(parameterCategories: Seq[Category]): Category

// contract: newEvaluator(categories).Result must be equal to analyze(categories)
def newEvaluator(parameterCategories: Seq[Category]): UDAFEvaluator
}

trait UDAFEvaluator {
type Result <: Category
type Aggregator

val aggregatorSerializer: Serializer { type Source = Aggregator }
val aggregatorDeserializer: Deserializer { type Target = Aggregator }

def newAggregator(parameterCategories: Seq[Category]): Aggregator

// The next three methods are free to return one of the parameters if stateful/mutable, or to build a new one if immutable
def reset(aggregator: Aggregator): Aggregator
// Invariant: this method should only ever be called with parameters matching those to newEvaluator
def iterate(aggregator: Aggregator, row: Seq[Category#JVMType]): Aggregator

def merge(aggregator: Aggregator, other: Aggregator): Aggregator

def result(aggregator: Aggregator): Result#JVMType
}

Note that the serializer and deserializer are public; the aggregation driver is responsible for calling them and passing the parsed results in. We could change this around or add methods enabling more control over deserialization of rows and aggregators (e.g. to support a case where we only want one key out of lots of large maps) but I’ll save that for another time.

I toyed with the idea of building separate Evaluator traits for different numbers of parameters, but that seemed like overkill. This way isn’t even that type-unsafe, since your concrete evaluator class will know how many parameters it expects and can pattern match accordingly:

class ConcatEvaluator extends UDAFEvaluator {
private type P0 = String
private type P1 = Int
override type Result = STRING.type
override type Aggregator = String

override def iterate(aggregator: Aggregator, row: Seq[ObjectInspector.Category#JVMType]): Aggregator = {
(row(0), row(1)) match {
case (r0: P0, r1: P1) => s"${aggregator}${r0}${r1}"
case _ => throw new HiveException("This should never happen")
}
}

override def result(aggregator: Aggregator): STRING.JVMType = aggregator
}

Conclusion

We hear it all the time, but I’ll say it again: code is meant to be written infrequently and read often. This is orders of magnitude more true for public interfaces meant to be extended by third parties.

The existing UDAF interface fails this principle at every level. It is not only impossible to read the type signatures of the methods to ascertain what they should do — it’s impossible to delve into the source code of Hive itself to see how the methods are actually called! (Or rather: you can search, and I did, for places those methods are called, but a lot of good it did me.) And since Hive is a massive distributed system, you can’t step through instances of your code in a debugger running against anything like production.

The design sketched here, of course, is anything but production-ready. It does, however, compile, and it is optimized for clarity rather than runtime efficiency. A team interested in improving said efficiency could probably find many ways to do so without making the types much harder to read and understand.

I submit to you also that aside from mistakes in aggregation logic (which are absolutely easy to do even when not aggregating data over hundreds of machines, and not something the framework can really protect you against), it is much harder to write a UDAF in this interface which would compile but not run, or run and produce grossly incorrect results. (For example: when writing the aforementioned UDAF at my day job this week, at one point the aggregation ran without throwing any exceptions, but returned empty maps for all inputs. It was only days later that I realized that LazyBinaryMapObjectInspector caches and reuses a map object which I was holding onto as well even after it had been cleared.)

If I’m down on Java in my writing and speaking, it’s because Java makes things hard that are conceptually easy, and produces lots of failures at runtime that could have been failures at compile-time. Furthermore, while considerate and conscientious library authors can produce interfaces that communicate well to the library user, this is at best unidiomatic and at worst actively frowned upon, because better to define a class with one fewer method than use the type signature to communicate intent.

--

--