Serverless stream processing with Rayvens on Ray.io

Gheorghe-Teodor Bercea
CodeFlare
Published in
5 min readDec 9, 2021

Rayvens seamlessly integrates Ray.io, Apache Camel, and Kafka
to empower data scientists to build scalable stream processing applications on the cloud with easy access to hundreds of data/event sources and sinks.

In previous posts we introduced Rayvens eventing for Ray.io, its programming model, and its catalog of data sources and sinks.

In this post we show how Rayvens can stream events using Kafka as the event transport while leveraging Ray’s scaling capabilities to enable the efficient and scalable parallel processing of large event quantities.

Rayvens generalizes the steps described in Anyscale “Serverless Kafka Stream Processing with Ray” blog to the point that the source and sink in Anyscale’s example code can be easily replaced with any of the hundreds of sources and sinks provided by Camel. Our example application below receives real-time cryptocurrency data from Binance. It processes the data producing messages that are pushed to a Slack channel.

The event flow

We want to process a large number of cryptocurrency price events. The user is responsible for providing a list of cryptocurrencies to be monitored, for this example we will use a list of two coins: ["BTC", "ETH"]. The real-time prices will be provided by Binance. The prices will be formatted and forwarded to a Slack channel for user inspection if above a certain threshold.

Kafka will be used to stream the events between:

  • Binance and the processing tasks
  • the tasks and Slack
Event processing flow

The Rayvens programming model can readily express the event flow above using its concepts of:

  • event source for receiving events from Binance
  • event sink for outputting events to Slack
  • event processor for processing the events
  • event transport for the Kafka communication to/from the main task, automatic creation of Kafka topics and provisioning of Kafka event Consumers and Producers

Rayvens employs the concept of event streams upon which all Rayvens events sources, sinks, and processors operate. A detailed description of event streams can be found here but for the purposes of this example a single stream object is needed:

stream = rayvens.Stream('kafka-eventing')

Creating the Binance event source

With the stream in place we can configure a source and then add it to the stream:

source_config = dict(
kind='binance-source',
coin=["BTC", "ETH"]
)
stream.add_source(source_config)

Creating the Slack event sink

Configuring a sink and adding it to the stream is very simple:

sink_config = dict(
kind='slack-sink',
channel=slack_channel,
webhookUrl=slack_webhook
)
stream.add_sink(sink_config)

For setting up a channel and webhook URL for Slack please consult the Slack API.

Creating the event processor

We can use a plain Python function to express the event processor:

def process_currency_price(event):
parsed_event = json.loads(event)
currency = parsed_event['instrument'].split("/")[0]
price = parsed_event['last']
return f"{currency} : {price}"
stream.add_operator(process_currency_price)

Note: The value returned by the processor represents the processed event content and will be put back into the event stream and forwarded to all sinks attached to the event stream.

Event transport

Rayvens, by default, uses HTTP for propagating events. To use Kafka, use the transport=”kafka” option explicitly when initializing Rayvens:

rayvens.init(transport="kafka")

Rayvens will automatically create:

  • the two topics for the source and sink communication
  • the Kafka Consumer object for ingesting incoming events from the Kafka topic responsible for handling incoming Binance events
  • the Kafka Producer object for publishing outgoing events to the Kafka topic used for communication with Slack

With these elements in place the event flow is ready:

# Initialize Rayvens:
rayvens.init(transport="kafka")
# Create an event stream:
stream = rayvens.Stream('kafka-eventing')
# Add sink:
sink_config = dict(
kind='slack-sink',
channel=slack_channel,
webhookUrl=slack_webhook
)
stream.add_sink(sink_config)
# Add event processor:
def process_currency_price(event):
parsed_event = json.loads(event)
currency = parsed_event['instrument'].split("/")[0]
price = parsed_event['last']
return f"{currency} : {price}"
stream.add_operator(process_currency_price)# Add event source:
source_config = dict(
kind='binance-source',
coin=[“BTC”, “ETH”]
)
stream.add_source(source_config)

But we are not done yet, what about scaling?

Scaling the event flow using Ray

As written above the event flow will not scale which might be fine for testing purposes but not so much for a real application with a compute intensive processing step.

We want the processing step to be distributed and scale and Rayvens can do this with two simple changes.

Rayvens is based on Ray and supports event processors that are Ray tasks, Ray actors and Ray actor methods.

Change 1: Since price events are handled independently the best representation for our processor is a Ray task. This can be achieved by annotating the plain Python function we used as processor before with @ray.remote:

@ray.remote
def process_currency_price(event):
...

Simply using a task is not enough.

Remaining issues: As written, the flow only has one Kafka Consumer. Moreover, the Kafka topic that is used for incoming Binance events, by default, is not partitioned which means that only one Kafka event queue is being created.

What we would like is to have several event queues feeding into several Kafka Consumers forwarding events to several Ray Tasks:

Scaling event processing flow

Change 2: Rayvens supports automatic partitioning of Kafka topics for event sources via the source kafka_transport_partitions configuration option:

source_config = dict(
...
kafka_transport_partitions=3
)

Behind the scenes: Rayvens automatically wraps the Kafka Consumer into a Ray Actor with low CPU usage num_cpus=0.05 and creates a pool of these Kafka Actor Consumers equal to the number of topic partitions. Each Kafka Actor Consumer will then call the user-provided processor (in this case a Ray Task) to process the event. The outcome will be sent to Slack via a single Kafka Producer.

And we’re done!

Link to the full code.

Using other sources and sinks

Rayvens allows the types of sources and sinks used in the example above to be replaced. Simply replace the configurations of the Binance source or Slack sink with the configuration for other other source/sink types.

Cloud Object Storage source:

source_config = dict(kind='cloud-object-storage-source',
bucket_name=<bucket_name>,
access_key_id=<...>,
secret_access_key=<...>,
endpoint=<...>)

Cloud Object Storage sink:

sink_config = dict(kind='cloud-object-storage-sink',
bucket_name=<bucket_name>,
access_key_id=<...>,
secret_access_key=<...>,
endpoint=<...>,
file_name=<file_name>)

Actually any other sources and sinks Rayvens supports (and there are hundreds) can be used to replace the sources and sinks used in the example code. See the Rayvens sources and sinks blog linked below.

Related blog posts

Authors: Gheorghe-Teodor Bercea and Olivier Tardieu

--

--