Text Classification with KDB.AI

Jonathan Kantor
KX Systems
Published in
11 min readJul 29, 2024

Introduction:

This article presents a novel approach to text classification that combines the power of the Text-to-Text Transfer Transformer (T5) model with KDB.AI’s nearest neighbor search to re-rank neighbors based on rich semantic meaning. This exploration focuses on classifying books from the Project Gutenberg Library into five distinct categories. We compare single dense embedding flat search with a hybrid dense and sparse embedding search. The methods and techniques described can be adapted for various text classification tasks.

Setup:

To follow along with this approach, ensure you have PyKX and KDB.AI set up. If you don’t, follow the steps below:

  1. KDBAI Cloud Database Setup:
  • Sign up for an account at KDB.AI
  • Find your Endpoint URL and add an API Key in the Connection Details Tab.
  • Optional: Save your Endpoint URL and API Key as environmental variables.

2. PyKx Setup:

Once both are set up, you can pip install the libraries:

pip install pykx kdbai_client

Then you can import the Python libraries and connect to the KDB.AI cloud server using your Python client:

import pykx as kx
import kdbai_client as kdbai
import os
from getpass import getpass

KDBAI_ENDPOINT = (
os.environ["KDBAI_ENDPOINT"]
if "KDBAI_ENDPOINT" in os.environ
else input("KDB.AI endpoint: ")
)
KDBAI_API_KEY = (
os.environ["KDBAI_API_KEY"]
if "KDBAI_API_KEY" in os.environ
else getpass("KDB.AI API key: ")
)

session = kdbai.Session(endpoint= KDBAI_ENDPOINT, api_key= KDBAI_API_KEY)

Background: The Classification Challenge

This project focused on classifying books from the Project Gutenberg library into distinct categories. Given the complexity and length of book texts, we implemented a chunking strategy to handle the 512 token limit of our models. This approach allowed us to process longer texts effectively while maintaining the context necessary for accurate classification.

For our embeddings, we utilized SBERT (Sentence-BERT) to generate dense vector representations of the text chunks. Additionally, we employed a companion tokenizer to create sparse embeddings, capturing different aspects of the text’s semantic structure.

The books are categorized into five distinct genres: Animal, CIA, Fantasy, Islam, Slavery, which were prelabeled by Project Gutenberg. Our goal is to accurately assign each text chunk to one of these categories based on its content and aggregate chunk predictions by book ID to classify each book.

With this model, you can insert any book into the system, and it will classify the text into the appropriate category.

Text-to-Text Transfer Transformer (T5):

T5 is a state-of-the-art language model developed by Google Research. Unlike traditional models that are designed for specific tasks, T5 treats every NLP problem as a “text-to-text” problem, making it highly versatile. Key features of T5 include:

  • Unified framework for various NLP tasks
  • Pre-trained on a large corpus of text data
  • Ability to fine-tune on specific tasks

In our classifier, we use a fine-tuned version of T5ForSequenceClassification, which adapts the T5 architecture for sequence classification tasks.

Data Structure:

Our classifier operates on a dataset with the following structure:

Where:

  • ID: Unique identifier for each book
  • Target: True category label
  • Text: Content of text chunk
  • Dense Embedding: SBERT Embedding of Text
  • Sparse Embedding: Sparse Dictionary from BertTokenizerFast

This structure allows for efficient storage and retrieval of text samples along with their associated metadata and embeddings and is configurable for any text and corresponding embeddings.

This structured data is then loaded into KDB.AI, a high-performance vector database that offers several key advantages for our text classification system:

  1. Efficient Vector Storage: KDB.AI is optimized for storing and retrieving high-dimensional vector data, making it ideal for managing our dense and sparse embeddings.
  2. Fast Flat Similarity Search: The database provides built-in search functionality for rapid nearest neighbor searches, which is crucial for our flat neighbor classification approach.
  3. Hybrid Search Capabilities: KDB.AI’s hybrid_search functionality allows us to combine different types of searches (e.g., combining dense and sparse embeddings), enhancing the accuracy and flexibility of our nearest neighbor computations.
  4. Scalability: KDB.AI can handle large volumes of data and concurrent queries, allowing our system to scale efficiently as the dataset grows.
  5. Integration with q: As a part of the KDB+ ecosystem, KDB.AI seamlessly integrates with q, enabling high-performance data manipulation and analysis.

