Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bdit_dag_utils
2 changes: 1 addition & 1 deletion collisions/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ accyear | n_collisions | n_involved
2018 | 62323 | 108142
2019 | 64209 | 107997

but be aware that due to the ever-refreshing nature of collisions mentioned in the [Readme.md](Readme.md), these numbers will surely change by small amounts with time.
but be aware that due to the ever-refreshing nature of collisions mentioned in the [readme.md](readme.md), these numbers will surely change by small amounts with time.

For the vast majority of records, it's possible to determine whether the collision event comes from TPS or CRC from the length of the `ACCNB` field. A value other than `NULL` in the `events.changed` field indicates that some aspect of the collision data (usually information about people involved in collision events) has been validated. If we want the number of events, involved and validated involved from the data subdivided by year and data source, we'd do:

Expand Down
67 changes: 67 additions & 0 deletions collisions/Readme.md → collisions/readme.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
- [Collisions](#collisions)
- [Data Sources \& Pipelines](#data-sources--pipelines)
- [Table Structure on the `bigdata` PostgreSQL Database](#table-structure-on-the-bigdata-postgresql-database)
- [Collisions Schema (`events` and `involved` Tables)](#collisions-schema-events-and-involved-tables)
- [Important Limitations and Caveats About `collisions.acc`](#important-limitations-and-caveats-about-collisionsacc)
- [The Daily Collision Replicator](#the-daily-collision-replicator)
- [Replicating New Tables](#replicating-new-tables)
- [Updating Existing Tables](#updating-existing-tables)
- [Changing Column Names During Replication](#changing-column-names-during-replication)
- [Using Updatable VIEW as replication destination](#using-updatable-view-as-replication-destination)
- [Using TABLE + VIEW](#using-table--view)


# Collisions

The collisions dataset contains information about traffic collisions and the people involved, that occurred in the City of Toronto’s Right-of-Way, from approximately 1985 to present, as reported by Toronto Police Services (TPS) or Collision Reporting Centres (CRC). Most of the information in this document pertains to collision data stored in the `bigdata` PostgreSQL database.
Expand Down Expand Up @@ -101,3 +114,57 @@ ALTER TABLE collisions.a OWNER TO collisions_bot;
### Updating Existing Tables

If you need to update an existing table to match any new modifications introduced by the MOVE team, e.g., dropping columns or changing column types, you should update the replicated table definition according to these changes. If the updated table has dependencies, you need to save and drop them to apply the new changes and then update these dependencies and re-create them again. The [`public.deps_save_and_drop_dependencies_dryrun`](https://github.com/CityofToronto/bdit_pgutils/blob/master/create-function-deps_save_and_drop_dependencies_dryrun.sql) and `public.deps_restore_dependencies` functions might help updating the dependencies in complex cases. Finally, if there are any changes in the table's name or schema, you should also update the `collisions_tables` variable.

### Changing Column Names During Replication

#### Using Updatable VIEW as replication destination

If you want to use different column names in the destination table than the source table, use the following method:
1. Alter the column names in the destination table to your liking:
```sql
ALTER TABLE IF EXISTS traffic.centreline2_midblocks RENAME centreline_id TO midblock_id;
```
2. Create a view which effectively reverses that change, so the column names match the source table:
```sql
CREATE OR REPLACE VIEW traffic.centreline2_midblocks_view AS
SELECT
--midblock_id is the name in the destination table, centreline_id is the name in the source table
centreline2_midblocks.midblock_id AS centreline_id,
--all the other columns without renaming:
centreline2_midblocks.midblock_name,
...
FROM traffic.centreline2_midblocks;

ALTER TABLE traffic.centreline2_midblocks_view OWNER TO traffic_bot;
COMMENT ON VIEW traffic.centreline2_midblocks_view IS '(DO NOT USE) Use `traffic.centreline2_midblocks` instead.';

--same permissions are the same as `traffic.centreline2_midblocks` except:
REVOKE ALL ON TABLE traffic.centreline2_midblocks_view FROM bdit_humans;
```
3. Update the Airflow variable with the replication table pair.
`["move_staging.centreline2_midblocks", "traffic.centreline2_midblocks"]` becomes:
```json
[
"move_staging.centreline2_midblocks", --source
"traffic.centreline2_midblocks_view", --insert target (updatable VIEW)
"traffic.centreline2_midblocks" --target for `COMMENT ON TABLE/VIEW...`
]
```

#### Using TABLE + VIEW

If you want to make more extensive changes to the final data than renaming columns, you can also use this alternate method:

1. Move replication table destination to `traffic_staging`

2. Create a view that references new `traffic_staging` destination in `traffic`.

3. Update the Airflow variable with the replication table pair:
`["move_staging.centreline2_midblocks", "traffic.centreline2_midblocks"]` becomes:
```json
[
"move_staging.centreline2_midblocks", --source
"traffic_staging.centreline2_midblocks" --data gets inserted here
"traffic.centreline2_midblocks_view", --target for `COMMENT ON TABLE/VIEW...` and also a view which transforms `traffic_staging` data
]
```
2 changes: 1 addition & 1 deletion dags/replicator_table_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
DAG_NAME = 'replicator_table_check'
DAG_OWNERS = Variable.get("dag_owners", deserialize_json=True).get(DAG_NAME, ["Unknown"])

README_PATH = os.path.join(repo_path, 'collisions/Readme.md')
README_PATH = os.path.join(repo_path, 'collisions/readme.md')
DOC_MD = get_readme_docmd(README_PATH, DAG_NAME)

default_args = {
Expand Down
27 changes: 21 additions & 6 deletions dags/replicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import os
import sys
from functools import partial
from datetime import timedelta
import pendulum
from pendulum import datetime, duration
# pylint: disable=import-error
from airflow.decorators import dag, task, task_group
from airflow.models import Variable
Expand Down Expand Up @@ -98,8 +97,24 @@ def status_message(tables, **context):

return generated_dag

#get replicator details from airflow variable
REPLICATORS = Variable.get('replicators', deserialize_json=True)
"""Dictionary of replicator DAGs to create

dag_name: dag_id
tables: name of Airflow variable to get list of (src/dest) tables to replicate
conn: name of Airflow connection of bot used for replication
"""
REPLICATORS = {
"counts": {
"dag_name": "counts_replicator",
"tables": "counts_tables",
"conn": "traffic_bot"
},
"collisions": {
"dag_name": "collisions_replicator",
"tables": "collisions_tables",
"conn": "collisions_bot"
}
}

#generate replicator DAGs from dict
for replicator, dag_items in REPLICATORS.items():
Expand All @@ -109,10 +124,10 @@ def status_message(tables, **context):
default_args = {
"owner": ",".join(DAG_OWNERS),
"depends_on_past": False,
"start_date": pendulum.datetime(2023, 10, 31, tz="America/Toronto"),
"start_date": datetime(2023, 10, 31, tz="America/Toronto"),
"email_on_failure": False,
"retries": 3,
"retry_delay": timedelta(minutes=60),
"retry_delay": duration(minutes=60),
"on_failure_callback": task_fail_slack_alert,
}

Expand Down
Loading