xData Aggregation FW
Note
This document has been machine translated.
This document describes xData Aggregation FW.
class Manager
This class manages Aggregator and Party configurations.
**Example code: **
Declare as follows
class TutorialManager(Manager):
AGGREGATOR = TutorialAggregator
TRAINER = TutorialTrainer
Class Variables.
- AGGREGATOR: specifies the
Aggregatorclass. - TRAINER: specifies the
Trainerclass.
__init__
Arguments:
- agg: RemoteStore | None: Specify the store to be used by the Aggregator. parties: list[RemoteStore]: Specifies the store used by the Party group.
federate
The following sequence of operations is performed. 1.
- run
federatein N parties Executefederatein Aggregator
arguments:
current_round: int: pass the round to execute now.
return:
result: undefined
code example:
manager = TutorialManager(agg=store, parties=[store, store])
current_round = 1
manager.federate(current_round=current_round)
The above code example is roughly equivalent to the following code
manager = TutorialManager(agg=store, parties=[store, store])
current_round = 1
aggregator = manager.get_aggregator()
for i in range(len(manager.parties)):
trainer = manager.get_trainer(i)
trainer.federate(current_round=current_round)
aggregator.federate(current_round=current_round)
get_aggregator
Get an Aggregator instance.
return value:
- result: Aggregator.
get_trainer`.
Get the Trainer instance.
arguments: * index: int: index of the store specified at initialization
return value:
- result: Party
class Aggregator
This class executes a series of processes of coalition learning as an Aggregator.
Code Examples:
# Implement the aggregation algorithm.
class TutorialAggregator(Aggregator):
TRAINER = TutorialTrainer
SERIALIZER = TutorialSerializer
def load_feedback_list(self):
model_ids = self.agg.get_latest_feedbacked_models()
if len(model_ids) == 0:
raise Exception()
for model_id in model_ids:
model = self.agg.load_model(model_id)
meta = self.agg.get_model_meta(model_id)
yield model, meta
def aggregate(self, current_round: int, feedback_list):
aggregated_model = 0
aggregated_meta = {"data_size": 0}
for model, meta in feedback_list:
aggregated_model += model
aggregated_meta["data_size"] += meta["data_size"]
return aggregated_model, aggregated_meta
federate
The following sequence of steps is performed. 1.
load_feedback_list: retrieve a set of models and their metadata 2.get_latest_feedbacked_models: Get models that have been fed back since the last aggregation. Called inload_feedback_list. 3. 3.deserialize: restore the given models in memory (called onload_modelcall inload_feedback_list) 4.aggregate: take the result ofload_feedback_listand perform aggregation 5.serialize: serialize the model returned by `aggregateput_aggregated_model: Store the model and metadata serialized byserializein the model store as its own aggregated model
arguments:
current_round: int: pass the round to run now.
return value:
result: dict return model_info returned by `put_aggregated_model
load_feedback_list (abstract method)
Describe the process of retrieving a set of models that have been fed back. A default implementation is provided.
result:
result: Iterable Return any Iterable. The default is to enumerate Tuple of model body and model metadata.
aggregate (abstract method)
Describe the aggregation process.
arguments:
- parameters: If
current_round > 1, the aggregated model is passed. Otherwise,None. - current_round: int: The current
roundis passed. - meta: If
current_round > 1, the metadata given to the aggregated model is passed. Otherwise it is{}.
return:
result: Tuple[Any, dict] Return model and dict.
class Trainer
This class executes a series of processes of coalition learning as a Party.
Code Example:
class TutorialTrainer(Trainer):
def train(self, parameters, current_round, meta):
if parameters is None:
parameters = 0
parameters += 1
return parameters, {"data_size": 2}
federate
The following sequence of steps is performed. 1.
request_transfer: Whencurrent_round > 1, request the latest aggregated model from the upstream and store its model store as the transferred model 2. **deserialize 2.**deserialize:** restore the given model in memory ifcurrent_round > 1. 3.**train:** performs the implemented training process. Ifcurrent_round > 1, the model obtained byrequest_transferis passed. 4. 4.**serialize:** serialize the model returned bytrain5. 5.**[put_model](fl_api_methods.md#flput_model):** register the model and metadata serialized byserialize` in its own storefeedback: Store models saved withput_modelin the upstream model store as fed back models
arguments:
current_round: int: pass the round to execute now.
return value:
- result: dict return
model_inforeturned from upstream in `feedback
train (abstract method)
Describe the training process.
arguments:
- parameters: If
current_round > 1, the aggregated model is passed. Otherwise,None. - current_round: int: The current
roundis passed. - meta: If
current_round > 1, the metadata given to the aggregated model is passed. Otherwise,{}.
return:
- result: Tuple[Any, dict] Return the model and
dict.
class Serializer
This class describes the method of persistence and restoration when sending and receiving models.
Code Example:
import torch
class PytorchSerializer:
def serialize(self, model, output_path):
torch.save(model, output_path)
return output_path
def deserialize(self, input_path, model_meta={}):
with open(input_path, 'rb') as f:
return torch.load(f.read(), map_location=torch.device('cpu'))
serialize (abstract method)
Describes how to persist the model.
arguments:
- model: model
- output_path: int: destination path where the framework specifies where to save the model
return value:
- result: str return destination path
deserialize (abstract method)
Describes how to restore a persisted model.
arguments:
- model: model
- output_path: int: destination path, specified by framework where to save the file
return_value:
result return the model.
class RemoteStore
A model store with Federated Learning Web API as its backend.
It functions like a wrapper around the Federated Learning Web API (specifying model_store_ddc only at initialization time).
For more information, see Federated Learning API List for details.
Code example:
from xdata_fl.client import Api
from xdata_agg.fw._abc import RemoteStore
class MyStore(RemoteStore):
SERIALIZER = PytorchSerializer
model_api = Api(endpoint="", apikey="", apisecret="")
model_store_ddc = "ddc:my_first_model_store"
# If RemoteStore is specified, model_store_ddc can be omitted
with MyStore(model_api, model_store_ddc=model_store_ddc) as store:
model_info = store.put_model(model)
model = store.load_model(model_info["model_id"])
class variable.
- SERIALIZER: Specifies the
Serializerclass (called onload_modelorput_model).
__init__.
arguments:
- api:
xdata_fl.client.Api: specify backend - model_store_ddc: str: specify backend model store