コンテンツにスキップ

イベントデータを投入する

xData Edge環境で、イベントデータをEvWHに投入する方法を説明します。説明で使用するコードやデータはこちらからダウンロードできます。

1. 投入するデータの取得

今回は気象庁が公開している過去の気象情報からを2019年10月の神奈川県のデータを利用します。 ダウンロードしたデータは2019-10-kanagawa.csvです。

2. データの確認

ダウンロードしたデータをイベントデータとしてEvWHに登録するためには、時間・空間の情報が必要です。
時間はダウンロードしたCSVに記載されているが、空間の情報がないため気象庁のサイトの地域気象観測所一覧 [PDF形式]の情報を利用して、後ほど作成します。
またPDF記載の緯度経度は度分形式ですがイベントデータでは十進度形式を使うので、データを投入する際に値を変換します。

3. データを投入するテーブルの作成

投入するデータの準備ができたので、データローダWebAPIのloaderwebapi.register_loaderを利用してデータを投入するテーブルを作成します。
イベントデータを投入するテーブルには start_datetime, end_datetime, locationのカラムが必要です。これに加え、降雨量を示すrainfallカラムを追加し、スキーマeventrain_jmaというデータ名でテーブルを作成します。
これらの実装例がregister_loader.pyです。

EdgeをインストールしたVMでregister_loader.pyを実行したあとに、以下でEwWH(DB)にログインし、データベースevwhにテーブル event.rain_jma_tbl_0 ができていることを確認します。 接続先(evwhdb01-67466c9dc5-fcpl2)は環境によって異なるのでkubectl get podsを実行し、evwhdb01から始まるPodを指定してください。 また、テーブル名(rain_jma_tbl_0)とデータ名(rain_jma)が異なるため注意してください。

kubectl exec -it evwhdb01-67466c9dc5-fcpl2 -- psql -d evwh -U evwh_admin

4. データ登録用の雛形の取得

次にデータ登録時に前項で定義したカラム定義に合わせてデータを登録するため、データローダWebAPIのloaderwebapi.init_recordでデータのフォーマットを取得します。
この実装例がinit_record.pyです。データのフォーマットがわかっている場合はこの手順を省略できます。

EdgeをインストールしたVMでinit_record.py を実行すると以下のような結果が得られるので、そのフォーマットを利用して次項でデータを投入します。

{
    "id": 1,
    "jsonrpc": "2.0",
    "result": {
        "features": [
            {
                "geometry": null,
                "properties": {
                    "end_datetime": null,
                    "rainfall": null,
                    "start_datetime": null
                },
                "type": "Feature"
            }
        ],
        "type": "FeatureCollection"
    }
}

5. データの投入

前項で取得したデータのフォーマットを利用して、データローダWebAPIのloaderwebapi.add_recordを用いてデータを投入するコードadd_record.pyをEdge VM上で実行します。
実装例のadd_record.pyを元に重要な点を説明します。

データファイルの読み込み

1でダウンロードしたCSVをpandasのDataFrameとして読み込むように実装します。CSVの文字コードはSHIFT-JIS、ヘッダは3行あるのでそのように設定し、時刻情報が並ぶ初めのカラムをインデックスとします。

df = pd.read_csv('2019-10-kanagawa.csv', encoding='shift_jis', header=[1,2,3], index_col=0)

投入データの修正

1でダウンロードしたデータには位置情報が含まれていないため、2で取得した情報を追加します。

locations = [['三浦', 35, 10.7, 139, 37.8],
             ['辻堂', 35, 19.2, 139, 27],
             ['海老名', 35, 26, 139, 23.2],
             ['横浜', 35, 26.3, 139, 39.1],
             ['日吉', 35, 33.1, 139, 39],
             ['相模原中央', 35, 34.3, 139, 22.2],
             ['相模湖', 35, 36.8, 139, 11.6],
             ['丹沢湖', 35, 24.6, 139, 2.6],
             ['平塚', 35, 20.7, 139, 18.3],
             ['小田原', 35, 16.6, 139, 9.3],
             ['箱根', 35, 13.3, 139, 2.5]]
また、緯度経度を度分から十進度に変換します。
    lat = location[1]+location[2]/60
    lon = location[3]+location[4]/60

投入データの作成

DataFrameとして読み込んだCSVから、各地点ごとに時刻降水量(mm)を取得し、4で取得した雛形を使ってAPIリクエストのBody部分を作成します。
位置情報は前項の情報を使いPoint型のgeometryとします。また、このデータは1時間刻みなのでend_datetimestart_datetimeの1時間後とします。

    for index, row in df[(location[0],'降水量(mm)')].iterrows():
        dt_str = index
        s_dt = dt.strptime(index, '%Y/%m/%d %H:%M:%S')
        e_dt = s_dt +  timedelta(hours=1)

        feature = {
            "type":"Feature", 
            "geometry": {"type": "Point", "coordinates": [lon, lat]},
            "properties":{
                "start_datetime": s_dt.strftime('%Y/%m/%d %H:%M:%S+09'), 
                "end_datetime": e_dt.strftime('%Y/%m/%d %H:%M:%S+09'), 
                "rainfall": row[0] 
            }
        }
        features.append(feature)

    body_dict = {
        "id": 1,
        "jsonrpc": "2.0",
        "method": "loaderwebapi.add_record",
        "params": {
            "data_name": "rain_jma",
            "options": {"if_exists": "override"},
            "record_set": features
        }
    }

6. 投入データの確認

手順3記載の方法でEwWH(DB)にログインし、テーブル event.rain_jma_tbl_0にデータが登録されていることを確認します。