Skip to content

Commit

Permalink
Added trace of flush summary stats. Added dropIndexes to factory. Add…
Browse files Browse the repository at this point in the history
…ed support for 'text' order in index. Updated to latest async module.
  • Loading branch information
meirgottlieb committed Aug 4, 2016
1 parent b436ea2 commit 337c797
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 102 deletions.
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "hydrate-mongodb",
"description": "An Object Document Mapper (ODM) for MongoDB.",
"version": "0.9.2",
"version": "0.9.3",
"author": {
"name": "Artifact Health, LLC",
"url": "http://www.artifacthealth.com"
Expand All @@ -20,9 +20,9 @@
"test": "gulp"
},
"dependencies": {
"async": "^0.9.0",
"async": "^2.0.1",
"change-case": "^2.2.0",
"mongodb": "^2.1.4",
"mongodb": "^2.1.19",
"reflect-helper": "^1.0.2"
},
"devDependencies": {
Expand Down
4 changes: 2 additions & 2 deletions src/config/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ export class Configuration {
async.each(this._mappings, (provider, done) => {

provider.getMapping(this, (err, r) => {
if(err) return done(err, null);
if(err) return done(err);

// Merge all registries. Duplicates will cause an error.
registry.addMappings(<ClassMapping[]>r);
done(null, null);
done();
});
}, (err) => {
if(err) return callback(err);
Expand Down
5 changes: 5 additions & 0 deletions src/core/timerUtil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export function getDuration(start: number[]): number {

var stop = process.hrtime(start);
return (stop[0] * 1000) + stop[1] / 1000000;
}
2 changes: 1 addition & 1 deletion src/mapping/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ import {IndexOptions} from "./indexOptions";

export interface Index {

keys: [string, number][];
keys: [string, number | string][];
options?: IndexOptions;
}
43 changes: 18 additions & 25 deletions src/mapping/providers/annotations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,30 +316,8 @@ export class IndexAnnotation extends Annotation implements ClassAnnotation, Prop

processPropertyAnnotation(context: MappingBuilderContext, mapping: MappingModel.EntityMapping, property: MappingModel.Property, symbol: Property, annotation: IndexAnnotation): void {

// TODO: allow indexes in embedded types and map to containing root type
if(!context.assertEntityMapping(mapping)) return;

var keys: [string, number][] = [];

var order: number;
if(annotation.order !== undefined) {
order = annotation.order;
if(order != 1 && order != -1) {
context.addError("Valid values for property 'order' are 1 or -1.");
return;
}
// TODO: 'order' property should not be passed in options object. When we validate options in the future, this will throw an error.
// However, we can't just delete it from the object because then that removes it from the annotation as well and subsequent
// processing of the annotation would then not have the order value. Instead we should copy properties from the annotation value
// to the index options.
}
else {
order = 1;
}
keys.push([property.name, order]);

this._addIndex(context, mapping, {
keys: keys,
keys: [[property.name, annotation.order || 1]],
options: annotation.options
});
}
Expand All @@ -349,11 +327,26 @@ export class IndexAnnotation extends Annotation implements ClassAnnotation, Prop
// TODO: allow indexes in embedded types and map to containing root type
if(context.assertEntityMapping(mapping)) {

if (!value.keys) {
context.addError("Missing require property 'keys'.");
if (!Array.isArray(value.keys) || value.keys.length == 0) {
context.addError("Missing or invalid property 'keys'.");
return;
}

for (let i = 0; i < value.keys.length; i++) {

let key = value.keys[i];
if (!Array.isArray(key) || key.length != 2 || typeof key[0] !== "string") {
context.addError(`Index key ${i} is invalid. Key must be a tuple [path, order].`);
return;
}

let order = value.keys[i][1];
if (order != 1 && order != -1 && order != 'text') {
context.addError("Valid values for index order are 1, -1, or 'text'.");
return;
}
}

// TODO: validate index options
mapping.addIndex(value);
}
Expand Down
6 changes: 1 addition & 5 deletions src/persister.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {WriteContext} from "./mapping/writeContext";
import {MappingModel} from "./mapping/mappingModel";
import {Readable} from "stream";
import {PersistenceError} from "./persistenceError";
import {getDuration} from "./core/timerUtil";

interface FindOneQuery {

Expand Down Expand Up @@ -1015,11 +1016,6 @@ class FindQueue {
}
}

function getDuration(start: number[]): number {

var stop = process.hrtime(start);
return (stop[0] * 1000) + stop[1] / 1000000;
}
/*
class QueryStream extends Readable {
Expand Down
58 changes: 56 additions & 2 deletions src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {QueryDefinition} from "./query/queryDefinition";
import {Observer} from "./observer";
import {PersistenceError} from "./persistenceError";
import {WriteContext} from "./mapping/writeContext";
import {getDuration} from "./core/timerUtil";

/**
* The state of an object.
Expand Down Expand Up @@ -281,11 +282,13 @@ export class SessionImpl extends EventEmitter implements InternalSession {
*/
private _queue: TaskQueue;

private _traceEnabled: boolean;

constructor(public factory: InternalSessionFactory) {
super();

this._createTaskQueue();
this._traceEnabled = this.factory.logger != null;
}

save(obj: any, callback?: Callback): void {
Expand Down Expand Up @@ -826,14 +829,22 @@ export class SessionImpl extends EventEmitter implements InternalSession {
// clear list of scheduled objects
this._scheduleHead = this._scheduleTail = null;

if (this._traceEnabled) {
var start = process.hrtime();
}

var batch = new Batch();
this._buildBatch(batch, head, (err) => {
if (err) return callback(err);

batch.execute((err) => {
if (err) return callback(err);

this._batchCompleted(head, callback);
if (start) {
var duration = getDuration(start);
}

this._batchCompleted(head, duration, callback);
});
});
});
Expand Down Expand Up @@ -922,9 +933,14 @@ export class SessionImpl extends EventEmitter implements InternalSession {
/**
* Called after a batch is successfully executed.
* @param head The head of the scheduled operation list.
* @param duration The duration of the flush operation, for trace logging.
* @param callback Called when processing is completed.
*/
private _batchCompleted(head: ObjectLinks, callback: Callback): void {
private _batchCompleted(head: ObjectLinks, duration: number, callback: Callback): void {

if (this._traceEnabled) {
this._logFlushStats(head, duration);
}

var links = head;
while(links) {
Expand Down Expand Up @@ -954,6 +970,44 @@ export class SessionImpl extends EventEmitter implements InternalSession {

callback();
}

private _logFlushStats(head: ObjectLinks, duration: number): void {

var inserted = 0, updated = 0, removed = 0, dirtyChecked = 0;

var links = head;
while (links) {

switch (links.scheduledOperation) {
case ScheduledOperation.Update:
updated++;
break;
case ScheduledOperation.Delete:
inserted++;
break;
case ScheduledOperation.Insert:
removed++;
break;
case ScheduledOperation.DirtyCheck:
dirtyChecked++;
break;
}

links = links.next;
}

this.factory.logger.trace(
{
duration,
operations: {
inserted,
updated,
removed,
dirtyChecked: dirtyChecked + updated
}
},
`[Hydrate] Completed flush.`);
}

private _close(callback: Callback): void {

Expand Down
39 changes: 37 additions & 2 deletions src/sessionFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ export interface SessionFactory {
* @param callback Called after the operation has completed.
*/
createIndexes(options: CreateIndexesOptions, callback: Callback): void;
/**
* Drops all indexes for all collections managed by Hydrate. This includes indexes that were not created by Hydrate.
* @param callback Called after the operation has completed.
*/
dropIndexes(callback: Callback): void;
}

/**
Expand Down Expand Up @@ -144,8 +149,8 @@ export class SessionFactoryImpl implements InternalSessionFactory {
}

let order = index.keys[i][1];
if (order !== 1 && order !== -1) {
indexDone(new PersistenceError(`Invalid order for key '${propertyName}' on index for '${mapping.name}'. Order must be 1 or -1.`));
if (order !== 1 && order !== -1 && order != 'text') {
indexDone(new PersistenceError(`Invalid order for key '${propertyName}' on index for '${mapping.name}'. Order must be 1, -1, or 'text'.`));
return;
}

Expand Down Expand Up @@ -185,4 +190,34 @@ export class SessionFactoryImpl implements InternalSessionFactory {
},
callback);
}

/**
* Drops all indexes for all collections managed by Hydrate. This includes indexes that were not created by Hydrate.
* @param callback Called after the operation has completed.
*/
dropIndexes(callback: Callback): void {

async.each(
this._mappingRegistry.getEntityMappings().filter(mapping => mapping.hasFlags(MappingModel.MappingFlags.InheritanceRoot)),
(mapping, done) => {

let collection = this._collections[mapping.id];
if (collection) {
collection.dropAllIndexes((err: Error) => {
if (err) return done(err);

if (this.logger) {
this.logger.trace(
{
collection: mapping.collectionName
},
`[Hydrate] Dropped all indexes on '${collection.collectionName}' collection.`);
}

done();
})
}
},
callback);
}
}
Loading

0 comments on commit 337c797

Please sign in to comment.