By leveraging KDB.AI, we can perform complex vector operations and similarity searches with exceptional speed, which is essential for both the training and inference stages of our text classifier.

Pipeline Architecture:

We implement our training and preprocessing pipelines using a Directed Acyclic Graph (DAG) Pipeline structure from the ML ToolKit. This architecture offers several significant advantages:

  1. Parallelization: The DAG structure allows for efficient parallel processing of independent tasks, significantly reducing computation time.
  2. Python to q Conversion: Using PyKx, we convert Python objects to q objects. This conversion enables us to leverage the high-performance capabilities of q, resulting in faster data operations.
  3. Modular Design: The DAG structure allows for easy modification and extension of the pipeline, making it adaptable to different machine learning tasks.
  4. Efficient Data Flow: The pipeline manages the flow of data between different processing stages, ensuring optimal use of computational resources.

We encapsulate the DAG pipeline within a Pythonic interface using PyKX to enhance interpretability. This pipeline architecture is employed across both the training and preprocessing stages ensuring efficient data management throughout the system.

Flat vs Hybrid Search

Stage 1: Flat Search with Dense Embeddings

In the Flat search approach, we utilize only dense embeddings generated by SBERT. This method provides a solid baseline for our text classification task.

nearest_neighbors = table.search(vectors = query_vectors, n = num_neighbors)

Stage 2: Hybrid Search with Dense and Sparse Embeddings

To improve accuracy, we evolved our approach to incorporate both dense and sparse embeddings in a hybrid search. This method allowed us to capture both the overall semantic meaning and specific textual features.

nearest_neighbors = table.hybrid_search(dense_vectors = dense_query_vectors, sparse_vectors = sparse_query_vectors, alpha = 0.9, n = num_neighbors)

The alpha parameter (set to 0.9 in this example) allows us to adjust the balance between dense and sparse embeddings in the search process.

Training Process:

Visualization of Training DAG Pipeline

The training process involves several key steps:

  1. K-Nearest Neighbors (KNN) Selection: We utilize KDB.AI’s flat and hybrid search functionality to identify the top N nearest neighbors for each text sample in the training set.
  2. Text Pair Generation: We create text pairs based on the KNN results. For instance: If “Hello” and “Good Morning” both belong to the “Greetings” category, we label this pair as 1. If “Hello” and “Goodbye” belong to different categories, we label this pair as 0.
  3. T5 Model Fine-tuning: We use these labeled text pairs to fine-tune a T5ForSequenceClassification model. This process leverages transfer learning, adapting a pre-trained model to our specific classification task.

Each of these steps is implemented as a node in our DAG pipeline, allowing for parallel processing and efficient conversion between Python and q objects where possible.

Now, let’s take a look at the implementation of the training pipeline:

T5TrainingPipeline Implementation

class T5TrainingPipeline(Pipeline):
def __init__(self, table):
super().__init__()
self.table = table
self.model = T5ForSequenceClassification.from_pretrained('t5-small')
self.tokenizer = T5Tokenizer.from_pretrained('t5-small')
self.model.to(torch.device('cuda:0' if torch.cuda.is_available() else "cpu"))
self.training_args = TrainingArguments(
output_dir = './results',
num_train_epochs = 2,
per_device_train_batch_size = 2,
warmup_steps = 0.1,
weight_decay = 0.01,
logging_dir = './logs',
save_strategy='epoch',
)

def knn_neighbors(self, train_text, num_neighbors):
dense_query_vectors = self.table.query()['Dense Embedding'].apply(lambda x: x.tolist()).tolist()
sparse_query_vectors = self.table.query()['Sparse Embedding'].tolist()
# for hybrid search
nearest_neighbors = self.table.hybrid_search(dense_vectors = dense_query_vectors, sparse_vectors = sparse_query_vectors, alpha = 0.9, n = num_neighbors['num_neighbors'].py())

# for flat search, replace the above with:
# nearest_neighbors = self.table.search(vectors = query_vectors, n = num_neighbors['num_neighbors'].py())
indices = np.array([nn['index'].tolist() for nn in nearest_neighbors])
distances = np.array([nn['__nn_distance'].tolist() for nn in nearest_neighbors])
return {"distances": distances, "indices": indices}

