Find all connected components in a dataset with minimal memory consumption

Thomas Meißner
7 min readAug 23, 2023

--

Everything has a cost

Image created by AI on https://www.imagine.art/ with the term “computer memory efficiency”

In our last article we created a script to find all connected components within a dataset. While we managed to find all connections, we did not fulfill our goal of being memory efficient.

In this article we put the focus on memory efficiency, but accept some other tradeoffs instead.

The dataset

First we will make use of our synthetic dataset to validate the approach is working. The dataset looked like this:

network_1 = [
("A", "12"),
("B", "12"),
("B", "13"),
("C", "13"),
("C", "1045"),
("D", "1045"),
("D", "2095"),
("E", "2095"),
("E", "883"),
("E", "6634"),
("F", "6634"),
("F", "7777"),
("G", "7777"),
]


network_2 = [
("M", "7768"),
("N", "7768"),
("N", "8998"),
("O", "7768"),
("O", "9000"),
("P", "9000")
]

Changing the approach

The last article followed the motto “dictionaries is all you need” as dictionaries are blazingly fast. If we want to be memory efficient though we cannot store all information in memory. Thus we will offload everythigng we don’t need on local storage.

We make use of a custom class that stores all relevant information for each entity:

class EntityMapper(ABC):
def __init__(self, entity_name: str, map_type: Literal["entity_to_identifier", "identifier_to_entity"]):
self.entity_name: str = entity_name
self.entity_to_identifier: List[Union[str, int]] = []
self.identifier_to_entity: List[Union[str, int]] = []
self.map_type = map_type
self.entity_to_entity: List[Union[str, int]] = []
self.entity_to_entity_chain_len: int = 0 # entity_to_entity_chain_len

Next we need some helper functions to save and store our classes:

def save_to_production(
class_instance, file_name: str = None, file_type=".dat", file_path=""
):
"""
Takes a class and saves it via pickle.
:param class_instance: Takes instance of a class.
:param file_name: Takes a string containing the whole file name.
:param file_type: Takes the expected type of file to export.
:return:
"""
filehandler = open(file_path + "/" + file_name + file_type, "wb")
pickle.dump(class_instance, filehandler)
filehandler.close()


def load_for_production(file_name: str = None, file_type=".dat", file_path=""):
"""
Load in a class. This function will try to load the model as provided.
It has a fallback logic to impute .dat as file_type in case the import fails initially.
:param file_name:
:param file_type:
:return:
"""
try:
filehandler = open(file_path + "/" + file_name, "rb")
except Exception:
filehandler = open(file_path + "/" + file_name + file_type, "rb")
class_instance = pickle.load(filehandler)
filehandler.close()
return class_instance

When running on a local system it is also recommended to use a function to create a folder for all of these classes:

folder_name = "path/test_folder" # whole path is required
permission_mode = 0o700 # Replace with desired permission mode

if not os.path.exists(folder_name):
os.makedirs(folder_name)
os.chmod(folder_name, permission_mode)
print(f"Folder '{folder_name}' created.")
else:
shutil.rmtree(folder_name)
print(f"Folder '{folder_name}' already exists. Removing...")
os.makedirs(folder_name)
os.chmod(folder_name, permission_mode)
print(f"Folder '{folder_name}' created.")

This way we can delete the folder as a whole from the OS if anything goes wrong which can be much more convenient.

We do not remove classes altogether. In fact we keep two classes for our first loop to speed up the script massively:

entities_seen = {}
identifiers_seen = {}

We could alternatively look up all file names in the folder to check if a file already exists, but this would slow own the loop massively as more and more files the system would have to loop through. Dictionaries are just so much faster here.

Apart from this the initial loop stays the same as in our first article. Instead of storing everything in dictionaries we update classes and load and store them.

For the final multi hop loop we have similar changes. As writing to the local file system much slower than into memory we try to get some compensation by using multiple cores:

 max_workers = os.cpu_count()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers-1) as executor:
# Submit the function for each entity code
futures = {executor.submit(update_shared_entities, code, entity): code for code in shared_entities}

# Wait for all threads to complete
concurrent.futures.wait(futures)

for results, shared_entity_code in futures.items():
shared_entity, chain_length_inc = results.result()
shared_entities_results.append(shared_entity.entity_to_entity)
if chain_length_inc:
chain_length_increased = chain_length_inc

