Review of Apache Apex (a Hadoop Streaming Engine)

Mateusz Zakarczemny
ONZO Technology
Published in
9 min readFeb 12, 2019

One of the keys to success for modern companies is to be data-driven. Nonetheless, customers expect not only a relevant product. It also has to be fast. Batch processing that offers high throughput at the cost of latency is no longer an acceptable trade-off. Those expectations were recognized by the open source community. The number of available streaming libraries and engines is overwhelming. Most of the discussions concern projects like Spark, Flink, Storm or Samza. However, today I’m going to review a less-known, but valuable project — Apache Apex.

Computational Model

Conceptually, Apex’s job is a direct acyclic graph (DAG). It is built from operators encapsulating business logic, that are connected using input/output ports. Elements traveling through DAG are called “tuples”.

Streaming windows

Streams are unbounded, they neither have a beginning nor an end. However, to support checkpoints and recovery, Apex needs some reference points — streaming windows. Each emitted tuple belongs to some window. To detect the window’s start and end, Apex emits control tuples along with data tuples. Control tuples are handled by a framework core, and the developer doesn’t need to take care of them. For one, it may look like micro-batching, and even the Apex documentation calls it that. Nevertheless, propagation of tuples is not blocked until a whole streaming window is completed. It is worth mentioning that the operations are atomic on the streaming windows’ level, not on the single tuple’s level.

Application windows

As mentioned previously, streams are unbounded data sets. We cannot calculate the sum or the max value of a stream. However, we could do that for a given time frame. In Apex, such frames are called “application windows”.
Application windows allow the framework to keep track of the operators that are keeping states over multiple streaming windows. Thus, an application window is a multiple of a streaming window. The start and end of a window are handled by beginWindow, endWindow callbacks. Apex supports both tumbling (non-overlapping) and sliding (overlapping) windows.

Backpressure

Backpressure is a mechanism of slowing down the stream if the amount of incoming events is too high for downstream operators. It is handled by buffer servers that prepend every operator. Buffers are in memory, but in the case of an overflow, data is spilled to a disk. The maximum buffer size is configurable. If the buffer size is exceeded, then all upstream operators will be blocked. Buffers are responsible for handling load spikes, but also play an important role during operator recovery. Upstream buffers allow a failed operator to replay all messages since the last snapshot.

Partitioning

Partitioning is the means by which Apex implements stream scalability. It allows work distribution and adaptation to load spikes. Apex supports two types of partitioning. Fixed — defined at startup, and dynamic — based on a stream load. Moreover, dynamic partitioning has two subgroups — stateful and stateless. The difference is based on how internal state is handled when the partition count changes. For stateless repartitioning, the application doesn’t care about distributing the internal state. Stateful repartitioning requires splitting or merging the operator state. It’s carried out by a partitioner that has to be implemented by the developer. Apex Malhar brings great generic stateless partitioners. However, a managed state, which would allow automatic redistribution of a state, is not supported yet. To decide when to trigger the partitioning, a StatsListner interface is used.

Number of partitions is defined per operator. Thus, it might be necessary to map higher number of partitions from upstream to downstream operators. This is carried out by a physical graph operator called “unifier”. Apex offers a variety of ways for applying a unifier. The choice depends on operator partitioning and node locality.

State management

