xData Federated Learning
xData Federated Learning は xData が提供する連合学習プラットフォーム機能です。
連合学習について
連合学習(Federated learning)とは、データを中央管理せずに分散した状態で機械学習を行い、分散しているモデルを集約することで精度を高めていく手法です。
連合学習には、下記のようなメリットがあります。
- 組織間・デバイス間など共有したくないデータを保護
- 分散学習による処理性能の向上
連合学習は主にプライバシーに配慮しながら学習できる手法として、近年注目されています。
xData Federated Learning での連合学習
xData Federated Learning では、xData プラットフォーム上に連合学習機能を統合することによって、連合学習に求められるデータ保護機能を提供します。
- xData プラットフォームの Web API が備える API キー/Secret による認証機構により、利用者が準備する情報を他の利用者が利用できないようセキュリティ上の配慮がなされています
- xData プラットフォームの MMS Filter プラグイン機構により、利用者が準備する情報を加工して (例えば、入力画像データのぼかし処理等、匿名化処理を施して) データベースに格納できます。これにより、公開・配信前に利用者が情報を準備する段階で匿名化処理を施せます
また、xData プラットフォームの分散開発環境である xData Edge を連合学習に利用することで、効率的な分散学習を可能としています。
リファレンス
xData Federated Learning は、次のコンポーネントを組み合わせて連合学習プラットフォームを実現します。
- xData Aggregation FW: Python実行環境で動作する xData 用の連合学習フレームワークです。Federated Learning Web API をAggregatorとParty間のモデル共有空間として利用し、連合学習の定形処理と機械学習エンジニア、集約アルゴリズム開発者のコードを結びつけます。
- Federated Learning Web API: モデルストアの操作・問い合わせ、およびに、モデルのアップロード・ダウンロード等のモデル管理・転送機能を提供します。
本ページについてご理解いただいた後、詳細については以下のリファレンスを参照ください。
xData Aggregation FW
xData Aggregation FW は、連合学習を次のワークフローで実行・管理します。
図1. ワークフロー
Model Store
モデルの保存領域です。
- Remote Model Store: Federated Learning Web API で操作する Edge 上に存在する保存領域です。詳しくは Federated Learning Web API を参照ください。
- Local Model Store: 実行環境から見た時、ローカル上の保存領域です。フレームワークがモデルをダウンロードする時に使用する一時保存領域です。フレームワーク内部でキャッシュ等で利用され、ユーザーは特に意識する必要はありません。
Trainer と Party と Aggregator
xData における連合学習では、N台の Party(Trainer) と 1台の Aggregator により構成される分散学習を取り扱います。
Trainer と Party と Aggregator の役割は次の通りです。
- Trainer: 学習を実行し学習済みモデルを生成する、実行環境(ローカル環境やEdgeでのリモート学習)上に配置されるプログラムです
- Party: Trainer から学習済みモデルを受け取り Aggregator へ連携したり、Aggregator からモデルを受け取り Trainer に連携するなど、モデルの送受信を行います(Edgeが役割を果たします)。文脈により、Party は Trainer の意味を含むことがあります
- Aggregator: Party 群から連携された複数の学習済みモデルを集約し、1つの学習済みモデルを生成します(Aggregator 機能を有するEdgeが役割を果たします)
Round
図1. 中の数字は次の処理を行います。
- train: 任意の実行環境(ローカル環境やEdgeでのリモート学習)でモデルの学習を行います
- feedback: 1. で学習したモデル(ローカルパラメータ)を Aggregator へ連携します
- aggregate: Aggregator は、2. で連携されたモデル群を集約します
- transfer: Party は Aggregator へ、 3. で集約されたモデルを要求し、次のラウンドではそのモデル(グローバルパラメータ)を使用します
連合学習では上記のサイクルを1ラウンドとし、ラウンドを繰り返すことでモデルの精度を高めていきます。
Communication
モデルは Party と Aggregator 間で共有され、共有時にモデルステートと呼ばれるタグがモデルストア上にモデルとともに保存されます。
モデルステートは、問い合わせやイベントのトリガーなど、任意の Communication を実装するために使用できます。
詳しくは モデル転送APIとモデルステート を参照ください。
また、サーバーとクライアント間のモデル送受信はhttpリクエストにより行われ、現時点ではプル型(クライアントがサーバーに対して処理を要求する一般的なリクエスト)のみサポートしています。
モデル送受信時に利用するContent-Type
は次の通りです。
- クライアントからサーバーへモデルを送信する際は、
multipart/form-data
で送信します。 - クライアントがサーバーからモデルを受信する際は、
application/octet-stream
で受信します。
serialize/deserialize
モデルストアに保存する際のデータフォーマット、あるいは、メモリに読み込む際の方法を定義します。
xData Aggregation FW や Federated Learning Web API を介してデータを転送する際、 データは単なるバイナリデータとして転送され、データフォーマットは規定されていません。
どのようなデータフォーマットで管理するかは、ユーザーの責務で serialize/deserialize に定義することになります。
train
学習を実施し、学習済みモデル(ローカルパラメータ)を生成します。
aggregate
学習済みのモデル群を任意の集約アルゴリズムで集約し、集約済みモデル(グローバルパラメータ)を生成します。
Execution Environment
Aggregator や Trainer が動作するPythonの実行環境です。
実行環境は、local上で処理をキックし、Edge上でAggregator や Trainer が動くこともあります(この場合、Remote Trainer や Remote Aggregator などと呼び区別します)
Federated Learning Web API
Federated Learning Web API は、モデルストアの操作およびにモデルのアップロード・ダウンロード等のモデル管理・転送と連合学習管理に関する機能をWEB APIとして提供します。
本APIはLow-Level APIとなり、xData Aggregation FW を使用するとフレームワーク内部にAPIを隠蔽するため、直接APIを使用する機会は少ないかもしれません。
実行手順
Federated Learning APIでは JSON-RPC v2.0 を利用します。
JSON-RPC を用いてFL APIを実行するには、 JSON 形式で記述した リクエストデータ(文字列)を EvWH が提供するウェブサービスのエンドポイントの URL に POST してください。
ただし、ファイル送受信の最適化のため、ファイル送受信に関する一部APIはJSON-RPCでなく、送信時にmultipart/form-data
、あるいは、受信時にapplication/octet-stream
を返します。
対象となるAPIについては、APIレファレンスを参照してください。
APIキー
当APIの利用にはAPIキーと秘密鍵が必要です。システムの管理者に申請して取得してください。
HTTPリクエスト送信時のHTTPヘッダ部に、それぞれ下記のヘッダ名で送信する必要があります。
- APIキー:APIKey
- 秘密鍵:Secret
エンドポイント
当APIのエンドポイントは https://pf.xdata.nict.jp/api/v1/fl/jsonrpc となります。
モデルストア
モデルを保存・共有する空間として、モデルストアと呼ばれる機能を提供します。
モデルストアは、任意の識別子(DDC
)で作成され、1つのモデルストアに複数のモデルを保存することができます。
※ 現時点で、保存できるモデルは最大255MBのサイズ制限があります
モデル・モデル情報
モデルは、任意のファイルです。
モデルには、model_id
と呼ばれるidが付与され、モデルを特定するために使用されます。
モデルに対してシステムが付与する1対1のメタデータ(更新日付やモデルステート等)をモデル情報と呼びます。 また、モデル情報にユーザー固有あるいは連合学習アルゴリズム固有のメタデータを含むことができ、これを単にメタデータと呼びます。
モデル転送APIとモデルステート
Federated Learning Web API では、ラウンド中の各手続きは次のAPIにより行われ、処理実行時にモデルにモデルステートと呼ばれるタグ(実体は数字型)が付与された上で、モデルストアに格納されます。
モデル転送API | 対応する手続き | 処理概要 | モデルステート |
---|---|---|---|
put_model |
train | Party(Trainer) が、 自身のモデルストアに学習済みモデルを登録する |
10: trained |
feedback |
feedback | Party が、 アップストリーム(Aggregator)のモデルストアに学習済みモデルを登録する |
20: feedbacked |
put_aggregated_model |
aggregate | Aggregatorが、 自身のモデルストアに集約後モデルを登録する |
30: aggregated |
request_transfer |
transfer | Party が、 アップストリーム(Aggregator)から集約後モデルを取得し、自身のモデルストアにモデルを登録する |
40: transferred |
アップストリーム
上流の Aggregator を アップストリーム と呼びます。
アップストリームは、構成情報で変更可能です。
xData では、Aggregator が Party のように振舞い、さらに上流の Aggregator と連携する階層型構成もサポートする予定です。
構成情報
Federated Learning Web API は、構成情報を参照します。
各設定値が及ぼす影響は次の通りです。
キー | 型 | 初期値 | 影響 |
---|---|---|---|
TENANT_ID |
str(uuid4 ) |
自動生成 | どこから連携されたモデルかを追跡するために使用されます。 この値は変更しないでください |
FEEDBACK_ENDPOINT |
str(URL ) |
空文字 | feedback request_transfer 時のアップストリームを決定します。空文字の場合は、自身のモデルストアにモデルを登録します |
FEEDBACK_APIKEY |
str | 空文字 | FEEDBACK_ENDPOINT が空文字でない場合に、アップストリームの認証で使用されます |
FEEDBACK_SECRET |
str | 空文字 | FEEDBACK_ENDPOINT が空文字でない場合に、アップストリームの認証で使用されます |
FEEDBACK_MODEL_STORE_DDC |
str(DDC ) |
空文字 | FEEDBACK_ENDPOINT が空文字でない場合に、アップストリームのモデルストアを特定するために使用されます |
構成情報について、こちらのリファレンスも参照ください。
チュートリアル
ここでは、xData Federated Learning がどのように動作するか紹介します。
モデルストアの準備
Federated Learning Web API を使用して、モデルストアの準備とモデルを保存してみます。
アップストリームの指定
初めに、構成情報で次の項目を設定します。
FEEDBACK_ENDPOINT
: 空文字を指定FEEDBACK_APIKEY
: 空文字を指定FEEDBACK_SECRET
: 空文字を指定FEEDBACK_MODEL_STORE_DDC
: 空文字を指定
FEEDBACK_ENDPOINT
を指定すると、アップストリームを変更できますが、まずは自身のEdgeを対象とした構成での動作検証を推奨します。
なお、構成情報を変更すると、実行中の連合学習がある場合は影響が生じるため注意してください。
モデルストアの作成
モデルを保存するためには、初めにモデルストアを作成する必要があります。
まず、設定ファイルを作成します。
次のシェルを実行し、fl_api.json
を作成した上で、endpoint
apikey
secret
を入力します。
cat << EOF > fl_api.json
{
"version": "0.1",
"name": "tutorial_model",
"memo": "this is tutorial.",
"store": {
"api": {
"endpoint": "YOUR_ENDPOINT",
"apikey": "YOUR_API_KEY",
"secret": "YOUR_SECRET"
},
"model_store_ddc": "ddc:tutorial_model_store"
},
"state": {
"next_round": 1
}
}
EOF
次のコードを作成し、fl_api_tutorial.py
で保存します。
import json
from xdata_fl.client import Api
def create_store(config):
credential = config["store"]["api"]
model_store_ddc = config["store"]["model_store_ddc"]
api = Api(**credential)
result = api.initialize_model_store(model_store_ddc)
print(result)
return result
if __name__ == "__main__":
with open("fl_api.json", "r") as f:
config = json.load(f)
create_store(config)
python3 fl_api_tutorial.py
を実行します。
すでに作成されている場合は、スキップしてもかまいません。
モデル保存と読み込み
先ほどのfl_api_tutorial.py
を次のように修正してください。
# 関数を追加
def run_tutorial(config):
credential = config["store"]["api"]
model_store_ddc = config["store"]["model_store_ddc"]
api = Api(**credential)
# アップロードするモデルを作成します
with open("tutorial_input_model.txt", "wb") as f:
f.write(b"xxx")
# パスを指定してファイルをアップロードします
model_info = api.put_model("tutorial_input_model.txt")
# 登録されたファイルの情報を表示します
print(model_info)
# モデルidを指定しして、モデルを読み込みます
stream = api.load_model(model_info["id"])
# 読み込んだモデルを保存します
with open("tutorial_output_model.txt", "wb) as f:
for s in stream:
f.write(s)
# input と output が同じであることを確認します
with open("tutorial_output_model.txt", "rb) as f:
assert b"xxx" == f.read()
# create_store でなく run_tutorialを実行するように修正
if __name__ == "__main__":
with open("fl_api.json", "r") as f:
config = json.load(f)
# create_store(config)
run_tutorial(config)
python3 fl_api_tutorial.py
を実行します。
処理がエラーなく完了すれば、モデルストアの作成とモデル保存と読み込みの動作検証は完了です。
連合学習の実行(スタンドアロン)
ここでは、次のような構成(スタンドアロン)で連合学習を実行します。
- aggregator: 自身
- party1: 自身
- party2: 自身
fl_api.json
は先ほどの設定を流用します。
複数台で分散して学習を実行する場合は、同じコードを別の実行環境に配布し、fl_api.json
の必要な部分を変更すれば同様に動作させることができます。
雛形コードの準備
fl_fw_tutorial.py
を作成し、次のコードを張り付けてください。
# 必要なパッケージをインポートします。
import json
from xdata_fl.client import Api
from xdata_agg.fw._abc import (
Aggregator,
RemoteStore,
Trainer
)
# 学習処理を実装します。
# ここでは、簡単な動作確認のため、モデルは単なる数字とし、メタデータに`data_size`を返すこととします。
class TutorialTrainer(Trainer):
def train(self, parameters, current_round, meta):
if parameters is None:
parameters = 0
parameters += 1
return parameters, {"data_size": 2}
# モデル(ここでは単なる数字)の保存方法を実装します。
class TutorialSerializer:
def serialize(self, model, output_path):
"""
Parameters
----------
model :
model object.
output_name : str
Destination specified(temporary file abs path) by the framework.
"""
with open(output_path, "w") as f:
json.dump(model, f)
return output_path
def deserialize(self, input_path, model_meta={}):
"""
Parameters
----------
input_path : str
Source specified(serialized file) by the framework.
"""
with open(input_path, "r") as f:
model = json.load(f)
return model
# 集約アルゴリズムを実装します。
class TutorialAggregator(Aggregator):
TRAINER = TutorialTrainer
SERIALIZER = TutorialSerializer
def load_feedback_list(self):
model_ids = self.agg.get_latest_feedbacked_models()
if len(model_ids) == 0:
raise Exception()
for model_id in model_ids:
model = self.agg.load_model(model_id)
meta = self.agg.get_model_meta(model_id)
yield model, meta
def aggregate(self, current_round: int, feedback_list):
aggregated_model = 0
aggregated_meta = {"data_size": 0}
for model, meta in feedback_list:
aggregated_model += model
aggregated_meta["data_size"] += meta["data_size"]
return aggregated_model, aggregated_meta
# モデルストアを定義します。
class TutorialStore(RemoteStore):
SERIALIZER = TutorialSerializer
# フレームワークに指示を出す関数を実装します。
def federate(store:RemoteStore, current_round: int, as_aggregator=False):
if as_aggregator:
manager = TutorialAggregator(agg=store)
# モデル群を取得・集約し、集約後のモデルをストアに転送する一連の流れを実行します
model_info = manager.run_aggregate(current_round=current_round)
else:
manager = TutorialAggregator(agg=None, parties=[store]) # partiesは複数指定可能
trainer = manager.get_trainer(0) # ここでは、先頭のpartyのみ単体実行させることとする
# 学習済みモデルを取得(ラウンド > 1 の時)し、それをベースに追加の学習を行い、
# 学習済みモデルをストアに転送する一連の流れを実行します
model_info = trainer.federate(current_round=current_round)
print(model_info)
print(f"model: {store.load_model(model_info["id"])}")
print(f"meta: {model_info["model_meta"]}")
# 構成情報を読み込みます
def configure(config_path):
with open(config_path, "r") as f:
config = json.load(f)
credential = config["store"]["api"]
model_store_ddc = config["store"]["model_store_ddc"]
next_round=config["state"]["next_round"]
api = Api(**credential)
return config, api, model_store_ddc, next_round
fl_fw_tutorial_federate.py
を作成し次のコードを貼り付けます。
from fl_fw_tutorial import configure, TutorialStore
if __name__ == "__main__":
config, api, model_store_ddc, next_round = configure("fl_api.json")
with TutorialStore(api, model_store_ddc) as store:
federate(
store,
current_round=next_round,
as_aggregator=False # Party として実行
)
fl_fw_tutorial_aggregate.py
を作成し次のコードを貼り付けます。
from fl_fw_tutorial import configure, TutorialStore
if __name__ == "__main__":
config, api, model_store_ddc, next_round = configure("fl_api.json")
with TutorialStore(api, model_store_ddc) as store:
federate(
store,
current_round=next_round,
as_aggregator=True # Aggregator として実行
)
fl_fw_tutorial_delete_all_model.py
を作成し次のコードを貼り付けます。
from fl_fw_tutorial import configure, TutorialStore
if __name__ == "__main__":
config, api, model_store_ddc, next_round = configure("fl_api.json")
with TutorialStore(api, model_store_ddc) as store:
store.delete_all_model()
モデルストアのクリーンアップ
まず、fl_api_tutorial.py
で登録したモデルは不要なので、モデルストアをクリーンアップします(クリーンアップしなくとも、feedback
を実行していないので、連合学習結果は同じになります)。
python3 fl_fw_tutorial_delete_all_model.py
実行(ラウンド1)
party1(1回目の実行を party1 と見立てる) として連合学習を実行します。
python3 fl_fw_tutorial_federate.py
party2(2回目の実行を party2 と見立てる) として連合学習を実行します。
python3 fl_fw_tutorial_federate.py
aggregator として連合学習を実行します。
python3 fl_fw_tutorial_aggregate.py
これで、1ラウンド目が完了です。 上記スクリプトを実行後、次のような出力が得られます。
model: 2
meta: {"data_size": 4}
実行(ラウンド2以降)
fl_api.json
のnext_round
を 1 増やします(ラウンドが1の場合、集約済みモデルが連携されません)。
{
...
"state": {
"next_round": 2
}
}
ラウンド1と同様の手順でスクリプトを実行します(クリーンアップは不要です)。
これで、2ラウンド目が完了です。
fl_fw_tutorial_aggregate.py
を実行後、次のような出力が得られます。
model: 6
meta: {"data_size": 4}
以上で連合学習のチュートリアルはおしまいです。