C++ Day 5: BusTub — Execution Flow

Arjun Sunil Kumar
Distributed Systems Engineering
9 min readAug 27, 2024

Today we will go through the execution flow of BusTub.

- src/common/bustub_instance.cpp

auto BustubInstance::ExecuteSqlTxn(const std::string &sql, ResultWriter &writer, Transaction *txn,
std::shared_ptr<CheckOptions> check_options) -> bool {


std::shared_lock<std::shared_mutex> l(catalog_lock_);
bustub::Binder binder(*catalog_);
binder.ParseAndSave(sql);
l.unlock();

for (auto *stmt : binder.statement_nodes_) {
auto statement = binder.BindStatement(stmt);

switch (statement->type_) {
case StatementType::CREATE_STATEMENT: {
const auto &create_stmt = dynamic_cast<const CreateStatement &>(*statement);
HandleCreateStatement(txn, create_stmt, writer);
continue;
}
....
}



std::shared_lock<std::shared_mutex> l(catalog_lock_);

// Plan the query.
bustub::Planner planner(*catalog_);
planner.PlanQuery(*statement);

// Optimize the query.
bustub::Optimizer optimizer(*catalog_, IsForceStarterRule());
auto optimized_plan = optimizer.Optimize(planner.plan_);

l.unlock();


// Execute the query.
auto exec_ctx = MakeExecutorContext(txn, is_delete);
if (check_options != nullptr) {
exec_ctx->InitCheckOptions(std::move(check_options));
}
std::vector<Tuple> result_set{};
is_successful &= execution_engine_->Execute(optimized_plan, &result_set, txn, exec_ctx.get());

// Return the result set as a vector of string.
auto schema = planner.plan_->OutputSchema();


// Generate header for the result set.
writer.BeginTable(false);
writer.BeginHeader();
for (const auto &column : schema.GetColumns()) {
writer.WriteHeaderCell(column.GetName());
}
writer.EndHeader();

// Transforming result set into strings.
for (const auto &tuple : result_set) {
writer.BeginRow();
for (uint32_t i = 0; i < schema.GetColumnCount(); i++) {
writer.WriteCell(tuple.GetValue(&schema, i).ToString());
}
writer.EndRow();
}
writer.EndTable();
}

}

- src/common/bustub_ddl.cpp

void BustubInstance::HandleCreateStatement(Transaction *txn, const CreateStatement &stmt, ResultWriter &writer) {
std::unique_lock<std::shared_mutex> l(catalog_lock_);
auto info = catalog_->CreateTable(txn, stmt.table_, Schema(stmt.columns_));

IndexInfo *index = nullptr;
if (!stmt.primary_key_.empty()) {
std::vector<uint32_t> col_ids;
for (const auto &col : stmt.primary_key_) {
auto idx = info->schema_.GetColIdx(col);
col_ids.push_back(idx);
if (info->schema_.GetColumn(idx).GetType() != TypeId::INTEGER) {
throw NotImplementedException("only support creating index on integer column");
}
}
auto key_schema = Schema::CopySchema(&info->schema_, col_ids);

index = catalog_->CreateIndex<IntegerKeyType, IntegerValueType, IntegerComparatorType>(
txn, stmt.table_ + "_pk", stmt.table_, info->schema_, key_schema, col_ids, TWO_INTEGER_SIZE,
IntegerHashFunctionType{}, true);
}



// output
if (info == nullptr) {
throw bustub::Exception("Failed to create table");
}else if (index != nullptr) {
WriteOneCell(fmt::format("Table created with id = {}, Primary key index created with id = {}", info->oid_, index->index_oid_), writer);
} else {
WriteOneCell(fmt::format("Table created with id = {}", info->oid_), writer);
}

}

- src/planner/planner.cpp

void Planner::PlanQuery(const BoundStatement &statement) {
switch (statement.type_) {
case StatementType::SELECT_STATEMENT: {
plan_ = PlanSelect(dynamic_cast<const SelectStatement &>(statement));
return;
}
case StatementType::INSERT_STATEMENT: {
plan_ = PlanInsert(dynamic_cast<const InsertStatement &>(statement));
return;
}
case StatementType::DELETE_STATEMENT: {
plan_ = PlanDelete(dynamic_cast<const DeleteStatement &>(statement));
return;
}
case StatementType::UPDATE_STATEMENT: {
plan_ = PlanUpdate(dynamic_cast<const UpdateStatement &>(statement));
return;
}
default:
throw Exception(fmt::format("the statement {} is not supported in planner yet", statement.type_));
}
}

