Skip to content

Provenance API - Control Execution of Processes

Note

This document has been machine translated.

This document describes how to check statements tied to DDC based on assumed cases.   All execution methods use JSON-RPC v2.0.
For detailed usage of each method, please refer to table/view operations or operations on DDC.

Prepare and execute the statement.

set_ddc(), join_ddc(), merge_ddc(), and process() first create an activity record that holds the statement to be executed, and then execute the query contained in that activity on the DB. The result of the execution is also recorded in the activity.

The following is an example of what you can see after executing merge_ddc.

{
  "jsonrpc": "2.0",
  "method": "prov.merge_ddc",
  "params": {"target": "ddc:xrain1",
             "source": "ddc:xrain0",
             "on_conflict": "nothing"
            },
  "id": "provenance_jsonrpc_id"
}

An example result of merge_ddc is shown below.

{
  "result": {
  "id": 1243,
  "session_id": 59,
  "usename": "xuser",
  "ddc_type": "table",
  "short_form": "xrain0", "long_form": "xrain0_1611536893.622533",
  "realname": "provenance.prov_3be2965ae5125f62936aed117ea81632",
  "created_at": "2021-10-09 23:32:09.693272+09",
  "committed_at": null,
  "prepared_activity_id": null
}
,
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

The executed activity can be checked with get_ddc_last_activity().

{
  "jsonrpc": "2.0",
  "method": "prov.get_ddc_activities",
  "params": {"ddc_label": "ddc:xrain0"},
  "id": "provenance_jsonrpc_id"
}
{
  "result": [{
      "id": 1343,
      "input_ddc_list": ["xrain1_1611536893.968535", "xrain0_1611536893.622533"],
      "output_ddc": "xrain0_1611536893.622533",
      "context": "merge_ddc(\"ddc:xrain1\",\"ddc:xrain0\",\"nothing\")",
      "query": "INSERT INTO provenance.prov_3be2965ae5125f62936aed117ea81632 SELECT * FROM provenance.prov_8482ed3baebd10c8024ec07141cef15b",
      "report": "",
      "successed": true,
      "relation_size": 6791168,
      "start_at": "2021-10-09 23:32:09.693272+09",,
      "end_at": "2021-10-09 23:42:09.693272+09",
  }],
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

For executed activities, start_at is the date and time when the statement was prepared, and end_at is the date and time when the statement was finished.

If the process is successful, successed will be set to true and relation_size will be set to the size of the output Table (0 if the output is to a View). If it fails, successed will be false and an error message will be recorded in report.

Prepare statements only (do not execute)

If you set the optional parameter no_exec=True when calling set_ddc(), join_ddc(), merge_ddc(), or process(), If the optional parameter no_exec=True is set when calling process(), merge_ddc(), or process(), the program will abort when the statement is prepared and recorded in the prepared_activity_id of the output ddc, and exit with execution waiting.

For process(), add 'no_exec':True to params.

{
  "jsonrpc": "2.0",
  "method": "prov.merge_ddc",
  "params": {"target": "ddc:xrain1",
             "source": "ddc:xrain0",
            "no_exec": true,
             "on_conflict": "nothing"
            },
  "id": "provenance_jsonrpc_id"
}

The format of the response to prov.merge_ddc() is as follows

{
  "result": {
      "id": 1243,
      "session_id": 59,
      "usename": "xuser",
      "ddc_type": "table",
      "short_form": "xrain0", "long_form": "xrain0_1611536893.622533",
      "realname": "provenance.prov_3be2965ae5125f62936aed117ea81632",
      "created_at": "2021-10-15T10:09:40.123456+09:00",
      "committed_at": null,
      "prepared_activity_id": 1344
  },
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

An example request for prov.process is as follows

{
  "jsonrpc": "2.0",
  "method": "prov.process",
  "params": {
    "method": "aggregate_geometry_table",
    "params": {
        "output_ddc": "ddc:agg_domingo",
        "input_ddc": "ddc:domingo",
        "start_datetime": "2021-10-15 10:09:30+09:00",
        "end_datetime": "2021-10-15 10:30:30+09:00",
        "spatial_extent": "{\"type":\"Polygon\",\"coordinates\":[[[139.25,35.25],[139.25,35.5],[139.625,35.5],[139.625,35.25],[139.25,35.25]]]}",
        "continuous_column_names": ["fld"],
        "discrete_column_names": [],
        "no_exec": true,
    },
  },
  "id": "provenance_jsonrpc_id"
}

The prepared statement can be checked with get_ddc_last_activity().

{
  "result": [{
      "id": 1343,
      "input_ddc_list": ["xrain1_1611536893.968535", "xrain0_1611536893.622533"],
      "output_ddc": "xrain0_1611536893.622533",
      "context": "merge_ddc(\"ddc:xrain1\",\"ddc:xrain0\",\"nothing\")",
      "query": "INSERT INTO provenance.prov_3be2965ae5125f62936aed117ea81632 SELECT * FROM provenance.prov_8482ed3baebd10c8024ec07141cef15b",
      "report": "",
      "successed": null,
      "relation_size": 0,
      "start_at": "2021-10-09 23:32:09.693272+09",
      "end_at": "2021-10-09 23:42:09.693272+09",
  }],
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

For activities that are only prepared or being processed, start_at will record the date and time the statement was prepared, but successed and end_at will be None.

Canceling prepared statements

If you want to cancel a statement after it has been prepared without executing it, or if you want to delete a statement that has been killed in the manner described below, call cancel_ddc_activity().

{
  "jsonrpc": "2.0",
  "method": "prov.cancel_ddc_activity",
  "params": {"ddc_label": "ddc:xrain0"},
  "id": "provenance_jsonrpc_id"
}

An example response is as follows

{
  "result": true,
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

The prepared_activity_id is reset (=NULL) and the associated activity record is deleted.

However, since the process already running will not be interrupted, care should be taken to check that it is not running using get_running_processes() described below, or cancel it when you know for sure that it is not yet running.

Executing prepared statements

Calling exec_ddc() on a prepared (prepared_activity_id is set) ddc will execute the statement.

{
  "jsonrpc": "2.0",
  "method": "prov.exec_ddc",
  "params": {"ddc_label": "ddc:xrain0"},
  "id": "provenance_jsonrpc_id"
}

An example response is shown below.

{
  "result": [{
      "id": 1243,
      "session_id": 59,
      "usename": "xuser",
      "ddc_type": "table",
      "short_form": "xrain0",
      "long_form": "xrain0_1611536893.622533",
      "realname": "provenance.prov_3be2965ae5125f62936aed117ea81632",
      "created_at": "2021-10-09 23:32:09.693272+09",
      "committed_at": null,
      "prepared_activity_id": null,
  }],
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

The result of execution can be checked by get_ddc_last_activity() as if the statement was prepared and executed at once.

{
  "result": [{
      "id": 1343,
      "input_ddc_list": ["xrain1_1611536893.968535", "xrain0_1611536893.622533"],
      "output_ddc": "xrain0_1611536893.622533",
      "context": "merge_ddc(\"ddc:xrain1\",\"ddc:xrain0\",\"nothing\")",
      "query": "INSERT INTO provenance.prov_3be2965ae5125f62936aed117ea81632 SELECT * FROM provenance.prov_8482ed3baebd10c8024ec07141cef15b",
      "report": "",
      "successed": true,
      "relation_size": 6791168,
      "start_at": "2021-10-09 23:32:09.693272+09",
      "end_at": "2021-10-09 23:42:09.693272+09",
  }],
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

Thread execution of prepared statements

When calling exec_ddc(), the optional parameter thread=True will create a new sub-thread for parallel execution.

{
  "jsonrpc": "2.0",
  "method": "prov.set_ddc",
  "params":{"ddc_label":"ddc:domingo","source":"event_domingo","ddc_type":"table", "no_exec": true},
  "id": "provenance_jsonrpc_id"
}

{
  "jsonrpc": "2.0",
  "method": "prov.exec_ddc",
  "params": {"ddc_label": "ddc_domingo_1611536893.622533", "thread": true},
  "id": "provenance_jsonrpc_id"
}

Note that if you are not using a session, the ddc being created in a subthread will not be committed until the process is complete, so you cannot refer to it with short_form. If you need to refer to it, specify long_form as in the example above. It is recommended to start a session with begin_session() if you want to run threads.

Wait for running process

Waits until the process running on a thread or in another process finishes. If multiple ddc's are specified, waits until the process associated with all ddc's finishes.

{
  "jsonrpc": "2.0",
  "method": "prov.wait_ddc",
  "params": {"ddcs": ["ddc:domingo"]},
  "id": "provenance_jsonrpc_id"
}

wait_ddc() actually checks the prepared_activity_id of ddc every 10 seconds to see if the process is finished and repeats until all are finished.

Get running processes

Get a list of processes running by the user.

This process refers to the PostgreSQL statistics table pg_catalog.pg_stat_activity and searches for running server-side processes that are running with exec_ddc().

Therefore, processes that are preparing and executing statements at once cannot be retrieved.

{
  "jsonrpc": "2.0",
  "method": "prov.get_running_processes",
  "id": "provenance_jsonrpc_id"
}

An example response is shown below.

{
  "result": [{
      "activity_id": 1092,
      "procpid": 8238,
      "usename": "xuser",
      "query_start": "2021-10-09 23:32:09.693272+09",
      "context": "process(\"aggregate_geometry_table\", \"{\"output_ddc\": \"ddc:agg_domingo\", \"input_ddc\": \"ddc:domingo\", \"start_datetime\": \"2018-09-15 00:00:00+09\", \"end_datetime\": datetime.datetime(2018, 9, 15, 1, 0, tzinfo=datetime.timezone(datetime.timedelta(seconds=32400))), \"spatial_extent\": \"{"type":"Polygon","coordinates":[[[139.25,35.25],[139.25,35.5],[139.625,35.5],[139.625,35.25],[139.25,35.25]]]}\", \"continuous_column_names\": [\"fld\"], \"discrete_column_names\": [], \"no_exec\": True}\")",
      "query": "SELECT analysis.aggregate_geometry_table(\"provenance.prov_9d4cd27be962241a752030a7a53e6a21\",\"error\",\"provenance.prov_04857b311ea5f93b4a152625289b4e94\",\"2018-09-15 00:00:00+09\",\"2018-09-15T01:00:00+09:00\"::timestamptz,ST_GeomFromGeoJSON(\"{\"type\":\"Polygon\",\"coordinates\":[[[139.25,35.25],[139.25,35.5],[139.625,35.5],[139.625,35.25],[139.25,35.25]]]}\"),ARRAY[\"fld\"],\"{}\")",
      "ddc": "agg_domingo_1611464268.123782"
  }],
  "id": "provenance_jsonrpc_id",
  "jsonrpc": "2.0"
}

Force termination of a running process

A user can kill a running process with stop_running_process().

{
  "jsonrpc": "2.0",
  "method": "prov.exec_ddc",
  "params": {"ddc_label": "ddc:xrain"},
  "id": "provenance_jsonrpc_id"
}

After executing the above process, you can cancel it (if it is still running) with the following request.

{
  "jsonrpc": "2.0",
  "method": "prov.stop_running_process",
  "params": {"ddc_label": "ddc:xrain"},
  "id": "provenance_jsonrpc_id"
}

In case of forced termination, get_running_processes() will no longer show them.

The prepared statement will remain (prepared_activity_id will remain set) and can be rerun or canceled by running cancel_ddc_activity().