Communicate data across clients

FeatureCloud platform Supports a wide variety of federated applications that can handle various federated scenarios. Such applications’ flexibility demands multiple communication methods to transfer data across the platform. This story covers four supported communication methods in FC and shows what should be expected once users send data around and expect to receive it from another client. For naming the states, one should import AppState and define state classes:

from enum import Enum
from FeatureCloud.app.engine.app import app_state, AppState, Role, SMPCOperation
import numpy as np

class States(Enum):
initial = 'initial'
broadcaster = 'Broadcaster'
waiter = 'Waiter'
sender = 'Sender'
aggregator = 'Aggregator'
terminal = 'terminal'

Five states are defined (the terminal state is predefined and merely flags the end of the app execution).

Sending out the data

The first action in each communication round is sending out data, and there are multiple options in FeatureCloud to do so.

Broadcast data

Once the federated collaboration starts, all participants should start from a common point in terms of model, especially when it comes to Deep neural networks. i.e., centralized custody; in this case, the coordinator should instantiate the DNN model and share the randomly initialized parameters with all clients. Besides, conventionally, the aggregated model should be available to all clients, so they continue their work with the aggregated model after aggregation. The broadcast method allows the coordinator to share the same data with all participants. For simplicity, in the initial state, we distinguish between the coordinator and participants, so the coordinator transitions into the braodcaster state while other participants wait for the initial values.

@app_state(name=States.initial.value)
class InitialState(AppState):
def register(self):
self.register_transition(States.broadcaster.value, Role.COORDINATOR)
self.register_transition(States.waiter.value, Role.PARTICIPANT)

def run(self):
if self.is_coordinator:
return States.broadcaster.value
return States.waiter.value

The Coordinator enters the broadcaster state, then randomly instantiates a NumPy array of (100, 10)for each client, and then broadcast it across them.

@app_state(name=States.broadcaster.value, role=Role.COORDINATOR)
class BroadcasterState(AppState):
def register(self):
self.register_transition(States.sender.value, Role.COORDINATOR)

def run(self):
data = np.random.random((len(self.clients), 100, 10)).tolist()
self.broadcast_data(data, send_to_self=False)
ind = my_share(self.clients, self.id)
self.store('data', data[ind])
self.log('Initial data is broadcasted')
return States.sender.value

Because broadcast provides the same content to all clients, each client should distinguish its own share from the rest based on its id. The braodcast method has a send_to_self argument that clarifies whether either coordinator should receive the data or not. In this example, we did not send the data to the coordinator because it has it and can provide it to the following states using the shared memory and store method.

Communicating Data:

send_data_to_participant communicates data to another specific client using its id.

Sending data for the coordinator:

Developers can use the send_data_to_coordinator method to Communicate data with the coordinator. It provides the FeatureCloud Controller with the data that should be transferred to the coordinator. And if the coordinator calls it, data will be directly appended to its list of incoming data. Developers should decide whether they want to employ SMPC for securing the aggregation or not by setting use_smpc flag.

@app_state(name=States.sender.value)
class SenderState(AppState):
def register(self):
self.register_transition(States.aggregator.value, Role.COORDINATOR)
self.register_transition(States.terminal.value, Role.PARTICIPANT)

def run(self):
data = self.load('data')
self.send_data_to_coordinator(data)
self.log('Data is sent to the coordinator!')
if self.is_coordinator:
return States.aggregator.value
return States.terminal.value

Collecting data

For receiving data from other clients, regardless of their role, one can use await_data. To collect the clients data at the coordinator side, one can use gather_data and aggregate_data methods.

Waiting to receive data

await_data is general method for receiving data which other methods build upon it for special cases. await_data can be called for receiving data from n clients by polling for data arrival every DATA_POLL_INTERVAL seconds. Once the data arrives, it deserializes the received data.

@app_state(name=States.waiter.value, role=Role.PARTICIPANT)
class WaiterState(AppState):
def register(self):
self.register_transition(States.sender.value, Role.PARTICIPANT)
def run(self):
data = self.await_data()
ind = my_share(self.clients, self.id)
data = data[ind]
self.log(f"Data is received. Data shape: {np.shape(data)}")
self.store('data', data)
return States.sender.value

Gathering clients data:

FC app developers can call this method only for clients with the coordinator role. This method calls the await_data method to wait for receiving data of all clients. The gathered data will be in a list that includes each client’s data. In case there is more than one data part shared by each client, one solution to access related data is to loop over the clients’ data and access the relative data part on different clients. For instance, for two clients, A and B: A sends out:

data_to_send = [[1, 2, 3], "test_A"]

while client B sends:

data_to_send = [[5, 6], "test_B"]

the coordinator can aggregate the data as follows:

clients_lists, clients_str = [], []
for clients_data in self.gather_data():
clients_lists.append(clients_data[0])
clients_str.append(clients_data[1])

Aggregating clients data

aggregate_data method automatically handles SMPC usage and serialization and always returns the aggregated data. Aggregated data contains the same structure and shape as clients sent out data because it was summed up element-wise. Therefore, to have structural data consistency, it considers SMPC usage as follows:

  • Using SMPC: waits to receive the aggregated data from SMPC modules; it looks like waiting for just one client.
  • Without SMPC: awaits to receive all client’s data, then internally aggregates them.
@app_state(name=States.aggregator.value, role=Role.COORDINATOR)
class AggregatorState(AppState):
def register(self):
self.register_transition(States.terminal.value, Role.COORDINATOR)

def run(self):
data = self.aggregate_data(operation=SMPCOperation.ADD)
self.log(f"Data is aggregated: {np.shape(data)}")
self.log(f"Data is aggregated: {data}")
return States.terminal.value

Accordingly, FeatureCloud app developers no longer are required to consider SMPC usage because they always get the same aggregated results in the coordinator. Provided aggregated results are not the average ones; therefore, they need to be averaged, if it’s apt to, separately. If different data parts are being sent by clients, using aggregate_data may be troublesome because those data parts may vary in dimension and data type. Hence, in such scenarios, developers can use gather_data to access the same data part of different clients and pass them to _aggregate method separately to get the aggregated values.

Configuring SMPC Module :

Developers can configure the Secure Multi-Party Computation(SMPC) module by sending range, shards, operation, and serialization parameters. In case of not calling the method, default configurations will be used (More information here).

You can find the code that was used in this story in communicate package of the FeatureCloud app-tutorial repository.

--

--