- src/planner/plan_select.cpp

TableRef → Filter → Project → Sort


auto Planner::PlanSelect(const SelectStatement &statement) -> AbstractPlanNodeRef {
auto ctx_guard = NewContext();
if (!statement.ctes_.empty()) {
ctx_.cte_list_ = &statement.ctes_;
}

AbstractPlanNodeRef plan = nullptr;

switch (statement.table_->type_) {
case TableReferenceType::EMPTY:
plan = std::make_shared<ValuesPlanNode>(
std::make_shared<Schema>(std::vector<Column>{}),
std::vector<std::vector<AbstractExpressionRef>>{std::vector<AbstractExpressionRef>{}});
break;
default:
plan = PlanTableRef(*statement.table_);
break;
}

if (!statement.where_->IsInvalid()) {
auto schema = plan->OutputSchema();
auto [_, expr] = PlanExpression(*statement.where_, {plan});
plan = std::make_shared<FilterPlanNode>(std::make_shared<Schema>(schema), std::move(expr), std::move(plan));
}

bool has_agg = false;
bool has_window_agg = false;
// Binder already checked that normal aggregations and window aggregations cannot coexist.
for (const auto &item : statement.select_list_) {
if (item->HasAggregation()) {
has_agg = true;
break;
}
if (item->HasWindowFunction()) {
has_window_agg = true;
break;
}
}

if (has_window_agg) {
if (!statement.having_->IsInvalid()) {
throw Exception("HAVING on window function is not supported yet.");
}
if (!statement.group_by_.empty()) {
throw Exception("Group by is not allowed to use with window function.");
}
plan = PlanSelectWindow(statement, std::move(plan));
} else if (!statement.having_->IsInvalid() || !statement.group_by_.empty() || has_agg) {
// Plan aggregation
plan = PlanSelectAgg(statement, std::move(plan));
} else {
// Plan normal select
std::vector<AbstractExpressionRef> exprs;
std::vector<std::string> column_names;
std::vector<AbstractPlanNodeRef> children = {plan};
for (const auto &item : statement.select_list_) {
auto [name, expr] = PlanExpression(*item, {plan});
if (name == UNNAMED_COLUMN) {
name = fmt::format("__unnamed#{}", universal_id_++);
}
exprs.emplace_back(std::move(expr));
column_names.emplace_back(std::move(name));
}
plan = std::make_shared<ProjectionPlanNode>(std::make_shared<Schema>(ProjectionPlanNode::RenameSchema(
ProjectionPlanNode::InferProjectionSchema(exprs), column_names)),
std::move(exprs), std::move(plan));
}


// Plan ORDER BY
if (!statement.sort_.empty()) {
std::vector<std::pair<OrderByType, AbstractExpressionRef>> order_bys;
for (const auto &order_by : statement.sort_) {
auto [_, expr] = PlanExpression(*order_by->expr_, {plan});
auto abstract_expr = std::move(expr);
order_bys.emplace_back(std::make_pair(order_by->type_, abstract_expr));
}
plan = std::make_shared<SortPlanNode>(std::make_shared<Schema>(plan->OutputSchema()), plan, std::move(order_bys));
}



return plan;
}
  • filter_plan.h
/**
* The FilterPlanNode represents a filter operation.
* It retains any tuple that satisfies the predicate in the child.
*/
class FilterPlanNode : public AbstractPlanNode {
public:
/**
* Construct a new FilterPlanNode instance.
* @param output The output schema of this filter plan node
* @param predicate The predicate applied during the scan operation
* @param child The child plan node
*/
FilterPlanNode(SchemaRef output, AbstractExpressionRef predicate, AbstractPlanNodeRef child)
: AbstractPlanNode(std::move(output), {std::move(child)}), predicate_{std::move(predicate)} {}

}
  • abstract_plan.h
class AbstractPlanNode {
public:

/**
* Create a new AbstractPlanNode with the specified output schema and children.
* @param output_schema The schema for the output of this plan node
* @param children The children of this plan node
*/
AbstractPlanNode(SchemaRef output_schema, std::vector<AbstractPlanNodeRef> children)
: output_schema_(std::move(output_schema)), children_(std::move(children)) {}

SchemaRef output_schema_;

/** The children of this plan node. */
//NOTE: This children_ is set via AbstractPlanNode constructor
// called from the FilterPlanNode constructor.
std::vector<AbstractPlanNodeRef> children_;

protected:
/** @return the string representation of the plan node itself */
virtual auto PlanNodeToString() const -> std::string { return "<unknown>"; }

/** @return the string representation of the plan node's children */
auto ChildrenToString(int indent, bool with_schema = true) const -> std::string;

private:
};

