Skip to content

xData Federated Learning

This document has been machine translated.

xData Federated Learning is a federated learning platform capability provided by xData.

About Federated Learning

Federated learning is a method of machine learning in which data is distributed without centralized control, and accuracy is increased by aggregating distributed models.

Federated learning has the following advantages

  • Protection of data that you do not want to share, such as between organizations or devices
  • Improved processing performance through distributed learning

Coalitional learning has been attracting attention in recent years mainly as a method that allows learning while taking privacy into consideration.

Federated Learning with xData Federated Learning

xData Federated Learning provides the data protection required for federated learning by integrating federated learning functionality on the xData platform.

  • The xData platform's Web API provides an API Key/Secret authentication mechanism to ensure that the information you provide is not available to other users.
  • The xData platform's MMS Filter plugin mechanism allows the user-prepared information to be processed and stored in the database (e.g., by blurring input image data or performing other anonymization processes). This allows users to anonymize information as they prepare it prior to publication or distribution.

In addition, xData Edge, the distributed development environment of the xData platform, can be used for federated learning to enable efficient distributed learning.

Reference

xData Federated Learning combines the following components to create a federated learning platform.

  • xData Aggregation FW: A federated learning framework for xData that runs in the Python runtime environment, using the Federated Learning Web API as the model sharing space between the Aggregator and Party, connecting the federated learning formalism with the code of machine learning It connects the code of engineers and aggregation algorithm developers with the formal processes of federated learning.
  • Federated Learning Web API: Provides model store operations and queries, as well as model management and transfer functions such as model upload and download.

After familiarizing yourself with this page, please refer to the following references for more information.

xData Aggregation FW

xData Aggregation FW executes and manages federated learning with the following workflow.

Figure 1. Workflow

Workflow

Model Store

A storage area that exists on the Edge operated by Federated Learning Web API.

Remote Model Store: A storage area that exists on the Edge operated by Federated Learning Web API. See Federated Learning Web API for details. Local Model Store: A local storage area as seen from the execution environment. This is a temporary storage area used by the framework when downloading models. It is used internally by the framework for caching, etc., and users do not need to be particularly aware of it.

Trainer, Party and Aggregator

The coalition learning in xData handles distributed learning consisting of N Parties (Trainers) and an Aggregator.

The roles of the Trainer, Party and Aggregator are as follows.

Trainer: A program deployed on the execution environment (local environment or remote training on Edge) that performs training and generates trained models. Party: sends and receives models, such as receiving learned models from the Trainer and linking them to the Aggregator, or receiving models from the Aggregator and linking them to the Trainer (the Edge plays this role). Depending on the context, Party may include the meaning of Trainer Aggregator: Aggregates multiple trained models federated from the Party group to create a single trained model (Edge with Aggregator functionality plays this role).

Round

Figure 1. the numbers in the middle perform the following processes

  1. train: train the model in any execution environment (local environment or remote training on Edge)
  2. feedback: feeds back the model (local parameters) trained in 1. to the Aggregator
  3. aggregate: Aggregator aggregates the models fed back in 2.
  4. transfer: Party requests the models aggregated in step 3 to the Aggregator and uses them (global parameters) in the next round

In coalition learning, the above cycle is considered as one round, and the accuracy of the model is improved by repeating the rounds.

Communication

Models are shared between Party and Aggregator, and a tag called Model State is stored with the model on the Model Store when shared.

Model state can be used to implement any communication, such as querying or triggering events.

See Model Transfer API and Model State for details.

In addition, model sending/receiving between server and client is done by http request, and currently only pull type (general request where the client requests the server to process) is supported.

The Content-Type used when sending and receiving models is as follows

  • When a client sends a model to the server, it is sent as multipart/form-data.
  • When a client receives a model from the server, it receives it as application/octet-stream.

serialize/deserialize

Define the data format when storing to the model store or how it is loaded into memory.

When transferring data via xData Aggregation FW or Federated Learning Web API, Data is transferred as just binary data and no data format is specified.

It is the user's responsibility to define in serialize/deserialize what data format to manage.

train

