- 
                Notifications
    You must be signed in to change notification settings 
- Fork 17
Add Observer class #671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Observer class #671
Changes from 4 commits
3fa1d11
              8c5c0fc
              529b604
              9c041db
              659685e
              dc05293
              6fd0f34
              7500c16
              303b3d5
              768df07
              e2980a0
              1276dcb
              8c618eb
              b0a1267
              0212bf6
              167e833
              6e74ddb
              1896d86
              9aee284
              5b731bf
              802b9ad
              3d7526d
              7fa0fcc
              8478d1e
              9d29332
              0b1f4b6
              7e21f3c
              63c50c1
              b19008b
              aa990d0
              f12eff4
              c3946b2
              ffeca3c
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,322 @@ | ||
| import { Kuzzle } from '../Kuzzle'; | ||
| import { RealtimeDocument } from './RealtimeDocument'; | ||
| import { Document, DocumentNotification, JSONObject } from '../types'; | ||
| import { RealtimeDocumentSearchResult } from './searchResult/RealtimeDocument'; | ||
|  | ||
| /** | ||
| * Class based on a Set<string> that holds the observed documents IDs of | ||
| * a specific collection. | ||
| */ | ||
| class ObservedDocuments extends Set<string> { | ||
| /** | ||
| * Room ID for the realtime subscription on the collection of observed documents | ||
| */ | ||
| public roomId: string = null; | ||
|  | ||
| /** | ||
| * Gets documents IDs | ||
| */ | ||
| ids () { | ||
| return this.values(); | ||
| } | ||
|  | ||
| /** | ||
| * Gets Koncorde filters for observed documents | ||
| */ | ||
| get filters () { | ||
| return { | ||
| ids: { values: Array.from(this.values()) } | ||
| }; | ||
| } | ||
| } | ||
|  | ||
| type DocumentUrn = string; | ||
| type CollectionUrn = string; | ||
|  | ||
| function documentUrn (index: string, collection: string, id: string): DocumentUrn { | ||
| return `${index}:${collection}:${id}`; | ||
| } | ||
|  | ||
| function collectionUrn (index: string, collection: string): CollectionUrn { | ||
| return `${index}:${collection}`; | ||
| } | ||
|  | ||
| /** | ||
| * The Observer class allows to manipulate realtime documents. | ||
| * | ||
| * A RealtimeDocument is like a normal Document from Kuzzle except that it is | ||
| * connected to the realtime engine and it's content will change with changes | ||
|         
                  Aschen marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| * occuring on the database. | ||
| * | ||
| * Realtime documents are resources that should be disposed either with the | ||
| * stop() or the dispose() method otherwise subscriptions will never be | ||
| * terminated, documents will be keep into memory and you will end with a | ||
| * memory leak. | ||
| */ | ||
| export class Observer { | ||
| /** | ||
| * Map used to keep track of the observed documents ids by collections. | ||
| */ | ||
| private documentsBycollections = new Map<CollectionUrn, ObservedDocuments>(); | ||
|  | ||
| /** | ||
| * Map containing the list of realtime documents managed by this observer. | ||
| * | ||
| * This map is used to update realtime documents content when notifications | ||
| * are received. | ||
| */ | ||
| private documents = new Map<DocumentUrn, RealtimeDocument>(); | ||
| private sdk: Kuzzle; | ||
|  | ||
| constructor (sdk: Kuzzle) { | ||
| Reflect.defineProperty(this, 'sdk', { | ||
| value: sdk | ||
| }); | ||
| } | ||
|  | ||
| /** | ||
| * Stop observing a list of documents or all the realtime documents | ||
| * of a collection. | ||
| * | ||
| * @param index Index name | ||
| * @param collection Collection name | ||
| * @param documents Array of documents (optional) | ||
| */ | ||
| stop (index: string, collection: string, documents?: Array<{ _id: string }>): Promise<void> { | ||
| const observedDocuments = this.documentsBycollections.get(collectionUrn(index, collection)); | ||
|  | ||
| if (! documents) { | ||
| for (const id of observedDocuments.ids()) { | ||
| this.documents.delete(documentUrn(index, collection, id)); | ||
| } | ||
|  | ||
| this.documentsBycollections.delete(collectionUrn(index, collection)) | ||
|  | ||
| return this.sdk.realtime.unsubscribe(observedDocuments.roomId); | ||
| } | ||
|  | ||
| for (const document of documents) { | ||
| const urn = documentUrn(index, collection, document._id); | ||
| const rtDocument = this.documents.get(urn); | ||
|  | ||
| if (! rtDocument) { | ||
| continue; | ||
| } | ||
|  | ||
| observedDocuments.delete(document._id); | ||
| } | ||
|  | ||
| return this.resubscribe(index, collection); | ||
| } | ||
|  | ||
| /** | ||
| * Unsubscribe from every collections and clear all the realtime documents. | ||
| */ | ||
| dispose () { | ||
|          | ||
| const promises = []; | ||
|  | ||
| for (const subscription of this.documentsBycollections.values()) { | ||
| if (subscription.roomId) { | ||
| promises.push(this.sdk.realtime.unsubscribe(subscription.roomId)); | ||
| } | ||
| } | ||
|  | ||
| this.documentsBycollections.clear(); | ||
| this.documents.clear(); | ||
|  | ||
| return Promise.all(promises); | ||
| } | ||
|  | ||
| /** | ||
| * Gets a realtime document | ||
| * | ||
| * @param index Index name | ||
| * @param collection Collection name | ||
| * @param id Document ID | ||
| * | ||
| * @returns The realtime document | ||
| */ | ||
| get (index: string, collection: string, id: string): Promise<RealtimeDocument> { | ||
| return this.sdk.document.get(index, collection, id) | ||
| .then(document => this.observe(index, collection, document)); | ||
| } | ||
|  | ||
| /** | ||
| * | ||
| * Gets multiple realtime documents. | ||
| * | ||
| * @param index Index name | ||
| * @param collection Collection name | ||
| * @param ids Document IDs | ||
| * | ||
| * @returns An object containing 2 arrays: "successes" and "errors" | ||
| */ | ||
| mGet ( | ||
| index: string, | ||
| collection: string, | ||
| ids: string[] | ||
| ): Promise<{ | ||
| /** | ||
| * Array of successfully retrieved documents | ||
| */ | ||
| successes: RealtimeDocument[]; | ||
| /** | ||
| * Array of the IDs of not found documents. | ||
| */ | ||
| errors: string[]; | ||
| }> { | ||
| const rtDocuments = []; | ||
| let _errors; | ||
|  | ||
| return this.sdk.document.mGet(index, collection, ids) | ||
| .then(({ successes, errors }) => { | ||
| _errors = errors; | ||
|  | ||
| for (const document of successes) { | ||
| rtDocuments.push(this.addDocument(index, collection, document)); | ||
| } | ||
|  | ||
| return this.resubscribe(index, collection); | ||
| }) | ||
| .then(() => ({ successes: rtDocuments, errors: _errors })); | ||
| } | ||
|  | ||
| /** | ||
| * Searches for documents and returns a SearchResult containing realtime | ||
| * documents. | ||
| * | ||
| * @param index Index name | ||
| * @param collection Collection name | ||
| * @param searchBody Search query | ||
| * @param options Additional options | ||
| * - `queuable` If true, queues the request during downtime, until connected to Kuzzle again | ||
| * - `from` Offset of the first document to fetch | ||
| * - `size` Maximum number of documents to retrieve per page | ||
| * - `scroll` When set, gets a forward-only cursor having its ttl set to the given value (e.g. `30s`) | ||
| * - `verb` (HTTP only) Forces the verb of the route | ||
| * - `timeout` Request Timeout in ms, after the delay if not resolved the promise will be rejected | ||
| * | ||
| * @returns A SearchResult containing realtime documents | ||
| */ | ||
| search ( | ||
| index: string, | ||
| collection: string, | ||
| searchBody: JSONObject = {}, | ||
| options: { | ||
| from?: number; | ||
| size?: number; | ||
| scroll?: string; | ||
| lang?: string; | ||
| verb?: string; | ||
| timeout?: number; | ||
| } = {} | ||
| ): Promise<RealtimeDocumentSearchResult> { | ||
| return this.sdk.document['_search'](index, collection, searchBody, options) | ||
| .then(({ response, request, opts }) => { | ||
| const result = new RealtimeDocumentSearchResult( | ||
| this.sdk, | ||
| request, | ||
| opts, | ||
| response.result, | ||
| this); | ||
|  | ||
| return result.start(); | ||
| }); | ||
| } | ||
|  | ||
| /** | ||
| * Retrieve a realtime document from a document | ||
| * | ||
| * @param index Index name | ||
| * @param collection Collection name | ||
| * @param document Document to observe | ||
| * | ||
| * @returns A realtime document | ||
| */ | ||
| observe ( | ||
| index: string, | ||
| collection: string, | ||
| document: Document, | ||
| ): Promise<RealtimeDocument> { | ||
| const rtDocument = this.addDocument(index, collection, document); | ||
|  | ||
| return this.resubscribe(index, collection) | ||
| .then(() => rtDocument); | ||
| } | ||
|  | ||
| /** | ||
| * Adds a document and retrieve managed realtime document | ||
| * | ||
| * @internal | ||
| * | ||
| * Use observe() to retrieve a realtime document. | ||
| */ | ||
| addDocument (index: string, collection: string, document: Document): RealtimeDocument { | ||
| const rtDocument = new RealtimeDocument(document); | ||
|  | ||
| const urn = collectionUrn(index, collection); | ||
|  | ||
| if (! this.documentsBycollections.has(urn)) { | ||
| this.documentsBycollections.set(urn, new ObservedDocuments()); | ||
| } | ||
|  | ||
| const subscription = this.documentsBycollections.get(urn); | ||
|  | ||
| subscription.add(document._id); | ||
|  | ||
| this.documents.set(documentUrn(index, collection, document._id), rtDocument); | ||
|  | ||
| return rtDocument; | ||
| } | ||
|  | ||
| /** | ||
| * Renew a collection subscription with filters according to the list of | ||
| * currently managed documents. | ||
| *@ | ||
| * @internal | ||
| */ | ||
| resubscribe (index: string, collection: string): Promise<void> { | ||
| const subscription = this.documentsBycollections.get(collectionUrn(index, collection)); | ||
| // @todo do not resubscribe if no documents | ||
| return this.sdk.realtime.subscribe( | ||
| index, | ||
| collection, | ||
| subscription.filters, | ||
| this.notificationHandler.bind(this) | ||
| ) | ||
| .then(roomId => { | ||
| const oldRoomId = subscription.roomId; | ||
|  | ||
| subscription.roomId = roomId; | ||
|  | ||
| if (oldRoomId) { | ||
| return this.sdk.realtime.unsubscribe(oldRoomId); | ||
| } | ||
| }); | ||
| } | ||
|  | ||
| /** | ||
| * Handler method to process notification and update realtime documents content. | ||
|         
                  Aschen marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| */ | ||
| private notificationHandler (notification: DocumentNotification): Promise<void> { | ||
| const { index, collection, result } = notification; | ||
|  | ||
| const urn = documentUrn(index, collection, result._id); | ||
| const rtDocument = this.documents.get(urn); | ||
|  | ||
| // On "write", mutate document with changes | ||
| // On "publish", nothing | ||
| if (notification.event !== 'delete') { | ||
| if (notification.event === 'write') { | ||
| Object.assign(rtDocument._source, result._source); | ||
| } | ||
|  | ||
| return Promise.resolve(); | ||
| } | ||
|  | ||
| rtDocument.deleted = true; | ||
| this.documents.delete(rtDocument._id); | ||
|  | ||
| return this.resubscribe(index, collection); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| import { JSONObject } from '../types'; | ||
|  | ||
| export class RealtimeDocument { | ||
|         
                  Aschen marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| public _id: string; | ||
| public _source: JSONObject; | ||
| public deleted: boolean; | ||
|  | ||
| constructor ({ _id, _source }) { | ||
| this._id = _id; | ||
| this._source = _source; | ||
| this.deleted = false; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.