イベントデータを投入する
xData Edge環境で、イベントデータをEvWHに投入する方法を説明します。説明で使用するコードやデータはこちらからダウンロードできます。
1. 投入するデータの取得
今回は気象庁が公開している過去の気象情報からを2019年10月の神奈川県のデータを利用します。
ダウンロードしたデータは2019-10-kanagawa.csvです。
2. データの確認
ダウンロードしたデータをイベントデータとしてEvWHに登録するためには、時間・空間の情報が必要です。
時間はダウンロードしたCSVに記載されているが、空間の情報がないため気象庁のサイトの地域気象観測所一覧 [PDF形式]の情報を利用して、後ほど作成します。
またPDF記載の緯度経度は度分形式ですがイベントデータでは十進度形式を使うので、データを投入する際に値を変換します。
3. データを投入するテーブルの作成
投入するデータの準備ができたので、データローダWebAPIのloaderwebapi.register_loaderを利用してデータを投入するテーブルを作成します。
イベントデータを投入するテーブルには start_datetime, end_datetime, locationのカラムが必要です。これに加え、降雨量を示すrainfallカラムを追加し、スキーマeventにrain_jmaというデータ名でテーブルを作成します。
これらの実装例がregister_loader.pyです。
EdgeをインストールしたVMでregister_loader.pyを実行したあとに、以下でEwWH(DB)にログインし、データベースevwhにテーブル event.rain_jma_tbl_0 ができていることを確認します。
接続先(evwhdb01-67466c9dc5-fcpl2)は環境によって異なるのでkubectl get podsを実行し、evwhdb01から始まるPodを指定してください。
また、テーブル名(rain_jma_tbl_0)とデータ名(rain_jma)が異なるため注意してください。
kubectl exec -it evwhdb01-67466c9dc5-fcpl2 -- psql -d evwh -U evwh_admin
4. データ登録用の雛形の取得
次にデータ登録時に前項で定義したカラム定義に合わせてデータを登録するため、データローダWebAPIのloaderwebapi.init_recordでデータのフォーマットを取得します。
この実装例がinit_record.pyです。データのフォーマットがわかっている場合はこの手順を省略できます。
EdgeをインストールしたVMでinit_record.py を実行すると以下のような結果が得られるので、そのフォーマットを利用して次項でデータを投入します。
{
"id": 1,
"jsonrpc": "2.0",
"result": {
"features": [
{
"geometry": null,
"properties": {
"end_datetime": null,
"rainfall": null,
"start_datetime": null
},
"type": "Feature"
}
],
"type": "FeatureCollection"
}
}
5. データの投入
前項で取得したデータのフォーマットを利用して、データローダWebAPIのloaderwebapi.add_recordを用いてデータを投入するコードadd_record.pyをEdge VM上で実行します。
実装例のadd_record.pyを元に重要な点を説明します。
データファイルの読み込み
1でダウンロードしたCSVをpandasのDataFrameとして読み込むように実装します。CSVの文字コードはSHIFT-JIS、ヘッダは3行あるのでそのように設定し、時刻情報が並ぶ初めのカラムをインデックスとします。
df = pd.read_csv('2019-10-kanagawa.csv', encoding='shift_jis', header=[1,2,3], index_col=0)
投入データの修正
1でダウンロードしたデータには位置情報が含まれていないため、2で取得した情報を追加します。
locations = [['三浦', 35, 10.7, 139, 37.8],
['辻堂', 35, 19.2, 139, 27],
['海老名', 35, 26, 139, 23.2],
['横浜', 35, 26.3, 139, 39.1],
['日吉', 35, 33.1, 139, 39],
['相模原中央', 35, 34.3, 139, 22.2],
['相模湖', 35, 36.8, 139, 11.6],
['丹沢湖', 35, 24.6, 139, 2.6],
['平塚', 35, 20.7, 139, 18.3],
['小田原', 35, 16.6, 139, 9.3],
['箱根', 35, 13.3, 139, 2.5]]
lat = location[1]+location[2]/60
lon = location[3]+location[4]/60
投入データの作成
DataFrameとして読み込んだCSVから、各地点ごとに時刻、降水量(mm)を取得し、4で取得した雛形を使ってAPIリクエストのBody部分を作成します。
位置情報は前項の情報を使いPoint型のgeometryとします。また、このデータは1時間刻みなのでend_datetimeはstart_datetimeの1時間後とします。
for index, row in df[(location[0],'降水量(mm)')].iterrows():
dt_str = index
s_dt = dt.strptime(index, '%Y/%m/%d %H:%M:%S')
e_dt = s_dt + timedelta(hours=1)
feature = {
"type":"Feature",
"geometry": {"type": "Point", "coordinates": [lon, lat]},
"properties":{
"start_datetime": s_dt.strftime('%Y/%m/%d %H:%M:%S+09'),
"end_datetime": e_dt.strftime('%Y/%m/%d %H:%M:%S+09'),
"rainfall": row[0]
}
}
features.append(feature)
body_dict = {
"id": 1,
"jsonrpc": "2.0",
"method": "loaderwebapi.add_record",
"params": {
"data_name": "rain_jma",
"options": {"if_exists": "override"},
"record_set": features
}
}
6. 投入データの確認
手順3記載の方法でEwWH(DB)にログインし、テーブル event.rain_jma_tbl_0にデータが登録されていることを確認します。