- src/optimizer/optimizer.cpp

auto Optimizer::Optimize(const AbstractPlanNodeRef &plan) -> AbstractPlanNodeRef {
if (force_starter_rule_) {
// Use starter rules when `force_starter_rule_` is set to true.
auto p = plan;
p = OptimizeMergeProjection(p);
p = OptimizeMergeFilterNLJ(p);
p = OptimizeOrderByAsIndexScan(p);
p = OptimizeSortLimitAsTopN(p);
p = OptimizeMergeFilterScan(p);
p = OptimizeSeqScanAsIndexScan(p);
return p;
}
// By default, use user-defined rules.
return OptimizeCustom(plan);
}

- src/optimizer/order_by_index_scan.cpp


auto Optimizer::OptimizeOrderByAsIndexScan(const AbstractPlanNodeRef &plan) -> AbstractPlanNodeRef {
std::vector<AbstractPlanNodeRef> children;
for (const auto &child : plan->GetChildren()) {
children.emplace_back(OptimizeOrderByAsIndexScan(child));
}
auto optimized_plan = plan->CloneWithChildren(std::move(children));

if (optimized_plan->GetType() == PlanType::Sort) {
const auto &sort_plan = dynamic_cast<const SortPlanNode &>(*optimized_plan);
const auto &order_bys = sort_plan.GetOrderBy();

std::vector<uint32_t> order_by_column_ids;
for (const auto &[order_type, expr] : order_bys) {
// Order type is asc or default
if (!(order_type == OrderByType::ASC || order_type == OrderByType::DEFAULT)) {
return optimized_plan;
}

// Order expression is a column value expression
const auto *column_value_expr = dynamic_cast<ColumnValueExpression *>(expr.get());
if (column_value_expr == nullptr) {
return optimized_plan;
}

order_by_column_ids.push_back(column_value_expr->GetColIdx());
}

// Has exactly one child
BUSTUB_ENSURE(optimized_plan->children_.size() == 1, "Sort with multiple children?? Impossible!");
const auto &child_plan = optimized_plan->children_[0];

if (child_plan->GetType() == PlanType::SeqScan) {
const auto &seq_scan = dynamic_cast<const SeqScanPlanNode &>(*child_plan);
const auto *table_info = catalog_.GetTable(seq_scan.GetTableOid());
const auto indices = catalog_.GetTableIndexes(table_info->name_);

for (const auto *index : indices) {
const auto &columns = index->index_->GetKeyAttrs();
// NOTE: This is the part which checks if order by has Index build on it.
if (order_by_column_ids == columns) {
return std::make_shared<IndexScanPlanNode>(optimized_plan->output_schema_, table_info->oid_,
index->index_oid_);
}
}
}
}

return optimized_plan;
}

- src/include/execution/execution_engine.h

// NOLINTNEXTLINE
auto Execute(const AbstractPlanNodeRef &plan, std::vector<Tuple> *result_set, Transaction *txn,
ExecutorContext *exec_ctx) -> bool {
BUSTUB_ASSERT((txn == exec_ctx->GetTransaction()), "Broken Invariant");

// Construct the executor for the abstract plan node
auto executor = ExecutorFactory::CreateExecutor(exec_ctx, plan);

// Initialize the executor
auto executor_succeeded = true;

try {
executor->Init();
PollExecutor(executor.get(), plan, result_set);
PerformChecks(exec_ctx);
} catch (const ExecutionException &ex) {
executor_succeeded = false;
if (result_set != nullptr) {
result_set->clear();
}
}

return executor_succeeded;
}

private:
/**
* Poll the executor until exhausted, or exception escapes.
* @param executor The root executor
* @param plan The plan to execute
* @param result_set The tuple result set
*/
static void PollExecutor(AbstractExecutor *executor, const AbstractPlanNodeRef &plan,
std::vector<Tuple> *result_set) {
RID rid{};
Tuple tuple{};
while (executor->Next(&tuple, &rid)) {
if (result_set != nullptr) {
result_set->push_back(tuple);
}
}
}

src/execution/executor_factory.cpp

