Skip to content

Tutorial

チュートリアル

ここでは、xData FL がどのように動作するか紹介します。

チュートリアルの Python コードでは xdata_prov クライアントパッケージを利用します。 独自の Python 環境でチュートリアルを実行する場合は、事前に xdata_prov クライアントパッケージをインストールしてください。

モデルストアの準備

Federated Learning Web API を使用して、モデルストアの準備とモデルを保存してみます。

アップストリームの指定

初めに、構成情報で次の項目を設定します。

  • FEEDBACK_ENDPOINT: 空文字を指定
  • FEEDBACK_APIKEY: 空文字を指定
  • FEEDBACK_SECRET: 空文字を指定
  • FEEDBACK_MODEL_STORE_DDC: 空文字を指定

FEEDBACK_ENDPOINTを指定すると、アップストリームを変更できますが、まずは自身のEdgeを対象とした構成での動作検証を推奨します。

なお、構成情報を変更すると、実行中の連合学習がある場合は影響が生じるため注意してください。

モデルストアの作成

モデルを保存するためには、初めにモデルストアを作成する必要があります。

まず、設定ファイルを作成します。

次のシェルを実行し、fl_api.jsonを作成した上で、endpoint apikey secret を入力します。

cat << EOF > fl_api.json
{
  "version": "0.1",
  "name": "tutorial_model",
  "memo": "this is tutorial.",
  "store": {
    "api": {
      "endpoint": "YOUR_ENDPOINT",
      "apikey": "YOUR_API_KEY",
      "secret": "YOUR_SECRET"
    },
    "model_store_ddc": "ddc:tutorial_model_store"
  },
  "state": {
    "next_round": 1
  }
}
EOF

次のコードを作成し、fl_api_tutorial.pyで保存します。

import json
from xdata_fl.client import Api

def create_store(config):
    credential = config["store"]["api"]
    model_store_ddc = config["store"]["model_store_ddc"]
    api = Api(**credential)

    result = api.initialize_model_store(model_store_ddc)
    print(result)
    return result

if __name__ == "__main__":
    with open("fl_api.json", "r") as f:
        config = json.load(f)
    create_store(config)

python3 fl_api_tutorial.py を実行します。 すでに作成されている場合は、スキップしてもかまいません。

モデル保存と読み込み

先ほどのfl_api_tutorial.pyを次のように修正してください。

# 関数を追加
def run_tutorial(config):
    credential = config["store"]["api"]
    model_store_ddc = config["store"]["model_store_ddc"]
    api = Api(**credential)

    # アップロードするモデルを作成します
    with open("tutorial_input_model.txt", "wb") as f:
        f.write(b"xxx")

    # パスを指定してファイルをアップロードします
    model_info = api.put_model(model_store_ddc, "tutorial_input_model.txt")

    # 登録されたファイルの情報を表示します
    print(model_info)

    # モデルidを指定しして、モデルを読み込みます
    stream = api.load_model(model_store_ddc, model_info["model_id"])

    # 読み込んだモデルを保存します
    with open("tutorial_output_model.txt", "wb") as f:
        for s in stream:
            f.write(s)

    # input と output が同じであることを確認します
    with open("tutorial_output_model.txt", "rb") as f:
        assert b"xxx" == f.read()

# create_store でなく run_tutorialを実行するように修正
if __name__ == "__main__":
    with open("fl_api.json", "r") as f:
        config = json.load(f)
    # create_store(config)
    run_tutorial(config)

python3 fl_api_tutorial.py を実行します。

処理がエラーなく完了すれば、モデルストアの作成とモデル保存と読み込みの動作検証は完了です。

連合学習の実行(スタンドアロン)

ここでは、次のような構成(スタンドアロン)で連合学習を実行します。

  • aggregator: 自身
  • party1: 自身
  • party2: 自身

fl_api.jsonは先ほどの設定を流用します。

複数台で分散して学習を実行する場合は、同じコードを別の実行環境に配布し、fl_api.jsonの必要な部分を変更すれば同様に動作させることができます。

雛形コードの準備

fl_fw_tutorial.py を作成し、次のコードを張り付けてください。

# 必要なパッケージをインポートします。
import json
from xdata_fl.client import Api
from xdata_agg.fw._abc import (
    Aggregator,
    RemoteStore,
    Trainer
)


# 学習処理を実装します。
# ここでは、簡単な動作確認のため、モデルは単なる数字とし、メタデータに`data_size`を返すこととします。
class TutorialTrainer(Trainer):
    def train(self, parameters, current_round, meta):
        if parameters is None:
            parameters = 0

        parameters += 1
        return parameters, {"data_size": 2}