def get_labels(self, train_text, neighbors):
indices = neighbors['indices'].py()
train_text = train_text['train_text'].pd()
labels = [1 if train_text['target'][i] == train_text['target'][j] else 0 for i in range(len(indices)) for j in indices[i]]
return labels

def get_text_pairs(self, train_text, neighbors):
indices = neighbors['indices'].py()
train_text = train_text['train_text'].pd()
text_pairs = [(train_text['Text'][i], train_text['Text'][j]) for i in range(len(indices)) for j in indices[i]]
return text_pairs

def create_dataset(self, text_pairs, labels):
self.dataset = TextPairDataset(text_pairs.py(), labels.py(), self.tokenizer)
return True

def run_train(self, dataset):
self.trainer = Trainer(
model = self.model,
args = self.training_args,
train_dataset = self.dataset,
)
self.trainer.train()

def build_pipeline(self, table, num_neighbors):

# Add configuration nodes to the pipeline
self.add_config("train_text", {"train_text": table.query()})
self.add_config("num_neighbors", {"num_neighbors": num_neighbors})

# Add processing nodes to the pipeline
self.add_node("knn_neighbors", {
"inputs": {"train_text": b"!", "num_neighbors": b"!"},
"outputs": b"!",
"function": self.knn_neighbors
})

self.add_node("labels", {
"inputs": {"train_text": b"!", "knn_neighbors": b"!"},
"outputs": b"!",
"function": self.get_labels
})

self.add_node("text_pairs", {
"inputs": {"train_text": b"!", "knn_neighbors": b"!"},
"outputs": b"!",
"function": self.get_text_pairs
})

self.add_node("dataset", {
"inputs": {"text_pairs": b"!", "labels": b"!"},
"outputs": b"!",
"function": self.create_dataset
})

self.add_node("run_train", {
"inputs": b"!",
"outputs": b"!",
"function": self.run_train
})

# Connect the nodes in the pipeline
self.connect_edge("train_text", "output", "knn_neighbors", "train_text")
self.connect_edge("num_neighbors", "output", "knn_neighbors", "num_neighbors")
self.connect_edge("train_text", "output", "labels", "train_text")
self.connect_edge("knn_neighbors", "output", "labels", "knn_neighbors")
self.connect_edge("train_text", "output", "text_pairs", "train_text")
self.connect_edge("knn_neighbors", "output", "text_pairs", "knn_neighbors")
self.connect_edge("text_pairs", "output", "dataset", "text_pairs")
self.connect_edge("labels", "output", "dataset", "labels")
self.connect_edge("dataset", "output", "run_train", "input")


def run(self, table, num_neighbors):
self.build_pipeline(table, num_neighbors)
pipeline = self.create_pipeline()
result = self.run_pipeline(pipeline)
return result

Using the Training Pipeline

To put our training pipeline into action, we need to prepare our data and initialize the pipeline. Here’s a step-by-step guide on how to use the T5TrainingPipeline:

1. First, we load our training data and create a KDB.AI table:


train = load_csv('train.csv', hybrid = True) # load train data from CSV
table = create_hybrid_table(session, len(train['Dense Embedding'][0])) # create KDB.AI table
[table.insert(train[i:i+200].reset_index()) for i in range(0, train.reset_index().shape[0], 200)]# insert data into table in batches

2. Then, we initialize and run the training pipeline:

num_neighbors = 100 # number of nearest neighbors to use for training
training = T5TrainingPipeline(table)
training_output = training.run(table, num_neighbors)

The fine-tuned model is created and stored in the results/ folder

Inference Process:

Visualization of Inference DAG Pipeline

The inference process consists of the following steps:

  1. KNN Selection: We identify the top 2N nearest neighbors using KDB.AI’s flat or hybrid search.
  2. Text Pair Generation: We create text pairs from these nearest neighbors.
  3. Similarity Scoring: The text pairs are fed into our fine-tuned T5 model, which produces similarity scores ranging from 0 (completely different) to 1 (exactly the same).
  4. Reranking: We re-rank the nearest N neighbors based on their similarity scores.
  5. Filtering: We apply a score threshold to filter out dissimilar neighbors. and use an uncertainty threshold to ensure a minimum percentage of similar neighbors for confident predictions.
  6. Classification: We aggregate the labels of each text’s nearest neighbors to determine the final classification.
  7. Manual Review: Texts that fall below the uncertainty threshold are flagged for manual human labeling.
  8. Retraining: These flagged texts are then labeled and used to retrain the model, helping to mitigate model drift.

