Rust RocksDB — MergeOperator: Multiple Callbacks & Data Model

Hiraq Citra M
lifefunk

--

In my last post, I wrote about my journey working with Rust and integrating it with the persistent storage, RocksDB.

When I continued working on my project, I found an interesting issue related to multiple operator callbacks including my approach to the data model I’ve used.

Multiple Operator Callbacks

From the official RocksDB Wiki, it’s possible for us to register multiple callbacks, something like this:

opts.create_if_missing(true);
opts.set_merge_operator_associative("test operator 1", concat_merge1);
opts.set_merge_operator_associative("test operator 2", concat_merge2);

But when I try to use this solution, the issue is when we merge some of the data, it will trigger all registered callbacks, and the problem is, that we have to make sure that each of the callbacks must be compatible with each other.

The callbacks will be executed sequentially ordered. From the example above it means, the current RocksDb engine will execute concat_merge1 first and then concat_merge2.

I’m thinking that rather than provide multiple different callbacks for different needs, I think it would be better to provide a single callback used to merge multiple data models.

My approach will be like this

pub const MERGE_BUCKET_ID: &str = "merge_bucket";

fn merge_bucket_builder<T: TryInto<Vec<u8>> + Serialize + DeserializeOwned>(
existing: Option<&[u8]>,
) -> Option<DbBucket<T>> {
let bucket = {
existing.map_or_else(
|| {
let bucket: DbBucket<T> = DbBucket::new();
Some(bucket)
},
|val| {
let bin_builder: Result<DbBucket<T>, DbError> = val.to_vec().try_into();
match bin_builder {
Ok(bucket) => Some(bucket),
Err(_) => None,
}
},
)
}?;

Some(bucket)
}

fn merge_bucket_credential(existing: Option<&[u8]>, operands: &MergeOperands) -> Option<Vec<u8>> {
let mut bucket: DbBucket<Credential> = merge_bucket_builder(existing)?;
for op in operands {
let op_credential = {
let credential_builder: Result<Credential, CredentialError> = op.to_vec().try_into();
match credential_builder {
Ok(credential) => Some(credential),
Err(_) => None,
}
};

op_credential.map(|cred| {
bucket.add(cred);
});
}

let output = {
let bucket_bin_builder: Result<Vec<u8>, DbError> = bucket.try_into();
match bucket_bin_builder {
Ok(bin) => Some(bin),
Err(_) => None,
}
};

output
}

fn merge_bucket_presentation(existing: Option<&[u8]>, operands: &MergeOperands) -> Option<Vec<u8>> {
let mut bucket: DbBucket<Verifier> = merge_bucket_builder(existing)?;

for op in operands {
let op_presentation = {
let presentation_builder: Result<Verifier, PresentationError> =
op.to_vec().try_into();

match presentation_builder {
Ok(presentation) => Some(presentation),
Err(_) => None,
}
};

op_presentation.map(|presentation| {
bucket.add(presentation);
});
}

let output = {
let bucket_bin_builder: Result<Vec<u8>, DbError> = bucket.try_into();
match bucket_bin_builder {
Ok(bin) => Some(bin),
Err(_) => None,
}
};

output
}

pub fn merge_bucket(
new_key: &[u8],
existing: Option<&[u8]>,
operands: &MergeOperands,
) -> Option<Vec<u8>> {
let allowed_keys = vec!["merge_credential", "merge_presentation"];

let key = {
match String::from_utf8(new_key.to_vec()) {
Ok(key_val) => Some(key_val),
Err(_) => None,
}
}
.filter(|key| {
let mut selected_key = String::from("");
for allowed in allowed_keys.iter() {
if key.as_str().contains(*allowed) {
selected_key = allowed.to_string();
break;
}
}

!selected_key.is_empty()
});

if key.is_none() {
let existing_val = existing.map(|val| val.to_vec())?;
return Some(existing_val);
}

let output = {
match key {
Some(key) => {
let mut str_splitted = key.as_str().split(":");
let first_index = str_splitted.next();
match first_index {
Some("merge_credential") => {
let output = merge_bucket_credential(existing, operands);
output
}
Some("merge_presentation") => {
let output = merge_bucket_presentation(existing, operands);
output
}
_ => None,
}
}
None => None,
}
};

output
}

