Skip to content

Commit 82f5f6d

Browse files
committed
Coalesce GeoJSON "loadData" requests.
Re-implement PR #5902 including fix for issue #5970.
1 parent d11cf30 commit 82f5f6d

File tree

4 files changed

+245
-20
lines changed

4 files changed

+245
-20
lines changed

src/source/geojson_source.js

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,22 @@ class GeoJSONSource extends Evented implements Source {
201201
// target {this.type}.loadData rather than literally geojson.loadData,
202202
// so that other geojson-like source types can easily reuse this
203203
// implementation
204-
this.workerID = this.dispatcher.send(`${this.type}.loadData`, options, (err, result) => {
205-
this._loaded = true;
206-
207-
if (result && result.resourceTiming && result.resourceTiming[this.id])
208-
this._resourceTiming = result.resourceTiming[this.id].slice(0);
209-
210-
callback(err);
204+
this.workerID = this.dispatcher.send(`${this.type}.loadData`, options, (err, abandoned) => {
205+
if (!abandoned) {
206+
this._loaded = true;
207+
208+
if (result && result.resourceTiming && result.resourceTiming[this.id])
209+
this._resourceTiming = result.resourceTiming[this.id].slice(0);
210+
// Any `loadData` calls that piled up while we were processing
211+
// this one will get coalesced into a single call when this
212+
// 'coalesce' message is processed.
213+
// We would self-send from the worker if we had access to its
214+
// message queue. Waiting instead for the 'coalesce' to round-trip
215+
// through the foreground just means we're throttling the worker
216+
// to run at a little less than full-throttle.
217+
this.dispatcher.send(`${this.type}.coalesce`, this.workerOptions, null, this.workerID);
218+
callback(err);
219+
}
211220
}, this.workerID);
212221
}
213222

src/source/geojson_worker_source.js

Lines changed: 101 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const GeoJSONWrapper = require('./geojson_wrapper');
77
const vtpbf = require('vt-pbf');
88
const supercluster = require('supercluster');
99
const geojsonvt = require('geojson-vt');
10+
const assert = require('assert');
1011

1112
const VectorTileWorkerSource = require('./vector_tile_worker_source');
1213

@@ -32,6 +33,10 @@ export type LoadGeoJSONParameters = {
3233
geojsonVtOptions?: Object
3334
};
3435

36+
export type CoalesceParameters = {
37+
source: string
38+
};
39+
3540
export type LoadGeoJSON = (params: LoadGeoJSONParameters, callback: Callback<mixed>) => void;
3641

3742
export interface GeoJSONIndex {
@@ -41,11 +46,11 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC
4146
const source = params.source,
4247
canonical = params.tileID.canonical;
4348

44-
if (!this._geoJSONIndexes[source]) {
49+
if (!this._sources[source] || !this._sources[source].geoJSONIndex) {
4550
return callback(null, null); // we couldn't load the file
4651
}
4752

48-
const geoJSONTile = this._geoJSONIndexes[source].getTile(canonical.z, canonical.x, canonical.y);
53+
const geoJSONTile = this._sources[source].geoJSONIndex.getTile(canonical.z, canonical.x, canonical.y);
4954
if (!geoJSONTile) {
5055
return callback(null, null); // nothing in the given tile
5156
}
@@ -67,6 +72,11 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC
6772
});
6873
}
6974

75+
export type SourceState =
76+
| 'Idle' // Source empty or data loaded
77+
| 'Coalescing' // Data finished loading, but discard 'loadData' messages until receiving 'coalesced'
78+
| 'NeedsLoadData'; // 'loadData' received while coalescing, trigger one more 'loadData' on receiving 'coalesced'
79+
7080
/**
7181
* The {@link WorkerSource} implementation that supports {@link GeoJSONSource}.
7282
* This class is designed to be easily reused to support custom source types
@@ -78,8 +88,13 @@ function loadGeoJSONTile(params: WorkerTileParameters, callback: LoadVectorDataC
7888
* @private
7989
*/
8090
class GeoJSONWorkerSource extends VectorTileWorkerSource {
81-
_geoJSONIndexes: { [string]: GeoJSONIndex };
8291
loadGeoJSON: LoadGeoJSON;
92+
_sources: { [string]: {
93+
state?: SourceState,
94+
pendingCallback?: Callback<boolean>,
95+
pendingLoadDataParams?: LoadGeoJSONParameters,
96+
geoJSONIndex?: GeoJSONIndex // object mapping source ids to geojson-vt-like tile indexes
97+
}};
8398

8499
/**
85100
* @param [loadGeoJSON] Optional method for custom loading/parsing of
@@ -91,8 +106,7 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
91106
if (loadGeoJSON) {
92107
this.loadGeoJSON = loadGeoJSON;
93108
}
94-
// object mapping source ids to geojson-vt-like tile indexes
95-
this._geoJSONIndexes = {};
109+
this._sources = {};
96110
}
97111

98112
/**
@@ -103,11 +117,51 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
103117
* Defers to {@link GeoJSONWorkerSource#loadGeoJSON} for the fetching/parsing,
104118
* expecting `callback(error, data)` to be called with either an error or a
105119
* parsed GeoJSON object.
120+
*
121+
* When `loadData` requests come in faster than they can be processed,
122+
* they are coalesced into a single request using the latest data.
123+
* See {@link GeoJSONWorkerSource#coalesce}
124+
*
106125
* @param params
107126
* @param params.source The id of the source.
108127
* @param callback
109128
*/
110-
loadData(params: LoadGeoJSONParameters, callback: Callback<{[string]: {[string]: Array<PerformanceResourceTiming>}}>) {
129+
loadData(params: LoadGeoJSONParameters, callback: Callback<boolean>) {
130+
if (!this._sources[params.source]) {
131+
this._sources[params.source] = {};
132+
}
133+
const source = this._sources[params.source];
134+
135+
if (source.pendingCallback) {
136+
// Tell the foreground the previous call has been abandoned
137+
source.pendingCallback(null, true);
138+
}
139+
source.pendingCallback = callback;
140+
source.pendingLoadDataParams = params;
141+
142+
if (source.state &&
143+
source.state !== 'Idle') {
144+
source.state = 'NeedsLoadData';
145+
} else {
146+
source.state = 'Coalescing';
147+
this._loadData(params.source);
148+
}
149+
}
150+
151+
/**
152+
* Internal implementation: called directly by `loadData`
153+
* or by `coalesce` using stored parameters.
154+
*/
155+
_loadData(sourceId: string) {
156+
const source = this._sources[sourceId];
157+
if (!source.pendingCallback || !source.pendingLoadDataParams) {
158+
assert(false);
159+
return;
160+
}
161+
const callback = source.pendingCallback;
162+
const params = source.pendingLoadDataParams;
163+
delete source.pendingCallback;
164+
delete source.pendingLoadDataParams;
111165
this.loadGeoJSON(params, (err, data) => {
112166
if (err || !data) {
113167
return callback(err);
@@ -117,7 +171,7 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
117171
rewind(data, true);
118172

119173
try {
120-
this._geoJSONIndexes[params.source] = params.cluster ?
174+
source.geoJSONIndex = params.cluster ?
121175
supercluster(params.superclusterOptions).load(data.features) :
122176
geojsonvt(data, params.geojsonVtOptions);
123177
} catch (err) {
@@ -141,6 +195,39 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
141195
});
142196
}
143197

198+
/**
199+
* While processing `loadData`, we coalesce all further
200+
* `loadData` messages into a single call to _loadData
201+
* that will happen once we've finished processing the
202+
* first message. {@link GeoJSONSource#_updateWorkerData}
203+
* is responsible for sending us the `coalesce` message
204+
* at the time it receives a response from `loadData`
205+
*
206+
* State: Idle
207+
* ↑ |
208+
* 'coalesce' 'loadData'
209+
* | (triggers load)
210+
* | ↓
211+
* State: Coalescing
212+
* ↑ |
213+
* (triggers load) |
214+
* 'coalesce' 'loadData'
215+
* | ↓
216+
* State: NeedsLoadData
217+
*/
218+
coalesce(params: CoalesceParameters) {
219+
const source = this._sources[params.source];
220+
if (!source) {
221+
return; // coalesce queued after removeSource
222+
}
223+
if (source.state === 'Coalescing') {
224+
source.state = 'Idle';
225+
} else if (source.state === 'NeedsLoadData') {
226+
source.state = 'Coalescing';
227+
this._loadData(params.source);
228+
}
229+
}
230+
144231
/**
145232
* Implements {@link WorkerSource#reloadTile}.
146233
*
@@ -192,8 +279,13 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
192279
}
193280

194281
removeSource(params: {source: string}, callback: Callback<mixed>) {
195-
if (this._geoJSONIndexes[params.source]) {
196-
delete this._geoJSONIndexes[params.source];
282+
const removedSource = this._sources[params.source];
283+
if (removedSource) {
284+
if (removedSource.pendingCallback) {
285+
// Don't leak callbacks
286+
removedSource.pendingCallback(null, true);
287+
}
288+
delete this._sources[params.source];
197289
}
198290
callback();
199291
}

test/unit/source/geojson_source.test.js

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ test('GeoJSONSource#setData', (t) => {
5252
opts = util.extend(opts, { data: {} });
5353
return new GeoJSONSource('id', opts, {
5454
send: function (type, data, callback) {
55-
return setTimeout(callback, 0);
55+
if (callback) {
56+
return setTimeout(callback, 0);
57+
}
5658
}
5759
});
5860
}
@@ -172,7 +174,9 @@ test('GeoJSONSource#update', (t) => {
172174
t.test('fires event when metadata loads', (t) => {
173175
const mockDispatcher = {
174176
send: function(message, args, callback) {
175-
setTimeout(callback, 0);
177+
if (callback) {
178+
setTimeout(callback, 0);
179+
}
176180
}
177181
};
178182

@@ -188,7 +192,9 @@ test('GeoJSONSource#update', (t) => {
188192
t.test('fires "error"', (t) => {
189193
const mockDispatcher = {
190194
send: function(message, args, callback) {
191-
setTimeout(callback.bind(null, 'error'), 0);
195+
if (callback) {
196+
setTimeout(callback.bind(null, 'error'), 0);
197+
}
192198
}
193199
};
194200

@@ -209,7 +215,9 @@ test('GeoJSONSource#update', (t) => {
209215
if (message === 'geojson.loadData' && --expectedLoadDataCalls <= 0) {
210216
t.end();
211217
}
212-
setTimeout(callback, 0);
218+
if (callback) {
219+
setTimeout(callback, 0);
220+
}
213221
}
214222
};
215223

test/unit/source/geojson_worker_source.test.js

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ test('reloadTile', (t) => {
8787

8888
function addData(callback) {
8989
source.loadData({ source: 'sourceId', data: JSON.stringify(geoJson) }, (err) => {
90+
source.coalesce({ source: 'sourceId' });
9091
t.equal(err, null);
9192
callback();
9293
});
@@ -197,3 +198,118 @@ test('resourceTiming', (t) => {
197198

198199
t.end();
199200
});
201+
202+
test('loadData', (t) => {
203+
const layers = [
204+
{
205+
id: 'layer1',
206+
source: 'source1',
207+
type: 'symbol',
208+
},
209+
{
210+
id: 'layer2',
211+
source: 'source2',
212+
type: 'symbol',
213+
}
214+
];
215+
216+
const geoJson = {
217+
"type": "Feature",
218+
"geometry": {
219+
"type": "Point",
220+
"coordinates": [0, 0]
221+
}
222+
};
223+
224+
const layerIndex = new StyleLayerIndex(layers);
225+
function createWorker() {
226+
const worker = new GeoJSONWorkerSource(null, layerIndex);
227+
228+
// Making the call to loadGeoJSON to asynchronous
229+
// allows these tests to mimic a message queue building up
230+
// (regardless of timing)
231+
const originalLoadGeoJSON = worker.loadGeoJSON;
232+
worker.loadGeoJSON = function(params, callback) {
233+
setTimeout(() => {
234+
originalLoadGeoJSON(params, callback);
235+
}, 0);
236+
};
237+
return worker;
238+
}
239+
240+
t.test('abandons coalesced callbacks', (t) => {
241+
// Expect first call to run, second to be abandoned,
242+
// and third to run in response to coalesce
243+
const worker = createWorker();
244+
worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
245+
t.equal(err, null);
246+
t.notOk(abandoned);
247+
worker.coalesce({ source: 'source1' });
248+
});
249+
250+
worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
251+
t.equal(err, null);
252+
t.ok(abandoned);
253+
});
254+
255+
worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
256+
t.equal(err, null);
257+
t.notOk(abandoned);
258+
t.end();
259+
});
260+
});
261+
262+
t.test('does not mix coalesce state between sources', (t) => {
263+
// Expect first and second calls to run independently,
264+
// and third call should run in response to coalesce
265+
// from first call.
266+
const worker = createWorker();
267+
worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
268+
t.equal(err, null);
269+
t.notOk(abandoned);
270+
worker.coalesce({ source: 'source1' });
271+
});
272+
273+
worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => {
274+
t.equal(err, null);
275+
t.notOk(abandoned);
276+
});
277+
278+
worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
279+
t.equal(err, null);
280+
t.notOk(abandoned);
281+
t.end();
282+
});
283+
});
284+
285+
t.test('does not mix stored callbacks between sources', (t) => {
286+
// Two loadData calls per source means no calls should
287+
// be abandoned.
288+
const worker = createWorker();
289+
worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
290+
t.equal(err, null);
291+
t.notOk(abandoned);
292+
worker.coalesce({ source: 'source1' });
293+
});
294+
295+
worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => {
296+
t.equal(err, null);
297+
t.notOk(abandoned);
298+
worker.coalesce({ source: 'source2' });
299+
});
300+
301+
worker.loadData({ source: 'source2', data: JSON.stringify(geoJson) }, (err, abandoned) => {
302+
t.equal(err, null);
303+
t.notOk(abandoned);
304+
// test ends here because source2 has the last coalesce call
305+
t.end();
306+
});
307+
308+
worker.loadData({ source: 'source1', data: JSON.stringify(geoJson) }, (err, abandoned) => {
309+
t.equal(err, null);
310+
t.notOk(abandoned);
311+
});
312+
});
313+
314+
t.end();
315+
});

0 commit comments

Comments
 (0)