Skip to content

Federated Learning API: API 一覧

当ドキュメントでは、Federated Learning APIについて記述しています。

実行方式は一部APIを除き JSON-RPC v2.0 を利用します。

pythonの実行環境では Federated Learning API をより簡単に扱うための API Client を使用することもできます。

JSON-RPC v2.0

リクエスト例:

JSON-RPC v2.0 を使用する場合は、各APIの仕様を参考に次のようなリクエストを作成してください。

  • method: str: 呼び出すAPIを指定します
  • params: dict: APIのパラメータを指定します。パラメータが不要な場合は、省略可能です。
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "fl.initialize_model_store",
  "params": {
    "model_store_ddc": "ddc:my_first_model_store"
  }
}

レスポンス例:

リクエストが成功した場合、次のようなレスポンスが返します。

  • result: 成功したリクエストのレスポンス
{"jsonrpc": "2.0", "result": {"id": "xxx", ...}, "id": 1}

リクエストがサーバ側で失敗した場合、次のようなレスポンスが返ります。

  • error: 失敗したリクエストのエラー情報
{"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": "1"}

API Client(Python用)

リクエスト例:

API Client を使用する場合は、各APIの仕様を参考に次のようなリクエストを作成してください。

from xdata_fl.client import Api

api = Api()
result = api.initialize_model_store(model_store_ddc="ddc:my_first_model_store")

# {"id": "xx", ...}

ロギングについて

API Client を使用する場合、任意のロガーにAPI の実行情報を出力できます。

ログレベルは次の通りです。

  • INFO: モデル転送が生じる重たい処理
  • DEBUG: その他

ロガーの構成例は次の通りです。

import logging
from xdata_fl.client import Api, set_logger

# Configure logger
handler = logging.StreamHandler()
formatter = logging.Formatter(
  " ".join(
    "%(asctime)s",
    "%(levelname)s"
    "%(filename)s",
    "%(funcName)s",
    "%(lineno)s",
    "%(message)s"
  ),
   datefmt="%Y/%m/%d %H:%M:%S"
)
handler.setFormatter(formatter)

logger = logging.getLogger("your_logger")
logger.setLevel(logging.INFO)
logger.addHandler(handler)


# Integrate logger
set_logger(logger)
  • PythonのデフォルトログレベルはWARNINGのため、INFOレベルでロガーを構成してください。
  • 出力時の時間は出力されないので、logging.Formatterで調整ください

詳しいロギングの使用方法は公式ドキュメントを参照ください。

fl.get_server_version

Federated Learning API のサーバーバージョンを返します。

レスポンス:

  • result: str: Federated Learning API のサーバーバージョン

リクエスト例:

result = api.get_server_version()

fl.initialize_model_store

モデルストアを作成します。

パラメータ:

  • model_store_ddc: str: 作成するモデルストア名(DDC)

レスポンス:

  • result: dict: 作成されたモデルストアに関する情報 (ddc_info)

リクエスト例:

result = api.initialize_model_store("ddc:my_first_model_store")

fl.exists_store

モデルストアの存在を確認します。

パラメータ:

  • model_store_ddc: str: 確認したいモデルストア名(DDC)

レスポンス:

  • result: bool: モデルストアが存在するか否か

リクエスト例:

result = api.exists_store("ddc:my_first_model_store")

fl.delete_store

モデルストアを削除します。

パラメータ:

  • model_store_ddc: str: 削除するモデルストア名(DDC)
  • raise_if_no_exists: bool = True: Trueの場合、指定したモデルストアが存在しない場合、例外を上げます

レスポンス:

  • result: int: モデルストアが削除された場合1、そうでなければ(raise_if_no_exists = Falseの場合)0

リクエスト例:

result = api.delete_store("ddc:my_first_model_store")

fl.put_model

指定したモデルストアにモデルを格納します。 通常、Party で実行します。

注意:

本メソッドは JSON-RPC v2.0 でなく、multipart/form-data としてリクエストを送信します。

また、エンドポイントは ../jsonrpc/upload/fl.put_model となります。

パラメータ:

  • model_store_ddc: str: 削除するモデルストア名(DDC)
  • file: Union[str, io.BufferedReader]: ファイルパス、または、ファイルオブジェクト
  • model_kind: str: モデルに対する拡張子等の情報(現在は未使用)
  • model_description: str: モデルに付与する任意の説明
  • model_meta: dict = {}: モデルに付与する任意のメタデータ
  • model_state: int = FlEvensts.CREATED: (基本的に指定しないでください)連合学習におけるモデルの状態

レスポンス:

  • result: dict: model_infoを返す

リクエスト例:

# ファイルパスを渡す場合
result = api.put_model(
  "ddc:my_first_model_store",
  "./my_models/model_1",
  model_description="test model",
  model_meta={"additonal": "additional data"}
)

