Event-driven data processing with IBM Cloud Object Storage and Ray.io

Gheorghe-Teodor Bercea
CodeFlare
Published in
4 min readOct 12, 2021

Jointly written by Gheorghe-Teodor Bercea and Olivier Tardieu.

In a previous blog we introduced Rayvens, a library for bridging Ray.io with the vast collection of event and data services supported by Apache Camel. We showcased Rayvens event streams, and configured simple event sources and sinks.

In this post we leverage Rayvens to produce and consume data (and metadata) hosted in IBM Cloud Object Storage or AWS S3 buckets.

Getting started

First, install Rayvens following the instructions provided here.

To access an IBM Cloud Object Storage or AWS S3 bucket, Rayvens requires:

  • the bucket name,
  • the access key id,
  • the secret access key,
  • the public or private endpoint.

Cloud Object Storage Source

The Cloud Object Storage source monitors a single bucket. It is configured as follows:

source_config = dict(kind='cloud-object-storage-source',
bucket_name=<bucket_name>,
access_key_id=<...>,
secret_access_key=<...>,
endpoint=<...>)

The bucket_name, access_key_id, secret_access_key, and endpoint are required configuration parameters. The source configuration supports additional optional configuration parameters discussed below.

When a file is uploaded to the bucket, the source produces an event on the Rayvens event stream attached to the source. This event has the following JSON structure:

{
"filename": <filename_value>,
"body": <body_value>
}

The file is automatically deleted from the bucket upon firing the event.

region

The region of the Cloud Storage instance if not specified defaults tous-east. To specify a different region use:

source_config = dict(kind='cloud-object-storage-source',
...
region='eu-de')

move_after_read

The move_after_read option makes it possible to preserve the files triggering the events by moving them to an alternate bucket instead of deleting them.

source_config = dict(kind='cloud-object-storage-source',
...
move_after_read=<alternate_bucket_name>)

The bucket needs to be created in advance in Cloud Object Storage.

meta_event_only

To only propagate the file name use the meta_event_only option:

source_config = dict(kind='cloud-object-storage-source',
...
meta_event_only=True)

The JSON structure of the produced event is:

{
"filename": <filename_value>
}

Combine the meta_event_only and move_after_read options to be notified of new files but pull file contents on demand.

Cloud Object Storage Sink

The Cloud Object Storage sink makes it possible to upload one or several files to a bucket. When a file is uploaded to a Cloud Object Storage bucket, a new file entry will be created unless a file with the same name already exists in the bucket in which case the existing file will be overwritten with the new file.

All instances of the sink require the following configuration parameters:

sink_config = dict(kind='cloud-object-storage-sink',
bucket_name=<bucket_name>,
access_key_id=<...>,
secret_access_key=<...>,
endpoint=<...>,
...)

Other parameters define what, when, and how to upload the data to Cloud Object Storage.

Uploading a file from an in-memory buffer

The sink configuration may specify a file name.

sink_config = dict(kind='cloud-object-storage-sink',
...
file_name=<file_name>)

In this configuration, the content of the file is the event payload itself.

Add the sink to an existing event stream:

stream.add_sink(sink_config)

Dump the file contents onto the stream:

stream << 'File contents.'

This works well for small to medium files.

Uploading one or several files from the file system

For larger files, multi-part uploads are recommended. Pass the upload_type='multi-part' option to the sink configuration and optionally specify the part size by adding part_size=<number_of_bytes> to the configuration. By default the part size is 10MB.

For example:

sink_config = dict(kind='cloud-object-storage-sink',
...
upload_type='multi-part',
part_size=2 * 1024 * 1024)

This configuration will upload a file in 2MB chunks.

In the case of a multi-part file upload, there are several ways to specify files to be uploaded.

Method 1: A file and its content can be specified from within the application using file_name=<file_name> and stream << 'File contents' method.

Method 2: Files can also be specified by adding a Cloud Object Storage sink to an event stream and then passing the following to the stream:

stream << Path('test_files/test.txt')

This requires Path to be imported: from pathlib import Path.

The event will be treated as a path to a file rather than the file contents. The file will be uploaded using its original name unless the file_name option is provided to override it.

Method 3: Files can be uploaded using the from_file option. For example:

sink_config = dict(kind='cloud-object-storage-sink',
...
upload_type='multi-part',
part_size=2 * 1024 * 1024,
from_file='test_files/test3.txt')

Every time a file named test3.txt is available in the test_files directory, the file will be uploaded to Cloud Object Storage.

Method 4: Files can be uploaded using thefrom_directory option:

sink_config = dict(kind='cloud-object-storage-sink',
...
upload_type='multi-part',
part_size=2 * 1024 * 1024,
from_directory='test_files/')

Any file added to the test_files directory will be uploaded to Cloud Object Storage under its own name. Existing files will be silently overwritten.

For Methods 3 and 4, the keep_file option ensures the files are kept on the file system but their location is changed. After upload the files are moved to an hidden subdirectory of the directory they were uploaded from.

Conclusion

This post showcases the practical ways in which Rayvens can bridge Cloud Object Storage and Ray.io. The Cloud Object Storage source monitors a bucket and produces an event whenever a file is added to a bucket. The event includes the file name and if desired the file content. The Cloud Object Storage sink supports various methods for uploading files to a bucket. At this time only multi-part uploads are supported with streaming uploads coming soon.

--

--