Skip to content

Commit 2171b8f

Browse files
authored
Set actionTracingId to tracingId for editableMappingUpdates (#8361)
* Set actionTracingId to tracingId for editableMappingUpdates * fix sql syntax * include annotation id * wip repair update actions script * wip fetch relevant updates * iterate on repariring updates * put updated updates * skip the reverse, the order in which we deal with the update groups doesnt matter here * undo application.conf change
1 parent 014a8e8 commit 2171b8f

File tree

4 files changed

+189
-6
lines changed

4 files changed

+189
-6
lines changed

tools/migration-unified-annotation-versioning/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ counts.py
44
logs/
55
*.dat
66
result.json
7+
mapping_tracing_mapping.json
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import logging
2+
from utils import setup_logging, log_since
3+
import argparse
4+
from connections import connect_to_fossildb, connect_to_postgres, assert_grpc_success
5+
import psycopg2
6+
import psycopg2.extras
7+
import time
8+
import fossildbapi_pb2 as proto
9+
import VolumeTracing_pb2 as Volume
10+
from typing import Optional
11+
import msgspec
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
def main():
17+
logger.info("Hello from find_mapping_tracing_mapping")
18+
setup_logging()
19+
parser = argparse.ArgumentParser()
20+
parser.add_argument("--src", type=str, help="Source fossildb host and port. Example: localhost:7155", required=True)
21+
parser.add_argument("--postgres", help="Postgres connection specifier, default is postgresql://postgres@localhost:5432/webknossos", type=str, default="postgresql://postgres@localhost:5432/webknossos")
22+
args = parser.parse_args()
23+
before = time.time()
24+
annotations = read_annotation_list(args)
25+
src_stub = connect_to_fossildb(args.src, "source")
26+
mappings = {}
27+
for annotation in annotations:
28+
annotation_id = annotation["_id"]
29+
id_mapping_for_annotation = {}
30+
for tracing_id, layer_type in annotation["layers"].items():
31+
if layer_type == 'Volume':
32+
try:
33+
editable_mapping_id = get_editable_mapping_id(src_stub, tracing_id, layer_type)
34+
if editable_mapping_id is not None:
35+
id_mapping_for_annotation[editable_mapping_id] = tracing_id
36+
except Exception as e:
37+
logger.info(f"exception while checking layer {tracing_id} of {annotation_id}: {e}")
38+
if id_mapping_for_annotation:
39+
mappings[annotation_id] = id_mapping_for_annotation
40+
41+
outfile_name = "mapping_tracing_mapping.json"
42+
logger.info(f"Writing mapping to {outfile_name}...")
43+
with open(outfile_name, "wb") as outfile:
44+
outfile.write(msgspec.json.encode(mappings))
45+
46+
log_since(before, f"Wrote full id mapping to {outfile_name}. Checked {len(annotations)} annotations, wrote {len(mappings)} annotation id mappings.")
47+
48+
49+
def get_newest_tracing_raw(src_stub, tracing_id, collection) -> Optional[bytes]:
50+
getReply = src_stub.Get(
51+
proto.GetRequest(collection=collection, key=tracing_id, mayBeEmpty=True)
52+
)
53+
assert_grpc_success(getReply)
54+
return getReply.value
55+
56+
57+
def get_editable_mapping_id(src_stub, tracing_id: str, layer_type: str) -> Optional[str]:
58+
if layer_type == "Skeleton":
59+
return None
60+
tracing_raw = get_newest_tracing_raw(src_stub, tracing_id, "volumes")
61+
if tracing_raw is None:
62+
return None
63+
volume = Volume.VolumeTracing()
64+
volume.ParseFromString(tracing_raw)
65+
if volume.hasEditableMapping:
66+
return volume.mappingName
67+
return None
68+
69+
70+
def read_annotation_list(args):
71+
before = time.time()
72+
connection = connect_to_postgres(args.postgres)
73+
cursor = connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
74+
cursor.execute(f"SELECT COUNT(*) FROM webknossos.annotations")
75+
annotation_count = cursor.fetchone()['count']
76+
logger.info(f"Loading infos of {annotation_count} annotations from postgres ...")
77+
query = f"""SELECT
78+
a._id,
79+
JSON_OBJECT_AGG(al.tracingId, al.typ) AS layers,
80+
JSON_OBJECT_AGG(al.tracingId, al.name) AS layerNames
81+
FROM webknossos.annotation_layers al
82+
JOIN webknossos.annotations a on al._annotation = a._id
83+
GROUP BY a._id
84+
"""
85+
cursor.execute(query)
86+
annotations = cursor.fetchall()
87+
log_since(before, "Loading annotation infos from postgres")
88+
return annotations
89+
90+
if __name__ == '__main__':
91+
main()

tools/migration-unified-annotation-versioning/migration.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def build_mapping_id_map(self, annotation) -> MappingIdMap:
105105
mapping_id_map[tracing_id] = editable_mapping_id
106106
return mapping_id_map
107107

108-
def fetch_updates(self, tracing_or_mapping_id: str, layer_type: str, collection: str, json_encoder, json_decoder) -> Tuple[List[Tuple[int, int, bytes]], bool]:
108+
def fetch_updates(self, tracing_id: str, tracing_or_mapping_id: str, layer_type: str, collection: str, json_encoder, json_decoder) -> Tuple[List[Tuple[int, int, bytes]], bool]:
109109
batch_size = 100
110110
newest_version = self.get_newest_version(tracing_or_mapping_id, collection)
111111
updates_for_layer = []
@@ -118,7 +118,7 @@ def fetch_updates(self, tracing_or_mapping_id: str, layer_type: str, collection:
118118
for version, update_group in reversed(update_groups):
119119
if version > next_version:
120120
continue
121-
update_group, timestamp, revert_source_version = self.process_update_group(tracing_or_mapping_id, layer_type, update_group, json_encoder, json_decoder)
121+
update_group, timestamp, revert_source_version = self.process_update_group(tracing_id, layer_type, update_group, json_encoder, json_decoder)
122122
if revert_source_version is not None:
123123
next_version = revert_source_version
124124
included_revert = True
@@ -135,7 +135,7 @@ def includes_revert(self, annotation) -> bool:
135135
layers = list(annotation["layers"].items())
136136
for tracing_id, layer_type in layers:
137137
collection = self.update_collection_for_layer_type(layer_type)
138-
_, layer_included_revert = self.fetch_updates(tracing_id, layer_type, collection, json_encoder=json_encoder, json_decoder=json_decoder)
138+
_, layer_included_revert = self.fetch_updates(tracing_id, tracing_id, layer_type, collection, json_encoder=json_encoder, json_decoder=json_decoder)
139139
if layer_included_revert:
140140
return True
141141
return False
@@ -148,12 +148,12 @@ def migrate_updates(self, annotation, mapping_id_map: MappingIdMap) -> LayerVers
148148
tracing_ids_and_mapping_ids = []
149149
for tracing_id, layer_type in layers:
150150
collection = self.update_collection_for_layer_type(layer_type)
151-
layer_updates, _ = self.fetch_updates(tracing_id, layer_type, collection, json_encoder=json_encoder, json_decoder=json_decoder)
151+
layer_updates, _ = self.fetch_updates(tracing_id, tracing_id, layer_type, collection, json_encoder=json_encoder, json_decoder=json_decoder)
152152
all_update_groups.append(layer_updates)
153153
tracing_ids_and_mapping_ids.append(tracing_id)
154154
if tracing_id in mapping_id_map:
155155
mapping_id = mapping_id_map[tracing_id]
156-
layer_updates, _ = self.fetch_updates(mapping_id, "editableMapping", "editableMappingUpdates", json_encoder=json_encoder, json_decoder=json_decoder)
156+
layer_updates, _ = self.fetch_updates(tracing_id, mapping_id, "editableMapping", "editableMappingUpdates", json_encoder=json_encoder, json_decoder=json_decoder)
157157
all_update_groups.append(layer_updates)
158158
tracing_ids_and_mapping_ids.append(mapping_id)
159159

@@ -239,7 +239,7 @@ def process_update_group(self, tracing_id: str, layer_type: str, update_group_ra
239239

240240
# add actionTracingId
241241
if not name == "updateTdCamera":
242-
update["value"]["actionTracingId"] = tracing_id
242+
update["value"]["actionTracingId"] = tracing_id # even for mappings, this is the tracing_id of their corresponding volume layer
243243

244244
# identify compact update actions, and mark them
245245
if (name == "updateBucket" and "position" not in update_value) \
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import logging
2+
from utils import setup_logging, log_since, batch_range
3+
import argparse
4+
from connections import connect_to_fossildb, assert_grpc_success
5+
import time
6+
import fossildbapi_pb2 as proto
7+
from typing import List, Tuple
8+
import msgspec
9+
10+
logger = logging.getLogger("migration-logs")
11+
12+
13+
def main():
14+
logger.info("Hello from repair_editable_mapping_updates")
15+
setup_logging()
16+
parser = argparse.ArgumentParser()
17+
parser.add_argument("--fossil", type=str, help="Fossildb host and port. Example: localhost:7155", required=True)
18+
parser.add_argument("--id_mapping", type=str, help="json file containing the id mapping determined by find_mapping_tracing_mapping.py", required=True)
19+
args = parser.parse_args()
20+
before = time.time()
21+
stub = connect_to_fossildb(args.fossil, "target")
22+
23+
json_encoder = msgspec.json.Encoder()
24+
json_decoder = msgspec.json.Decoder()
25+
with open(args.id_mapping, "rb") as infile:
26+
id_mapping = json_decoder.decode(infile.read())
27+
for annotation_id in id_mapping.keys():
28+
repair_updates_of_annotation(stub, annotation_id, id_mapping[annotation_id], json_encoder, json_decoder)
29+
30+
log_since(before, f"Repairing all {len(id_mapping)} annotations")
31+
32+
33+
def repair_updates_of_annotation(stub, annotation_id, id_mapping_for_annotation, json_encoder, json_decoder):
34+
get_batch_size = 100 # in update groups
35+
put_buffer_size = 100 # in update groups
36+
37+
before = time.time()
38+
put_buffer = []
39+
changed_update_count = 0
40+
newest_version = get_newest_version(stub, annotation_id, "annotationUpdates")
41+
if newest_version > 10000:
42+
logger.info(f"Newest version of {annotation_id} is {newest_version}. This may take some time...")
43+
for batch_start, batch_end in list(batch_range(newest_version + 1, get_batch_size)):
44+
update_groups_batch = get_update_batch(stub, annotation_id, batch_start, batch_end - 1)
45+
for version, update_group_bytes in update_groups_batch:
46+
update_group = json_decoder.decode(update_group_bytes)
47+
group_changed = False
48+
for update in update_group:
49+
if "value" in update:
50+
update_value = update["value"]
51+
if "actionTracingId" in update_value and update_value["actionTracingId"] in id_mapping_for_annotation:
52+
update_value["actionTracingId"] = id_mapping_for_annotation[update_value["actionTracingId"]]
53+
group_changed = True
54+
changed_update_count += 1
55+
if group_changed:
56+
versioned_key_value_pair = proto.VersionedKeyValuePairProto()
57+
versioned_key_value_pair.key = annotation_id
58+
versioned_key_value_pair.version = version
59+
versioned_key_value_pair.value = json_encoder.encode(update_group)
60+
put_buffer.append(versioned_key_value_pair)
61+
if len(put_buffer) >= put_buffer_size:
62+
put_multiple_keys_versions(stub, "annotationUpdates", put_buffer)
63+
put_buffer = []
64+
if len(put_buffer) > 0:
65+
put_multiple_keys_versions(stub, "annotationUpdates", put_buffer)
66+
log_since(before, f"Repaired {changed_update_count} updates of annotation {annotation_id},")
67+
68+
69+
def put_multiple_keys_versions(stub, collection: str, to_put) -> None:
70+
reply = stub.PutMultipleKeysWithMultipleVersions(proto.PutMultipleKeysWithMultipleVersionsRequest(collection=collection, versionedKeyValuePairs = to_put))
71+
assert_grpc_success(reply)
72+
73+
74+
def get_update_batch(stub, annotation_id: str, batch_start: int, batch_end_inclusive: int) -> List[Tuple[int, bytes]]:
75+
reply = stub.GetMultipleVersions(
76+
proto.GetMultipleVersionsRequest(collection="annotationUpdates", key=annotation_id, oldestVersion=batch_start, newestVersion=batch_end_inclusive)
77+
)
78+
assert_grpc_success(reply)
79+
return list(zip(reply.versions, reply.values))
80+
81+
82+
def get_newest_version(stub, tracing_id: str, collection: str) -> int:
83+
reply = stub.Get(
84+
proto.GetRequest(collection=collection, key=tracing_id, mayBeEmpty=True)
85+
)
86+
assert_grpc_success(reply)
87+
return reply.actualVersion
88+
89+
90+
if __name__ == '__main__':
91+
main()

0 commit comments

Comments
 (0)