# ファイルオブジェクトを渡す場合
with Open("./my_models/model_1", "rb) as f:
  result = api.put_model(
    "ddc:my_first_model_store",
    f,
    model_description="test model",
    model_meta={"additonal": "additional data"}
  )

fl.put_aggregated_model

指定したモデルストアにモデルを格納します。 通常、Aggregator で実行します。

注意:

本メソッドは JSON-RPC v2.0 でなく、multipart/form-data としてリクエストを送信します。

また、エンドポイントは ../jsonrpc/upload/fl.put_aggregated_model となります。

パラメータ:

  • model_store_ddc: str: 削除するモデルストア名(DDC)
  • file: Union[str, io.BufferedReader]: ファイルパス、または、ファイルオブジェクト
  • model_kind: str: モデルに対する拡張子等の情報(現在は未使用)
  • model_description: str: モデルに付与する任意の説明
  • model_meta: dict = {}: モデルに付与する任意のメタデータ
  • model_state: int = FlEvensts.AGGREGATED: (基本的に指定しないでください)連合学習におけるモデルの状態

レスポンス:

  • result: dict: model_infoを返す

リクエスト例:

# ファイルパスを渡す場合
result = api.put_aggregated_model(
  "ddc:my_first_model_store",
  "./my_models/aggregated_model_1",
  model_description="test model",
  model_meta={"additonal": "additional data"}
)

# ファイルオブジェクトを渡す場合
with Open("./my_models/aggregated_model_1", "rb) as f:
  result = api.put_aggregated_model(
    "ddc:my_first_model_store",
    f,
    model_description="test model",
    model_meta={"additonal": "additional data"}
  )

fl.feedback

指定したモデルストアの指定したモデルをアップストリームのモデルストアへ feedback します。 アップストリームのモデルストアは、構成情報によって定まります。

サーバ内部で取得されたモデルはアップストリームに転送されます。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)
  • model_id: str: モデルid

レスポンス:

  • result: dict: アップストリームから返されたmodel_infoを返す

リクエスト例:

result = api.feedback("ddc:my_first_model_store", "model_id")

fl.load_model

指定したモデルストアの指定したモデルをダウンロードします。

注意:

本メソッドはリクエストは JSON-RPC v2.0 ですが、レスポンスはJSON JSON-RPC v2.0 に準じていません。

また、エンドポイントは ../jsonrpc/download/fl.load_model となります。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)
  • model_id: str: モデルid

レスポンス:

  • result: Iterable[bytes]: モデル本体

リクエスト例:

_result = api.load_model("ddc:my_first_model_store", "model_id")
result = b"".join(_result)

fl.request_transfer

アップストリームのモデルストアから最新の集約されたモデルを取得し、指定したモデルストアに格納します。

アップストリームのモデルストアは、構成情報によって定まります。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)

レスポンス:

  • result: dict: model_infoを返す

リクエスト例:

_result = api.request_transfer("ddc:my_first_model_store")

fl.get_model_info

指定したモデルストアの指定したモデルの情報を取得します。 モデル本体のデータは含まれません。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)
  • model_id: str: モデルid

レスポンス:

  • result: dict: model_infoを返す

リクエスト例:

result = api.get_model_info("ddc:my_first_model_store", "model_id")

fl.exists_model

指定したモデルストアの指定したモデルの存在を確認します。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)
  • model_id: str: モデルid

レスポンス:

  • result: bool: モデルが存在する場合True

リクエスト例:

result = api.exists_model("ddc:my_first_model_store", "model_id")

fl.delete_model

指定したモデルストアの指定したモデルを削除します。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)
  • model_id: str: 削除するモデルid

レスポンス:

  • result: int: モデルが削除された場合1、そうでなければ(raise_if_no_exists = falseの場合)0

リクエスト例:

result = api.delete_model("ddc:my_first_model_store", "model_id")

fl.delete_all_model

指定したモデルストアの全てのモデルを削除します。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)

レスポンス:

  • result: int: 削除されたモデル数

リクエスト例:

result = api.delete_all_model("ddc:my_first_model_store")

fl.get_models

指定したモデルストアの全てのモデル情報を取得します。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)

レスポンス:

  • result: list[dict]: model_infoの一覧

リクエスト例:

result = api.get_models("ddc:my_first_model_store")

fl.get_model_info_latest

指定したモデルストアの最新のモデル情報を取得します。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)

レスポンス:

  • result: dict: model_info

リクエスト例:

result = api.get_model_info_latest("ddc:my_first_model_store")

fl.get_latest_feedbacked_models

指定したモデルストアから最新のfeedbackedモデル情報の一覧を取得します。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)
  • round: int: 対象のラウンド(未実装)

レスポンス:

  • result: list[dict]: model_infoの一覧

リクエスト例:

result = api.get_latest_feedbacked_models("ddc:my_first_model_store")

fl.get_latest_aggregated_model

指定したモデルストアから最新のaggregatedモデル情報を取得します。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)

レスポンス:

  • result: dict: model_info

リクエスト例:

result = api.get_latest_aggregated_model("ddc:my_first_model_store")

fl.get_latest_transferred_model

指定したモデルストアから最新のtransferredモデル情報を取得します。

パラメータ:

  • model_store_ddc: str: モデルストア名(DDC)

レスポンス:

  • result: dict: model_info

リクエスト例:

result = api.get_latest_transferred_model("ddc:my_first_model_store")

fl.get_parties

アップストリームからパーティを取得します。

レスポンス:

  • result: list[dict]: party_infoの一覧を返します

リクエスト例:

result = api.get_parties()

レスポンス例:

[
  {
    "id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    "tenant_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
    "tenant_name: "my tenant name",
    "description": "my description",
    "hop_count": 1,
    "created_at": datetime.datetime.now(tz=LOCAL_TIMEZONE),
    "updated_at": datetime.datetime.now(tz=LOCAL_TIMEZONE),
  }
]

fl.create_party

アップストリームにパーティを登録します。

呼び出し元となったEdgeは、構成情報から TENANT_IDTENANT_NAME を取得し、アップストリームへ転送します。

パラメータ:

  • description: str = "" : パーティに対する任意の説明

レスポンス:

  • result: dict: party_info

リクエスト例:

result = api.create_party("my experiment 1")

fl.delete_party

アップストリームからパーティを削除します。

パラメータ:

  • id: str: パーティを識別するid

レスポンス:

  • result: int: 削除された数を返します

リクエスト例:

result = api.delete_party("party_id")