Skip to content

Controlling the execution of processing (case study)

This document has been machine translated.

This document describes how to check statements associated with a DDC based on a possible case.

All execution methods use JSON-RPC v2.0.

For detailed usage of each method, please refer to Table View Operation or Operation on DDC.

Prepare and execute the statement

set_ddc(), join_ddc(), merge_ddc(), and process() first create an activity record that holds the statement to be executed, and then execute the query contained in the activity on the DB. The execution result is also recorded in activity.

The following is an example to check after executing merge_ddc.

{
  "jsonrpc": "2.0",
  "method": "prov.merge_ddc",
  "params": {"target": "ddc:xrain1",
             "source": "ddc:xrain0",
             "on_conflict": "nothing"
            },
  "id": "occurrence_jsonrpc_id"
}

An example result of merge_ddc is shown below.

{
  "result": {
  "id": 1243,
  "session_id": 59,
  "usename": "xuser",
  "ddc_type": "table",
  "short_form": "xrain0", "long_form": "xrain0_1611536893.622533",
  "realname": "provenance.prov_3be2965ae5125f62936aed117ea81632",
  "created_at": "2021-10-09 23:32:09.693272+09",
  "committed_at": null,
  "prepared_activity_id": null
}
,
  "id": "occurrence_jsonrpc_id",
  "jsonrpc": "2.0"
}

The executed activity can be checked with get_ddc_last_activity().

{
  "jsonrpc": "2.0",
  "method": "prov.get_ddc_activities",
  "params": {"ddc_label": "ddc:xrain0"},
  "id": "occurrence_jsonrpc_id"
}
{
  "result": [{
      "id": 1343,
      "input_ddc_list": ["xrain1_1611536893.968535", "xrain0_1611536893.622533"],
      "output_ddc": "xrain0_1611536893.622533",
      "context": "merge_ddc(\"ddc:xrain1\",\"ddc:xrain0\",\"nothing")",
      "query": "INSERT INTO provenance.prov_3be2965ae5125f62936aed117ea81632 SELECT * FROM provenance.prov_8482ed3baebd10c8024ec07141cef15b",
      "report": "",
      "successed": true,
      "relation_size": 6791168,
      "start_at": "2021-10-09 23:32:09.693272+09",,
      "end_at": "2021-10-09 23:42:09.693272+09",
  }],
  "id": "occurrence_jsonrpc_id",
  "jsonrpc": "2.0"
}

For each executed activity, start_at is the date and time when the statement was prepared, and end_at is the date and time when it was finished.

If it is successful, successed will be set to true and relation_size will be the size of the output table (or 0 if the output is a View). If it fails, successed will be false and an error message will be logged in report.

Prepare statements only (do not execute).

If the optional parameter no_exec=True is set when calling set_ddc(), join_ddc(), merge_ddc(), or process(), the If the optional parameter no_exec=True is set when calling merge_ddc(), merge_ddc(), or process(), it will abort after preparing the statement and recording it in the prepared_activity_id of the destination ddc, and exit with waiting for execution.

For process(), add 'no_exec':True to params.

In the case of process()', add 'no_exec':True to params.

{
  "jsonrpc": "2.0",
  "method": "prov.merge_ddc",
  "params": {"target": "ddc:xrain1",
             "source": "ddc:xrain0",
            "no_exec": true,
             "on_conflict": "nothing"
            },
  "id": "occurrence_jsonrpc_id"
}

The format of the response to prov.merge_ddc() is as follows.

{
  "result": {
      "id": 1243,
      "session_id": 59,
      "usename": "xuser",
      "ddc_type": "table",
      "short_form": "xrain0", "long_form": "xrain0_1611536893.622533",
      "realname": "provenance.prov_3be2965ae5125f62936aed117ea81632",
      "created_at": "2021-10-15T10:09:40.123456+09:00",
      "committed_at": null,
      "prepared_activity_id": 1344
  },
  "id": "occurrence_jsonrpc_id",
  "jsonrpc": "2.0"
}

An example request for prov.process is as follows.

