Skip to content

Federated Learning API: API List

Note

This document has been machine translated.

This document describes the Federated Learning API.

The execution method uses JSON-RPC v2.0 except for some APIs.

The python runtime environment can also use the API Client for easier handling of the Federated Learning API.

JSON-RPC v2.0

Request Example:

When using JSON-RPC v2.0, create a request like the following, referring to the specification of each API.

  • method: str: Specify the API to call
  • params: dict: Specify the API parameters. If parameters are not needed, they can be omitted.
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "fl.initialize_model_store",
  "params": {
    "model_store_ddc": "ddc:my_first_model_store"
  }
}

Response Example:

If the request is successful, the following response is returned

  • result: Response to a successful request
{"jsonrpc": "2.0", "result": {"id": "xxx", ...}, "id": 1}

If the request fails on the server side, the following response is returned

  • error: Error information of the failed request
{"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": "1"}

API Client (for Python) *Request Example:

Example request:

When using the API Client, create a request like the following, referring to the specifications of each API.

from xdata_fl.client import Api

api = Api()
result = api.initialize_model_store(model_store_ddc="ddc:my_first_model_store")

# {"id": "xx", ...}

About Logging.

When using the API Client, you can output API execution information to any logger.

The logging levels are as follows.

  • INFO: Heavy operations that result in model transfers
  • DEBUG: other

An example of logger configuration is as follows.

import logging
from xdata_fl.client import Api, set_logger

# Configure logger
handler = logging.StreamHandler()
formatter = logging.Formatter(
  " ".join(
    "%(asctime)s",
    "%(levelname)s"
    "%(filename)s",
    "%(funcName)s",
    "%(lineno)s",
    "%(message)s"
  ),
   datefmt="%Y/%m/%d %H:%M:%S"
)
handler.setFormatter(formatter)

logger = logging.getLogger("your_logger")
logger.setLevel(logging.INFO)
logger.addHandler(handler)


# Integrate logger
set_logger(logger)
  • The default logging level in Python is WARNING, so please configure the logger at the INFO level.
  • Time at output is not output, so please use logging.Formatter to adjust it.

For detailed logging usage, please refer to the official documentation.

fl.get_server_version

Returns the server version of the Federated Learning API.

response:

result: str: Server version of Federated Learning API

example request:

result = api.get_server_version()

fl.initialize_model_store

Creates a model store.

parameters:

model_store_ddc: str: Name of the model store to create (DDC)

response:

result: dict: Information about the created model store (ddc_info)

request example:

result = api.initialize_model_store("ddc:my_first_model_store")

fl.exists_store

Checks for the existence of a model store.

parameters:

model_store_ddc: str: Name of the model store to check (DDC)

response:

result: bool: Whether model store exists or not

request example:

result = api.exists_store("ddc:my_first_model_store")

fl.delete_store

Deletes a model store.

parameters:

  • model_store_ddc: str: name of model store to delete (DDC) raise_if_no_exists: bool = True: if True, raises an exception if the specified model store does not exist

response:

  • result: int: 1 if the model store was deleted, 0 otherwise (if raise_if_no_exists = False)

request example:

result = api.delete_store("ddc:my_first_model_store")

fl.put_model

Stores models in the specified model store. Typically, this is done in Party.

Note:

This method sends the request as multipart/form-data, not JSON-RPC v2.0.

Also, the endpoint is . /jsonrpc/upload/fl.put_model.

parameters:

  • model_store_ddc: str: Model store name (DDC) to delete.
  • file: Union[str, io.BufferedReader]: file path or file object
  • model_kind: str: Information such as extension for the model (currently unused)
  • model_description: str: Arbitrary description to give to the model
  • model_meta: dict = {}: arbitrary metadata for the model
  • model_state: int = FlEvensts.CREATED: (basically, do not specify) model state in coalition learning

response:

result: dict: return `model_info

request example:

# To pass a file path
result = api.put_model(
  "ddc:my_first_model_store",
  "./my_models/model_1",
  model_description="test model",
  model_meta={"additonal": "additional data"}
)

# When passing a file object
with Open("./my_models/model_1", "rb) as f:
  result = api.put_model(
    "ddc:my_first_model_store",
    f,
    model_description="test model",
    model_meta={"additonal": "additional data"}
  )

fl.put_aggregated_model

Stores models in the specified model store. Usually performed in Aggregator.

Note:

This method sends the request as multipart/form-data, not JSON-RPC v2.0.

Also, the endpoint is . /jsonrpc/upload/fl.put_aggregated_model.

parameters:

  • model_store_ddc: str: Model store name to delete (DDC)
  • file: Union[str, io.BufferedReader]: file path or file object
  • model_kind: str: Information such as extension for the model (currently unused)
  • model_description: str: Arbitrary description to give to the model
  • model_meta: dict = {}: arbitrary metadata for the model
  • model_state: int = FlEvensts.AGGREGATED: (basically don't specify) the state of the model in coalition learning

response:

result: dict: return `model_info

Request Example:

# To pass a file path
result = api.put_aggregated_model(
  "ddc:my_first_model_store",
  "./my_models/aggregated_model_1",
  model_description="test model",
  model_meta={"additonal": "additional data"}
)

