xData Aggregation FW: API List
This document has been machine translated.
This document describes xData Aggregation FW.
class Manager
This class manages the configuration of Aggregator and Party.
Code Example:
Declare as follows
class TutorialManager(Manager):
AGGREGATOR = TutorialAggregator
TRAINER = TutorialTrainer
Class Variable
- AGGREGATOR: Specifies the
Aggregator
class. - TRAINER: Specifies the
Trainer
class.
__init__
Argument:
- agg: RemoteStore | None: Specify the store to be used by the Aggregator.
- parties: list[RemoteStore]: Specify the store used by Party group.
federate
The following sequence of operations is performed.
- run
federate
in N parties - Execute
federate
in Aggregator
arguments:
- current_round: int: pass the
round
to execute now.
return:
- result: undefined
code example:
manager = TutorialManager(agg=store, parties=[store, store])
current_round = 1
manager.federate(current_round=current_round)
The above code example is roughly equivalent to the following code.
manager = TutorialManager(agg=store, parties=[store, store])
current_round = 1
aggregator = manager.get_aggregator()
for i in range(len(manager.parties)):
trainer = manager.get_trainer(i)
trainer.federate(current_round=current_round)
aggregator.federate(current_round=current_round)
get_aggregator
Get an Aggregator
instance.
Return Value:
- result: Aggregator
get_trainer
Get an Trainer
instance.
arguments:
- index: int: index of the store specified at initialization
Return Value:
- result: Party
class Aggregator
As an Aggregator, this class executes a series of processes for coalition learning.
Code Example:
# Implement the aggregation algorithm.
class TutorialAggregator(Aggregator):
TRAINER = TutorialTrainer
SERIALIZER = TutorialSerializer
def load_feedback_list(self):
model_ids = self.agg.get_latest_feedbacked_models()
if len(model_ids) == 0:
raise Exception()
for model_id in model_ids:
model = self.agg.load_model(model_id)
meta = self.agg.get_model_meta(model_id)
yield model, meta
def aggregate(self, current_round: int, feedback_list):
aggregated_model = 0
aggregated_meta = {"data_size": 0}
for model, meta in feedback_list:
aggregated_model += model
aggregated_meta["data_size"] += meta["data_size"]
return aggregated_model, aggregated_meta
federate
A series of processes are executed in the following flow.
load_feedback_list
: Retrieve a set of models and their metadataget_latest_feedbacked_models
: Get the set of models that have been fed back since the last aggregation. Called inload_feedback_list
.deserialize
: Restore the given model to memory (called when callingload_model
inload_feedback_list
)aggregate
: receives the results ofload_feedback_list
and performs the aggregation processserialize
: Serialize the model returned byaggregate
.put_aggregated_model
: Store models and metadata serialized withserialize
in the model store as their own aggregated models
arguments:
- current_round: int: Pass
round
to be executed from now on.
Return Value:
- result: dict returns
model_info
returned by `put_aggregated_model
load_feedback_list
(abstract method)
Describe the process for retrieving a set of models that have been fed back. A default implementation is provided.
Return Value:
- result: Iterable Return any
Iterable
. The default is to enumerateTuple
of model body and model metadata.
aggregate
(abstract method)
Describe the aggregation process.
arguments:
- parameters: If
current_round > 1
, the aggregated model is passed. Otherwise,None
. - current_round: int: The current
round
is passed. - meta: If
current_round > 1
, the metadata given to the aggregated model is passed. Otherwise it is{}
.
Return Value:
- result: Tuple[Any, dict] Return model and
dict
.
class Trainer
This class executes a series of processes of coalition learning as a Party.
Code Example:
class TutorialTrainer(Trainer):
def train(self, parameters, current_round, meta):
if parameters is None:
parameters = 0
parameters += 1
return parameters, {"data_size": 2}
federate
A series of processes are executed in the following flow.
request_transfer
: Ifcurrent_round > 1
, requests the latest aggregated model from upstream and stores it in its own model store as the transferred model 2.deserialize
: ifcurrent_round > 1
, restore the given model in memory.train
: performs the implemented training process. Ifcurrent_round > 1
, the model obtained byrequest_transfer
is passed. 4.serialize
: serialize the model returned bytrain
.put_model
: register the model and metadata serialized byserialize
in its own storefeedback
: Store models saved withput_model
in the upstream model store as fed back models
arguments:
- current_round: int: Pass
round
to be executed from now on.
Return Value:
- result: dict returns
model_info
returned from upstream infeedback
train
(abstract method)
Describe the training process.
arguments:
- parameters: If
current_round > 1
, the aggregated model is passed. Otherwise,None
. - current_round: int: The current
round
is passed. - meta: If
current_round > 1
, the metadata given to the aggregated model is passed. Otherwise it is{}
.
Return Value:
- result: Tuple[Any, dict] Return model and
dict
.
class Serializer
This class describes the method of persistence and restoration when sending and receiving models.
Code Example:
import torch
class PytorchSerializer:
def serialize(self, model, output_path):
torch.save(model, output_path)
return output_path
def deserialize(self, input_path, model_meta={}):
with open(input_path, 'rb') as f:
return torch.load(f.read(), map_location=torch.device('cpu'))
serialize
(abstract method)
Describes how to persist the model.
arguments:
- model: model
- output_path: int: Destination path specified by the framework where to save the file.
Return Value:
- result: str return destination path
deserialize
(abstract method)
Describes how to restore a persisted model.
arguments:
- model: model
- output_path: int: destination path, specified by framework where to save the data
Return Value:
- result Return the model.
class RemoteStore
A model store that uses the Federated Learning Web API as its backend.
It functions like a wrapper for the Federated Learning Web API (specifying model_store_ddc
only at initialization).
For details, please refer to List of Federated Learning APIs.
Code Example:
from xdata_fl.client import Api
from xdata_agg.fw._abc import RemoteStore
class MyStore(RemoteStore):
SERIALIZER = PytorchSerializer
model_api = Api(endpoint="", apikey="", apisecret="")
model_store_ddc = "ddc:my_first_model_store"
# If RemoteStore is specified, model_store_ddc can be omitted
with MyStore(model_api, model_store_ddc=model_store_ddc) as store:
model_info = store.put_model(model)
model = store.load_model(model_info["model_id"])
Class Variable
- SERIALIZER: Specify the
Serializer
class (called whenload_model
orput_model
is used)
__init__
arguments:
- api:
xdata_fl.client.Api
: specify backend - model_store_ddc: str: specify backend model store