{
  "jsonrpc": "2.0",
  "method": "prov.process",
  "params": {
    "method": "aggregate_geometry_table",
    "params": {
        "output_ddc": "ddc:agg_domingo",
        "input_ddc": "ddc:domainingo",
        "start_datetime": "2021-10-15 10:09:30+09:00",
        "end_datetime": "2021-10-15 10:30:30+09:00",
        "spatial_extent": "{\"type":\"Polygon\",\"coordinates":[[[139.25,35.25],[139.25,35.5],[139.625,35.5],[139.625,35.25],[139.25,35.25]]]}",
        "continuous_column_names": ["fld"],
        "discrete_column_names": [],
        "no_exec": true,
    },
  },
  "id": "occurrence_jsonrpc_id"
}

The prepared statements can be checked with get_ddc_last_activity().

{
  "result": [{
      "id": 1343,
      "input_ddc_list": ["xrain1_1611536893.968535", "xrain0_1611536893.622533"],
      "output_ddc": "xrain0_1611536893.622533",
      "context": "merge_ddc(\"ddc:xrain1\",\"ddc:xrain0\",\"nothing")",
      "query": "INSERT INTO provenance.prov_3be2965ae5125f62936aed117ea81632 SELECT * FROM provenance.prov_8482ed3baebd10c8024ec07141cef15b",
      "report": "",
      "successed": null,
      "relation_size": 0,
      "start_at": "2021-10-09 23:32:09.693272+09",
      "end_at": "2021-10-09 23:42:09.693272+09",
  }],
  "id": "occurrence_jsonrpc_id",
  "jsonrpc": "2.0"
}

For activities that are only prepared or in process, start_at will record the date and time the statement was prepared, but successed and end_at will be None.

Cancel a prepared statement

If you have prepared a statement and want to cancel it without executing it, or if you want to delete a statement that has been killed in the way described below, call cancel_ddc_activity().

If you want to cancel a statement without executing it after it has been prepared, or if you want to delete a terminated statement in the way described below, call cancel_ddc_activity().

{
  "jsonrpc": "2.0",
  "method": "prov.cancel_ddc_activity",
  "params": {"ddc_label": "ddc:xrain0"},
  "id": "occurrence_jsonrpc_id"
}

An example response is as follows

{
  "result": true,
  "id": "prosperity_jsonrpc_id",
  "jsonrpc": "2.0"
}

The prepared_activity_id will be reset (=NULL) and the associated activity record will be deleted.

However, it does not suspend the process that is already running, so you should be careful to check that it is not running using get_running_processes() described below, or cancel it when you know for sure that it is not running yet.

Prepared statement execution

Calling exec_ddc() on a prepared (prepared_activity_id is set) ddc will execute the statement.

{
  "jsonrpc": "2.0",
  "method": "prov.exec_ddc",
  "params": {"ddc_label": "ddc:xrain0"},
  "id": "occurrence_jsonrpc_id"
}

An example response is shown below.

{
  "result": [{
      "id": 1243,
      "session_id": 59,
      "usename": "xuser",
      "ddc_type": "table",
      "short_form": "xrain0",
      "long_form": "xrain0_1611536893.622533",
      "realname": "provenance.prov_3be2965ae5125f62936aed117ea81632",
      "created_at": "2021-10-09 23:32:09.693272+09",
      "committed_at": null,
      "prepared_activity_id": null,
  }],
  "id": "occurrence_jsonrpc_id",
  "jsonrpc": "2.0"
}

You can check the execution result by using get_ddc_last_activity() as well as preparing and executing the statement at once.

{
  "result": [{
      "id": 1343,
      "input_ddc_list": ["xrain1_1611536893.968535", "xrain0_1611536893.622533"],
      "output_ddc": "xrain0_1611536893.622533",
      "context": "merge_ddc(\"ddc:xrain1\",\"ddc:xrain0\",\"nothing")",
      "query": "INSERT INTO provenance.prov_3be2965ae5125f62936aed117ea81632 SELECT * FROM provenance.prov_8482ed3baebd10c8024ec07141cef15b",
      "report": "",
      "successed": true,
      "relation_size": 6791168,
      "start_at": "2021-10-09 23:32:09.693272+09",
      "end_at": "2021-10-09 23:42:09.693272+09",
  }],
  "id": "occurrence_jsonrpc_id",
  "jsonrpc": "2.0"
}

Thread execution of prepared statements.