# モデル(ここでは単なる数字)の保存方法を実装します。
class TutorialSerializer:
    def serialize(self, model, output_path):
        """
        Parameters
        ----------
        model :
            model object.
        output_name : str
            Destination specified(temporary file abs path) by the framework.
        """
        with open(output_path, "w") as f:
            json.dump(model, f)

        return output_path

    def deserialize(self, input_path, model_meta={}):
        """
        Parameters
        ----------
        input_path : str
            Source specified(serialized file) by the framework.
        """
        with open(input_path, "r") as f:
            model = json.load(f)

        return model

# 集約アルゴリズムを実装します。
class TutorialAggregator(Aggregator):
    TRAINER = TutorialTrainer
    SERIALIZER = TutorialSerializer

    def load_feedback_list(self):
        model_ids = self.agg.get_latest_feedbacked_models()

        if len(model_ids) == 0:
            raise Exception()

        for model_id in model_ids:
            model = self.agg.load_model(model_id)
            meta = self.agg.get_model_meta(model_id)
            yield model, meta

    def aggregate(self, current_round: int, feedback_list):
        aggregated_model = 0
        aggregated_meta = {"data_size": 0}

        for model, meta in feedback_list:
            aggregated_model += model
            aggregated_meta["data_size"] += meta["data_size"]

        return aggregated_model, aggregated_meta


# モデルストアを定義します。
class TutorialStore(RemoteStore):
    SERIALIZER = TutorialSerializer


# フレームワークに指示を出す関数を実装します。
def federate(storeRemoteStore, current_round: int, as_aggregator=False):
    if as_aggregator:
        manager = TutorialAggregator(agg=store)

        # モデル群を取得・集約し、集約後のモデルをストアに転送する一連の流れを実行します
        model_info = manager.run_aggregate(current_round=current_round)
    else:
        manager = TutorialAggregator(agg=None, parties=[store]) # partiesは複数指定可能
        trainer = manager.get_trainer(0)  # ここでは、先頭のpartyのみ単体実行させることとする

        # 学習済みモデルを取得(ラウンド > 1 の時)し、それをベースに追加の学習を行い、
        # 学習済みモデルをストアに転送する一連の流れを実行します
        model_info = trainer.federate(current_round=current_round)

    print(model_info)
    print(f"model: {store.load_model(model_info["id"])}")
    print(f"meta: {model_info["model_meta"]}")


# 構成情報を読み込みます
def configure(config_path):
    with open(config_path, "r") as f:
        config = json.load(f)

    credential = config["store"]["api"]
    model_store_ddc = config["store"]["model_store_ddc"]
    next_round=config["state"]["next_round"]
    api = Api(**credential)
    return config, api, model_store_ddc, next_round

fl_fw_tutorial_federate.py を作成し次のコードを貼り付けます。

from fl_fw_tutorial import configure, TutorialStore

if __name__ == "__main__":
    config, api, model_store_ddc, next_round = configure("fl_api.json")
    with TutorialStore(api, model_store_ddc) as store:
        federate(
            store,
            current_round=next_round,
            as_aggregator=False # Party として実行
        )

fl_fw_tutorial_aggregate.py を作成し次のコードを貼り付けます。

from fl_fw_tutorial import configure, TutorialStore

if __name__ == "__main__":
    config, api, model_store_ddc, next_round = configure("fl_api.json")
    with TutorialStore(api, model_store_ddc) as store:
        federate(
            store,
            current_round=next_round,
            as_aggregator=True # Aggregator として実行
        )

fl_fw_tutorial_delete_all_model.py を作成し次のコードを貼り付けます。

from fl_fw_tutorial import configure, TutorialStore

if __name__ == "__main__":
    config, api, model_store_ddc, next_round = configure("fl_api.json")
    with TutorialStore(api, model_store_ddc) as store:
        store.delete_all_model()

モデルストアのクリーンアップ

まず、fl_api_tutorial.py で登録したモデルは不要なので、モデルストアをクリーンアップします(クリーンアップしなくとも、feedback を実行していないので、連合学習結果は同じになります)。

python3 fl_fw_tutorial_delete_all_model.py

実行(ラウンド1)

party1(1回目の実行を party1 と見立てる) として連合学習を実行します。

python3 fl_fw_tutorial_federate.py

party2(2回目の実行を party2 と見立てる) として連合学習を実行します。

python3 fl_fw_tutorial_federate.py

aggregator として連合学習を実行します。

python3 fl_fw_tutorial_aggregate.py

これで、1ラウンド目が完了です。 上記スクリプトを実行後、次のような出力が得られます。

model: 2
meta: {"data_size": 4}

実行(ラウンド2以降)

fl_api.jsonnext_roundを 1 増やします(ラウンドが1の場合、集約済みモデルが連携されません)。

{
  ...
  "state": {
    "next_round": 2
  }
}

ラウンド1と同様の手順でスクリプトを実行します(クリーンアップは不要です)。

これで、2ラウンド目が完了です。 fl_fw_tutorial_aggregate.pyを実行後、次のような出力が得られます。

model: 6
meta: {"data_size": 4}

連合学習のチュートリアルは以上です。