The operation of this merge operator callback will based on given keys. First, I define the allowed keys that I need to support, which are

  • merge_credential
  • merge_presentation

If there is a merge request operation that uses a key that is not supported, it will just return a None. If the given key has already been supported, it will call other specific operation callbacks

  • merge_bucket_credential
  • merge_bucket_presentation

Data Model

As I’ve explained above, that my approach is using a single-operator callback. To support this solution, I’ve thought that I need a single data model that supports some specific data types with some constraints. The solution is I’ve created a generic data model.

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(crate = "self::serde")]
pub struct Bucket<T>
where
T: TryInto<Vec<u8>> + Serialize,
{
collections: Vec<T>,
}

impl<T> Bucket<T>
where
T: TryInto<Vec<u8>> + Serialize + DeserializeOwned,
{
pub fn new() -> Self {
Self {
collections: Vec::new(),
}
}

pub fn add(&mut self, val: T) {
self.collections.push(val)
}

pub fn iterate(&self) -> Iter<T> {
let iterator = self.collections.iter();
iterator
}
}

It is just a simple data model called Bucket that contains a single private property, which is collections, this property is a simple of Vec<T>. The <T> itself is any data type that implements the trait (interface) of :

  • TryInto<Vec<u8>>
  • Serialize

The Bucket itself must implement those traits too, because we need to serialize the data into a bytes array.

impl<T> TryInto<Vec<u8>> for Bucket<T>
where
T: TryInto<Vec<u8>> + Serialize,
{
type Error = DbError;
fn try_into(self) -> Result<Vec<u8>, Self::Error> {
let json = serde_json::to_vec(&self)
.map_err(|err| DbError::BucketError(format!("error try into: {}", err.to_string())))?;

Ok(json)
}
}

impl<T> TryFrom<Vec<u8>> for Bucket<T>
where
T: TryInto<Vec<u8>> + Serialize + DeserializeOwned,
{
type Error = DbError;

fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
let bucket: Self = serde_json::from_slice(&value)
.map_err(|err| DbError::BucketError(format!("error try from: {}", err.to_string())))?;
Ok(bucket)
}
}

By using this single data model, I’ll be able to save and merge any data from other data types as long as they implement the same traits. The merge operation when need to merge will be like this:

fn merge_bucket_builder<T: TryInto<Vec<u8>> + Serialize + DeserializeOwned>(
existing: Option<&[u8]>,
) -> Option<DbBucket<T>> {
let bucket = {
existing.map_or_else(
|| {
let bucket: DbBucket<T> = DbBucket::new();
Some(bucket)
},
|val| {
let bin_builder: Result<DbBucket<T>, DbError> = val.to_vec().try_into();
match bin_builder {
Ok(bucket) => Some(bucket),
Err(_) => None,
}
},
)
}?;

Some(bucket)
}

Because this data model is able to convert to and from Vec<u8>, it means we’ll able to save and extract from existing parameters. The data saved into RocksDb itself is not the raw data types, but some data types that are wrapped inside the Bucket.

fn merge_bucket_credential(existing: Option<&[u8]>, operands: &MergeOperands) -> Option<Vec<u8>> {
let mut bucket: DbBucket<Credential> = merge_bucket_builder(existing)?;
for op in operands {
let op_credential = {
let credential_builder: Result<Credential, CredentialError> = op.to_vec().try_into();
match credential_builder {
Ok(credential) => Some(credential),
Err(_) => None,
}
};

op_credential.map(|cred| {
bucket.add(cred);
});
}

let output = {
let bucket_bin_builder: Result<Vec<u8>, DbError> = bucket.try_into();
match bucket_bin_builder {
Ok(bin) => Some(bin),
Err(_) => None,
}
};

output
}

The overall data flow diagram will be like this:

Outro

Working with Rust, especially for this case integrated with RocksDB really improved my knowledge and I’ve found many interesting challenges in it. The RocksDb itself is like simple persistent storage but really powerful as key-value storage, so powerful that I think it’s really possible for us to build a high-level database model on top of it, like CockroachDB.

--

--