Performs training and generates a learned model (local parameters).

aggregate

A set of learned models is aggregated with an arbitrary aggregation algorithm to generate an aggregated model (global parameters).

Execution Environment

A Python runtime environment in which the Aggregator and Trainer run.

The execution environment kicks off processing on the local, and the Aggregator or Trainer may run on the Edge (in this case, it is called Remote Trainer, Remote Aggregator, etc. to distinguish it).

Federated Learning Web API

Federated Learning Web API provides functions related to model management and transfer, such as model store operations and model upload/download, and federated learning management, as a WEB API.

This API is a Low-Level API, and the use of xData Aggregation FW hides the API inside the framework, so there may be few opportunities to use the API directly.

Execution Procedure

The Federated Learning API uses JSON-RPC v2.0.

To execute the FL API using JSON-RPC, POST the request data (string) described in JSON format to the URL of the web service endpoint provided by EvWH.

However, for optimization of file sending/receiving, some APIs related to file sending/receiving will return multipart/form-data on sending or application/octet-stream on receiving, instead of JSON-RPC.

Please refer to the API Reference for the target APIs.

API Key

An API key and private key are required to use this API. Please apply to your system administrator to obtain them.

They must be sent in the HTTP header section when sending an HTTP request with the following header names respectively.

  • API key: APIKey
  • Private key: Secret

Endpoints

The endpoint for this API is https://pf.xdata.nict.jp/api/v1/fl/jsonrpc.

Model Store

We provide a function called model store as a space to store and share models.

A model store is created with an arbitrary identifier (DDC), and multiple models can be stored in one model store.

  • At this time, there is a size limit of up to 255 MB for models that can be stored.

Model / Model Information

A model is an arbitrary file. Models are assigned an id, called model_id, which is used to identify the model.

The one-to-one metadata (update date, model state, etc.) that the system assigns to a model is called model information. Model information can also contain user-specific or federated learning algorithm-specific metadata, which is referred to simply as metadata.

Model Transfer API and Model State

In the Federated Learning Web API, each procedure in a round is performed by the following API, which assigns a tag called model state (entity is a numeric type) to the model at the time of processing execution, and then stores it in the model store.

Model Transfer API Supported Procedures Processing Overview Model State
put_model train Party (Trainer) registers trained models in its own model store. 10: trained
feedback feedback Party registers trained models in the model store of the
Upstream (Aggregator)
20: feedbacked
put_aggregated_model aggregate Aggregator registers models after aggregation in its own model store. 30: aggregated
request_transfer transfer Party obtains the post-aggregation model from the
upstream (Aggregator) and registers the model in its own model store.
40: transferred

Upstream

An upstream Aggregator is called an upstream.

The upstream can be changed in the configuration information.

xData plans to support hierarchical configurations in which an Aggregator acts like a Party and works with further upstream Aggregators.

Configuration Information

The Federated Learning Web API references configuration information.

Each configuration value has the following effect.