When calling exec_ddc() with the optional parameter thread=True, a new sub-thread will be created and executed in parallel.

{
  "jsonrpc": "2.0",
  "method": "prov.set_ddc",
  "params":{"ddc_label": "ddc:domainingo", "source": "event_domingo", "ddc_type": "table", "no_exec": true},
  "id": "occurrence_jsonrpc_id"
}

{
  "jsonrpc": "2.0",
  "method": "prov.exec_ddc",
  "params": {"ddc_label": "ddc_domingo_1611536893.622533", "thread": true}
  "id": "prosperity_jsonrpc_id"
}

Note that if you are not using a session, ddc being created in a sub-thread will not be committed until the process is complete, so it cannot be referenced in short_form.

If you need to refer to it, specify long_form as in the example above.

If you need to refer to it, specify long_form as shown in the example above. If you are in a session, you can refer to it with short_form, so it is recommended to start a session with begin_session() if you are going to thread execution.

Wait for a running process

Waits for a process running in a thread or in another process to finish. If more than one ddc is specified, it will wait until the processes associated with all the ddc's are finished.

{
  "jsonrpc": "2.0",
  "method": "prov.wait_ddc",
  "params": {"ddcs": ["ddc:domingo"]},
  "id": "occurrence_jsonrpc_id"
}

wait_ddc() actually checks the prepared_activity_id of ddc every 10 seconds to see if the process is finished, and repeats until it is all finished.

Get the running processes

Get the list of processes that the user is running.

This process refers to the PostgreSQL statistics table pg_catalog.pg_stat_activity and searches for running server-side processes with exec_ddc().

Therefore, processes that are preparing and executing statements at once will not be retrieved.

The following example uses

{
  "jsonrpc": "2.0",
  "method": "prov.get_running_processes",
  "id": "convenience_jsonrpc_id"
}

An example response is shown below.

{
  "result": [{
      "activity_id": 1092,
      "procpid": 8238,
      "usename": "xuser",
      "query_start": "2021-10-09 23:32:09.693272+09",
      "context": "process(\"aggregate_geometry_table\", \"{\"output_ddc\": \"ddc:agg_domingo\", \"input_ddc\": \"ddc:domingo\", \"start_datetime\": \"2018-09-15 00:00:00+09\", \"end_datetime\": datetime.datetime(2018, 9, 15, 1, 0, tzinfo=datetime.timezone(datetime.timedelta(seconds=32400))), \"spatial_extent\": \"{"type": "Polygon", "coordinates":[[[139.25,35.25],[139.25,35.5],[139.625,35.5],[139.625,35.25],[139.25,35.25]]]}\", \"continuous_column_names": [\"fld\"], \"discrete_column_names": [], \"no_exec\": True}}")",
      "query": "SELECT analysis.aggregate_geometry_table(\"occurrence.prov_9d4cd27be962241a752030a7a53e6a21\",\"error\",\"occurrence.prov_04857b311ea5f93b4a152625289b4e94\",\"2018-09-15 00:00:00+09\",\"2018-09-15T01:00:00+09:00\"::timestamptz,ST_GeomFromGeoJSON({\"type\":\"Polygon\",\"coordinates\":[[[139.25,35.25],[139.25,35.5],[139.625,35.5],[139.625,35.25],[139.25,35.25]]]}"),ARRAY[\"fld\"],\"{}\")",
      "ddc": "agg_domingo_1611464268.123782"
  }],
  "id": "prosperity_jsonrpc_id",
  "jsonrpc": "2.0"
}

Force termination of running processes

A user can kill a running process with stop_running_process().

{
  "jsonrpc": "2.0",
  "method": "prov.exec_ddc",
  "params": {"ddc_label": "ddc:xrain"},
  "id": "occurrence_jsonrpc_id"
}

After executing the above process, you can cancel it (if it is still running) with the following request.

{
  "jsonrpc": "2.0",
  "method": "prov.stop_running_process",
  "params": {"ddc_label": "ddc:xrain"},
  "id": "occurrence_jsonrpc_id"
}

If you force quit, it will not be displayed in get_running_processes().

The prepared statements will remain intact (prepared_activity_id is still set), so you can re-run them or cancel them by running cancel_ddc_activity().