save_to_production(shared_entity, file_name=f"entity_to_identifier_{shared_entity_code}", file_path=folder_name)
del shared_entity

The multi-threading however adds the risk that we did not find the full chains. Thus we add one extra loop before the early stopping comes into effect:

if not chain_length_increased:
multihop()
break

Impact of these changes

We did all of these changes to find connected components with minimal memory consumption. Indeed we found all components in our synthetic dataset.

We also tried to run the script on our more than 5 million rows big dataset. here however the local storage went out of disk space! This should be a big warning that such a script should only be tested in a safe environment as such out of storage errors can fully crash an operating system. We tested it within a virtual machine.

The memory consumption has been barely noticeable indeed.

The biggest drawback has been the long runtime though. For just 500k rows the script needed 20 minutes! Also so many write operations will reduce the lifetime of an SSD massively. Thus it cannot be recommended to run such script on a local system.

Conclusion

While we managed to find all connected components with minimal memory consumption, the drawbacks are too big: Slow runtime, high storage space demand and too many write operations are heavy drawbacks.

However this can still be optimized. Instead of one object per entity we could create an id for every entity, group these into less object (via modulo maybe) and then read and write bigger and less objects. This would consume more memory, but be faster and less storage demanding.

The full script (only test this in a safe environment!):

from abc import ABC
import csv
import _pickle as pickle
import gc
import datetime
import concurrent.futures
import pandas as pd
#import pickle
import os
import shutil
from typing import Dict, List, Literal, Union



def create_synthetic_data() -> None:
synth = pd.DataFrame(
{
"user": [
"A",
"B",
"B",
"C",
"C",
"D",
"D",
"E",
"E",
"E",
"F",
"F",
"G",
"M",
"N",
"N",
"O",
"O",
"P"
],
"message_id": [
"12",
"12",
"13",
"13",
"1045",
"1045",
"2095",
"2095",
"883",
"6634",
"6634",
"7777",
"7777",
"7768",
"7768",
"8998",
"7768",
"9000",
"9000"
]
}
)
synth.to_csv("/path/account_linking_synth.csv", index=False)


class EntityMapper(ABC):
def __init__(self, entity_name: str, map_type: Literal["entity_to_identifier", "identifier_to_entity"]):
self.entity_name: str = entity_name
self.entity_to_identifier: List[Union[str, int]] = []
self.identifier_to_entity: List[Union[str, int]] = []
self.map_type = map_type
self.entity_to_entity: List[Union[str, int]] = []
self.entity_to_entity_chain_len: int = 0 # entity_to_entity_chain_len


def save_to_production(
class_instance, file_name: str = None, file_type=".dat", file_path=""
):
"""
Takes a class and saves it via pickle.
:param class_instance: Takes instance of a class.
:param file_name: Takes a string containing the whole file name.
:param file_type: Takes the expected type of file to export.
:return:
"""
filehandler = open(file_path + "/" + file_name + file_type, "wb")
pickle.dump(class_instance, filehandler)
filehandler.close()


def load_for_production(file_name: str = None, file_type=".dat", file_path=""):
"""
Load in a class. This function will try to load the class as provided.
It has a fallback logic to impute .dat as file_type in case the import fails initially.
:param file_name:
:param file_type:
:return:
"""
try:
filehandler = open(file_path + "/" + file_name, "rb")
except Exception:
filehandler = open(file_path + "/" + file_name + file_type, "rb")
class_instance = pickle.load(filehandler)
filehandler.close()
return class_instance


filename = '/path/account_linking_synth.csv'

folder_name = "/path/test_folder" # whole path is required
permission_mode = 0o700 # Replace with desired permission mode

if not os.path.exists(folder_name):
os.makedirs(folder_name)
os.chmod(folder_name, permission_mode)
print(f"Folder '{folder_name}' created.")
else:
shutil.rmtree(folder_name)
print(f"Folder '{folder_name}' already exists. Removing...")
os.makedirs(folder_name)
os.chmod(folder_name, permission_mode)
print(f"Folder '{folder_name}' created.")

entity_to_entity_chain_increased: Dict[str, bool] = {}

create_synthetic_data()

# define which idx the columns have in the CSV
entity_idx = 1
identitiy_idx = 0
header = True
rows_parsed = 0
chain_length_increased: bool = False

