Accessing hundreds of event sources and sinks with Rayvens

Gheorghe-Teodor Bercea
CodeFlare
Published in
5 min readAug 23, 2021

Jointly written by Gheorghe-Teodor Bercea and Olivier Tardieu.

In a previous blog we introduced Rayvens. Rayvens combines event-driven programming (Apache Camel) and AI (Ray.io). We followed with a more detailed description of Rayvens event streams. In this blog post we dive into the details of the Rayvens event sources and sinks. We will cover:

  1. General principles: configuring, attaching, and detaching sources and sinks to/from event streams
  2. Built-in catalog of sources and sinks
  3. Programming more sources and sinks using Camel’s YAML dialect

Sources and sinks are configured by means of Python dictionaries, for example:

source_config = dict(kind='http-source',
url=<source_url>,
period=3000)

Each configuration requires a kind and, depending on this kind, other fields will be mandatory or optional. For example, the http-source periodically fetches data from a REST API. The configuration of the url to get data from is required whereas the period is optional.

Typical sources and sinks are configured by means of simple values (numbers, strings, Boolean flags). In contrast, the generic-source and generic-sink make it possible to use Camel’s YAML dialect to program sophisticated sources and sinks. They accept a single spec field that specifies the program for the source or sink to run.

Attaching a source or sink to a stream

A source or sink is attached to an event stream using add_source or add_sink respectively:

source = stream.add_source(source_sonfig)
sink = stream.add_sink(sink_config)

An event source produces events and appends these events to the stream the source is added to. An event sink is subscribed to the stream it is attached to. It accepts all events emitted by the stream once it is attached to the stream and forwards these events (typically in order) to an external data service.

Keep in mind that sources and sinks typically take a few seconds to initialize and start producing or accepting events after the invocation of add_source or add_sink. Multiple sinks attached to the same stream eventually accept the same events.

As explained in prior blogs, a stream by default emits all the events appended to it in order, hence forwards all events from its sources to its sinks (after initialization). Stream operators can be used to filter or transform events between sources and sinks. A stream can forward events to multiple streams for instance to partition events across multiple sinks.

Detaching a source or sink from a stream

The return value of add_source or add_sink makes it possible to detach a source or sink from the stream it is attached to:

stream.disconnect_source(source)
stream.disconnect_sink(sink)

To detach all sources and sinks from a stream invoke:

stream.disconnect_all()

Disconnecting a source or sink from a stream does not preempt events already queued for processing on the stream.

Rayvens built-in sources

Rayvens offers built-in sources and sinks for popular services. A comprehensive list can be found here. These sources and sinks expose a curated set of options that the user can configure. A summary of built-in sources follows.

HTTP source

This source makes an HTTP GET request to url every period milliseconds producing for each request an event consisting of the response body:

source_config = dict(kind='http-source',
url=<url_value>,
period=<optional_period_value>)

Kafka source

This source receives messages from a Kafka topic. It requires the topic name as well as a comma-separated list of Kafka brokers. See the full documentation for additional parameters such as Kafka credentials.

source_config = dict(kind='kafka-source',
topic=<topic>,
brokers=<kafka_brokers>)

Telegram source

This source receives all the incoming messages from a Telegram Bot.

source_config = dict(kind='telegram-source',      
authorization_token=<token>)

Binance source

This source receives cryptocurrency status events from the Binance API every period milliseconds. This source allows the monitoring of several cryptocurrencies.

source_config = dict(kind='binance-source',
coin=<list_of_coin_IDs>,
period=<optional_period_value>)

Example list of coin IDs:

['BTC', 'ETH']

Cloud Object Storage source

This source receives objects from IBM Cloud Object Storage or AWS S3. For example, it can be configured to produce an event for each object initially present or later added to a bucket:

source_config = dict(kind='cloud-object-storage-source',
bucket_name=<bucket>,
access_key_id=<access_key_id>,
secret_access_key=<secret_access_key>,
endpoint=endpoint)

Several other configuration options exist and will be covered in a separate blog post.

The events received by the stream from this source have the following JSON format:

{ "filename": <filename>, "body": <body> }

File source

This source reads a file from the file system when the file becomes available.

source_config = dict(kind='file-source',
path='path/to/file',
keep_files=True)

When keep_files is true the file will be moved into a hidden subfolder and not deleted after being read which is the default behavior.

File watch source

This source monitors a directory for file-related events: file modification (MODIFY), file deletion (DELETE), and file creation (CREATE). A subset of these event types to monitor can be specified. By default all events are monitored.

source_config = dict(kind='file-watch-source',
path='path/to/directory',
events='DELETE,CREATE')

The event received by the stream from this source has the following JSON format:

{ "filename": <filename>, "event_type": <event_type> }

Rayvens built-in sinks

Slack sink

Output a message to Slack.

sink_config = dict(kind='slack-sink',
channel=<slack_channel_name>,
webhook_url=<slack_webhook>)

Kafka sink

Publish events to a Kafka topic.

sink_config = dict(kind='kafka-sink',
topic=<topic>,
brokers=<list_of_brokers>)

Telegram sink

Send message to a Telegram bot.

sink_config = dict(kind='telegram-sink',
authorization_token=<authorization_token>,
chat_id=<chat_id>)

Cloud Object Storage sink

Upload objects to IBM Cloud Object Storage or AWS S3.

sink_config = dict(kind='cloud-object-storage-sink',
bucket_name=<bucket_name>,
access_key_id=<access_key_id>,
secret_access_key=<secret_access_key>,
endpoint=<endpoint>,
file_name=<name_of_uploaded_file>)

Several other options for this sink will be covered in a separate blog post.

Generic sources and sinks

Rayvens leverages the breadth of components supported by the Apache Camel-K project (253+ components). Rayvens includes a growing set of built-in sources and sinks to mediate the interaction between Ray and popular services such as Twitter, Telegram, Cloud Object Storage, etc. Items in this catalog comprise compositions of elementary Camel components for instance a periodic ticker with an HTTP client to offer simple and intuitive sources and sinks such as the http-source. However this catalog today barely scratches the surface of what Camel is capable of.

To allow power users to program more sophisticated sources and sinks or unlock the entiere catalog of Camel components, Rayvens offer a generic-source and a generic-sink. They makes it possible to instantiate a source or sink running a custom Camel YAML script. For instance an http-source-like source can be programmed as follows:

source_config = dict(kind='generic-source',
spec="""
- from:
uri: timer:tick?period=3000
steps:
- to: https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL
""")

Simply set the kind to either generic-source or generic-sink and paste the appropriate YAML script as a properly indented string under the spec field. See the full documentation for details.

Conclusion

Rayvens makes it easy to connect to external event sources and sinks by means of the vast component catalog of Apache Camel. To flatten the learning curve, Rayvens offers easy-to-configure, built-in sources and sinks. Generic sources and sinks enable power users to access the full catalog of Camel components and program sophisticated sources and sinks.

Rayvens catalog of built-in sources and sinks is rapidly growing. Going forward we plan to enrich this catalog with the recently introduced catalog of Apache Camel Kamelets.

--

--