Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ export * from './src/core/searchResult/Profile';
export * from './src/core/searchResult/Role';
export * from './src/core/searchResult/Specifications';
export * from './src/core/searchResult/User';
export * from './src/core/Observer';
export * from './src/core/RealtimeDocument';

export * from './src/types';

Expand Down
6 changes: 3 additions & 3 deletions src/controllers/Document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ export class DocumentController extends BaseController {
*
* @param index Index name
* @param collection Collection name
* @param query Search query
* @param searchBody Search query
* @param options Additional options
* - `queuable` If true, queues the request during downtime, until connected to Kuzzle again
* - `from` Offset of the first document to fetch
Expand All @@ -874,7 +874,7 @@ export class DocumentController extends BaseController {
search (
index: string,
collection: string,
query: JSONObject = {},
searchBody: JSONObject = {},
options: {
queuable?: boolean;
from?: number;
Expand All @@ -885,7 +885,7 @@ export class DocumentController extends BaseController {
timeout?: number;
} = {}
): Promise<SearchResult<DocumentHit>> {
return this._search(index, collection, query, options)
return this._search(index, collection, searchBody, options)
.then(({ response, request, opts }) => (
new DocumentSearchResult(this.kuzzle, request, opts, response.result)
));
Expand Down
322 changes: 322 additions & 0 deletions src/core/Observer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,322 @@
import { Kuzzle } from '../Kuzzle';
import { RealtimeDocument } from './RealtimeDocument';
import { Document, DocumentNotification, JSONObject } from '../types';
import { RealtimeDocumentSearchResult } from './searchResult/RealtimeDocument';

/**
* Class based on a Set<string> that holds the observed documents IDs of
* a specific collection.
*/
class ObservedDocuments extends Set<string> {
/**
* Room ID for the realtime subscription on the collection of observed documents
*/
public roomId: string = null;

/**
* Gets documents IDs
*/
ids () {
return this.values();
}

/**
* Gets Koncorde filters for observed documents
*/
get filters () {
return {
ids: { values: Array.from(this.values()) }
};
}
}

type DocumentUrn = string;
type CollectionUrn = string;

function documentUrn (index: string, collection: string, id: string): DocumentUrn {
return `${index}:${collection}:${id}`;
}

function collectionUrn (index: string, collection: string): CollectionUrn {
return `${index}:${collection}`;
}

/**
* The Observer class allows to manipulate realtime documents.
*
* A RealtimeDocument is like a normal Document from Kuzzle except that it is
* connected to the realtime engine and it's content will change with changes
* occuring on the database.
*
* Realtime documents are resources that should be disposed either with the
* stop() or the dispose() method otherwise subscriptions will never be
* terminated, documents will be keep into memory and you will end with a
* memory leak.
*/
export class Observer {
/**
* Map used to keep track of the observed documents ids by collections.
*/
private documentsBycollections = new Map<CollectionUrn, ObservedDocuments>();

/**
* Map containing the list of realtime documents managed by this observer.
*
* This map is used to update realtime documents content when notifications
* are received.
*/
private documents = new Map<DocumentUrn, RealtimeDocument>();
private sdk: Kuzzle;

constructor (sdk: Kuzzle) {
Reflect.defineProperty(this, 'sdk', {
value: sdk
});
}

/**
* Stop observing a list of documents or all the realtime documents
* of a collection.
*
* @param index Index name
* @param collection Collection name
* @param documents Array of documents (optional)
*/
stop (index: string, collection: string, documents?: Array<{ _id: string }>): Promise<void> {
const observedDocuments = this.documentsBycollections.get(collectionUrn(index, collection));

if (! documents) {
for (const id of observedDocuments.ids()) {
this.documents.delete(documentUrn(index, collection, id));
}

this.documentsBycollections.delete(collectionUrn(index, collection))

return this.sdk.realtime.unsubscribe(observedDocuments.roomId);
}

for (const document of documents) {
const urn = documentUrn(index, collection, document._id);
const rtDocument = this.documents.get(urn);

if (! rtDocument) {
continue;
}

observedDocuments.delete(document._id);
}

return this.resubscribe(index, collection);
}

/**
* Unsubscribe from every collections and clear all the realtime documents.
*/
dispose () {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since there is a stop method, i would call it stopAll () instead wdyt ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I was thinking about having only one method that can take index & collection, index & collection & documents or nothing to stop subscriptions.
I will update and you can tell me

const promises = [];

for (const subscription of this.documentsBycollections.values()) {
if (subscription.roomId) {
promises.push(this.sdk.realtime.unsubscribe(subscription.roomId));
}
}

this.documentsBycollections.clear();
this.documents.clear();

return Promise.all(promises);
}

/**
* Gets a realtime document
*
* @param index Index name
* @param collection Collection name
* @param id Document ID
*
* @returns The realtime document
*/
get (index: string, collection: string, id: string): Promise<RealtimeDocument> {
return this.sdk.document.get(index, collection, id)
.then(document => this.observe(index, collection, document));
}

/**
*
* Gets multiple realtime documents.
*
* @param index Index name
* @param collection Collection name
* @param ids Document IDs
*
* @returns An object containing 2 arrays: "successes" and "errors"
*/
mGet (
index: string,
collection: string,
ids: string[]
): Promise<{
/**
* Array of successfully retrieved documents
*/
successes: RealtimeDocument[];
/**
* Array of the IDs of not found documents.
*/
errors: string[];
}> {
const rtDocuments = [];
let _errors;

return this.sdk.document.mGet(index, collection, ids)
.then(({ successes, errors }) => {
_errors = errors;

for (const document of successes) {
rtDocuments.push(this.addDocument(index, collection, document));
}

return this.resubscribe(index, collection);
})
.then(() => ({ successes: rtDocuments, errors: _errors }));
}

/**
* Searches for documents and returns a SearchResult containing realtime
* documents.
*
* @param index Index name
* @param collection Collection name
* @param searchBody Search query
* @param options Additional options
* - `queuable` If true, queues the request during downtime, until connected to Kuzzle again
* - `from` Offset of the first document to fetch
* - `size` Maximum number of documents to retrieve per page
* - `scroll` When set, gets a forward-only cursor having its ttl set to the given value (e.g. `30s`)
* - `verb` (HTTP only) Forces the verb of the route
* - `timeout` Request Timeout in ms, after the delay if not resolved the promise will be rejected
*
* @returns A SearchResult containing realtime documents
*/
search (
index: string,
collection: string,
searchBody: JSONObject = {},
options: {
from?: number;
size?: number;
scroll?: string;
lang?: string;
verb?: string;
timeout?: number;
} = {}
): Promise<RealtimeDocumentSearchResult> {
return this.sdk.document['_search'](index, collection, searchBody, options)
.then(({ response, request, opts }) => {
const result = new RealtimeDocumentSearchResult(
this.sdk,
request,
opts,
response.result,
this);

return result.start();
});
}

/**
* Retrieve a realtime document from a document
*
* @param index Index name
* @param collection Collection name
* @param document Document to observe
*
* @returns A realtime document
*/
observe (
index: string,
collection: string,
document: Document,
): Promise<RealtimeDocument> {
const rtDocument = this.addDocument(index, collection, document);

return this.resubscribe(index, collection)
.then(() => rtDocument);
}

/**
* Adds a document and retrieve managed realtime document
*
* @internal
*
* Use observe() to retrieve a realtime document.
*/
addDocument (index: string, collection: string, document: Document): RealtimeDocument {
const rtDocument = new RealtimeDocument(document);

const urn = collectionUrn(index, collection);

if (! this.documentsBycollections.has(urn)) {
this.documentsBycollections.set(urn, new ObservedDocuments());
}

const subscription = this.documentsBycollections.get(urn);

subscription.add(document._id);

this.documents.set(documentUrn(index, collection, document._id), rtDocument);

return rtDocument;
}

/**
* Renew a collection subscription with filters according to the list of
* currently managed documents.
*@
* @internal
*/
resubscribe (index: string, collection: string): Promise<void> {
const subscription = this.documentsBycollections.get(collectionUrn(index, collection));
// @todo do not resubscribe if no documents
return this.sdk.realtime.subscribe(
index,
collection,
subscription.filters,
this.notificationHandler.bind(this)
)
.then(roomId => {
const oldRoomId = subscription.roomId;

subscription.roomId = roomId;

if (oldRoomId) {
return this.sdk.realtime.unsubscribe(oldRoomId);
}
});
}

/**
* Handler method to process notification and update realtime documents content.
*/
private notificationHandler (notification: DocumentNotification): Promise<void> {
const { index, collection, result } = notification;

const urn = documentUrn(index, collection, result._id);
const rtDocument = this.documents.get(urn);

// On "write", mutate document with changes
// On "publish", nothing
if (notification.event !== 'delete') {
if (notification.event === 'write') {
Object.assign(rtDocument._source, result._source);
}

return Promise.resolve();
}

rtDocument.deleted = true;
this.documents.delete(rtDocument._id);

return this.resubscribe(index, collection);
}
}
13 changes: 13 additions & 0 deletions src/core/RealtimeDocument.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { JSONObject } from '../types';

export class RealtimeDocument {
public _id: string;
public _source: JSONObject;
public deleted: boolean;

constructor ({ _id, _source }) {
this._id = _id;
this._source = _source;
this.deleted = false;
}
}
Loading