auto ExecutorFactory::CreateExecutor(ExecutorContext *exec_ctx, const AbstractPlanNodeRef &plan)
-> std::unique_ptr<AbstractExecutor> {
auto check_options_set = exec_ctx->GetCheckOptions()->check_options_set_;
switch (plan->GetType()) {
// Create a new sequential scan executor
case PlanType::SeqScan: {
return std::make_unique<SeqScanExecutor>(exec_ctx, dynamic_cast<const SeqScanPlanNode *>(plan.get()));
}

// Create a new index scan executor
case PlanType::IndexScan: {
return std::make_unique<IndexScanExecutor>(exec_ctx, dynamic_cast<const IndexScanPlanNode *>(plan.get()));
}

// Create a new nested-index join executor
case PlanType::NestedIndexJoin: {
auto nested_index_join_plan = dynamic_cast<const NestedIndexJoinPlanNode *>(plan.get());
auto left = ExecutorFactory::CreateExecutor(exec_ctx, nested_index_join_plan->GetChildPlan());
return std::make_unique<NestIndexJoinExecutor>(exec_ctx, nested_index_join_plan, std::move(left));
}

// Create a new hash join executor
case PlanType::HashJoin: {
auto hash_join_plan = dynamic_cast<const HashJoinPlanNode *>(plan.get());
auto left = ExecutorFactory::CreateExecutor(exec_ctx, hash_join_plan->GetLeftPlan());
auto right = ExecutorFactory::CreateExecutor(exec_ctx, hash_join_plan->GetRightPlan());
return std::make_unique<HashJoinExecutor>(exec_ctx, hash_join_plan, std::move(left), std::move(right));
}

// Create a new projection executor
case PlanType::Projection: {
const auto *projection_plan = dynamic_cast<const ProjectionPlanNode *>(plan.get());
auto child = ExecutorFactory::CreateExecutor(exec_ctx, projection_plan->GetChildPlan());
return std::make_unique<ProjectionExecutor>(exec_ctx, projection_plan, std::move(child));
}


default:
UNREACHABLE("Unsupported plan type.");
}
}

- src/include/execution/executors/abstract_executor.h

class AbstractExecutor {
public:
/**
* Construct a new AbstractExecutor instance.
* @param exec_ctx the executor context that the executor runs with
*/
explicit AbstractExecutor(ExecutorContext *exec_ctx) : exec_ctx_{exec_ctx} {}

/** Virtual destructor. */
virtual ~AbstractExecutor() = default;

/**
* Initialize the executor.
* @warning This function must be called before Next() is called!
*/
virtual void Init() = 0;

/**
* Yield the next tuple from this executor.
* @param[out] tuple The next tuple produced by this executor
* @param[out] rid The next tuple RID produced by this executor
* @return `true` if a tuple was produced, `false` if there are no more tuples
*/
virtual auto Next(Tuple *tuple, RID *rid) -> bool = 0;

/** @return The schema of the tuples that this executor produces */
virtual auto GetOutputSchema() const -> const Schema & = 0;

/** @return The executor context in which this executor runs */
auto GetExecutorContext() -> ExecutorContext * { return exec_ctx_; }

protected:
/** The executor context in which the executor runs */
ExecutorContext *exec_ctx_;
};
} // namespace bustub

- src/execution/mock_scan_executor.cpp

namespace bustub {

void MockScanExecutor::Init() {
// Reset the cursor
cursor_ = 0;
}

MockScanExecutor::MockScanExecutor(ExecutorContext *exec_ctx, const MockScanPlanNode *plan)
: AbstractExecutor{exec_ctx}, plan_{plan}, func_(GetFunctionOf(plan)), size_(GetSizeOf(plan)) {
if (GetShuffled(plan)) {
for (size_t i = 0; i < size_; i++) {
shuffled_idx_.push_back(i);
}
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(shuffled_idx_.begin(), shuffled_idx_.end(), g);
}
}

void MockScanExecutor::Init() {
// Reset the cursor
cursor_ = 0;
}

auto MockScanExecutor::Next(Tuple *tuple, RID *rid) -> bool {
if (cursor_ == size_) {
// Scan complete
return EXECUTOR_EXHAUSTED;
}
if (shuffled_idx_.empty()) {
*tuple = func_(cursor_);
} else {
*tuple = func_(shuffled_idx_[cursor_]);
}
++cursor_;
*rid = MakeDummyRID();
return EXECUTOR_ACTIVE;
}


} // namespace bustub

- src/execution/projection_executor.cpp