Like the training process, the inference pipeline is also structured as a DAG, allowing for efficient parallelization and data processing.

Let’s examine the implementation of the inference pipeline:

InferencePipeline Implementation

class InferencePipeline(Pipeline):
def __init__(self, output_dir, table):
super().__init__()
self.model = T5ForSequenceClassification.from_pretrained(output_dir)
self.tokenizer = T5Tokenizer.from_pretrained('t5-small')
self.model.to(torch.device('cuda:0' if torch.cuda.is_available() else "cpu"))
self.table = table

def predict_similarity(self, text_pairs):
self.model.eval()
scores = []
with torch.no_grad():
for text1, text2 in tqdm(text_pairs, desc="Predicting similarities"):
inputs = self.tokenizer(
text1, text2,
truncation=True,
padding='max_length',
max_length=512,
return_tensors='pt'
)
outputs = self.model(**inputs)
logits = outputs.logits
score = torch.softmax(logits, dim=1)[0][1].item()
scores.append(score)
return scores

def knn_neighbors(self, text, num_neighbors):
dense_query_vectors = self.table.query(filter = [('like', 'flag', 'test')])['Dense Embedding'].apply(lambda x: x.tolist()).tolist()
sparse_query_vectors = table.query(filter = [('like', 'flag', 'test')])['Sparse Embedding'].tolist()
# for hybrid search
nearest_neighbors = self.table.hybrid_search(dense_vectors = dense_query_vectors, sparse_vectors = sparse_query_vectors, alpha = 0.9, n = 2 * num_neighbors['num_neighbors'].py(), filter = [("like", "flag", "train")])

# for flat search, replace the above with:
# nearest_neighbors = table.search(vectors = list(query_vectors.apply(lambda x: x.tolist())), n = 2 * num_neighbors['num_neighbors'].py(), filter = [("like", "flag", "train")])
indices = np.array([nn['index'].tolist() for nn in nearest_neighbors])
self.indices = indices
distances = np.array([nn['__nn_distance'].tolist() for nn in nearest_neighbors])
return {"distances": distances, "indices": indices}

def get_text_pairs(self, text, knn_neighbors):
train_text = self.table.query(filter = [("like", "flag", "train")])
test_text = self.table.query(filter = [("like", "flag", "test")])
indices = knn_neighbors['indices'].np()
text_pairs = [(test_text['Text'][i], train_text['Text'][train_text['index'] == j]) for i in range(len(indices)) for j in indices[i]]
return text_pairs

def get_similarity(self, text_pairs, text, num_neighbors):
test_text = self.table.query(filter = [("like", "flag", "test")])['Text']
scores = self.predict_similarity(text_pairs.py())
self.scores = scores
scores_reshaped = np.array(scores).reshape(len(test_text), 2*num_neighbors['num_neighbors'].py())
return scores_reshaped

def get_text_predictions(self, knn_neighbors, text, scores_reshaped, num_neighbors):
accuracy_thresh, uncertain_thresh = 0.9, np.floor(0.9 * num_neighbors['num_neighbors'].py())
indices = knn_neighbors['indices'].np()
sorted_indices = np.array([indices[i][np.argsort(scores_reshaped.np()[i])[::-1]] for i in range(len(indices))])

top_indices = sorted_indices[:,:num_neighbors['num_neighbors'].py()]
top_scores = np.vstack(scores_reshaped.np())[:,:num_neighbors['num_neighbors'].py()]

sorted_scores = np.sort(top_scores)[:, ::-1]
mask = sorted_scores > accuracy_thresh
filtered_indices = np.where(mask, top_indices, -1)

count_uncertain = np.sum(filtered_indices == -1, axis=1)
good_indexes = count_uncertain <= uncertain_thresh
uncertain_indexes = count_uncertain > uncertain_thresh

filtered_indices = filtered_indices[good_indexes]
filtered_text_predictions = [get_mode(train.loc[index_list[index_list != -1], 'target']) for index_list in filtered_indices]

self.good_indexes = good_indexes
self.uncertain_indexes = uncertain_indexes
self.top_scores = top_scores
self.top_indices = top_indices

