Skip to content

プロベナンス: 処理の実行制御(ケーススタディ)

当ドキュメントでは、想定されるケースに基づいてDDCに紐づいたステートメント確認する方法を記述しています。

実行方式は全て JSON-RPC v2.0 を利用します。

各メソッドの詳しい使用方法は テーブル・ビュー操作 または DDC に対する操作 を参照してください。

ステートメントの準備と実行

set_ddc(), join_ddc(), merge_ddc(), process() は、まず実行するステートメントを保持する activity レコードを作成し、その activity に含まれる query を DB 上で実行します。 実行結果も activity に記録されます。

下記は merge_ddc を実行した後に確認する例です。

{
  "jsonrpc": "2.0",
  "method": "prov.merge_ddc",
  "params": {"target": "ddc:xrain1",
             "source": "ddc:xrain0",
             "on_conflict": "nothing"
            },
  "id": "provenance_jsonrpc_id"
}

merge_ddc の結果例は下記の通りです。

{
  "result": {
  "id": 1243,
  "session_id": 59,
  "usename": "xuser",
  "ddc_type": "table",
  "short_form": "xrain0", "long_form": "xrain0_1611536893.622533",
  "realname": "provenance.prov_3be2965ae5125f62936aed117ea81632",
  "created_at": "2021-10-09 23:32:09.693272+09",
  "committed_at": null,
  "prepared_activity_id": null
}
,
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

実行された activity は get_ddc_last_activity() で確認できます。

{
  "jsonrpc": "2.0",
  "method": "prov.get_ddc_activities",
  "params": {"ddc_label": "ddc:xrain0"},
  "id": "provenance_jsonrpc_id"
}
{
  "result": [{
      "id": 1343,
      "input_ddc_list": ["xrain1_1611536893.968535", "xrain0_1611536893.622533"],
      "output_ddc": "xrain0_1611536893.622533",
      "context": "merge_ddc(\"ddc:xrain1\",\"ddc:xrain0\",\"nothing\")",
      "query": "INSERT INTO provenance.prov_3be2965ae5125f62936aed117ea81632 SELECT * FROM provenance.prov_8482ed3baebd10c8024ec07141cef15b",
      "report": "",
      "successed": true,
      "relation_size": 6791168,
      "start_at": "2021-10-09 23:32:09.693272+09",,
      "end_at": "2021-10-09 23:42:09.693272+09",
  }],
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

実行済みの activity は、start_at にステートメントが準備された日時が、 end_at に終了した日時が記録されます。

処理が正常に終了した場合は successed が true に、relation_size に出力した Table のサイズが記録されます(ただし出力先が View の場合は 0)。 失敗した場合は successed が false になり、エラーメッセージが report に記録されます。

ステートメントの準備のみ(実行しない)

set_ddc(), join_ddc(), merge_ddc(), process() を呼びだす時にオプションパラメータ no_exec=True をセットすると、 ステートメントを準備して出力先 ddc のprepared_activity_id に記録したところで中断し、実行待機状態で終了します。

process() の場合は params'no_exec':True を 追加します。

{
  "jsonrpc": "2.0",
  "method": "prov.merge_ddc",
  "params": {"target": "ddc:xrain1",
             "source": "ddc:xrain0",
            "no_exec": true,
             "on_conflict": "nothing"
            },
  "id": "provenance_jsonrpc_id"
}

prov.merge_ddc() のレスポンスの形式は以下の通りです。

{
  "result": {
      "id": 1243,
      "session_id": 59,
      "usename": "xuser",
      "ddc_type": "table",
      "short_form": "xrain0", "long_form": "xrain0_1611536893.622533",
      "realname": "provenance.prov_3be2965ae5125f62936aed117ea81632",
      "created_at": "2021-10-15T10:09:40.123456+09:00",
      "committed_at": null,
      "prepared_activity_id": 1344
  },
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

prov.process のリクエスト例は以下の通りです。

{
  "jsonrpc": "2.0",
  "method": "prov.process",
  "params": {
    "method": "aggregate_geometry_table",
    "params": {
        "output_ddc": "ddc:agg_domingo",
        "input_ddc": "ddc:domingo",
        "start_datetime": "2021-10-15 10:09:30+09:00",
        "end_datetime": "2021-10-15 10:30:30+09:00",
        "spatial_extent": "{\"type":\"Polygon\",\"coordinates\":[[[139.25,35.25],[139.25,35.5],[139.625,35.5],[139.625,35.25],[139.25,35.25]]]}",
        "continuous_column_names": ["fld"],
        "discrete_column_names": [],
        "no_exec": true,
    },
  },
  "id": "provenance_jsonrpc_id"
}

準備されたステートメントは get_ddc_last_activity() で確認できます。

{
  "result": [{
      "id": 1343,
      "input_ddc_list": ["xrain1_1611536893.968535", "xrain0_1611536893.622533"],
      "output_ddc": "xrain0_1611536893.622533",
      "context": "merge_ddc(\"ddc:xrain1\",\"ddc:xrain0\",\"nothing\")",
      "query": "INSERT INTO provenance.prov_3be2965ae5125f62936aed117ea81632 SELECT * FROM provenance.prov_8482ed3baebd10c8024ec07141cef15b",
      "report": "",
      "successed": null,
      "relation_size": 0,
      "start_at": "2021-10-09 23:32:09.693272+09",
      "end_at": "2021-10-09 23:42:09.693272+09",
  }],
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

準備されただけ、または処理中の activity は、start_at にステートメントが準備された日時が記録されますが、successed, end_at は None になります。

準備したステートメントのキャンセル

ステートメントを準備した後で実行せずにキャンセルしたい場合、あるいは後述する方法で強制終了したステートメントを削除したい場合は cancel_ddc_activity() を呼びます。

{
  "jsonrpc": "2.0",
  "method": "prov.cancel_ddc_activity",
  "params": {"ddc_label": "ddc:xrain0"},
  "id": "provenance_jsonrpc_id"
}

レスポンス例は以下の通りです。

{
  "result": true,
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

prepared_activity_id をリセット (=NULL) し、紐付けられている activity レコードも削除されます。

ただし既に実行中の処理は中断されないので、後述する get_running_processes() を利用して実行中ではないことを確認するか、確実に実行前であることが分かる時にキャンセルするように注意する必要があります。

準備したステートメントの実行

準備済み(prepared_activity_id がセットされている)ddc に対してexec_ddc() を呼び出すと、ステートメントを実行します。

{
  "jsonrpc": "2.0",
  "method": "prov.exec_ddc",
  "params": {"ddc_label": "ddc:xrain0"},
  "id": "provenance_jsonrpc_id"
}

レスポンス例は下記の通りです。

{
  "result": [{
      "id": 1243,
      "session_id": 59,
      "usename": "xuser",
      "ddc_type": "table",
      "short_form": "xrain0",
      "long_form": "xrain0_1611536893.622533",
      "realname": "provenance.prov_3be2965ae5125f62936aed117ea81632",
      "created_at": "2021-10-09 23:32:09.693272+09",
      "committed_at": null,
      "prepared_activity_id": null,
  }],
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

実行結果はステートメントの準備と実行を一度に行なった場合と同様にget_ddc_last_activity() で取得すれば確認できます。

{
  "result": [{
      "id": 1343,
      "input_ddc_list": ["xrain1_1611536893.968535", "xrain0_1611536893.622533"],
      "output_ddc": "xrain0_1611536893.622533",
      "context": "merge_ddc(\"ddc:xrain1\",\"ddc:xrain0\",\"nothing\")",
      "query": "INSERT INTO provenance.prov_3be2965ae5125f62936aed117ea81632 SELECT * FROM provenance.prov_8482ed3baebd10c8024ec07141cef15b",
      "report": "",
      "successed": true,
      "relation_size": 6791168,
      "start_at": "2021-10-09 23:32:09.693272+09",
      "end_at": "2021-10-09 23:42:09.693272+09",
  }],
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

準備したステートメントのスレッド実行

exec_ddc() を呼ぶ際に、オプションパラメータ thread=True を指定すると、新しくサブスレッドを作成して並列実行します。

{
  "jsonrpc": "2.0",
  "method": "prov.set_ddc",
  "params":{"ddc_label":"ddc:domingo","source":"event_domingo","ddc_type":"table", "no_exec": true},
  "id": "provenance_jsonrpc_id"
}

{
  "jsonrpc": "2.0",
  "method": "prov.exec_ddc",
  "params": {"ddc_label": "ddc_domingo_1611536893.622533", "thread": true},
  "id": "provenance_jsonrpc_id"
}

セッションを利用していない場合、サブスレッドで作成中の ddc は処理が完了するまで commit されないため、short_form では参照できないことにご注意ください。

参照する必要がある場合は上の例のように long_form を指定してください。

セッション内であれば short_form でも参照できるので、 スレッド実行を行う場合はbegin_session() でセッションを開始しておくことを推奨します。

実行中の処理を待機

スレッドまたは別プロセスで実行中の処理が終了するまで待機します。 複数の ddc を指定した場合、そのすべての ddc に紐づけられている処理が終了するまで待機します。

{
  "jsonrpc": "2.0",
  "method": "prov.wait_ddc",
  "params": {"ddcs": ["ddc:domingo"]},
  "id": "provenance_jsonrpc_id"
}

wait_ddc() は、実際には ddc の prepared_activity_id を10秒ごとにチェックして処理が終了しているかどうかを確認し、すべて終了するまで繰り返します。

実行中プロセスの取得

ユーザが実行中のプロセス一覧を取得します。

この処理は PostgreSQL の統計情報テーブル pg_catalog.pg_stat_activity を参照し、実行中のサーバサイドプロセスの中から exec_ddc() で実行中のものを検索します。

そのため、ステートメントの準備と実行を一気に行なっている処理は取得できません。

{
  "jsonrpc": "2.0",
  "method": "prov.get_running_processes",
  "id": "provenance_jsonrpc_id"
}

レスポンス例は下記の通りです。

{
  "result": [{
      "activity_id": 1092,
      "procpid": 8238,
      "usename": "xuser",
      "query_start": "2021-10-09 23:32:09.693272+09",
      "context": "process(\"aggregate_geometry_table\", \"{\"output_ddc\": \"ddc:agg_domingo\", \"input_ddc\": \"ddc:domingo\", \"start_datetime\": \"2018-09-15 00:00:00+09\", \"end_datetime\": datetime.datetime(2018, 9, 15, 1, 0, tzinfo=datetime.timezone(datetime.timedelta(seconds=32400))), \"spatial_extent\": \"{"type":"Polygon","coordinates":[[[139.25,35.25],[139.25,35.5],[139.625,35.5],[139.625,35.25],[139.25,35.25]]]}\", \"continuous_column_names\": [\"fld\"], \"discrete_column_names\": [], \"no_exec\": True}\")",
      "query": "SELECT analysis.aggregate_geometry_table(\"provenance.prov_9d4cd27be962241a752030a7a53e6a21\",\"error\",\"provenance.prov_04857b311ea5f93b4a152625289b4e94\",\"2018-09-15 00:00:00+09\",\"2018-09-15T01:00:00+09:00\"::timestamptz,ST_GeomFromGeoJSON(\"{\"type\":\"Polygon\",\"coordinates\":[[[139.25,35.25],[139.25,35.5],[139.625,35.5],[139.625,35.25],[139.25,35.25]]]}\"),ARRAY[\"fld\"],\"{}\")",
      "ddc": "agg_domingo_1611464268.123782"
  }],
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

実行中プロセスの強制終了

ユーザが実行中のプロセスは stop_running_process() で強制終了できます。

{
  "jsonrpc": "2.0",
  "method": "prov.exec_ddc",
  "params": {"ddc_label": "ddc:xrain"},
  "id": "provenance_jsonrpc_id"
}

上記のような処理実行をした後、下記のリクエストで(実行中であれば)キャンセルできます。

{
  "jsonrpc": "2.0",
  "method": "prov.stop_running_process",
  "params": {"ddc_label": "ddc:xrain"},
  "id": "provenance_jsonrpc_id"
}

強制終了した場合、 get_running_processes() では表示されなくなります。

準備されたステートメントはそのまま残る(prepared_activity_id はセットされたまま)ので、再実行するか、 cancel_ddc_activity() を実行してキャンセルすることができます。