Key Type Initial Value Effect
TENANT_ID str(uuid4 automatic creation It is used to track where the model is linked from. Do not change this value.
FEEDBACK_ENDPOINT str(URL empty string Determines the upstream for 'feedback' 'request_transfer'. If empty, registers the model in its own model store
FEEDBACK_APIKEY str empty string Used for upstream authentication if FEEDBACK_ENDPOINT is not empty
FEEDBACK_SECRET str empty string Used for upstream authentication if FEEDBACK_ENDPOINT is not empty
FEEDBACK_MODEL_STORE_DDC str(DDC empty string Used to identify the upstream model store if FEEDBACK_ENDPOINT is not empty

See also this reference for configuration information.

Tutorial

Here is how xData Federated Learning works.

Preparing the Model Store

Using the Federated Learning Web API, let's prepare the model store and save the model.

Specify upstream

First, set the following items in the configuration information

  • FEEDBACK_ENDPOINT: set to an empty string.
  • FEEDBACK_APIKEY: specify an empty string.
  • FEEDBACK_SECRET: specify an empty string.
  • FEEDBACK_MODEL_STORE_DDC: specify an empty string.

If you specify FEEDBACK_ENDPOINT, you can change the upstream, but we recommend that you first verify the operation with your own Edge configuration.

Note that changing the configuration information will affect any federated learning that is running.

Creating a Model Store

To store a model, you must first create a model store.

First, create a configuration file.

Run the following shell to create fl_api.json and enter endpoint apikey secret.

cat << EOF > fl_api.json
{
  "version": "0.1",
  "name": "tutorial_model",
  "memo": "this is tutorial.",
  "store": {
    "api": {
      "endpoint": "YOUR_ENDPOINT",
      "apikey": "YOUR_API_KEY",
      "secret": "YOUR_SECRET"
    },
    "model_store_ddc": "ddc:tutorial_model_store"
  },
  "state": {
    "next_round": 1
  }
}
EOF

Create the following code and save it as fl_api_tutorial.py.

import json
from xdata_fl.client import Api

def create_store(config):
    credential = config["store"]["api"]
    model_store_ddc = config["store"]["model_store_ddc"]
    api = Api(**credential)

    result = api.initialize_model_store(model_store_ddc)
    print(result)
    return result

if __name__ == "__main__":
    with open("fl_api.json", "r") as f:
        config = json.load(f)
    create_store(config)

Run python3 fl_api_tutorial.py. You may skip this step if it has already been created.

Model saving and loading

Modify the fl_api_tutorial.py from earlier as follows


# Add function
def run_tutorial(config):
    credential = config["store"]["api"]
    model_store_ddc = config["store"]["model_store_ddc"]
    api = Api(**credential)

    # Create a model to upload
    with open("tutorial_input_model.txt", "wb") as f:
        f.write(b"xxx")

    # Upload file with path
    model_info = api.put_model("tutorial_input_model.txt")

    # Display information about registered files
    print(model_info)

    # Load model by specifying model id
    stream = api.load_model(model_info["id"])

    # Save the loaded model
    with open("tutorial_output_model.txt", "wb) as f:
        for s in stream:
            f.write(s)

    # Make sure input and output are the same
    with open("tutorial_output_model.txt", "rb) as f:
        assert b"xxx" == f.read()

# Fix to run run_tutorial instead of create_store
if __name__ == "__main__":
    with open("fl_api.json", "r") as f:
        config = json.load(f)
    # create_store(config)
    run_tutorial(config)

Run python3 fl_api_tutorial.py.

If the process completes without error, the creation of the model store and the verification of model saving and loading are complete.

Running Coalitional Learning (Standalone)

In this section, we will run coalition learning in the following configuration (stand-alone).

  • aggregator: itself
  • party1: itself
  • party2: itself

The fl_api.json file is the same as the previous configuration.

If you wish to distribute the learning across multiple units, you can distribute the same code to different execution environments and modify the necessary parts of fl_api.json to make it work in the same way.

Prepare the template code

Create fl_fw_tutorial.py and stick the following code in it.

# Import the required packages.
import json
from xdata_fl.client import Api
from xdata_agg.fw._abc import (
    Aggregator,
    RemoteStore,
    Trainer
)


# Implement the learning process.
# For the sake of simplicity here, let's assume the model is just a number and return `data_size` in the metadata.
class TutorialTrainer(Trainer):
    def train(self, parameters, current_round, meta):
        if parameters is None:
            parameters = 0

        parameters += 1
        return parameters, {"data_size": 2}


# Implement how to save the model (here just numbers).
class TutorialSerializer:
    def serialize(self, model, output_path):
        """
        Parameters
        ----------
        model :
            model object.
        output_name : str
            Destination specified(temporary file abs path) by the framework.
        """
        with open(output_path, "w") as f:
            json.dump(model, f)

        return output_path

    def deserialize(self, input_path, model_meta={}):
        """
        Parameters
        ----------
        input_path : str
            Source specified(serialized file) by the framework.
        """
        with open(input_path, "r") as f:
            model = json.load(f)

        return model

# Implement the aggregation algorithm.
class TutorialAggregator(Aggregator):
    TRAINER = TutorialTrainer
    SERIALIZER = TutorialSerializer

    def load_feedback_list(self):
        model_ids = self.agg.get_latest_feedbacked_models()

        if len(model_ids) == 0:
            raise Exception()

        for model_id in model_ids:
            model = self.agg.load_model(model_id)
            meta = self.agg.get_model_meta(model_id)
            yield model, meta

    def aggregate(self, current_round: int, feedback_list):
        aggregated_model = 0
        aggregated_meta = {"data_size": 0}

        for model, meta in feedback_list:
            aggregated_model += model
            aggregated_meta["data_size"] += meta["data_size"]

        return aggregated_model, aggregated_meta


# Define model store.
class TutorialStore(RemoteStore):
    SERIALIZER = TutorialSerializer


# Implement a function to direct the framework.
def federate(store:RemoteStore, current_round: int, as_aggregator=False):
    if as_aggregator:
        manager = TutorialAggregator(agg=store)

        # Execute a series of steps to retrieve and aggregate a set of models and transfer the aggregated models to the store
        model_info = manager.run_aggregate(current_round=current_round)
    else:
        manager = TutorialAggregator(agg=None, parties=[store]) # Multiple parties can be specified
        trainer = manager.get_trainer(0)  # In this example, only the first party will be run by itself

        # Get the learned model (when round > 1), perform additional learning based on it, # and then transfer the learned model to the store,
        # execute the sequence of transferring the learned model to the store
        model_info = trainer.federate(current_round=current_round)

    print(model_info)
    print(f"model: {store.load_model(model_info["id"])}")
    print(f"meta: {model_info["model_meta"]}")


# Read configuration information
def configure(config_path):
    with open(config_path, "r") as f:
        config = json.load(f)

    credential = config["store"]["api"]
    model_store_ddc = config["store"]["model_store_ddc"]
    next_round=config["state"]["next_round"]
    api = Api(**credential)
    return config, api, model_store_ddc, next_round

Create fl_fw_tutorial_federate.py and paste the following code

from fl_fw_tutorial import configure, TutorialStore

if __name__ == "__main__":
    config, api, model_store_ddc, next_round = configure("fl_api.json")
    with TutorialStore(api, model_store_ddc) as store:
        federate(
            store,
            current_round=next_round,
            as_aggregator=False # Run as a Party
        )

Create fl_fw_tutorial_aggregate.py and paste the following code

from fl_fw_tutorial import configure, TutorialStore

if __name__ == "__main__":
    config, api, model_store_ddc, next_round = configure("fl_api.json")
    with TutorialStore(api, model_store_ddc) as store:
        federate(
            store,
            current_round=next_round,
            as_aggregator=True # Run as Aggregator
        )

Create fl_fw_tutorial_delete_all_model.py and paste the following code.

from fl_fw_tutorial import configure, TutorialStore

if __name__ == "__main__":
    config, api, model_store_ddc, next_round = configure("fl_api.json")
    with TutorialStore(api, model_store_ddc) as store:
        store.delete_all_model()

Clean up the model store

First, clean up the model store, since the models registered in fl_api_tutorial.py are not needed (even without cleaning up, the coalition training results will be the same since feedback is not running).

python3 fl_fw_tutorial_delete_all_model.py

Execution (round 1)

Execute the coalition learning as party1 (the first execution is considered as party1).

python3 fl_fw_tutorial_federate.py

Execute the coalition learning as party2 (the second execution is considered as party2).

python3 fl_fw_tutorial_federate.py

erforms coalition learning as an aggregator.

python3 fl_fw_tutorial_aggregate.py

This completes the first round. After executing the above script, you will get the following output.

model: 2
meta: {"data_size": 4}

Run (round 2 and beyond)

Increase next_round in fl_api.json by 1 (if round is 1, aggregated models are not linked).

{
  ...
  "state": {
    "next_round": 2
  }
}

Run the script following the same steps as round 1 (no cleanup required).

This completes the second round. After running fl_fw_tutorial_aggregate.py, you will get the following output

model: 6
meta: {"data_size": 4}

This concludes the tutorial on coalition learning.

API List