Skip to content

Commit

Permalink
fix(aggregations, explain-plan): remove out stages before running exp…
Browse files Browse the repository at this point in the history
…lain plan COMPASS-7012 (#4830)

fix(aggregations, explain-plan): remove out stages before running explain plan
  • Loading branch information
gribnoysup authored Sep 12, 2023
1 parent 75b346b commit a390314
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 47 deletions.
38 changes: 38 additions & 0 deletions packages/compass-aggregations/src/modules/insights.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,42 @@ describe('fetchExplainForPipeline', function () {

expect(dataService.explainAggregate).to.be.calledOnce;
});

it('should remove $out stage before passing pipeline to explain', async function () {
const dataService = {
explainAggregate: Sinon.stub().resolves(simpleExplain),
isCancelError: Sinon.stub().returns(false),
};

const store = configureStore({
namespace: 'test.test',
dataProvider: { dataProvider: dataService as any },
pipeline: [{ $match: { foo: 1 } }, { $out: 'test' }],
});

await store.dispatch(fetchExplainForPipeline());

expect(dataService.explainAggregate).to.be.calledWith('test.test', [
{ $match: { foo: 1 } },
]);
});

it('should remove $merge stage before passing pipeline to explain', async function () {
const dataService = {
explainAggregate: Sinon.stub().resolves(simpleExplain),
isCancelError: Sinon.stub().returns(false),
};

const store = configureStore({
namespace: 'test.test',
dataProvider: { dataProvider: dataService as any },
pipeline: [{ $merge: { into: 'test' } }, { $match: { bar: 2 } }],
});

await store.dispatch(fetchExplainForPipeline());

expect(dataService.explainAggregate).to.be.calledWith('test.test', [
{ $match: { bar: 2 } },
]);
});
});
14 changes: 13 additions & 1 deletion packages/compass-aggregations/src/modules/insights.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { PipelineBuilderThunkAction } from '.';
import { ActionTypes as ConfirmNewPipelineActions } from './is-new-pipeline-confirm';
import { RESTORE_PIPELINE } from './saved-pipeline';
import { AIPipelineActionTypes } from './pipeline-builder/pipeline-ai';
import { getStageOperator, isOutputStage } from '../utils/stage';

