Federated Learning API: API 一覧
当ドキュメントでは、Federated Learning APIについて記述しています。
実行方式は一部APIを除き JSON-RPC v2.0 を利用します。
pythonの実行環境では Federated Learning API をより簡単に扱うための API Client を使用することもできます。
JSON-RPC v2.0
リクエスト例:
JSON-RPC v2.0 を使用する場合は、各APIの仕様を参考に次のようなリクエストを作成してください。
- method: str: 呼び出すAPIを指定します
- params: dict: APIのパラメータを指定します。パラメータが不要な場合は、省略可能です。
{
"jsonrpc": "2.0",
"id": 1,
"method": "fl.initialize_model_store",
"params": {
"model_store_ddc": "ddc:my_first_model_store"
}
}
レスポンス例:
リクエストが成功した場合、次のようなレスポンスが返します。
- result: 成功したリクエストのレスポンス
{"jsonrpc": "2.0", "result": {"id": "xxx", ...}, "id": 1}
リクエストがサーバ側で失敗した場合、次のようなレスポンスが返ります。
- error: 失敗したリクエストのエラー情報
{"jsonrpc": "2.0", "error": {"code": -32601, "message": "Method not found"}, "id": "1"}
API Client(Python用)
リクエスト例:
API Client を使用する場合は、各APIの仕様を参考に次のようなリクエストを作成してください。
from xdata_fl.client import Api
api = Api()
result = api.initialize_model_store(model_store_ddc="ddc:my_first_model_store")
# {"id": "xx", ...}
ロギングについて
API Client を使用する場合、任意のロガーにAPI の実行情報を出力できます。
ログレベルは次の通りです。
INFO
: モデル転送が生じる重たい処理DEBUG
: その他
ロガーの構成例は次の通りです。
import logging
from xdata_fl.client import Api, set_logger
# Configure logger
handler = logging.StreamHandler()
formatter = logging.Formatter(
" ".join(
"%(asctime)s",
"%(levelname)s"
"%(filename)s",
"%(funcName)s",
"%(lineno)s",
"%(message)s"
),
datefmt="%Y/%m/%d %H:%M:%S"
)
handler.setFormatter(formatter)
logger = logging.getLogger("your_logger")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
# Integrate logger
set_logger(logger)
- Pythonのデフォルトログレベルは
WARNING
のため、INFO
レベルでロガーを構成してください。 - 出力時の時間は出力されないので、
logging.Formatter
で調整ください
詳しいロギングの使用方法は公式ドキュメントを参照ください。
fl.get_server_version
Federated Learning API のサーバーバージョンを返します。
レスポンス:
- result: str: Federated Learning API のサーバーバージョン
リクエスト例:
result = api.get_server_version()
fl.initialize_model_store
モデルストアを作成します。
パラメータ:
- model_store_ddc: str: 作成するモデルストア名(DDC)
レスポンス:
- result: dict: 作成されたモデルストアに関する情報 (ddc_info)
リクエスト例:
result = api.initialize_model_store("ddc:my_first_model_store")
fl.exists_store
モデルストアの存在を確認します。
パラメータ:
- model_store_ddc: str: 確認したいモデルストア名(DDC)
レスポンス:
- result: bool: モデルストアが存在するか否か
リクエスト例:
result = api.exists_store("ddc:my_first_model_store")
fl.delete_store
モデルストアを削除します。
パラメータ:
- model_store_ddc: str: 削除するモデルストア名(DDC)
- raise_if_no_exists: bool = True:
True
の場合、指定したモデルストアが存在しない場合、例外を上げます
レスポンス:
- result: int: モデルストアが削除された場合
1
、そうでなければ(raise_if_no_exists = False
の場合)0
リクエスト例:
result = api.delete_store("ddc:my_first_model_store")
fl.put_model
指定したモデルストアにモデルを格納します。 通常、Party で実行します。
注意:
本メソッドは JSON-RPC v2.0 でなく、multipart/form-data
としてリクエストを送信します。
また、エンドポイントは ../jsonrpc/upload/fl.put_model
となります。
パラメータ:
- model_store_ddc: str: 削除するモデルストア名(DDC)
- file: Union[str, io.BufferedReader]: ファイルパス、または、ファイルオブジェクト
- model_kind: str: モデルに対する拡張子等の情報(現在は未使用)
- model_description: str: モデルに付与する任意の説明
- model_meta: dict = {}: モデルに付与する任意のメタデータ
- model_state: int = FlEvensts.CREATED: (基本的に指定しないでください)連合学習におけるモデルの状態
レスポンス:
- result: dict:
model_info
を返す
リクエスト例:
# ファイルパスを渡す場合
result = api.put_model(
"ddc:my_first_model_store",
"./my_models/model_1",
model_description="test model",
model_meta={"additonal": "additional data"}
)
# ファイルオブジェクトを渡す場合
with Open("./my_models/model_1", "rb) as f:
result = api.put_model(
"ddc:my_first_model_store",
f,
model_description="test model",
model_meta={"additonal": "additional data"}
)
fl.put_aggregated_model
指定したモデルストアにモデルを格納します。 通常、Aggregator で実行します。
注意:
本メソッドは JSON-RPC v2.0 でなく、multipart/form-data
としてリクエストを送信します。
また、エンドポイントは ../jsonrpc/upload/fl.put_aggregated_model
となります。
パラメータ:
- model_store_ddc: str: 削除するモデルストア名(DDC)
- file: Union[str, io.BufferedReader]: ファイルパス、または、ファイルオブジェクト
- model_kind: str: モデルに対する拡張子等の情報(現在は未使用)
- model_description: str: モデルに付与する任意の説明
- model_meta: dict = {}: モデルに付与する任意のメタデータ
- model_state: int = FlEvensts.AGGREGATED: (基本的に指定しないでください)連合学習におけるモデルの状態
レスポンス:
- result: dict:
model_info
を返す
リクエスト例:
# ファイルパスを渡す場合
result = api.put_aggregated_model(
"ddc:my_first_model_store",
"./my_models/aggregated_model_1",
model_description="test model",
model_meta={"additonal": "additional data"}
)
# ファイルオブジェクトを渡す場合
with Open("./my_models/aggregated_model_1", "rb) as f:
result = api.put_aggregated_model(
"ddc:my_first_model_store",
f,
model_description="test model",
model_meta={"additonal": "additional data"}
)
fl.feedback
指定したモデルストアの指定したモデルをアップストリームのモデルストアへ feedback します。 アップストリームのモデルストアは、構成情報によって定まります。
サーバ内部で取得されたモデルはアップストリームに転送されます。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
- model_id: str: モデルid
レスポンス:
- result: dict: アップストリームから返された
model_info
を返す
リクエスト例:
result = api.feedback("ddc:my_first_model_store", "model_id")
fl.load_model
指定したモデルストアの指定したモデルをダウンロードします。
注意:
本メソッドはリクエストは JSON-RPC v2.0 ですが、レスポンスはJSON JSON-RPC v2.0 に準じていません。
また、エンドポイントは ../jsonrpc/download/fl.load_model
となります。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
- model_id: str: モデルid
レスポンス:
- result: Iterable[bytes]: モデル本体
リクエスト例:
_result = api.load_model("ddc:my_first_model_store", "model_id")
result = b"".join(_result)
fl.request_transfer
アップストリームのモデルストアから最新の集約されたモデルを取得し、指定したモデルストアに格納します。
アップストリームのモデルストアは、構成情報によって定まります。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
レスポンス:
- result: dict:
model_info
を返す
リクエスト例:
_result = api.request_transfer("ddc:my_first_model_store")
fl.get_model_info
指定したモデルストアの指定したモデルの情報を取得します。 モデル本体のデータは含まれません。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
- model_id: str: モデルid
レスポンス:
- result: dict:
model_info
を返す
リクエスト例:
result = api.get_model_info("ddc:my_first_model_store", "model_id")
fl.exists_model
指定したモデルストアの指定したモデルの存在を確認します。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
- model_id: str: モデルid
レスポンス:
- result: bool: モデルが存在する場合
True
リクエスト例:
result = api.exists_model("ddc:my_first_model_store", "model_id")
fl.delete_model
指定したモデルストアの指定したモデルを削除します。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
- model_id: str: 削除するモデルid
レスポンス:
- result: int: モデルが削除された場合
1
、そうでなければ(raise_if_no_exists = false
の場合)0
リクエスト例:
result = api.delete_model("ddc:my_first_model_store", "model_id")
fl.delete_all_model
指定したモデルストアの全てのモデルを削除します。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
レスポンス:
- result: int: 削除されたモデル数
リクエスト例:
result = api.delete_all_model("ddc:my_first_model_store")
fl.get_models
指定したモデルストアの全てのモデル情報を取得します。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
レスポンス:
- result: list[dict]:
model_info
の一覧
リクエスト例:
result = api.get_models("ddc:my_first_model_store")
fl.get_model_info_latest
指定したモデルストアの最新のモデル情報を取得します。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
レスポンス:
- result: dict:
model_info
リクエスト例:
result = api.get_model_info_latest("ddc:my_first_model_store")
fl.get_latest_feedbacked_models
指定したモデルストアから最新のfeedbacked
モデル情報の一覧を取得します。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
- round: int: 対象のラウンド(未実装)
レスポンス:
- result: list[dict]:
model_info
の一覧
リクエスト例:
result = api.get_latest_feedbacked_models("ddc:my_first_model_store")
fl.get_latest_aggregated_model
指定したモデルストアから最新のaggregated
モデル情報を取得します。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
レスポンス:
- result: dict:
model_info
リクエスト例:
result = api.get_latest_aggregated_model("ddc:my_first_model_store")
fl.get_latest_transferred_model
指定したモデルストアから最新のtransferred
モデル情報を取得します。
パラメータ:
- model_store_ddc: str: モデルストア名(DDC)
レスポンス:
- result: dict:
model_info
リクエスト例:
result = api.get_latest_transferred_model("ddc:my_first_model_store")
fl.get_parties
アップストリームからパーティを取得します。
レスポンス:
- result: list[dict]:
party_info
の一覧を返します
リクエスト例:
result = api.get_parties()
レスポンス例:
[
{
"id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"tenant_id: "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"tenant_name: "my tenant name",
"description": "my description",
"hop_count": 1,
"created_at": datetime.datetime.now(tz=LOCAL_TIMEZONE),
"updated_at": datetime.datetime.now(tz=LOCAL_TIMEZONE),
}
]
fl.create_party
アップストリームにパーティを登録します。
呼び出し元となったEdgeは、構成情報から TENANT_ID
と TENANT_NAME
を取得し、アップストリームへ転送します。
パラメータ:
- description: str = "" : パーティに対する任意の説明
レスポンス:
- result: dict:
party_info
リクエスト例:
result = api.create_party("my experiment 1")
fl.delete_party
アップストリームからパーティを削除します。
パラメータ:
- id: str: パーティを識別するid
レスポンス:
- result: int: 削除された数を返します
リクエスト例:
result = api.delete_party("party_id")