Improving Chest X-ray Pneumonia Detection with Federated Learning and Covalent
--
Introduction
Federated learning is an innovative approach to machine learning that allows AI models to be trained across multiple devices or servers while keeping the data localized. Instead of centralizing data to a single server, federated learning decentralizes the training process, enabling data to remain on its original device. This approach is particularly beneficial in industries like healthcare, where patient data is sensitive. For instance, training an automatic X-ray diagnostic system requires a vast database to capture the full spectrum of possible anatomies and pathological patterns. Federated learning allows hospitals and medical institutions to enhance their AI models collaboratively without compromising data privacy.
However, the practical implementation of federated learning, especially on a large scale, is fraught with challenges. One of the primary challenges is the need for heterogeneous computing environments. While the centralized coordination of the model might require minimal compute resources, the actual training processes, distributed across various nodes, often demand high GPU compute power. Data scattered across different regions or devices can lead to inconsistencies in data access speeds and computational capabilities. The unavailability of specific GPUs in certain cloud environments (like geographic regions) can further complicate the training process. Moreover, the nature of federated learning makes rapid prototyping extremely challenging. The iterative process of model development, crucial for refining and improving models, becomes cumbersome and time-consuming.
Covalent, an open-source and cloud-agnostic tool, streamlines the process of federated learning. Designed for swift prototyping, it allows users to conduct experiments directly within their Jupyter notebooks, bypassing the complexities of cloud consoles. This efficiency not only enhances the productivity of ML teams but also shortens model development cycles. Covalent’s design facilitates workflows that utilize resources from major cloud providers, seamlessly integrating from a local machine to compute-intensive cloud environments. This makes it a valuable asset for researchers and professionals aiming to harness the benefits of centralized federated learning without the typical infrastructure and data privacy concerns.
In this blog post, we will provide a comprehensive guide on federated learning, dive into the technical aspects required for building Covalent workflows, and showcase how the two can be combined to build powerful experiments. The exemplar case we will employ involves the identification of pneumonia from chest X-rays.
Blog highlights
Before we proceed, here is list of advantages of using Covalent for prospective users
- Multi-cloud experiments: Covalent opens the gateway to conducting experiments that seamlessly traverse various clouds and regions. This capability proves particularly valuable when data is securely stored and must remain within its designated confines.
- Dynamic workflows: Datasets preprocessing may differ, especially when working with heterogeneous and non-identically distributed data. Covalent allows for defining dynamic workflows which adapt on-the-fly at runtime to cater to the unique characteristics of each dataset.
- Dynamic hardware allocation: In tandem with dynamic workflows, Covalent offers the flexibility to allocate compute resources based on dataset characteristics, for example dataset size. Moreover, users can dynamically choose between leveraging a graphics card or a CPU, ensuring optimal performance tailored to the specific requirements of each task.
Federated Learning on Pneumonia Chest X-rays
In our demonstration of federated learning, we will train three separate Convolutional Neural Networks (CNNs) using distinct Pneumonia Chest X-ray datasets. Subsequently, the trained models and their corresponding weights will be transmitted back to a central node. On this central node, we will perform weight aggregation using a simple weighted arithmetic mean:
Here, wⱼ represents the set of weights in CNN layer j, K is the number of models/datasets, which, in our case, is three, and dₖ represents the size of the dataset k.
This federated learning variant is commonly called centralized federated learning, where a single central node oversees the entire learning process. Utilizing weight averaging represents just one approach among several options available in federated learning. Other alternatives include exchanging gradients instead of weights or implementing techniques such as dynamic regularization.
Broadly speaking, our federated learning workflow encompasses the subsequent stages:
- Training autonomous CNN models in isolation.
- Consolidating the distinct CNN models into a singular aggregated model.
- Iteratively revisiting steps 1–2, this time refining the autonomous models by fine-tuning the aggregated model.
We execute the workflow for a predefined number of rounds. Each round entails training separate CNN models on isolated nodes without sharing any data. Subsequently, we aggregate the models, and evaluation is performed on each individual test set. Throughout the process, the centralized node solely possesses information about the model weights, dataset size, and accuracy scores.
For datasets, we will utilize three datasets currently available in the 🤗 Hugging Face Datasets repository:
We create a custom Python dataclass
to represent a single dataset.
@dataclass
class HFDataset:
"""
A class to represent a dataset available
in the Hugging Face repository.
"""
name: Tuple[str, str]
cloud_provider: str
filter_func: Callable[[Dict[str, str]], bool] = None
transform_label_func: Callable[[Dict[str, str]], int] = None
In order to standardize the datasets, we apply filtering (filter_func
) and transformations (transform_label_func
) to ensure that each dataset becomes the foundation of a binary supervised learning problem. Here, class 0 denotes X-rays with a negative diagnosis, while class 1 represents X-rays indicating the presence of pneumonia in the person. We associate each dataset with a specific cloud provider (cloud_provider
) to simulate the unshared nature of the datasets by designating the location where the dataset remains exclusively visible and accessible.
Developing with Covalent
Covalent SDK is a Python framework that consists three main key components:
These components help users to define complex workflows in a lightweight and non-destructive manner, with minimal and non-intrusive changes to their experimental code. We will next explain how to utilize the electron, executor, and lattice elements.
Electron
The electron element converts a function to a remotely-executable task that Covalent can deploy arbitrary resources to. A decorator ct.electron
is all that a python function needs to become an electron. Users specify resources and constraints for each electron by passing various executor objects to electron decorators.
Executors
Covalent executors are used to define the environment in which the workflow will be executed. Each electron can be assigned a different executor, such that the workflow is executed across several different machines.
import covalent as ct
gcp_batch_executor = ct.executor.GCPBatchExecutor(
vcpus = 2, # Number of vCPUs to allocate
memory = 512, # Memory in MB to allocate
time_limit = 60, # Time limit of job in seconds
poll_freq = 3, # Number of seconds to pause before polling for the job's status
)
aws_batch_executor = ct.executor.AWSBatchExecutor(
vcpu=2,
memory=4,
num_gpus=0,
time_limit=60,
poll_freq=3
)
azure_executor = ct.executor.AzureBatchExecutor(
retries=3,
time_limit=300,
)
# use `@ct.electron` to make electrons from individual tasks
gcp_batch_executor = ct.executor.GCPBatchExecutor(
vcpus = 2, # Number of vCPUs to allocate
memory = 512, # Memory in MB to allocate
time_limit = 60, # Time limit of job in seconds
poll_freq = 3, # Number of seconds to pause before polling for the job's status
)
aws_batch_executor = ct.executor.AWSBatchExecutor(
vcpu=2,
memory=4,
num_gpus=0,
time_limit=60,
poll_freq=3
)
azure_executor = ct.executor.AzureBatchExecutor(
retries=3,
time_limit=300,
)
@ct.electron(executor=aws_batch_executor)
def preprocess(
dataset, transform_func, transform_label_func=None,
filter_func=None
):
"""function that preprocesses the data"""
...
@ct.electron(executor=gcp_batch_executor)
def train_model(model, epoch_count, train_dataloader, use_gpu=False):
"""function that trains the model"""
...
@ct.electron(executor=azure_executor)
def evaluate(model, test_dataloader, use_gpu=False):
"""function that evaluates the model on test data"""
...
Lattice
The lattice decorator (@ct.lattice
) converts a function composed of electrons into a manageable workflow. It does so by wrapping the function in a callable Lattice
object. We transform a workflow into a Lattice
simply by adding this decorator to a function:
@ct.lattice
def federated_learning_workflow(
datasets: HFDataset, round_number, epoch_per_round, batch_size,
model_agg=None, image_dimension=64,
):
...
Finally, once a lattice is defined, you must dispatch a workflow to run it. You can dispatch a lattice workflow using Covalent by calling ct.dispatch
and providing a workflow name and parameters.
hf_datasets = [
HFDataset(
name=("keremberke/chest-xray-classification", 'full'),
cloud_provider='aws',
),
HFDataset(
name=("mmenendezg/raw_pneumonia_x_ray", ),
cloud_provider="gcp",
),
HFDataset(
name=(
"alkzar90/NIH-Chest-X-ray-dataset",
"image-classification"
),
cloud_provider='azure',
filter_func=filter_by_label,
transform_label_func=map_labels_to_single_label
)
]
dispatch_id = ct.dispatch(federated_learning_workflow)(
hf_datasets, round_number=50,
epoch_per_round=1, batch_size=32
)
The diagram below portrays the overarching design of a Covalent workflow example that spans AWS, GCP, and Azure environments.
Pneumonia Chest X-ray Federated Learning using Covalent
We now demonstrate how to utilize Covalent to run and perform federated learning across several cloud providers. To achieve this, we establish a dynamic sublattice, which creates an executor at runtime. This approach allows us to allocate cloud resources on the fly, based on the workflow’s input parameters.
@ct.electron
@ct.lattice
def cloud_pnemonia_classifier(cloud_provider, use_gpu=False, **kwargs):
# Match provider to an executor.
cloud_service_executor_mapping = {
"aws": ct.executor.AWSBatchExecutor(vcpu=2, num_gpus=int(use_gpu)),
"gcp": ct.executor.GCPBatchExecutor(vcpus=2),
"azure": ct.executor.AzureBatchExecutor(),
}
# Create a new electron (task) that uses this executor.
electron = ct.electron(
build_pneumonia_classifier,
executor=cloud_service_executor_mapping.get(cloud_provider),
deps_pip=ct.DepsPip(
packages=[
"torch==2.0.1", "torchvision==0.15.2","datasets==2.14.0"
]
)
)
return electron(**kwargs)
It is important to observe that in this case build_pneumonia_classifier
is built as an ct.electron
object dynamically at runtime, which is also when an executor is assigned.
💡 Ensure that you have pip installed the necessary plugins
pip install covalent-awsbatch-plugin covalent-gcpbatch-plugin covalent-azurebatch-plugin
Executor default configuration parameters are available in the Covalent config (typically located at $HOME/.config/covalent/covalent.conf
). More information on running batch executors is available in the Covalent docs for AWSBatch, AzureBatch, and GCPBatch. Python framework prerequisite dependencies installed on cloud resources are specified using ct.DepsPip
.
With the main @ct.lattice
defined, let's delve into the specifics of the build_pneumonia_classifier
method. Its primary tasks include:
- Loading and preprocessing the dataset,
- Creating a new model or continuing the training of a previously trained supervised model, and
- Evaluating the performance of the model on the loaded test dataset.
def build_pneumonia_classifier(
dataset_name=None, filter_func=None, transform_label=None,
model=None, epoch_number=2, batch_size=64,
image_dimension=64
):
train_ds, test_ds, maj_test_acc = prepare_dataset(
dataset_name, filter_func, transform_label,
image_dimension=image_dimension
)
train_dataloader, test_dataloader = create_dataloaders(
train_ds, test_ds, batch_size
)
if not model:
model = create_pneumonia_network(image_dimension)
train_losses, ds_size = train_model(
model, epoch_number, train_dataloader
)
test_acc, test_loss = evaluate(model, test_dataloader)
return model, ds_size, test_acc
For a comprehensive understanding of the dataset preprocessing, model architecture implementation in PyTorch, and the methods employed for model training and evaluation, you can find detailed information here.
Lastly, to finalize our federated learning workflow, we showcase the process of constructing an aggregated model from the individual models through weighted averaging, as defined in equation (1).
import torch
@ct.electron
def create_aggregated_network(
model_list, ds_sizes, image_dimension=64
):
"""
Simple aggregation mechanism where weights of a network
are aggregated using a weighted average, where the value of the
weight is the size of the dataset
"""
dataset_weights = np.array(ds_sizes) / sum(ds_sizes)
whole_aggregator = []
# compute a weighted average
for p_index, layer in enumerate(model_list[0].parameters()):
params_aggregator = torch.zeros(layer.size())
for model_index, model in enumerate(model_list):
params_aggregator = params_aggregator + dataset_weights[
model_index
] * list(model.parameters())[p_index].data
whole_aggregator.append(params_aggregator)
net_avg = create_pneumonia_network(image_dimension)
for param_index, layer in enumerate(net_avg.parameters()):
layer.data = whole_aggregator[param_index]
return net_avg
Now we expand on the previously introduced federated_learning_workflow
, which interconnects all the tasks (electrons) into a main workflow (lattice). This workflow facilitates the creation of a pnemonia classifier for each dataset, ensuring seamless development within isolated cloud environments. After the models are crafted, they are transmitted back to the central node. Here, the resulting models are aggregated as described in create_aggregated_network
, thereby producing a refined model for the isolated cloud environments to further enhance and improve upon.
@ct.lattice
def federated_learning_workflow(
datasets: HFDataset, round_number, epoch_per_round, batch_size,
model_agg=None, image_dimension=64,
):
test_accuracies = []
for round_idx in range(round_number):
print(f"Round {round_idx + 1}")
models = []
dataset_sizes = []
for ds in datasets:
trained_model, ds_size, test_acc = build_pneumonia_classifier(
ds.name, model=model_agg, image_dimension=image_dimension,
epoch_number=epoch_per_round,
transform_label=ds.transform_label_func,
filter_func=ds.filter_func
)
models.append(trained_model)
dataset_sizes.append(ds_size)
test_accuracies.append(
(round_idx, ds.name, test_acc)
)
model_agg = create_aggregated_network(
models, dataset_sizes, image_dimension=image_dimension,
)
return test_accuracies
The iterative process of centralized federated learning continues for a duration determined by the round_number
hyper-parameter. The illustration below provides a visual depiction of how the Covalent UI presents the aforementioned workflow when round_number
is set to 2, and the workflow encompasses the processing of three distinct datasets across three different cloud providers.
Evaluation of applying Federated Learning to X-ray data
In this evaluation, we aim to shed further light on the intrinsic advantages of federated learning, particularly its emphasis on preserving data privacy and fostering seamless collaboration across diverse healthcare institutions. To achieve this, we compare the test accuracies of a federated learning setup against a classically trained setup, where a model is trained on a single dataset, while employing identical CNN model architectures. The following illustration illustrates the consistent superiority of a federated learning model over a standard machine learning arrangement. Three distinct models are individually developed and subsequently contrasted with a model trained through federated learning. This evaluation is performed across three distinct test datasets.
In an attempt to comprehend the reasons behind the federated model performance, we generate feature maps using the convolutional layer for separate models (upper row) and the federated learning model (lower row). The visual representations offer insights into the evolution of specialized and distinct filters by individual classifiers, whereas the federated model illustrates traits that span across all individual models.
Conclusion
Leveraging Covalent, we established a federated learning framework across three cloud computing resources while maintaining strict data isolation protocols. This allowed us to build an X-ray pneumonia classifier seamlessly by utilizing dynamic sublattices for on-the-fly allocation of compute resources, even with heterogeneous datasets.
The Covalent-enabled design of the federated learning experiment simplifies adding datasets, adjusting training hyperparameters, and altering aggregation technique. The workflow ensures reproducibility with secure metadata storage in a scalable database.
This blog post aims to showcase the remarkable potential of Covalent in conjunction with various resources such as AWS Batch, Azure Batch, and GCP Batch. The full code for the sample implementation of this post can be found here.
Want to learn more?
Visit Covalent's GitHub page.
References
[1] Kermany, Daniel; Zhang, Kang; Goldbaum, Michael, Labeled Optical Coherence Tomography (OCT) and Chest X-Ray Images for Classification (2018), Mendeley Data, V2, doi: 10.17632/rscbjbr9sj.2
[2] Wang, X., Peng, Y., Lu, L., Lu, Z., Bagheri, M., & Summers, R. M., Chestx-ray8: Hospital-scale chest x-ray database and benchmarks on weakly-supervised classification and localization of common thorax diseases. (2017), Proceedings of the IEEE conference on computer vision and pattern recognition