Skip to content

Commit

Permalink
fix: updatePartInstancesSegmentIds: take into account when multiple s…
Browse files Browse the repository at this point in the history
…egments have been merged into one.

Also ensure that the bulkWrite uses a db index

(cherry picked from commit bdab8c4)
  • Loading branch information
nytamin committed Nov 21, 2024
1 parent 97fc3ea commit 1d7617e
Showing 1 changed file with 61 additions and 27 deletions.
88 changes: 61 additions & 27 deletions packages/job-worker/src/ingest/commit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part'
import { DatabasePersistedModel } from '../modelBase'
import { updateSegmentIdsForAdlibbedPartInstances } from './commit/updateSegmentIdsForAdlibbedPartInstances'
import { stringifyError } from '@sofie-automation/shared-lib/dist/lib/stringifyError'
import { AnyBulkWriteOperation } from 'mongodb'

export type BeforePartMapItem = { id: PartId; rank: number }
export type BeforeIngestOperationPartMap = ReadonlyMap<SegmentId, Array<BeforePartMapItem>>
Expand Down Expand Up @@ -301,26 +302,32 @@ async function updatePartInstancesSegmentIds(
renamedSegments: ReadonlyMap<SegmentId, SegmentId> | null,
beforePartMap: BeforeIngestOperationPartMap
) {
// A set of rules which can be translated to mongo queries for PartInstances to update
/**
* Maps new SegmentId ->
* A set of rules which can be translated to mongo queries for PartInstances to update
*/
const renameRules = new Map<
SegmentId,
{
/** Parts that have been moved to the new SegmentId */
partIds: PartId[]
fromSegmentId: SegmentId | null
/** Segments that have been renamed to the new SegmentId */
fromSegmentIds: SegmentId[]
}
>()

// Add whole segment renames to the set of rules
if (renamedSegments) {
for (const [fromSegmentId, toSegmentId] of renamedSegments) {
const rule = renameRules.get(toSegmentId) ?? { partIds: [], fromSegmentId: null }
const rule = renameRules.get(toSegmentId) ?? { partIds: [], fromSegmentIds: [] }
renameRules.set(toSegmentId, rule)

rule.fromSegmentId = fromSegmentId
rule.fromSegmentIds.push(fromSegmentId)
}
}

// Reverse the structure
// Reverse the Map structure
/** Maps Part -> SegmentId-of-the-part-before-ingest-changes */
const beforePartSegmentIdMap = new Map<PartId, SegmentId>()
for (const [segmentId, partItems] of beforePartMap.entries()) {
for (const partItem of partItems) {
Expand All @@ -331,8 +338,11 @@ async function updatePartInstancesSegmentIds(
// Some parts may have gotten a different segmentId to the base rule, so track those seperately in the rules
for (const partModel of ingestModel.getAllOrderedParts()) {
const oldSegmentId = beforePartSegmentIdMap.get(partModel.part._id)

if (oldSegmentId && oldSegmentId !== partModel.part.segmentId) {
const rule = renameRules.get(partModel.part.segmentId) ?? { partIds: [], fromSegmentId: null }
// The part has moved to another segment, add a rule to update its corresponding PartInstances:

const rule = renameRules.get(partModel.part.segmentId) ?? { partIds: [], fromSegmentIds: [] }
renameRules.set(partModel.part.segmentId, rule)

rule.partIds.push(partModel.part._id)
Expand All @@ -341,30 +351,52 @@ async function updatePartInstancesSegmentIds(

// Perform a mongo update to modify the PartInstances
if (renameRules.size > 0) {
await context.directCollections.PartInstances.bulkWrite(
Array.from(renameRules.entries()).map(([newSegmentId, rule]) => ({
updateMany: {
filter: {
$or: _.compact([
rule.fromSegmentId
? {
segmentId: rule.fromSegmentId,
}
: undefined,
{
'part._id': { $in: rule.partIds },
const rulesInOrder = Array.from(renameRules.entries()).sort((a, b) => {
// Ensure that the ones with partIds are processed last,
// as that should take precedence.

if (a[1].partIds.length && !b[1].partIds.length) return 1
if (!a[1].partIds.length && b[1].partIds.length) return -1
return 0
})

const writeOps: AnyBulkWriteOperation<DBPartInstance>[] = []

for (const [newSegmentId, rule] of rulesInOrder) {
if (rule.fromSegmentIds.length) {
writeOps.push({
updateMany: {
filter: {
rundownId: ingestModel.rundownId,
segmentId: { $in: rule.fromSegmentIds },
},
update: {
$set: {
segmentId: newSegmentId,
'part.segmentId': newSegmentId,
},
]),
},
},
update: {
$set: {
segmentId: newSegmentId,
'part.segmentId': newSegmentId,
})
}
if (rule.partIds.length) {
writeOps.push({
updateMany: {
filter: {
rundownId: ingestModel.rundownId,
'part._id': { $in: rule.partIds },
},
update: {
$set: {
segmentId: newSegmentId,
'part.segmentId': newSegmentId,
},
},
},
},
}))
)
})
}
}
if (writeOps.length) await context.directCollections.PartInstances.bulkWrite(writeOps)
}
}

Expand Down Expand Up @@ -691,8 +723,10 @@ async function removeSegments(
for (const segment of ingestModel.getAllSegments()) {
const segmentId = segment.segment._id
if (segment.segment.isHidden) {
// Blueprints want to hide the Segment

if (!canRemoveSegment(previousPartInstance, currentPartInstance, nextPartInstance, segmentId)) {
// Protect live segment from being hidden
// The Segment is live, so we need to protect it from being hidden
logger.warn(`Cannot hide live segment ${segmentId}, it will be orphaned`)
switch (segment.segment.orphaned) {
case SegmentOrphanedReason.DELETED:
Expand Down

0 comments on commit 1d7617e

Please sign in to comment.