Skip to content

xData Aggregation FW: API List

This document has been machine translated.

This document describes xData Aggregation FW.

class Manager

This class manages the configuration of Aggregator and Party.

Code Example:

Declare as follows

class TutorialManager(Manager):
    AGGREGATOR = TutorialAggregator
    TRAINER = TutorialTrainer

Class Variable

  • AGGREGATOR: Specifies the Aggregator class.
  • TRAINER: Specifies the Trainer class.

__init__

Argument:

  • agg: RemoteStore | None: Specify the store to be used by the Aggregator.
  • parties: list[RemoteStore]: Specify the store used by Party group.

federate

The following sequence of operations is performed.

  1. run federate in N parties
  2. Execute federate in Aggregator

arguments:

  • current_round: int: pass the round to execute now.

return:

  • result: undefined

code example:

manager = TutorialManager(agg=store, parties=[store, store])
current_round = 1

manager.federate(current_round=current_round)

The above code example is roughly equivalent to the following code.

manager = TutorialManager(agg=store, parties=[store, store])
current_round = 1

aggregator = manager.get_aggregator()

for i in range(len(manager.parties)):
  trainer = manager.get_trainer(i)
  trainer.federate(current_round=current_round)

aggregator.federate(current_round=current_round)

get_aggregator

Get an Aggregator instance.

Return Value:

  • result: Aggregator

get_trainer

Get an Trainer instance.

arguments:

  • index: int: index of the store specified at initialization

Return Value:

  • result: Party

class Aggregator

As an Aggregator, this class executes a series of processes for coalition learning.

Code Example:

# Implement the aggregation algorithm.
class TutorialAggregator(Aggregator):
    TRAINER = TutorialTrainer
    SERIALIZER = TutorialSerializer

    def load_feedback_list(self):
        model_ids = self.agg.get_latest_feedbacked_models()

        if len(model_ids) == 0:
            raise Exception()

        for model_id in model_ids:
            model = self.agg.load_model(model_id)
            meta = self.agg.get_model_meta(model_id)
            yield model, meta

    def aggregate(self, current_round: int, feedback_list):
        aggregated_model = 0
        aggregated_meta = {"data_size": 0}

        for model, meta in feedback_list:
            aggregated_model += model
            aggregated_meta["data_size"] += meta["data_size"]

        return aggregated_model, aggregated_meta

federate

A series of processes are executed in the following flow.

  1. load_feedback_list: Retrieve a set of models and their metadata
  2. get_latest_feedbacked_models: Get the set of models that have been fed back since the last aggregation. Called in load_feedback_list.
  3. deserialize: Restore the given model to memory (called when calling load_model in load_feedback_list)
  4. aggregate: receives the results of load_feedback_list and performs the aggregation process
  5. serialize: Serialize the model returned by aggregate.
  6. put_aggregated_model: Store models and metadata serialized with serialize in the model store as their own aggregated models

arguments:

  • current_round: int: Pass round to be executed from now on.

Return Value:

  • result: dict returns model_info returned by `put_aggregated_model

load_feedback_list (abstract method)

Describe the process for retrieving a set of models that have been fed back. A default implementation is provided.

Return Value:

  • result: Iterable Return any Iterable. The default is to enumerate Tuple of model body and model metadata.

aggregate (abstract method)

Describe the aggregation process.

arguments:

  • parameters: If current_round > 1, the aggregated model is passed. Otherwise, None.
  • current_round: int: The current round is passed.
  • meta: If current_round > 1, the metadata given to the aggregated model is passed. Otherwise it is {}.

Return Value:

  • result: Tuple[Any, dict] Return model and dict.

class Trainer

This class executes a series of processes of coalition learning as a Party.

Code Example:

class TutorialTrainer(Trainer):
    def train(self, parameters, current_round, meta):
        if parameters is None:
            parameters = 0

        parameters += 1
        return parameters, {"data_size": 2}

federate

A series of processes are executed in the following flow.

  1. request_transfer: If current_round > 1, requests the latest aggregated model from upstream and stores it in its own model store as the transferred model 2.
  2. deserialize: if current_round > 1, restore the given model in memory.
  3. train: performs the implemented training process. If current_round > 1, the model obtained by request_transfer is passed. 4.
  4. serialize: serialize the model returned by train.
  5. put_model: register the model and metadata serialized by serialize in its own store
  6. feedback: Store models saved with put_model in the upstream model store as fed back models

arguments:

  • current_round: int: Pass round to be executed from now on.

Return Value:

  • result: dict returns model_info returned from upstream in feedback

train (abstract method)

Describe the training process.

arguments:

  • parameters: If current_round > 1, the aggregated model is passed. Otherwise, None.
  • current_round: int: The current round is passed.
  • meta: If current_round > 1, the metadata given to the aggregated model is passed. Otherwise it is {}.

Return Value:

  • result: Tuple[Any, dict] Return model and dict.

class Serializer

This class describes the method of persistence and restoration when sending and receiving models.

Code Example:

import torch

class PytorchSerializer:
    def serialize(self, model, output_path):
        torch.save(model, output_path)
        return output_path

    def deserialize(self, input_path, model_meta={}):
        with open(input_path, 'rb') as f:
            return torch.load(f.read(), map_location=torch.device('cpu'))

serialize (abstract method)

Describes how to persist the model.

arguments:

  • model: model
  • output_path: int: Destination path specified by the framework where to save the file.

Return Value:

  • result: str return destination path

deserialize (abstract method)

Describes how to restore a persisted model.

arguments:

  • model: model
  • output_path: int: destination path, specified by framework where to save the data

Return Value:

  • result Return the model.

class RemoteStore

A model store that uses the Federated Learning Web API as its backend. It functions like a wrapper for the Federated Learning Web API (specifying model_store_ddc only at initialization).

For details, please refer to List of Federated Learning APIs.

Code Example:

from xdata_fl.client import Api
from xdata_agg.fw._abc import RemoteStore

class MyStore(RemoteStore):
    SERIALIZER = PytorchSerializer

model_api = Api(endpoint="", apikey="", apisecret="")
model_store_ddc = "ddc:my_first_model_store"

# If RemoteStore is specified, model_store_ddc can be omitted
with MyStore(model_api, model_store_ddc=model_store_ddc) as store:
    model_info = store.put_model(model)
    model = store.load_model(model_info["model_id"])

Class Variable

  • SERIALIZER: Specify the Serializer class (called when load_model or put_model is used)

__init__

arguments:

  • api: xdata_fl.client.Api: specify backend
  • model_store_ddc: str: specify backend model store