For fault tolerance and recovery, Apache Apex employs checkpointing. By default, it is done every 30 s (60 streaming windows). Checkpointing is carried out on operator level. It is not a very sophisticated process. Apex uses Kryo (https://github.com/EsotericSoftware/kryo) to serialize the whole operator object to HDFS. It has some significant implications during the development of your operators.

The developer needs to be aware that every non-transient field will be serialized. Of course, it is possible to implement a custom Kryo serializer to avoid such behaviour, but it’s an additional overhead. What’s more, application evolution could be quite tricky due to this. New class definitions need to be backward compatible with previous versions to allow deserialization.

Distribution mode

Apex was built on top of the Hadoop infrastructure. It leverages Apache Yarn for resource allocation, and HDFS for state persistence. The core component is an application master. It is responsible for operator allocation based on an execution plan. Operators could be allocated in many different ways: in a separate container, separate container within the same node, within the same container but separate thread, or even within the same thread. Such a variety of options allows developers to find the right balance between networking and serialization cost, resource allocation difficulty, and isolation. Moreover, anti-affinity constraints are supported, so we could precisely define which operators cannot be deployed in the same node, container or a thread.

Apex tries to get the most of the Hadoop platform. Nevertheless, it is done at the expense of flexibility. Unlike other streaming engines, it cannot be deployed in standalone mode, or using a resource manager other than yarn. It might be a key factor for companies that don’t have a Hadoop cluster yet.

Fault tolerance

As mentioned before, Apex’s fault tolerance is based on Yarn. There are couple of failure scenarios that could happen:

  • container failure — Detected by the Yarn node manager. It is a per-node yarn agent which controls containers (in collaboration with global resource manager). Node manager could detect a failure, but could also kill the container itself, e.g. when the container will overuse the provided resources. In such a scenario, Apex’s application master will ask the resource manager for new containers, and reallocate the killed operators. After the allocation of new containers, operator state will be recovered from checkpoints and upstream buffer servers.
  • machine or node manager process failure — The resource manager will stop receiving heartbeats from the node manager. It will notify the application master, which will ask for new containers. The recovery process is similar to the container failure scenario.
  • failure of application master or machine on which the application master is runningResource manager will relaunch the application master.
  • resource manager fails — If the resource manager will fail, then it will be a failover to the secondary resource manager (only if the cluster is configured in HA mode).
  • yarn cluster down — In such a case, Apex offers an option to relaunch the application from the previous state.

As we can see, Apex’s application life cycle relies on standard Yarn mechanisms. The above scenarios are not very detailed, and are valid for Yarn prior to 2.2. However, new Yarn versions brought a lot of optimizations in case of node manager failures (https://hortonworks.com/blog/resilience-of-yarn-applications-across-nodemanager-restarts/) What’s more, future releases may bring a way to reconnect the running container after an application master restart.

Source: https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/yarn_architecture.gif

Delivery guarantees

It is commonly known there are three possible delivery guarantees. At most once, at least once, and exactly once. One could argue that exactly once is not possible. However, it is a much broader topic, and it depends on the definition. We won’t discuss that here (if you are interested, I would advise following the thread https://news.ycombinator.com/item?id=9266725). Instead, I will try to elaborate on the guarantees provided by Apex.

At least once

It is the default delivery strategy. Achieved by using checkpointing and an upstream buffer server. After the failure, the stream engine recovers the operator and its downstream dependencies to the latest checkpoint. Then, the upstream buffer server replays all tuples since the last checkpoint. In terms of latency, it is a quite expensive strategy. However, it guarantees no data loss.

At most once

As far as latency is concerned, this strategy has the fastest recovery mode. After the failure operator is restarted, it subscribes to the next streaming window in the upstream buffer server. There is no need for restarting downstream operators. It is a very valuable strategy when we are interested in high latency, and we don’t care about data loss.

End to end exactly once

As mentioned before, you cannot have the exactly once delivery. Anyway, Apex provides some way to emulate it. You could configure the stream engine in a way that will make the internal state look like each message was processed exactly once. It requires checkpointing after each application window, which could be very expensive if the application window resolution is low. Nevertheless, it guarantees only the internal state. All operators performing side effects need a special care (usually input / output at the edge of applications). They have to be idempotent or support rollbacks. Most of I/O operators provided by the Malhar library support exactly once by idempotence. Other strategies, like two phase commits, are not supported.

Ease of use

Additional libraries

As mentioned before, Apache Apax comes with the Malhar library. It seems to be a quite good decision not to introduce additional features to the core of the platform, but to ship it separately. Malhar provides a lot of solid and useful operators. The vast majority of them supports all 3 delivery guarantees, partitioning, and extensibility. They are a good starting point for new applications. The main provided functionalities could be grouped as follows:

  • IO operators — support connections to RDBM and NoSQL databases, messaging systems and external systems
  • Parsers — enable reading some external data formats like XML or JSON
  • Stream manipulation — supplies the operators, which allows for carrying out operations like group by, limit, join etc
  • Computation — statistical calculations, filtering, data generation
  • Language support — executing non-Java programs. Supported languages are: JavaScript, R, Ruby, Python

Programming interface

Apex provides a programming interface only for Java. Of course, you could use it with any JVM based languages, but it will not be very convenient. The interface is designed in terms of operators, ports, and its configuration. Fluent API (map, flatMap, filter, etc.) is not natively supported. However, there are ongoing efforts to provide it via the Malhar library.

Example of connecting operators using ports. Part of yahoo financial app from https://apex.apache.org/docs/apex/application_development/

Runtime interface

Apex provides a powerful CLI for its applications. It allows for application lifecycle management, logical plan modification at runtime, monitoring and configuration. What’s more, there is a REST API that provides information about the application state. I also came across the Arato project (https://github.com/atrato) which brings the support of Grafana and REST management console. It doesn’t look actively supported though.

Unfortunately, there is no dedicated web interface for management and monitoring of Apex applications. There was one proprietary software provided by Datatorrent, but it’s no longer maintained.

Future perspective

Apex is a top-level Apache foundation project. It graduated the incubation process two years ago (https://blogs.apache.org/foundation/entry/the_apache_software_foundation_announces90). GitHub statistics (https://github.com/apache/apex-core/graphs/commit-activity) shows that it is being actively, but not rapidly developed. The latest release of Apex core is from April 2018. Malhar library was released in November 2017.

The company that released Apache Apex as an open source project was DataTorrent — a big data analytics company. Unfortunately, it has been closed in May this year. Therefore, the future of this project is unclear.

--

--