Generated With Google Gemini

Rust RocksDb — MultiGet & MergeOperator

Hiraq Citra M
lifefunk

--

I have experience working with persistent storage in Rust, and I choose RocksDB as my persistent storage or database. Developing in Rust is really fun and I’ve felt really productive with that, and now I’ve got more interesting challenges and experiences when integrating it with RocksDB, through the available crate, rust-rocksdb :

Rust wrapper for RocksDB.

RocksDB

From its official documentation:

The RocksDB library provides a persistent key-value store. Keys and values are arbitrary byte arrays. The keys are ordered within the key-value store according to a user-specified comparator function.

This database was inspired by Google LevelDB :

LevelDB is a fast key-value storage library written at Google that provides an ordered mapping from string keys to string values.

The reason why I chose this key-value storage is because I like its concept for the key-value storage, it only provides basic operations, such as

  • Put
  • Get / MultiGet
  • Delete
  • Merge

The data model itself will depend on us to design and build it based on our application and business needs.

A balance is struck between customizability and self-adaptability. RocksDB features highly flexible configuration settings that may be tuned to run on a variety of production environments, including SSDs, hard disks, ramfs, or remote storage. It supports various compression algorithms and good tools for production support and debugging

Source:

There is one feature that I really like, it’s called ColumnFamily

RocksDB supports partitioning a database instance into multiple-column families. All databases are created with a column family named “default”, which is used for operations where the column family is unspecified.

Short story, I think that all of the basic operations and needs as persistent storage are already provided by this database, even I’m just been thinking that it’s really possible to create a more “specific” database on top of this persistent storage engine, like this: https://myrocks.io/ , it is a MySQL database that backed by RocksDB

Operations

MultiGet

When using Rust RocksDB crate, there already exists an API for simple Get used to get a value based on some given key

pub fn get_cf<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K
) -> Result<Option<Vec<u8>>, Error>

What is MultiGet ? This API will help us to “query” our database storage something like this in SQL:

SELECT * FROM some_tables WHERE id IN(...)

To achieve the same behavior as the SQL above, the MultiGet API is like this:

pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
&'a self,
keys: I
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'b W, K)>,
W: 'b + AsColumnFamilyRef,

Please be aware that the keys parameter actually is a tuple that contains two things: a reference to the available column family and the key. This API will act like the WHERE IN in SQL, but the good thig is, through this API is possible for us to find the data from multiple available columns in one time query.

You have to be sure that your key is available in the described column family

An example is taken from my own implementation:

let cf = db_instance
.cf_handle(cf_def.as_str())
.map(|val| val.to_owned())
.ok_or(AppError::DbError("cf handler failed".to_string()))?;

let cf_keys = keys.iter().map(|val| (cf, val));
let result = db_instance.multi_get_cf(cf_keys);

What will be happened if some of keys not found? This is what I was implement

let mut creds: Vec<Credential> = Vec::new();
let _ = values
.iter()
.map(|val| {
let cred_val = val.as_ref().map_or(None, |val| {
let cred_val_bytes = val
.clone()
.map(|val| {
let cred = Credential::try_from(val)
.map_err(|err| {
CredentialError::CommonError(
VerifiableError::RepoError(err.to_string()),
)
})
.map_or(None, |cred| Some(cred));

cred
})
.flatten();

cred_val_bytes
});

cred_val
})
.for_each(|val| {
val.map(|cred| creds.push(cred));
});

Ok(creds)

If some of the given keys are not found, we can ignore them, and it depends on how we manage the results that come from this storage.

MergeOperator

Now, it’s the most interesting part for me. Let’s say we have a case where we want to update a counter, it’s a really simple case, right? Each time some method is triggered, it will update the counter in our database. By nature, we will be thinking the operations will be like this:

Get the data -> Modify -> Save

This operation is called a Read-Modify-Write or a Merge operation in RocksDB.

The Merge API from the crate is also really simple, it is almost like the Put operation:

pub fn merge_cf<K, V>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
value: V
) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,

The differences are:

  • We need to register the Merge Operator Callback on the database initiation
  • Before we use the API function, we need to make sure that the Merge Keyalready been saved in the storage

The operator callback itself needs to implement this trait:

pub trait MergeFn: Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static { }

For example, the callback’s signature will be like this:

