Rayvens event streams

Olivier Tardieu
CodeFlare
Published in
4 min readJul 6, 2021

Jointly written by Gheorghe-Teodor Bercea and Olivier Tardieu.

In a previous blog we introduced Rayvens. With Rayvens, Ray programs can subscribe to event streams, process, and produce events. In this post we dive into the details of the Rayvens streams API. We will cover:

  1. Rayvens initialization
  2. Stream construction
  3. Producing and consuming events
  4. Stream subscriber types
  5. Stream operators

Rayvens initialization

Rayvens must be initialized before use:

import rayvensrayvens.init()

The init method takes optional configuration parameters we will cover in a separate post.

Stream construction

The Rayvens API is built around streams. A stream is an abstraction of an ordered, potentially infinite sequence of events. Events are just items of data with no prescribed schema or semantics. Streams bridge producers and consumers of events in a Ray program with external event sources and sinks. Streams also have the ability to process events. They can filter and/or transform events.

An event stream is created by instantiating the Rayvens Stream class:

stream = rayvens.Stream('stream_name')

A stream name must be provided.

Producing and consuming events

Events can be appended to a stream one by one with the << operator:

stream << "hello" << "world"

This statement appends two events to the stream stream in order, starting with "hello" and followed by "world".

Subscribers can be added to a stream to consume its events using the >> operator. For instance, events can be logged to the console using:

stream >> print

As seen in the previous blog post, a custom message can be constructed using a lambda function:

stream >> (lambda event: print('LOG:', event))

The << and >> operator are not symmetrical. Each << invocation appends a single event to the stream. In contrast, the right-hand side of the >> invocation is invoked for every event appended to the stream.

Events are ephemeral. A stream does not remember past events. A subscriber receives all events appended to the stream from the time of the subscription onward.

Stream subscriber types

A stream subscriber may be a Python callable, a Ray task, a Ray actor method, or a Rayvens stream. A subscriber is invoked for each event appended to the stream with the event as the invocation sole argument. We now demonstrate these different subscriber types and explain ordering guarantees.

Python callable

def handler(event):
print('received', event)
stream >> handler

The handler function is invoked for each event in order in a blocking manner. Each invocation has to complete before the next one can execute.

If multiple functions are subscribed to a stream, Rayvens guarantees that the executions of these functions do not overlap. Moreover, the processing of one event has to finish for all functions before the next event is considered.

Ray tasks

@ray.remote
def task(event):
print('received', event)
stream >> task

Ray tasks subscribed to a stream execute in arbitrary order, irrespective of the order of events. Moreover task executions may overlap.

Ray actors

@ray.remote
class SimpleActor:
def method(self, event):
print('received', event)
actor = SimpleActor.remote()stream >> actor.method

Events are delivered in order to each actor subscribed to the stream. However, if multiple actors are subscribed to the same stream, the order or delivery across actors is unspecified. In particular, one actor may be ahead of another actor by multiple events.

An actor with a method named append may be subscribed with:

stream >> actor

This is simply a shorthand for:

stream >> actor.append

Rayvens streams

source_stream = rayvens.Stream('source_stream_name')
sink_stream = rayvens.Stream('sink_stream_name')
source_stream >> sink_stream

All source_stream events will be appended in order to sink_stream (after processing if an operator is added to source_stream).

Stream operators

A stream can specify a stream operator. A stream operator can be a Python callable, a Ray task, or a Ray actor method. It is invoked on each event before any stream subscriber. Each operator invocation may either drop the event by returning None or transform the event by returning any other value. Transformed events are then forwarded in order to the stream subscribers.

For example, an operator may be used to decorate events with a timestamp:

@ray.remote
class Decorator:
def append(self, event):
return { 'event': event, 'time': time.time() }
decorator = Decorator.remote()stream.add_operator(decorator)

While it is possible to use Ray tasks as stream operators, keep in mind that these tasks may execute in arbitrary order even if the task outputs are then forwarded to the stream subscribers in the order of the incoming events. So Ray tasks should not be used for non-commutative event processing.

Conclusion

Rayvens unleashes event-driven programming in Ray. In addition to producing and consuming events with the << and >> operators showcased in the previous sections, Rayvens leverages Apache Camel to accept events from external sources and emit events to external sinks. We will discuss these sources and sinks in a separate blog post.

--

--