return filtered_text_predictions

def build_pipeline(self, table, num_neighbors):

# Add configuration nodes to the pipeline
self.add_config("text", {"text": table.query()})
self.add_config("num_neighbors", {"num_neighbors": num_neighbors})

# Add processing nodes to the pipeline
self.add_node("knn_neighbors", {
"inputs": {"text": b"!", "num_neighbors": b"!"},
"outputs": b"!",
"function": self.knn_neighbors
})

self.add_node("text_pairs", {
"inputs": {"text": b"!", "indices": b"!"},
"outputs": b"!",
"function": self.get_text_pairs
})

self.add_node("similarities", {
"inputs": {"text_pairs": b"!", "text": b"!", "num_neighbors": b"!"},
"outputs": b"!",
"function": self.get_similarity
})

self.add_node("text_predictions", {
"inputs": {"indices": b"!", "text": b"!", "similarities": b"!", "num_neighbors": b"!"},
"outputs": b"!",
"function": self.get_text_predictions
})

# Connect the nodes in the pipeline
self.connect_edge("text", "output", "knn_neighbors", "text")
self.connect_edge("num_neighbors", "output", "knn_neighbors", "num_neighbors")
self.connect_edge("text", "output", "text_pairs", "text")
self.connect_edge("knn_neighbors", "output", "text_pairs", "indices")
self.connect_edge("text_pairs", "output", "similarities", "text_pairs")
self.connect_edge("text", "output", "similarities", "text")
self.connect_edge("num_neighbors", "output", "similarities", "num_neighbors")
self.connect_edge("knn_neighbors", "output", "text_predictions", "indices")
self.connect_edge("text", "output", "text_predictions", "text")
self.connect_edge("similarities", "output", "text_predictions", "similarities")
self.connect_edge("num_neighbors", "output", "text_predictions", "num_neighbors")

def run(self, table, num_neighbors):
self.build_pipeline(table, num_neighbors)
pipeline = self.create_pipeline()
result = self.run_pipeline(pipeline)
return result

Using the Inference Pipeline

To use the inference pipeline for classifying new text data, follow these steps:

1. First, prepare your data by loading both the test and train datasets:

test = load_csv('test.csv', hybrid = True).assign(flag='test').sample(n=250, random_state=42).reset_index(drop = True) # Load a sample of 250 chunks and label test data
train = load_csv('train.csv', hybrid = True).assign(flag='train') # Load and label train data
text = pd.concat([train, test]).reset_index(drop=True) # Combine and reset index
text = text.loc[:, text.columns != 'Sparse Embedding'] # Remove sparse embedding column
table = create_table(session, len(test['Dense Embedding'][0]), both=True) # Create KDB.AI table
[table.insert(text[i:i+200].reset_index()) for i in (range(0, text.reset_index().shape[0], 200))] # insert data into table in chunks2. Next, initialize and run the inference pipeline:
num_neighbors = 10 # number of nearest neighbors to classify with
inference = InferencePipeline(output_dir, table)
inference_output = inference.run(table, num_neighbors= num_neighbors)

Performance Evaluation:

When evaluated on a random sampling of 250 text chunks books from the Project Gutenberg library, our study revealed:

Flat Search (Dense Embeddings only):

  • Text Chunks: 98% accuracy
  • Books: 100% accuracy
Confusion Matrix Results for Inference with Flat Search

Hybrid Search (Dense + Sparse Embeddings):

  • Text Chunks: 100% accuracy
  • Books: 100% accuracy
Confusion Matrix Results for Inference with Hybrid Search

The hybrid search method demonstrated a slight but significant improvement in classification accuracy. This improvement can be attributed to the method’s ability to capture both overall semantic meaning (through dense embeddings) and specific textual features (through sparse embeddings).

Conclusion:

Our study demonstrates the power of combining advanced NLP techniques, efficient search methods, and high-performance computing to create a robust text classification system. By leveraging the T5 model, and the strength of KDB.AI’s powerful hybrid search capabilities, we’ve developed a classifier that achieves high accuracy in categorizing books.

Importantly, while this case focused on book classification, the architecture we’ve developed can be readily adapted to a wide range of text classification problems, making it a versatile solution for various domains and industries.

Don’t hesitate to contact jkantor@kx.com if you have any suggestions or questions.

--

--