Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions packages/live-status-gateway/src/collectionBase.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { CorelibPubSubCollections, CorelibPubSubTypes } from '@sofie-automation/corelib/dist/pubsub'
import {
StudioId,
CoreConnection,
ProtectedString,
Collection as CoreCollection,
CollectionDocCheck,
} from '@sofie-automation/server-core-integration'
import throttleToNextTick from '@sofie-automation/shared-lib/dist/lib/throttleToNextTick'
import * as _ from 'underscore'
import { Logger } from 'winston'
import { CoreHandler } from './coreHandler'
import { arePropertiesShallowEqual } from './helpers/equality'
import { CollectionHandlers } from './liveStatusServer'

export type ObserverCallback<T, K extends keyof T> = (data: Pick<T, K> | undefined) => void

export const DEFAULT_THROTTLE_PERIOD_MS = 20

export abstract class CollectionBase<T, TCollection extends keyof CorelibPubSubCollections> {
protected _name: string
protected _collectionName: TCollection
protected _logger: Logger
protected _coreHandler: CoreHandler
protected _studioId!: StudioId
protected _observers: Map<
ObserverCallback<T, keyof T>,
{ keysToPick: readonly (keyof T)[] | undefined; lastData: T | undefined }
> = new Map()
protected _collectionData: T | undefined

protected get _core(): CoreConnection<CorelibPubSubTypes, CorelibPubSubCollections> {
return this._coreHandler.core
}
protected throttledChanged: () => void

constructor(
collection: TCollection,
logger: Logger,
coreHandler: CoreHandler,
throttlePeriodMs = DEFAULT_THROTTLE_PERIOD_MS
) {
this._name = this.constructor.name
this._collectionName = collection
this._logger = logger
this._coreHandler = coreHandler

this.throttledChanged = throttleToNextTick(
throttlePeriodMs > 0
? _.throttle(() => this.changed(), throttlePeriodMs, { leading: true, trailing: true })
: () => this.changed()
)

this._logger.info(`Starting ${this._name} handler`)
}

init(_handlers: CollectionHandlers): void {
if (!this._coreHandler.studioId) throw new Error('StudioId is not defined')
this._studioId = this._coreHandler.studioId
}

close(): void {
this._logger.info(`Closing ${this._name} handler`)
}

subscribe<K extends keyof T>(callback: ObserverCallback<T, K>, keysToPick?: readonly K[]): void {
//this._logger.info(`${name}' added observer for '${this._name}'`)
if (this._collectionData) callback(this._collectionData)
this._observers.set(callback, { keysToPick, lastData: this.shallowClone(this._collectionData) })
}

/**
* Called after a batch of updates to documents in the collection
*/
protected changed(): void {
// override me
}

notify(data: T | undefined): void {
for (const [observer, o] of this._observers) {
if (
!o.lastData ||
!o.keysToPick ||
!data ||
!arePropertiesShallowEqual(o.lastData, data, undefined, o.keysToPick)
) {
observer(data)
o.lastData = this.shallowClone(data)
}
}
}

protected shallowClone(data: T | undefined): T | undefined {
if (data === undefined) return undefined
if (Array.isArray(data)) return [...data] as T
if (typeof data === 'object') return { ...data }
return data
}

protected logDocumentChange(documentId: string | ProtectedString<any>, changeType: string): void {
this._logger.silly(`${this._name} ${changeType} ${documentId}`)
}

protected logUpdateReceived(collectionName: string, updateCount: number | undefined): void
protected logUpdateReceived(collectionName: string, extraInfo?: string): void
protected logUpdateReceived(
collectionName: string,
extraInfoOrUpdateCount: string | number | undefined | null = null
): void {
let message = `${this._name} received ${collectionName} update`
if (typeof extraInfoOrUpdateCount === 'string') {
message += `, ${extraInfoOrUpdateCount}`
} else if (extraInfoOrUpdateCount !== null) {
message += `(${extraInfoOrUpdateCount})`
}
this._logger.debug(message)
}

protected logNotifyingUpdate(updateCount: number | undefined): void {
this._logger.debug(`${this._name} notifying update with ${updateCount} ${this._collectionName}`)
}

protected getCollectionOrFail(): CoreCollection<CollectionDocCheck<CorelibPubSubCollections[TCollection]>> {
const collection = this._core.getCollection<TCollection>(this._collectionName)
if (!collection) throw new Error(`collection '${this._collectionName}' not found!`)
return collection
}
}
Original file line number Diff line number Diff line change
@@ -1,79 +1,11 @@
import { Logger } from 'winston'
import { CoreHandler } from '../coreHandler'
import { CollectionBase, Collection, CollectionObserver } from '../wsHandler'
import { AdLibAction } from '@sofie-automation/corelib/dist/dataModel/AdlibAction'
import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections'
import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub'
import { AdLibActionId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { SelectedPartInstances } from './partInstancesHandler'

export class AdLibActionsHandler
extends CollectionBase<AdLibAction[], CorelibPubSub.adLibActions, CollectionName.AdLibActions>
implements Collection<AdLibAction[]>, CollectionObserver<SelectedPartInstances>
{
public observerName: string
private _curRundownId: RundownId | undefined
private _curPartInstance: DBPartInstance | undefined
import { RundownContentHandlerBase } from './rundownContentHandlerBase'

export class AdLibActionsHandler extends RundownContentHandlerBase<CorelibPubSub.adLibActions> {
constructor(logger: Logger, coreHandler: CoreHandler) {
super(AdLibActionsHandler.name, CollectionName.AdLibActions, CorelibPubSub.adLibActions, logger, coreHandler)
this.observerName = this._name
}

async changed(id: AdLibActionId, changeType: string): Promise<void> {
this.logDocumentChange(id, changeType)
if (!this._collectionName) return
const col = this._core.getCollection(this._collectionName)
if (!col) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = col.find({ rundownId: this._curRundownId })
await this.notify(this._collectionData)
}

async update(source: string, data: SelectedPartInstances | undefined): Promise<void> {
this.logUpdateReceived('partInstances', source)
const prevRundownId = this._curRundownId
this._curPartInstance = data ? data.current ?? data.next : undefined
this._curRundownId = this._curPartInstance ? this._curPartInstance.rundownId : undefined

await new Promise(process.nextTick.bind(this))
if (!this._collectionName) return
if (!this._publicationName) return
if (prevRundownId !== this._curRundownId) {
if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId)
if (this._dbObserver) this._dbObserver.stop()
if (this._curRundownId && this._curPartInstance) {
this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [
this._curRundownId,
])
this._dbObserver = this._coreHandler.setupObserver(this._collectionName)
this._dbObserver.added = (id) => {
void this.changed(id, 'added').catch(this._logger.error)
}
this._dbObserver.changed = (id) => {
void this.changed(id, 'changed').catch(this._logger.error)
}
this._dbObserver.removed = (id) => {
void this.changed(id, 'removed').catch(this._logger.error)
}

const collection = this._core.getCollection(this._collectionName)
if (!collection) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = collection.find({
rundownId: this._curRundownId,
})
await this.notify(this._collectionData)
}
}
}

// override notify to implement empty array handling
async notify(data: AdLibAction[] | undefined): Promise<void> {
this.logNotifyingUpdate(data?.length)
if (data !== undefined) {
for (const observer of this._observers) {
await observer.update(this._name, data)
}
}
super(CollectionName.AdLibActions, CorelibPubSub.adLibActions, logger, coreHandler)
}
}
75 changes: 3 additions & 72 deletions packages/live-status-gateway/src/collections/adLibsHandler.ts
Original file line number Diff line number Diff line change
@@ -1,80 +1,11 @@
import { Logger } from 'winston'
import { CoreHandler } from '../coreHandler'
import { CollectionBase, Collection, CollectionObserver } from '../wsHandler'
import { AdLibPiece } from '@sofie-automation/corelib/dist/dataModel/AdLibPiece'
import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections'
import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub'
import { PieceId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { SelectedPartInstances } from './partInstancesHandler'

export class AdLibsHandler
extends CollectionBase<AdLibPiece[], CorelibPubSub.adLibPieces, CollectionName.AdLibPieces>
implements Collection<AdLibPiece[]>, CollectionObserver<SelectedPartInstances>
{
public observerName: string
// private _core: CoreConnection
private _currentRundownId: RundownId | undefined
private _currentPartInstance: DBPartInstance | undefined
import { RundownContentHandlerBase } from './rundownContentHandlerBase'

export class AdLibsHandler extends RundownContentHandlerBase<CorelibPubSub.adLibPieces> {
constructor(logger: Logger, coreHandler: CoreHandler) {
super(AdLibsHandler.name, CollectionName.AdLibPieces, CorelibPubSub.adLibPieces, logger, coreHandler)
this.observerName = this._name
}

async changed(id: PieceId, changeType: string): Promise<void> {
this.logDocumentChange(id, changeType)
if (!this._collectionName) return
const col = this._core.getCollection(this._collectionName)
if (!col) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = col.find({ rundownId: this._currentRundownId })
await this.notify(this._collectionData)
}

async update(source: string, data: SelectedPartInstances | undefined): Promise<void> {
this.logUpdateReceived('partInstances', source)
const prevRundownId = this._currentRundownId
this._currentPartInstance = data ? data.current ?? data.next : undefined
this._currentRundownId = this._currentPartInstance?.rundownId

await new Promise(process.nextTick.bind(this))
if (!this._collectionName) return
if (!this._publicationName) return
if (prevRundownId !== this._currentRundownId) {
if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId)
if (this._dbObserver) this._dbObserver.stop()
if (this._currentRundownId && this._currentPartInstance) {
this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [
this._currentRundownId,
])
this._dbObserver = this._coreHandler.setupObserver(this._collectionName)
this._dbObserver.added = (id) => {
void this.changed(id, 'added').catch(this._logger.error)
}
this._dbObserver.changed = (id) => {
void this.changed(id, 'changed').catch(this._logger.error)
}
this._dbObserver.removed = (id) => {
void this.changed(id, 'removed').catch(this._logger.error)
}

const collection = this._core.getCollection(this._collectionName)
if (!collection) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = collection.find({
rundownId: this._currentRundownId,
})
await this.notify(this._collectionData)
}
}
}

// override notify to implement empty array handling
async notify(data: AdLibPiece[] | undefined): Promise<void> {
this.logNotifyingUpdate(data?.length)
if (data !== undefined) {
for (const observer of this._observers) {
await observer.update(this._name, data)
}
}
super(CollectionName.AdLibPieces, CorelibPubSub.adLibPieces, logger, coreHandler)
}
}
Original file line number Diff line number Diff line change
@@ -1,85 +1,16 @@
import { Logger } from 'winston'
import { CoreHandler } from '../coreHandler'
import { CollectionBase, Collection, CollectionObserver } from '../wsHandler'
import { RundownBaselineAdLibAction } from '@sofie-automation/corelib/dist/dataModel/RundownBaselineAdLibAction'
import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections'
import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub'
import { RundownBaselineAdLibActionId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { SelectedPartInstances } from './partInstancesHandler'

export class GlobalAdLibActionsHandler
extends CollectionBase<
RundownBaselineAdLibAction[],
CorelibPubSub.rundownBaselineAdLibActions,
CollectionName.RundownBaselineAdLibActions
>
implements Collection<RundownBaselineAdLibAction[]>, CollectionObserver<SelectedPartInstances>
{
public observerName: string
private _currentRundownId: RundownId | undefined
import { RundownContentHandlerBase } from './rundownContentHandlerBase'

export class GlobalAdLibActionsHandler extends RundownContentHandlerBase<CorelibPubSub.rundownBaselineAdLibActions> {
constructor(logger: Logger, coreHandler: CoreHandler) {
super(
GlobalAdLibActionsHandler.name,
CollectionName.RundownBaselineAdLibActions,
CorelibPubSub.rundownBaselineAdLibActions,
logger,
coreHandler
)
this.observerName = this._name
}

async changed(id: RundownBaselineAdLibActionId, changeType: string): Promise<void> {
this.logDocumentChange(id, changeType)
if (!this._collectionName) return
const col = this._core.getCollection(this._collectionName)
if (!col) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = col.find({ rundownId: this._currentRundownId })
await this.notify(this._collectionData)
}

async update(source: string, data: SelectedPartInstances | undefined): Promise<void> {
this.logUpdateReceived('partInstances', source)
const prevRundownId = this._currentRundownId
const partInstance = data ? data.current ?? data.next : undefined
this._currentRundownId = partInstance?.rundownId

await new Promise(process.nextTick.bind(this))
if (!this._collectionName) return
if (!this._publicationName) return
if (prevRundownId !== this._currentRundownId) {
if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId)
if (this._dbObserver) this._dbObserver.stop()
if (this._currentRundownId) {
this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [
this._currentRundownId,
])
this._dbObserver = this._coreHandler.setupObserver(this._collectionName)
this._dbObserver.added = (id) => {
void this.changed(id, 'added').catch(this._logger.error)
}
this._dbObserver.changed = (id) => {
void this.changed(id, 'changed').catch(this._logger.error)
}
this._dbObserver.removed = (id) => {
void this.changed(id, 'removed').catch(this._logger.error)
}

const collection = this._core.getCollection(this._collectionName)
if (!collection) throw new Error(`collection '${this._collectionName}' not found!`)
this._collectionData = collection.find({ rundownId: this._currentRundownId })
await this.notify(this._collectionData)
}
}
}

// override notify to implement empty array handling
async notify(data: RundownBaselineAdLibAction[] | undefined): Promise<void> {
this.logNotifyingUpdate(data?.length)
if (data !== undefined) {
for (const observer of this._observers) {
await observer.update(this._name, data)
}
}
}
}
Loading
Loading