fn concat_merge(new_key: &[u8],
existing_val: Option<&[u8]>,
operands: &MergeOperands)
-> Option<Vec<u8>> {

let mut result: Vec<u8> = Vec::with_capacity(operands.len());
existing_val.map(|v| {
for e in v {
result.push(*e)
}
});
for op in operands {
for e in op {
result.push(*e)
}
}
Some(result)
}

It’s up to us to the detailed implementation inside the callback, in my case, it will be like this:

pub fn merge_bucket_credential(
new_key: &[u8],
existing: Option<&[u8]>,
operands: &MergeOperands,
) -> Option<Vec<u8>> {
let key = {
match String::from_utf8(new_key.to_vec()) {
Ok(key_val) => Some(key_val),
Err(_) => None,
}
}
.filter(|key| {
let prefix = "merge_credential";
key.as_str().contains(prefix)
});

if key.is_none() {
let existing_val = existing.map(|val| val.to_vec())?;
return Some(existing_val);
}

let mut bucket = {
existing.map_or_else(
|| {
let bucket: Bucket<Credential> = Bucket::new();
Some(bucket)
},
|val| {
let bin_builder: Result<Bucket<Credential>, DbError> = val.to_vec().try_into();
match bin_builder {
Ok(bucket) => Some(bucket),
Err(_) => None,
}
},
)
}?;

for op in operands {
let op_credential = {
let credential_builder: Result<Credential, CredentialError> = op.to_vec().try_into();
match credential_builder {
Ok(credential) => Some(credential),
Err(_) => None,
}
};

op_credential.map(|cred| {
bucket.add(cred);
});
}

let output = {
let bucket_bin_builder: Result<Vec<u8>, DbError> = bucket.try_into();
match bucket_bin_builder {
Ok(bin) => Some(bin),
Err(_) => None,
}
};

output
}

What you need to be aware of is, you need to know at which context you want this merge operation running, because the initiation process will be different between using ColumnFamily or Common , it’s about how should we put the storage configuration.

In my case that I’m using ColumnFamily , the configuration will be like this:

        let mut db_opts = Options::new(opts_path, opts_cf_name);
db_opts
.build_default_opts()
.set_db_opts(move |opt| {
opt.create_if_missing(opts_db_main.get_create_if_missing());
opt.create_missing_column_families(opts_db_main.get_create_missing_columns());
opt.set_error_if_exists(opts_db_main.get_set_error_if_exists());
opt.set_wal_dir(opts_db_main.get_set_wal_dir());

opt
})
.set_cf_opts(|opt| {
opt.set_merge_operator_associative(
MERGE_BUCKET_CREDENTIAL_ID,
merge_bucket_credential,
);

opt
});

You need to register your merge callback function as a part of your ColumnFamily Options, but if you decide to not use a column-based, you need to configure this merge operation in the database-level configurations. This configuration is taking an important part, because if we’re wrong to place the configuration, like using column-based but the configuration in the database level, the callback will not be triggered.

What I really like about this approach is we have the freedom to maintain our merge operation, and it will depend 100% on our application needs, as long as we implement its base trait/interface.

Once we’ve configured the column/database options, now we can use the API function, in my case it will be like this:

        match check_bucket_exists {
true => {
let _ = self
.db
.exec(DbInstruction::MergeCf {
key: credential_did_merge_key.clone(),
value: credential_bytes.clone(),
})
.await
.map_err(|err| {
CredentialError::CommonError(VerifiableError::RepoError(err.to_string()))
})?;
}
false => {
let mut bucket = DbBucket::<Credential>::new();
bucket.add(data.to_owned());

let bucket_bytes: Vec<u8> = bucket
.try_into()
.map_err(|err: DbError| CredentialError::GenerateJSONError(err.to_string()))?;

let _ = self
.db
.exec(DbInstruction::SaveCf {
key: credential_did_merge_key.clone(),
value: bucket_bytes,
})
.await
.map_err(|err| {
CredentialError::CommonError(VerifiableError::RepoError(err.to_string()))
})?;
}
}

As I’ve said before, before we use the Merge API, we need to make sure that our data has already been saved successfully, This means, we need to use the basic PutCf operation, and then once it has already been saved, for the next calls, we will using MergeCf , like example above.

Outro

For those who are interested in the example of all of these implementations, you can visit my project here:

--

--