entities_seen = {}
identifiers_seen = {}

# one hop
with open(filename, 'r') as csvfile:
datareader = csv.reader(csvfile)
# expect 1st column to be entity & 2nc column identifier
for row in datareader:
rows_parsed += 1
if header and rows_parsed <= 1:
continue

if row[entity_idx] in entities_seen:
entity_already_seen = True
else:
entities_seen[row[entity_idx]] = 1
entity_already_seen = False

if row[identitiy_idx] in identifiers_seen:
identifier_already_seen = True
else:
identifiers_seen[row[identitiy_idx]] = 1
identifier_already_seen = False

# check if entity is already there and append identifier
if not entity_already_seen:
entity_instance = EntityMapper(entity_name=row[entity_idx], map_type="entity_to_identifier")
else:
entity_instance = load_for_production(file_name=f"entity_to_identifier_{row[entity_idx]}", file_path=folder_name)

entity_instance.entity_to_identifier.append(row[identitiy_idx])

# likewise check if identifier is listed already
if not identifier_already_seen:
identifier_instance = EntityMapper(entity_name=row[identitiy_idx], map_type="identifier_to_entity")
else:
identifier_instance = load_for_production(file_name=f"identifier_to_entity_{row[identitiy_idx]}", file_path=folder_name)

identifier_instance.identifier_to_entity.append(row[entity_idx])

for entity in identifier_instance.identifier_to_entity:
if entity not in entity_instance.entity_to_entity:
entity_instance.entity_to_entity = identifier_instance.identifier_to_entity
else:
entity_instance.entity_to_entity = list(set(
entity_instance.entity_to_entity + identifier_instance.identifier_to_entity
))
save_to_production(entity_instance, file_name=f"entity_to_identifier_{row[entity_idx]}", file_path=folder_name)
save_to_production(identifier_instance, file_name=f"identifier_to_entity_{row[identitiy_idx]}", file_path=folder_name)


def update_shared_entities(shared_entity_code: str, entity_inst: EntityMapper):
shared_entity = load_for_production(file_name=f"entity_to_identifier_{shared_entity_code}", file_path=folder_name)
if entity_inst.entity_name == shared_entity:
shared = []
chain_length_increased = False
else:
prev_length = entity_inst.entity_to_entity_chain_len
shared = list(set(entity_inst.entity_to_entity + shared_entity.entity_to_entity))
shared_entity.entity_to_entity = shared
# we keep track if any chain got longer, otherwise we do early stopping
chain_length = max(len(entity_inst.entity_to_entity), len(shared_entity.entity_to_entity))
if chain_length > prev_length:
entity_inst.entity_to_entity_chain_len = chain_length
chain_length_increased = True
else:
chain_length_increased = False
return shared_entity, chain_length_increased


def multihop():
global chain_length_increased
global entity_to_entity_chain_increased

chain_length_increased = False

for entity_path in os.listdir(f'{folder_name}/'):
if "entity_to_identifier" in entity_path:
entity = load_for_production(file_name=str(entity_path), file_path=folder_name)
shared_entities = entity.entity_to_entity
else:
continue

shared_entities_results = []

max_workers = os.cpu_count()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers-1) as executor:
# Submit the function for each entity code
futures = {executor.submit(update_shared_entities, code, entity): code for code in shared_entities}

# Wait for all threads to complete
concurrent.futures.wait(futures)

for results, shared_entity_code in futures.items():
shared_entity, chain_length_inc = results.result()
shared_entities_results.append(shared_entity.entity_to_entity)
if chain_length_inc:
chain_length_increased = chain_length_inc

save_to_production(shared_entity, file_name=f"entity_to_identifier_{shared_entity_code}", file_path=folder_name)
del shared_entity

entity.entity_to_entity = list(set(entity.entity_to_entity + [item for row in shared_entities_results for item in row]))

save_to_production(entity, file_name=f"entity_to_identifier_{entity.entity_name}", file_path=folder_name)
del entity


for i in range(10):
global chain_length_increased
print(f"Start multihop iteration {i} at {datetime.datetime.now()}")
multihop()
# if no chain got longer we do early stopping
if not chain_length_increased:
multihop()
break

--

--

Thomas Meißner

Data scientist at SumUp. Passionate about data, good food, coffee and wine. Father of two lovely children.