namespace bustub {

ProjectionExecutor::ProjectionExecutor(ExecutorContext *exec_ctx, const ProjectionPlanNode *plan,
std::unique_ptr<AbstractExecutor> &&child_executor)
: AbstractExecutor(exec_ctx), plan_(plan), child_executor_(std::move(child_executor)) {}

void ProjectionExecutor::Init() {
// Initialize the child executor
child_executor_->Init();
}

auto ProjectionExecutor::Next(Tuple *tuple, RID *rid) -> bool {
Tuple child_tuple{};

// Get the next tuple
const auto status = child_executor_->Next(&child_tuple, rid);

if (!status) {
return false;
}

// Compute expressions
std::vector<Value> values{};
values.reserve(GetOutputSchema().GetColumnCount());
for (const auto &expr : plan_->GetExpressions()) {
values.push_back(expr->Evaluate(&child_tuple, child_executor_->GetOutputSchema()));
}

*tuple = Tuple{values, &GetOutputSchema()};

return true;
}
} // namespace bustub

- src/include/execution/expressions/abstract_expression.h


namespace bustub {

class AbstractExpression;
using AbstractExpressionRef = std::shared_ptr<AbstractExpression>;

/**
* AbstractExpression is the base class of all the expressions in the system.
* Expressions are modeled as trees, i.e. every expression may have a variable number of children.
*/
class AbstractExpression {
public:
/**
* Create a new AbstractExpression with the given children and return type.
* @param children the children of this abstract expression
* @param ret_type the return type of this abstract expression when it is evaluated
*/
AbstractExpression(std::vector<AbstractExpressionRef> children, Column ret_type)
: children_{std::move(children)}, ret_type_{std::move(ret_type)} {}

/** Virtual destructor. */
virtual ~AbstractExpression() = default;

/** @return The value obtained by evaluating the tuple with the given schema */
virtual auto Evaluate(const Tuple *tuple, const Schema &schema) const -> Value = 0;

/**
* Returns the value obtained by evaluating a JOIN.
* @param left_tuple The left tuple
* @param left_schema The left tuple's schema
* @param right_tuple The right tuple
* @param right_schema The right tuple's schema
* @return The value obtained by evaluating a JOIN on the left and right
*/
virtual auto EvaluateJoin(const Tuple *left_tuple, const Schema &left_schema, const Tuple *right_tuple,
const Schema &right_schema) const -> Value = 0;

/** @return the child_idx'th child of this expression */
auto GetChildAt(uint32_t child_idx) const -> const AbstractExpressionRef & { return children_[child_idx]; }

/** @return the children of this expression, ordering may matter */
auto GetChildren() const -> const std::vector<AbstractExpressionRef> & { return children_; }

/** @return the type of this expression if it were to be evaluated */
virtual auto GetReturnType() const -> Column { return ret_type_; }

/** @return the string representation of the plan node and its children */
virtual auto ToString() const -> std::string { return "<unknown>"; }

/** @return a new expression with new children */
virtual auto CloneWithChildren(std::vector<AbstractExpressionRef> children) const
-> std::unique_ptr<AbstractExpression> = 0;

/** The children of this expression. Note that the order of appearance of children may matter. */
std::vector<AbstractExpressionRef> children_;

private:
/** The return type of this expression. */
Column ret_type_;
};

} // namespace bustub

- src/include/execution/expressions/arithmetic_expression.h

/**
* ArithmeticExpression represents two expressions being computed, ONLY SUPPORT INTEGER FOR NOW.
*/
class ArithmeticExpression : public AbstractExpression {
public:

auto Evaluate(const Tuple *tuple, const Schema &schema) const -> Value override {
Value lhs = GetChildAt(0)->Evaluate(tuple, schema);
Value rhs = GetChildAt(1)->Evaluate(tuple, schema);
auto res = PerformComputation(lhs, rhs);
if (res == std::nullopt) {
return ValueFactory::GetNullValueByType(TypeId::INTEGER);
}
return ValueFactory::GetIntegerValue(*res);
}


private:
auto PerformComputation(const Value &lhs, const Value &rhs) const -> std::optional<int32_t> {
if (lhs.IsNull() || rhs.IsNull()) {
return std::nullopt;
}
switch (compute_type_) {
case ArithmeticType::Plus:
return lhs.GetAs<int32_t>() + rhs.GetAs<int32_t>();
case ArithmeticType::Minus:
return lhs.GetAs<int32_t>() - rhs.GetAs<int32_t>();
default:
UNREACHABLE("Unsupported arithmetic type.");
}
}

}

Conclusion

I hope this is of some use to someone. Subscribe to us for more related content.

--

--

Arjun Sunil Kumar
Distributed Systems Engineering

Writes on Database Kernel, Distributed Systems, Cloud Technology, Data Engineering & SDE Paradigm. github.com/arjunsk