Skip to content

Commit 6c50dba

Browse files
authored
Merge pull request #7775 from systeminit/jobelenus/mtm-fk-await-fixes
Await fixes for SQL write commands
2 parents b85efd7 + dd86b2b commit 6c50dba

File tree

1 file changed

+57
-36
lines changed

1 file changed

+57
-36
lines changed

app/web/src/workers/webworker.ts

Lines changed: 57 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -779,13 +779,13 @@ const handleHammer = async (db: Database, msg: WorkspaceAtomMessage) => {
779779
span.setAttributes({
780780
needToInsertMTM: true,
781781
});
782-
const inserted = insertAtomMTM(db, msg.atom, indexChecksum);
783-
span.setAttribute("insertedMTM", await inserted);
782+
const inserted = await insertAtomMTM(db, msg.atom, indexChecksum);
783+
span.setAttribute("insertedMTM", inserted);
784784
}
785785

786-
updateChangeSetWithNewIndex(db, msg.atom);
786+
await updateChangeSetWithNewIndex(db, msg.atom);
787787
span.setAttribute("updatedWithNewIndex", true);
788-
removeOldIndex(db, span);
788+
await removeOldIndex(db, span);
789789

790790
if (
791791
COMPUTED_KINDS.includes(msg.atom.kind) ||
@@ -899,7 +899,15 @@ const bulkInsertAtomMTMs = async (
899899
on conflict (index_checksum, kind, args) do update set checksum=excluded.checksum
900900
;`;
901901

902-
await dbWrite(db, { sql, bind });
902+
try {
903+
await dbWrite(db, { sql, bind });
904+
} catch (err: unknown) {
905+
const span = trace.getActiveSpan();
906+
span?.addEvent("error", {
907+
source: "bulkInsertAtomMTMs",
908+
error: err instanceof Error ? err.toString() : "unknown",
909+
});
910+
}
903911
}
904912
};
905913

@@ -1233,7 +1241,7 @@ const handleDeploymentPatchMessage = async (
12331241
modifications.push({ ...p, data: doc, checksum: p.toChecksum });
12341242
}
12351243

1236-
if (inserts.length > 0) writeDeploymentAtoms(db, inserts);
1244+
if (inserts.length > 0) await writeDeploymentAtoms(db, inserts);
12371245

12381246
if (removals.length > 0) {
12391247
const bind: string[] = [];
@@ -1250,7 +1258,7 @@ const handleDeploymentPatchMessage = async (
12501258
});
12511259
}
12521260

1253-
if (modifications.length > 0) writeDeploymentAtoms(db, modifications);
1261+
if (modifications.length > 0) await writeDeploymentAtoms(db, modifications);
12541262

12551263
const kinds: Set<GlobalEntity> = new Set();
12561264
[...modifications, ...inserts, ...removals]
@@ -1309,7 +1317,7 @@ const handleWorkspacePatchMessage = async (
13091317
if (data.patches.length === 0) {
13101318
try {
13111319
await initIndexAndChangeSet(db, data.meta, span);
1312-
updateChangeSetWithNewIndex(db, data.meta);
1320+
await updateChangeSetWithNewIndex(db, data.meta);
13131321
} catch (err) {
13141322
error("Failed to handle empty patch", data);
13151323
}
@@ -1411,9 +1419,9 @@ const handleWorkspacePatchMessage = async (
14111419
(a) => a.kind === EntityKind.IncomingConnections,
14121420
);
14131421

1414-
updateChangeSetWithNewIndex(db, data.meta);
1422+
await updateChangeSetWithNewIndex(db, data.meta);
14151423
span.setAttribute("updatedWithNewIndex", true);
1416-
removeOldIndex(db, span);
1424+
await removeOldIndex(db, span);
14171425

14181426
await Promise.all(
14191427
nonListAtomsToBust.map(async (atom) =>
@@ -1485,7 +1493,7 @@ const handlePatchOperations = async (
14851493
checksum: op.atom.toChecksum,
14861494
}));
14871495
const bulkMtmStart = performance.now();
1488-
bulkInsertAtomMTMs(db, noops, indexChecksum);
1496+
await bulkInsertAtomMTMs(db, noops, indexChecksum);
14891497
span.setAttribute(
14901498
"performance.NoopBulkMtm",
14911499
performance.now() - bulkMtmStart,
@@ -1576,14 +1584,14 @@ const handlePatchOperations = async (
15761584
// Ok we have all the patches we could apply, insert them into the database
15771585
if (atomsToInsert.length > 0) {
15781586
const startCreate = performance.now();
1579-
bulkCreateAtoms(db, atomsToInsert);
1587+
await bulkCreateAtoms(db, atomsToInsert);
15801588
span.setAttribute(
15811589
"performance.bulkCreateAtoms",
15821590
performance.now() - startCreate,
15831591
);
15841592

15851593
const startMtm = performance.now();
1586-
bulkInsertAtomMTMs(db, atomsToInsert, indexChecksum);
1594+
await bulkInsertAtomMTMs(db, atomsToInsert, indexChecksum);
15871595
span.setAttribute(
15881596
"performance.bulkCreateMtms",
15891597
performance.now() - startMtm,
@@ -1612,8 +1620,8 @@ const handlePatchOperations = async (
16121620
continue;
16131621
}
16141622

1615-
removeAtom(db, indexChecksum, atom.kind, atom.id, atom.checksum);
1616-
postProcess(
1623+
await removeAtom(db, indexChecksum, atom.kind, atom.id, atom.checksum);
1624+
await postProcess(
16171625
db,
16181626
workspaceId,
16191627
changeSetId,
@@ -1641,7 +1649,7 @@ const handlePatchOperations = async (
16411649
);
16421650

16431651
if (doc)
1644-
postProcess(
1652+
await postProcess(
16451653
db,
16461654
workspaceId,
16471655
changeSetId,
@@ -1660,7 +1668,7 @@ const handlePatchOperations = async (
16601668

16611669
for (const atom of atomsToInsert) {
16621670
try {
1663-
postProcess(
1671+
await postProcess(
16641672
db,
16651673
workspaceId,
16661674
changeSetId,
@@ -1828,8 +1836,8 @@ const mjolnirBulk = async (
18281836
}
18291837
}
18301838

1831-
bulkCreateAtoms(db, cachedAtoms);
1832-
bulkInsertAtomMTMs(db, cachedAtoms, indexChecksum);
1839+
await bulkCreateAtoms(db, cachedAtoms);
1840+
await bulkInsertAtomMTMs(db, cachedAtoms, indexChecksum);
18331841

18341842
const pattern = [
18351843
"v2",
@@ -1922,16 +1930,16 @@ const mjolnirBulk = async (
19221930
);
19231931
await handleHammer(db, msg);
19241932

1925-
bulkCreateAtoms(db, req.data.successful);
1926-
bulkInsertAtomMTMs(db, req.data.successful, indexChecksum);
1933+
await bulkCreateAtoms(db, req.data.successful);
1934+
await bulkInsertAtomMTMs(db, req.data.successful, indexChecksum);
19271935

19281936
for (const obj of req.data.successful) {
19291937
returnedFn(
19301938
changeSetId,
19311939
`${obj.frontEndObject.kind}.${obj.frontEndObject.id}`,
19321940
);
19331941

1934-
postProcess(
1942+
await postProcess(
19351943
db,
19361944
workspaceId,
19371945
changeSetId,
@@ -1941,7 +1949,7 @@ const mjolnirBulk = async (
19411949
indexChecksum,
19421950
);
19431951

1944-
bustCacheAndReferences(
1952+
await bustCacheAndReferences(
19451953
db,
19461954
workspaceId,
19471955
changeSetId,
@@ -2002,7 +2010,7 @@ const deploymentMjolnir = async (
20022010
if (!req || req.status !== 200) return;
20032011

20042012
const { checksum, data } = req.data;
2005-
writeDeploymentAtoms(db, [{ checksum, data, kind, id }]);
2013+
await writeDeploymentAtoms(db, [{ checksum, data, kind, id }]);
20062014
};
20072015

20082016
const mjolnir = async (
@@ -2025,17 +2033,17 @@ const mjolnir = async (
20252033
// storing the index becomes useful here, we can lookup the
20262034
// checksum we would expect to be returned, and see if we have it already
20272035
if (!checksum) {
2028-
return mjolnirJob(workspaceId, changeSetId, kind, id, checksum);
2036+
return await mjolnirJob(workspaceId, changeSetId, kind, id, checksum);
20292037
}
20302038

20312039
// these are sent after patches are completed
20322040
// double check that i am still necessary!
20332041
const exists = await workspaceAtomExistsOnIndexes(db, kind, id, checksum);
20342042
if (exists.length === 0) {
2035-
return mjolnirJob(workspaceId, changeSetId, kind, id, checksum);
2043+
return await mjolnirJob(workspaceId, changeSetId, kind, id, checksum);
20362044
} // if i have it, bust!
20372045
else
2038-
bustCacheAndReferences(
2046+
await bustCacheAndReferences(
20392047
db,
20402048
workspaceId,
20412049
changeSetId,
@@ -2231,7 +2239,7 @@ const pruneAtomsForClosedChangeSet = async (
22312239
`,
22322240
bind: [changeSetId],
22332241
});
2234-
removeOldIndex(db, span);
2242+
await removeOldIndex(db, span);
22352243
span.end();
22362244
});
22372245
};
@@ -2680,7 +2688,7 @@ const niflheim = async (
26802688
bulkInflight({ workspaceId, changeSetId });
26812689

26822690
// clear out references, no queries have been performed yet
2683-
clearAllWeakReferences(db, changeSetId);
2691+
await clearAllWeakReferences(db, changeSetId);
26842692

26852693
const pattern = CHANGE_SET_INDEX_URL(workspaceId, changeSetId);
26862694

@@ -2723,7 +2731,7 @@ const niflheim = async (
27232731
toIndexChecksum: indexChecksum,
27242732
fromIndexChecksum: indexChecksum,
27252733
};
2726-
initIndexAndChangeSet(db, meta, frigg);
2734+
await initIndexAndChangeSet(db, meta, frigg);
27272735
debug("niflheim atom count", atoms.length);
27282736
frigg.setAttribute("numEntries", atoms.length);
27292737
frigg.setAttribute("indexChecksum", indexChecksum);
@@ -2748,7 +2756,7 @@ const niflheim = async (
27482756
db,
27492757
chunk,
27502758
);
2751-
bulkInsertAtomMTMs(db, existingDocuments, indexChecksum, chunkSize);
2759+
await bulkInsertAtomMTMs(db, existingDocuments, indexChecksum, chunkSize);
27522760
hammerObjs.push(...hammers);
27532761
}
27542762