const FETCH_EXPLAIN_PLAN_SUCCESS =
'compass-aggregations/FETCH_EXPLAIN_PLAN_SUCCESS';
Expand Down Expand Up @@ -52,7 +53,18 @@ export const fetchExplainForPipeline = (): PipelineBuilderThunkAction<
try {
// Debounce action to allow for user typing to stop
await cancellableWait(300, abortSignal);
const pipeline = getPipelineFromBuilderState(getState(), pipelineBuilder);
const pipeline = getPipelineFromBuilderState(
getState(),
pipelineBuilder
).filter((stage) => {
// Getting explain plan for a pipeline with an out / merge stage can
// cause data corruption issues in non-genuine MongoDB servers, for
// example CosmosDB actually executes pipeline and persists data, even
// when the stage is not at the end of the pipeline. To avoid
// introducing branching logic based on MongoDB genuineness, we just
// filter out all output stages here instead
return !isOutputStage(getStageOperator(stage));
});
const rawExplainPlan = await dataService.dataService?.explainAggregate?.(
namespace,
pipeline,
Expand Down
6 changes: 3 additions & 3 deletions packages/compass-aggregations/src/utils/stage.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export const filterStageOperators = (options) => {
};

/**
* @param {unknown} stage
* @param {unknown | undefined} stage
* @returns {string | undefined}
*/
export function getStageOperator(stage) {
Expand Down Expand Up @@ -181,7 +181,7 @@ const ATLAS_ONLY_OPERATOR_NAMES = new Set(
);

/**
* @param {string} stageOperator
* @param {string | undefined} stageOperator
* @returns {boolean}
*/
export function isOutputStage(stageOperator) {
Expand All @@ -190,7 +190,7 @@ export function isOutputStage(stageOperator) {

/**
*
* @param {string} stageOperator
* @param {string | undefined} stageOperator
* @returns {boolean}
*/
export function isAtlasOnlyStage(stageOperator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from './explain-plan-modal-store';
import { expect } from 'chai';
import type { Document } from 'mongodb';
import Sinon from 'sinon';

const localAppRegistry = new AppRegistry();

Expand Down Expand Up @@ -48,31 +49,40 @@ const simplePlan = {
ok: 1,
};

function configureStore(explainPlan: Document | Error = simplePlan) {
const explain = () => {
if ((explainPlan as Error).name) {
return Promise.reject(explainPlan);
}
return Promise.resolve(explainPlan);
};
const dataProvider: ExplainPlanModalConfigureStoreOptions['dataProvider'] = {
dataProvider: {
explainAggregate: explain,
explainFind: explain,
isCancelError() {
return false;
describe('explain plan modal store', function () {
const sandbox = Sinon.createSandbox();
let dataProvider: ExplainPlanModalConfigureStoreOptions['dataProvider'];

function configureStore(explainPlan: Document | Error = simplePlan) {
const explain = sandbox.stub().callsFake(() => {
if ((explainPlan as Error).name) {
return Promise.reject(explainPlan);
}
return Promise.resolve(explainPlan);
});

dataProvider = {
dataProvider: {
explainAggregate: explain,
explainFind: explain,
isCancelError() {
return false;
},
},
},
};
return _configureStore({
localAppRegistry,
dataProvider,
namespace: 'test.test',
isDataLake: false,
};

return _configureStore({
localAppRegistry,
dataProvider,
namespace: 'test.test',
isDataLake: false,
});
}

afterEach(function () {
sandbox.resetHistory();
});
}

describe('explain plan modal store', function () {
it('should open modal on `open-explain-plan-modal` event', function () {
const store = configureStore();
localAppRegistry.emit('open-explain-plan-modal', {
Expand Down Expand Up @@ -122,4 +132,32 @@ describe('explain plan modal store', function () {
store.dispatch(closeExplainPlanModal());
expect(store.getState()).to.have.property('isModalOpen', false);
});

it('should remove $out stage before passing pipeline to explain', async function () {
const store = configureStore();
await store.dispatch(
openExplainPlanModal({
aggregation: { pipeline: [{ $match: { foo: 1 } }, { $out: 'test' }] },
})
);
expect(dataProvider?.dataProvider?.explainAggregate).to.be.calledWith(
'test.test',
[{ $match: { foo: 1 } }]
);
});

it('should remove $merge stage before passing pipeline to explain', async function () {
const store = configureStore();
await store.dispatch(
openExplainPlanModal({
aggregation: {
pipeline: [{ $merge: { into: 'test' } }, { $match: { bar: 2 } }],
},
})
);
expect(dataProvider?.dataProvider?.explainAggregate).to.be.calledWith(
'test.test',
[{ $match: { bar: 2 } }]
);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Stage } from '@mongodb-js/explain-plan-helper';
import { ExplainPlan } from '@mongodb-js/explain-plan-helper';
import { capMaxTimeMSAtPreferenceLimit } from 'compass-preferences-model';
import type AppRegistry from 'hadron-app-registry';
import type { DataService, ExplainExecuteOptions } from 'mongodb-data-service';
import type { DataService } from 'mongodb-data-service';
import type { Action, AnyAction, Reducer } from 'redux';
import { applyMiddleware, createStore } from 'redux';
import type { ThunkAction } from 'redux-thunk';
Expand Down Expand Up @@ -198,21 +198,11 @@ function cleanupAbortSignal(id: number) {
return ExplainPlanAbortControllerMap.delete(id);
}

const getAggregationExplainVerbosity = (
pipeline: unknown[],
isDataLake: boolean
): ExplainExecuteOptions['explainVerbosity'] => {
// dataLake does not have $out/$merge operators
if (isDataLake) {
return 'queryPlannerExtended';
}
const lastStage = pipeline[pipeline.length - 1] ?? {};
const isOutOrMergePipeline =
Object.prototype.hasOwnProperty.call(lastStage, '$out') ||
Object.prototype.hasOwnProperty.call(lastStage, '$merge');
return isOutOrMergePipeline
? 'queryPlanner' // $out & $merge only work with queryPlanner
: 'allPlansExecution';
const isOutputStage = (stage: unknown): boolean => {
return (
Object.prototype.hasOwnProperty.call(stage, '$out') ||
Object.prototype.hasOwnProperty.call(stage, '$merge')
);
};

const DEFAULT_MAX_TIME_MS = 60_000;
Expand All @@ -235,11 +225,20 @@ export const openExplainPlanModal = (

try {
if (event.aggregation) {
const { pipeline, collation, maxTimeMS } = event.aggregation;
const explainVerbosity = getAggregationExplainVerbosity(
pipeline,
isDataLake
);
const { collation, maxTimeMS } = event.aggregation;
const explainVerbosity = isDataLake
? 'queryPlannerExtended'
: 'allPlansExecution';

const pipeline = event.aggregation.pipeline.filter((stage) => {
// Getting explain plan for a pipeline with an out / merge stage can
// cause data corruption issues in non-genuine MongoDB servers, for
// example CosmosDB actually executes pipeline and persists data, even
// when the stage is not at the end of the pipeline. To avoid
// introducing branching logic based on MongoDB genuineness, we just
// filter out all output stages here instead
return !isOutputStage(stage);
});

const explainOptions = {
maxTimeMS: capMaxTimeMSAtPreferenceLimit(
Expand Down

0 comments on commit a390314

Please sign in to comment.