Event-driven data processing with IBM Cloud Object Storage and Ray.io
Jointly written by Gheorghe-Teodor Bercea and Olivier Tardieu.
In a previous blog we introduced Rayvens, a library for bridging Ray.io with the vast collection of event and data services supported by Apache Camel. We showcased Rayvens event streams, and configured simple event sources and sinks.
In this post we leverage Rayvens to produce and consume data (and metadata) hosted in IBM Cloud Object Storage or AWS S3 buckets.
Getting started
First, install Rayvens following the instructions provided here.
To access an IBM Cloud Object Storage or AWS S3 bucket, Rayvens requires:
- the bucket name,
- the access key id,
- the secret access key,
- the public or private endpoint.
Cloud Object Storage Source
The Cloud Object Storage source monitors a single bucket. It is configured as follows:
source_config = dict(kind='cloud-object-storage-source',
bucket_name=<bucket_name>,
access_key_id=<...>,
secret_access_key=<...>,
endpoint=<...>)
The bucket_name
, access_key_id
, secret_access_key
, and endpoint
are required configuration parameters. The source configuration supports additional optional configuration parameters discussed below.
When a file is uploaded to the bucket, the source produces an event on the Rayvens event stream attached to the source. This event has the following JSON structure:
{
"filename": <filename_value>,
"body": <body_value>
}
The file is automatically deleted from the bucket upon firing the event.
region
The region of the Cloud Storage instance if not specified defaults tous-east
. To specify a different region use:
source_config = dict(kind='cloud-object-storage-source',
...
region='eu-de')
move_after_read
The move_after_read
option makes it possible to preserve the files triggering the events by moving them to an alternate bucket instead of deleting them.
source_config = dict(kind='cloud-object-storage-source',
...
move_after_read=<alternate_bucket_name>)
The bucket needs to be created in advance in Cloud Object Storage.
meta_event_only
To only propagate the file name use the meta_event_only
option:
source_config = dict(kind='cloud-object-storage-source',
...
meta_event_only=True)
The JSON structure of the produced event is:
{
"filename": <filename_value>
}
Combine the meta_event_only
and move_after_read
options to be notified of new files but pull file contents on demand.
Cloud Object Storage Sink
The Cloud Object Storage sink makes it possible to upload one or several files to a bucket. When a file is uploaded to a Cloud Object Storage bucket, a new file entry will be created unless a file with the same name already exists in the bucket in which case the existing file will be overwritten with the new file.
All instances of the sink require the following configuration parameters:
sink_config = dict(kind='cloud-object-storage-sink',
bucket_name=<bucket_name>,
access_key_id=<...>,
secret_access_key=<...>,
endpoint=<...>,
...)
Other parameters define what, when, and how to upload the data to Cloud Object Storage.
Uploading a file from an in-memory buffer
The sink configuration may specify a file name.
sink_config = dict(kind='cloud-object-storage-sink',
...
file_name=<file_name>)
In this configuration, the content of the file is the event payload itself.
Add the sink to an existing event stream:
stream.add_sink(sink_config)
Dump the file contents onto the stream:
stream << 'File contents.'
This works well for small to medium files.
Uploading one or several files from the file system
For larger files, multi-part uploads are recommended. Pass the upload_type='multi-part'
option to the sink configuration and optionally specify the part size by adding part_size=<number_of_bytes>
to the configuration. By default the part size is 10MB.
For example:
sink_config = dict(kind='cloud-object-storage-sink',
...
upload_type='multi-part',
part_size=2 * 1024 * 1024)
This configuration will upload a file in 2MB chunks.
In the case of a multi-part file upload, there are several ways to specify files to be uploaded.
Method 1: A file and its content can be specified from within the application using file_name=<file_name>
and stream << 'File contents'
method.
Method 2: Files can also be specified by adding a Cloud Object Storage sink to an event stream and then passing the following to the stream:
stream << Path('test_files/test.txt')
This requires Path
to be imported: from pathlib import Path
.
The event will be treated as a path to a file rather than the file contents. The file will be uploaded using its original name unless the file_name
option is provided to override it.
Method 3: Files can be uploaded using the from_file
option. For example:
sink_config = dict(kind='cloud-object-storage-sink',
...
upload_type='multi-part',
part_size=2 * 1024 * 1024,
from_file='test_files/test3.txt')
Every time a file named test3.txt
is available in the test_files
directory, the file will be uploaded to Cloud Object Storage.
Method 4: Files can be uploaded using thefrom_directory
option:
sink_config = dict(kind='cloud-object-storage-sink',
...
upload_type='multi-part',
part_size=2 * 1024 * 1024,
from_directory='test_files/')
Any file added to the test_files
directory will be uploaded to Cloud Object Storage under its own name. Existing files will be silently overwritten.
For Methods 3 and 4, the keep_file
option ensures the files are kept on the file system but their location is changed. After upload the files are moved to an hidden subdirectory of the directory they were uploaded from.
Conclusion
This post showcases the practical ways in which Rayvens can bridge Cloud Object Storage and Ray.io. The Cloud Object Storage source monitors a bucket and produces an event whenever a file is added to a bucket. The event includes the file name and if desired the file content. The Cloud Object Storage sink supports various methods for uploading files to a bucket. At this time only multi-part uploads are supported with streaming uploads coming soon.