@@ -2763,7 +2771,7 @@ const niflheim = async (
27632771
setCachedDocument(atom, doc);
27642772
}
27652773

2766-
postProcess(
2774+
await postProcess(
27672775
db,
27682776
workspaceId,
27692777
changeSetId,
@@ -2776,7 +2784,7 @@ const niflheim = async (
27762784
false,
27772785
);
27782786

2779-
bustCacheAndReferences(
2787+
await bustCacheAndReferences(
27802788
db,
27812789
workspaceId,
27822790
changeSetId,
@@ -2804,7 +2812,7 @@ const niflheim = async (
28042812
if (atomsToUnlink.length > 0) {
28052813
// We are not awaiting this promise so that we can continue forward since we don't
28062814
// need to see the result
2807-
bulkRemoveAtoms(db, atomsToUnlink, indexChecksum);
2815+
await bulkRemoveAtoms(db, atomsToUnlink, indexChecksum);
28082816
}
28092817

28102818
// store the MvIndex itself
@@ -2816,10 +2824,10 @@ const niflheim = async (
28162824
toChecksum: indexChecksum,
28172825
};
28182826
await createAtom(db, mvAtom, req.data.frontEndObject.data);
2819-
insertAtomMTM(db, mvAtom, indexChecksum);
2827+
await insertAtomMTM(db, mvAtom, indexChecksum);
28202828

28212829
// link the checksum to the change set (just in case its not done in init)
2822-
updateChangeSetWithNewIndex(db, meta);
2830+
await updateChangeSetWithNewIndex(db, meta);
28232831

28242832
// Now to deal with all the atoms we don't have present. Throw the big hammer.
28252833
if (hammerObjs.length > 0) {
@@ -3573,6 +3581,18 @@ const getReferences = async (
35733581
source: "getReferences",
35743582
sourceKind: kind,
35753583
});
3584+
/**
3585+
* NOTE: neither `mjolnir` or `weakReference` are await'd on purpose
3586+
* this `getReferences` call is inside the client "read" path.
3587+
*
3588+
* We want to return from that path ASAP to keep the UI responsive.
3589+
* If there is a thrown hammer, when it returns, it busts cache to re-read
3590+
*
3591+
* Keeping track of references is needed for the write path, so we don't need
3592+
* to `await` and slow down the read path. This will resolve before subsequent writes
3593+
* from a wholly different patch message.
3594+
* */
3595+
35763596
mjolnir(
35773597
db,
35783598
workspaceId,
@@ -3612,6 +3632,7 @@ const getReferences = async (
36123632
source: "getReferences",
36133633
sourceKind: kind,
36143634
});
3635+
// no await on purpose
36153636
mjolnir(
36163637
db,
36173638
workspaceId,

0 commit comments

Comments
 (0)