Skip to content

xData Aggregation FW

Note

This document has been machine translated.

This document describes xData Aggregation FW.

class Manager

This class manages Aggregator and Party configurations.

**Example code: **

Declare as follows

class TutorialManager(Manager):
    AGGREGATOR = TutorialAggregator
    TRAINER = TutorialTrainer

Class Variables.

  • AGGREGATOR: specifies the Aggregator class.
  • TRAINER: specifies the Trainer class.

__init__

Arguments:

  • agg: RemoteStore | None: Specify the store to be used by the Aggregator. parties: list[RemoteStore]: Specifies the store used by the Party group.

federate

The following sequence of operations is performed. 1.

  1. run federate in N parties Execute federate in Aggregator

arguments:

current_round: int: pass the round to execute now.

return:

result: undefined

code example:

manager = TutorialManager(agg=store, parties=[store, store])
current_round = 1

manager.federate(current_round=current_round)

The above code example is roughly equivalent to the following code

manager = TutorialManager(agg=store, parties=[store, store])
current_round = 1

aggregator = manager.get_aggregator()

for i in range(len(manager.parties)):
  trainer = manager.get_trainer(i)
  trainer.federate(current_round=current_round)

aggregator.federate(current_round=current_round)

get_aggregator

Get an Aggregator instance.

return value:

  • result: Aggregator.

get_trainer`.

Get the Trainer instance.

arguments: * index: int: index of the store specified at initialization

return value:

  • result: Party

class Aggregator

This class executes a series of processes of coalition learning as an Aggregator.

Code Examples:

# Implement the aggregation algorithm.
class TutorialAggregator(Aggregator):
    TRAINER = TutorialTrainer
    SERIALIZER = TutorialSerializer

    def load_feedback_list(self):
        model_ids = self.agg.get_latest_feedbacked_models()

        if len(model_ids) == 0:
            raise Exception()

        for model_id in model_ids:
            model = self.agg.load_model(model_id)
            meta = self.agg.get_model_meta(model_id)
            yield model, meta

    def aggregate(self, current_round: int, feedback_list):
        aggregated_model = 0
        aggregated_meta = {"data_size": 0}

        for model, meta in feedback_list:
            aggregated_model += model
            aggregated_meta["data_size"] += meta["data_size"]

        return aggregated_model, aggregated_meta

federate

The following sequence of steps is performed. 1.

  1. load_feedback_list: retrieve a set of models and their metadata 2.get_latest_feedbacked_models: Get models that have been fed back since the last aggregation. Called in load_feedback_list. 3. 3.deserialize: restore the given models in memory (called on load_model call in load_feedback_list) 4.aggregate: take the result of load_feedback_list and perform aggregation 5.serialize: serialize the model returned by `aggregate
  2. put_aggregated_model: Store the model and metadata serialized by serialize in the model store as its own aggregated model

arguments:

current_round: int: pass the round to run now.

return value:

result: dict return model_info returned by `put_aggregated_model

load_feedback_list (abstract method)

Describe the process of retrieving a set of models that have been fed back. A default implementation is provided.

result:

result: Iterable Return any Iterable. The default is to enumerate Tuple of model body and model metadata.

aggregate (abstract method)

Describe the aggregation process.

arguments:

  • parameters: If current_round > 1, the aggregated model is passed. Otherwise, None.
  • current_round: int: The current round is passed.
  • meta: If current_round > 1, the metadata given to the aggregated model is passed. Otherwise it is {}.

return:

result: Tuple[Any, dict] Return model and dict.

class Trainer

This class executes a series of processes of coalition learning as a Party.

Code Example:

class TutorialTrainer(Trainer):
    def train(self, parameters, current_round, meta):
        if parameters is None:
            parameters = 0

        parameters += 1
        return parameters, {"data_size": 2}

federate

The following sequence of steps is performed. 1.

  1. request_transfer: When current_round > 1, request the latest aggregated model from the upstream and store its model store as the transferred model 2. **deserialize 2.**deserialize:** restore the given model in memory ifcurrent_round > 1. 3.**train:** performs the implemented training process. Ifcurrent_round > 1, the model obtained byrequest_transferis passed. 4. 4.**serialize:** serialize the model returned bytrain5. 5.**[put_model](fl_api_methods.md#flput_model):** register the model and metadata serialized byserialize` in its own store
  2. feedback: Store models saved with put_model in the upstream model store as fed back models

arguments:

current_round: int: pass the round to execute now.

return value:

  • result: dict return model_info returned from upstream in `feedback

train (abstract method)

Describe the training process.

arguments:

  • parameters: If current_round > 1, the aggregated model is passed. Otherwise, None.
  • current_round: int: The current round is passed.
  • meta: If current_round > 1, the metadata given to the aggregated model is passed. Otherwise, {}.

return:

  • result: Tuple[Any, dict] Return the model and dict.

class Serializer

This class describes the method of persistence and restoration when sending and receiving models.

Code Example:

import torch

class PytorchSerializer:
    def serialize(self, model, output_path):
        torch.save(model, output_path)
        return output_path

    def deserialize(self, input_path, model_meta={}):
        with open(input_path, 'rb') as f:
            return torch.load(f.read(), map_location=torch.device('cpu'))

serialize (abstract method)

Describes how to persist the model.

arguments:

  • model: model
  • output_path: int: destination path where the framework specifies where to save the model

return value:

  • result: str return destination path

deserialize (abstract method)

Describes how to restore a persisted model.

arguments:

  • model: model
  • output_path: int: destination path, specified by framework where to save the file

return_value:

result return the model.

class RemoteStore

A model store with Federated Learning Web API as its backend. It functions like a wrapper around the Federated Learning Web API (specifying model_store_ddc only at initialization time).

For more information, see Federated Learning API List for details.

Code example:

from xdata_fl.client import Api
from xdata_agg.fw._abc import RemoteStore

class MyStore(RemoteStore):
    SERIALIZER = PytorchSerializer

model_api = Api(endpoint="", apikey="", apisecret="")
model_store_ddc = "ddc:my_first_model_store"

# If RemoteStore is specified, model_store_ddc can be omitted
with MyStore(model_api, model_store_ddc=model_store_ddc) as store:
    model_info = store.put_model(model)
    model = store.load_model(model_info["model_id"])

class variable.

  • SERIALIZER: Specifies the Serializer class (called on load_model or put_model).

__init__.

arguments:

  • api: xdata_fl.client.Api: specify backend
  • model_store_ddc: str: specify backend model store