# When passing a file object
with Open("./my_models/aggregated_model_1", "rb) as f:
  result = api.put_aggregated_model(
    "ddc:my_first_model_store",
    f,
    model_description="test model",
    model_meta={"additonal": "additional data"}
  )

fl.feedback

Feedback the specified model in the specified model store to the upstream model store. The upstream model store is determined by configuration information.

Models retrieved inside the server are forwarded to the upstream.

parameters:

  • model_store_ddc: str: model store name (DDC)
  • model_id: str: model id

response:

  • result: dict: return model_info returned from upstream

Request Example:

result = api.feedback("ddc:my_first_model_store", "model_id")

fl.load_model

Downloads the specified model from the specified model store.

Note:

This method is JSON-RPC v2.0 for the request, but the response does not follow JSON JSON-RPC v2.0.

Also, the endpoint is . /jsonrpc/download/fl.load_model.

parameters:

  • model_store_ddc: str: model store name (DDC)
  • model_id: str: model id

response:

result: Iterable[bytes]: Model body

request example:

_result = api.load_model("ddc:my_first_model_store", "model_id")
result = b"".join(_result)

fl.request_transfer

Retrieve the latest aggregated model from the upstream model store and store it in the specified model store.

The upstream model store is determined by configuration information.

parameters:

model_store_ddc: str: model store name (DDC)

response:

result: dict: return `model_info

request example:

_result = api.request_transfer("ddc:my_first_model_store")

fl.proxy_request_transfer_info

Retrieve the latest aggregated model information from the upstream model store.

The upstream model store is determined by configuration information.

parameters:

model_store_ddc: str: Model store name (DDC) to refer to if upstream is itself

response:

  • result: dict: return `model_info

request example:

result = api.proxy_request_transfer_info("ddc:my_first_model_store")

fl.get_model_info

Retrieves information on the specified model in the specified model store. The data of the model itself is not included.

parameters:

  • model_store_ddc: str: model store name (DDC)
  • model_id: str: model id

response:

  • result: dict: return `model_info

request example:

result = api.get_model_info("ddc:my_first_model_store", "model_id")

fl.exists_model

Checks for the existence of the specified model in the specified model store.

parameters:

  • model_store_ddc: str: model store name (DDC)
  • model_id: str: model id

response:

  • result: bool: True if model exists

request example:

result = api.exists_model("ddc:my_first_model_store", "model_id")

fl.delete_model

Deletes the specified model in the specified model store.

parameters:

  • model_store_ddc: str: model store name (DDC)
  • model_id: str: model id to be deleted

response:

  • result: int: 1 if the model was deleted, 0 otherwise (if raise_if_no_exists = false)

request example:

result = api.delete_model("ddc:my_first_model_store", "model_id")

fl.delete_all_model

Deletes all models in the specified model store.

parameters:

model_store_ddc: str: model store name (DDC)

response:

result: int: Number of models deleted

request example:

result = api.delete_all_model("ddc:my_first_model_store")

fl.get_models

Get all model information for a given model store.

parameters:

model_store_ddc: str: model store name (DDC)

response:

result: list[dict]: list of `model_info

example request:

result = api.get_models("ddc:my_first_model_store")

fl.get_model_info_latest

Get the latest model information for a given model store.

parameters:

model_store_ddc: str: model store name (DDC)

response:

result: dict: `model_info

request example:

result = api.get_model_info_latest("ddc:my_first_model_store")

fl.get_latest_feedbacked_models

Get a list of the latest feedbacked model information from the specified model store.

parameters:

  • model_store_ddc: str: Model store name (DDC)
  • round: int: Target round (not implemented)

response:

result: list[dict]: list of `model_info

request example:

result = api.get_latest_feedbacked_models("ddc:my_first_model_store")

fl.get_latest_aggregated_model

Get the latest aggregated model information from the specified model store.

parameters:

model_store_ddc: str: model store name (DDC)

response:

result: dict: `model_info

request example:

result = api.get_latest_aggregated_model("ddc:my_first_model_store")

fl.get_latest_transferred_model

Get the latest transferred model information from the specified model store.

parameters:

model_store_ddc: str: model store name (DDC)

response:

result: dict: `model_info

request example:

result = api.get_latest_transferred_model("ddc:my_first_model_store")

fl.get_parties

Get parties from upstream.

response:

result: list[dict]: Returns a list of `party_info

example request:

result = api.get_parties()

Response Example:

[
  {
    "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    "tenant_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    "tenant_name: "my tenant name",
    "description": "my description",
    "hop_count": 1,
    "created_at": datetime.datetime.now(tz=LOCAL_TIMEZONE),
    "updated_at": datetime.datetime.now(tz=LOCAL_TIMEZONE),
  }
]

fl.create_party

Registers a party in the upstream.

The calling Edge gets the TENANT_ID and TENANT_NAME from the configuration information and transfers them to the upstream.

parameters:

description: str = “” : arbitrary description for the party

response:

result: dict: party_info

example request:

result = api.create_party("my experiment 1")

fl.delete_party

Deletes a party from the upstream.

parameters:

id: str: id to identify the party

response:

result: int: returns the number of deleted

request example:

result = api.delete_party("party_id")