A C++ way to multi-threaded MongoDB access
Accessing a MongoDB database from multiple threads in an application can be tricky as all child objects from a single client updates the same internal structures of the client. So even if all the threads accessing the child objects are synchronized, it is not safe.
For this reason MongoDB provides
pool
, which provide methods to acquire a new client from a pool for each thread. It is thread-safe and allows the clients to be synchronized.
Let me demonstrate how we can put it to use in an real world multi-threaded application.
#include <mongocxx/uri.hpp>
#include <mongocxx/instance.hpp>
#include <mongocxx/pool.hpp>
#include <iostream>
#include <memory>class MongoDBInstance {private:
mongocxx::instance m_dbInstance;
std::unique_ptr<mongocxx::pool> m_client_pool;
MongoDBInstance () { }public:static MongoDBInstance* GetInstance () {
static MongoDBInstance objMongoDBInstance;
return &objMongoDBInstance;
}/* Create a pool object only once from MongoDB URI */
void createPool(std::string uri_) {
if (!m_client_pool){
m_client_pool = (std::unique_ptr<mongocxx::pool>)
new mongocxx::pool { mongocxx::uri {uri_} };
}
}/* Acquire a client from the pool */
mongocxx::pool::entry
getClientFromPool () { return m_client_pool->acquire(); }~MongoDBInstance ();};
The MongoDBInstance
class is a singleton class used to create and provide access to pool to acquire a new client. For a single MongoDB deployment we need a single instance of pool
object, which is much more efficient as opposed to manually constructing client
objects in case it is connected to a replica set.
#include <iostream>
#include <mongocxx/database.hpp>
#include <mongocxx/pool.hpp>
#include <mongocxx/client.hpp>
#include <mongocxx/exception/bulk_write_exception.hpp>
#include <bsoncxx/json.hpp>
#include <bsoncxx/exception/exception.hpp>class MongoDBAccess {private:
mongocxx::client& m_client;
std::string m_dbName;
std::string m_collectionName;
mongocxx::database m_db;
mongocxx::collection m_collection;public: MongoDBAccess (mongocxx::client& client_, std::string dbName_,
std::string collName_) : m_client(client_), m_dbName(dbName_),
m_collectionName(collName_) { m_db = m_client[dbName_];
m_collection = m_db[collName_];
} int insert (std::string jsonDoc_) {
try { // Convert JSON data to document
auto doc_value = bsoncxx::from_json(jsonDoc_); //Insert the document
auto result = m_collection.insert_one(std::move(doc_value));
} catch(const bsoncxx::exception& e) {
std::string errInfo = std::string("Error in converting
JSONdata,Err Msg : ") + e.what(); return -1;
} catch (mongocxx::bulk_write_exception e) {
std::string errInfo = std::string("Error in inserting
document, Err Msg : ") + e.what(); return -1;
}return 0;
}};
MongoDBAccess
class provides all the necessary methods to connect to a database and insert JSON data into it using a client object for each thread acquired from pool.
class MongoDBThread {
private:
MongoDBAccess objDB_access;
std::string strJSON_doc;public:
MongoDBThread(mongocxx::client& client, std::string db_name,
std::string coll_name, std::string json_doc) :
objDB_access(client, db_name, coll_name), strJSON_doc(json_doc)
{}void operator()() {
objDB_access.insert(strJSON_doc);
}
};
and now we create a thread utility class MongoDBThread
which takes all the parameters required to do a insert operation in MongoDB.
Now let us see how we can use these classes to perform insertion of JSON objects:
void db_ops(std::string json_doc_a, std::string json_doc_b) {
MongoDBInstance::GetInstance()->createPool(mongodb_uri_path);
auto dbClient_A =
MongoDBInstance::GetInstance()->getClientFromPool(); auto dbClient_B =
MongoDBInstance::GetInstance()->getClientFromPool(); std::thread thread_A {
MongoDBThread(*dbClient_A, "db_name_A", "coll_a", json_doc_a)
}; std::thread thread_B {
MongoDBThread(*dbClient_B, "db_name_B", "coll_b", json_doc_b)
}; thread_A.join();
thread_B.join();
}
In the db_ops
function, we create an pool
object using the MongoDBInstance
singleton class and get two clients using its getClientFromPool
function.
After this we create two threads thread_A
and thread_B
and pass object of MongoDBThread
class as parameter.
This simple example shows how we can use MongoDB pool
in a multi-threaded environment which is efficient and prevent situation which may cause undefined behavior in the application.