Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

Commit

Permalink
Bug 1370985 - Change getBatched() to return records directly instead …
Browse files Browse the repository at this point in the history
…of using a callback. r=markh,tcsc

MozReview-Commit-ID: HfwPe8jSH66
  • Loading branch information
eoger committed May 30, 2017
1 parent 2a48d17 commit e828d81
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 321 deletions.
16 changes: 7 additions & 9 deletions services/sync/modules/bookmark_validator.js
Original file line number Diff line number Diff line change
Expand Up @@ -823,16 +823,14 @@ class BookmarkValidator {
let collection = engine.itemSource();
let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
collection.full = true;
let items = [];
collection.recordHandler = function(item) {
item.decrypt(collectionKey);
items.push(item.cleartext);
};
let resp = await collection.getBatched();
if (!resp.success) {
throw resp;
let result = await collection.getBatched();
if (!result.response.success) {
throw result.response;
}
return items;
return result.records.map(record => {
record.decrypt(collectionKey);
return record.cleartext;
});
}

async validate(engine) {
Expand Down
16 changes: 7 additions & 9 deletions services/sync/modules/collection_validator.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,14 @@ class CollectionValidator {
let collection = engine.itemSource();
let collectionKey = engine.service.collectionKeys.keyForCollection(engine.name);
collection.full = true;
let items = [];
collection.recordHandler = function(item) {
item.decrypt(collectionKey);
items.push(item.cleartext);
};
let resp = await collection.getBatched();
if (!resp.success) {
throw resp;
let result = await collection.getBatched();
if (!result.response.success) {
throw result.response;
}
return items;
return result.records.map(record => {
record.decrypt(collectionKey);
return record.cleartext;
});
}

// Should return a promise that resolves to an array of client items.
Expand Down
33 changes: 21 additions & 12 deletions services/sync/modules/engines.js
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ SyncEngine.prototype = {
// called for every incoming record.
let self = this;

newitems.recordHandler = function(item) {
let recordHandler = function(item) {
if (aborting) {
return;
}
Expand Down Expand Up @@ -1231,13 +1231,17 @@ SyncEngine.prototype = {

// Only bother getting data from the server if there's new things
if (this.lastModified == null || this.lastModified > this.lastSync) {
let resp = Async.promiseSpinningly(newitems.getBatched());
doApplyBatchAndPersistFailed.call(this);
if (!resp.success) {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
let { response, records } = Async.promiseSpinningly(newitems.getBatched());
if (!response.success) {
response.failureCode = ENGINE_DOWNLOAD_FAIL;
throw response;
}

for (let record of records) {
recordHandler(record);
}
doApplyBatchAndPersistFailed.call(this);

if (aborting) {
throw aborting;
}
Expand Down Expand Up @@ -1285,13 +1289,18 @@ SyncEngine.prototype = {
newitems.newer = 0;
newitems.ids = fetchBatch.slice(0, batchSize);

// Reuse the existing record handler set earlier
let resp = Async.promiseSpinningly(newitems.get());
if (!resp.success) {
resp.failureCode = ENGINE_DOWNLOAD_FAIL;
throw resp;
}

for (let json of resp.obj) {
let record = new this._recordObj();
record.deserialize(json);
recordHandler(record);
}

// This batch was successfully applied. Not using
// doApplyBatchAndPersistFailed() here to avoid writing toFetch twice.
fetchBatch = fetchBatch.slice(batchSize);
Expand Down Expand Up @@ -1815,15 +1824,15 @@ SyncEngine.prototype = {
test.full = true;

let key = this.service.collectionKeys.keyForCollection(this.name);
test.recordHandler = function recordHandler(record) {
record.decrypt(key);
canDecrypt = true;
};

// Any failure fetching/decrypting will just result in false
try {
this._log.trace("Trying to decrypt a record from the server..");
Async.promiseSpinningly(test.get());
let json = Async.promiseSpinningly(test.get()).obj[0];
let record = new this._recordObj();
record.deserialize(json);
record.decrypt(key);
canDecrypt = true;
} catch (ex) {
if (Async.isShutdownException(ex)) {
throw ex;
Expand Down
58 changes: 9 additions & 49 deletions services/sync/modules/record.js
Original file line number Diff line number Diff line change
Expand Up @@ -707,22 +707,18 @@ Collection.prototype = {
async getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) {
let totalLimit = Number(this.limit) || Infinity;
if (batchSize <= 0 || batchSize >= totalLimit) {
// Invalid batch sizes should arguably be an error, but they're easy to handle
return this.get();
throw new Error("Invalid batch size");
}

if (!this.full) {
throw new Error("getBatched is unimplemented for guid-only GETs");
}

// _onComplete and _onProgress are reset after each `get` by AsyncResource.
// We overwrite _onRecord to something that stores the data in an array
// until the end.
let { _onComplete, _onProgress, _onRecord } = this;
let { _onComplete, _onProgress } = this;
let recordBuffer = [];
let resp;
try {
this._onRecord = r => recordBuffer.push(r);
let lastModifiedTime;
this.limit = batchSize;

Expand All @@ -736,8 +732,14 @@ Collection.prototype = {
// Actually perform the request
resp = await this.get();
if (!resp.success) {
recordBuffer = [];
break;
}
for (let json of resp.obj) {
let record = new this._recordObj();
record.deserialize(json);
recordBuffer.push(record);
}

// Initialize last modified, or check that something broken isn't happening.
let lastModified = resp.headers["x-last-modified"];
Expand All @@ -759,54 +761,12 @@ Collection.prototype = {
// handler so that we can more convincingly pretend to be a normal get()
// call. Note: we're resetting these to the values they had before this
// function was called.
this._onRecord = _onRecord;
this._limit = totalLimit;
this._offset = null;
delete this._headers["x-if-unmodified-since"];
this._rebuildURL();
}
if (resp.success && Async.checkAppReady()) {
// call the original _onRecord (e.g. the user supplied record handler)
// for each record we've stored
for (let record of recordBuffer) {
this._onRecord(record);
}
}
return resp;
},

set recordHandler(onRecord) {
// Save this because onProgress is called with this as the ChannelListener
let coll = this;

// Switch to newline separated records for incremental parsing
coll.setHeader("Accept", "application/newlines");

this._onRecord = onRecord;

this._onProgress = function(httpChannel) {
let newline, length = 0, contentLength = "unknown";

try {
// Content-Length of the value of this response header
contentLength = httpChannel.getResponseHeader("Content-Length");
} catch (ex) { }

while ((newline = this._data.indexOf("\n")) > 0) {
// Split the json record from the rest of the data
let json = this._data.slice(0, newline);
this._data = this._data.slice(newline + 1);

length += json.length;
coll._log.trace("Record: Content-Length = " + contentLength +
", ByteCount = " + length);

// Deserialize a record from json and give it to the callback
let record = new coll._recordObj();
record.deserialize(json);
coll._onRecord(record);
}
};
return { response: resp, records: recordBuffer };
},

// This object only supports posting via the postQueue object.
Expand Down
11 changes: 8 additions & 3 deletions services/sync/tests/unit/head_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ ServerCollection.prototype = {
return c;
},

get(options) {
get(options, request) {
let result;
if (options.full) {
let data = [];
Expand All @@ -317,8 +317,13 @@ ServerCollection.prototype = {
} else if (start) {
data = data.slice(start);
}
// Our implementation of application/newlines.
result = data.join("\n") + "\n";

if (request && request.getHeader("accept") == "application/newlines") {
this._log.error("Error: client requesting application/newlines content");
throw new Error("This server should not serve application/newlines content");
} else {
result = JSON.stringify(data);
}

// Use options as a backchannel to report count.
options.recordCount = data.length;
Expand Down
2 changes: 1 addition & 1 deletion services/sync/tests/unit/test_bookmark_duping.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async function createBookmark(parentId, url, title, index = bms.DEFAULT_INDEX) {
function getServerRecord(collection, id) {
let wbo = collection.get({ full: true, ids: [id] });
// Whew - lots of json strings inside strings.
return JSON.parse(JSON.parse(JSON.parse(wbo).payload).ciphertext);
return JSON.parse(JSON.parse(JSON.parse(JSON.parse(wbo)[0]).payload).ciphertext);
}

async function promiseNoLocalItem(guid) {
Expand Down
16 changes: 1 addition & 15 deletions services/sync/tests/unit/test_bookmark_engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -535,23 +535,9 @@ add_task(async function test_misreconciled_root() {
};

let rec = new FakeRecord(BookmarkFolder, to_apply);
let encrypted = encryptPayload(rec.cleartext);
encrypted.decrypt = function() {
for (let x in rec) {
encrypted[x] = rec[x];
}
};

_("Applying record.");
engine._processIncoming({
getBatched() {
return this.get();
},
async get() {
this.recordHandler(encrypted);
return {success: true}
},
});
store.applyIncoming(rec);

// Ensure that afterwards, toolbar is still there.
// As of 2012-12-05, this only passes because Places doesn't use "toolbar" as
Expand Down
Loading

0 comments on commit e828d81

Please sign in to comment.