Skip to content

fix: Possible race condition on Orchestrator API initialization #9713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/cubejs-server-core/src/core/OrchestratorStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export class OrchestratorStorage {
});
}

protected readonly initializers: Map<string, Promise<OrchestratorApi>> = new Map();

public has(orchestratorId: string) {
return this.storage.has(orchestratorId);
}
Expand All @@ -24,6 +26,29 @@ export class OrchestratorStorage {
return this.storage.set(orchestratorId, orchestratorApi);
}

public async getOrInit(orchestratorId: string, init: () => Promise<OrchestratorApi>): Promise<OrchestratorApi> {
if (this.storage.has(orchestratorId)) {
return this.storage.get(orchestratorId);
}

if (this.initializers.has(orchestratorId)) {
return this.initializers.get(orchestratorId);
}

try {
const initPromise = init();
this.initializers.set(orchestratorId, initPromise);

const instance = await initPromise;

this.storage.set(orchestratorId, instance);

return instance;
} finally {
this.initializers.delete(orchestratorId);
}
}

public clear() {
this.storage.clear();
}
Expand Down
168 changes: 81 additions & 87 deletions packages/cubejs-server-core/src/core/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -569,88 +569,49 @@ export class CubejsServerCore {
public async getOrchestratorApi(context: RequestContext): Promise<OrchestratorApi> {
const orchestratorId = await this.contextToOrchestratorId(context);

if (this.orchestratorStorage.has(orchestratorId)) {
return this.orchestratorStorage.get(orchestratorId);
}

/**
* Hash table to store promises which will be resolved with the
* datasource drivers. DriverFactoryByDataSource function is closure
* this constant.
*/
const driverPromise: Record<string, Promise<BaseDriver>> = {};

let externalPreAggregationsDriverPromise: Promise<BaseDriver> | null = null;

const contextToDbType: DbTypeAsyncFn = this.contextToDbType.bind(this);
const externalDbType = this.contextToExternalDbType(context);

// orchestrator options can be empty, if user didn't define it.
// so we are adding default and configuring queues concurrency.
const orchestratorOptions =
Copy link
Member Author

@ovr ovr Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you can see, there is an await point here at (await this.orchestratorOptions(context)) || {},. it's defined between first get/set and set as init point.

With this, it's possible to have two orchestrator API instances, if it's called immediately at one time.

this.optsHandler.getOrchestratorInitializedOptions(
context,
(await this.orchestratorOptions(context)) || {},
);

const orchestratorApi = this.createOrchestratorApi(
return this.orchestratorStorage.getOrInit(orchestratorId, async () => {
/**
* Driver factory function `DriverFactoryByDataSource`.
* Hash table to store promises which will be resolved with the
* datasource drivers. DriverFactoryByDataSource function is a closure
* this constant.
*/
async (dataSource = 'default') => {
if (driverPromise[dataSource]) {
return driverPromise[dataSource];
}
const driverPromise: Record<string, Promise<BaseDriver>> = {};

// eslint-disable-next-line no-return-assign
return driverPromise[dataSource] = (async () => {
let driver: BaseDriver | null = null;

try {
driver = await this.resolveDriver(
{
...context,
dataSource,
},
orchestratorOptions,
);

if (typeof driver === 'object' && driver != null) {
if (driver.setLogger) {
driver.setLogger(this.logger);
}
let externalPreAggregationsDriverPromise: Promise<BaseDriver> | null = null;

await driver.testConnection();
const contextToDbType: DbTypeAsyncFn = this.contextToDbType.bind(this);
const externalDbType = this.contextToExternalDbType(context);

return driver;
}

throw new Error(
`Unexpected return type, driverFactory must return driver (dataSource: "${dataSource}"), actual: ${getRealType(driver)}`
);
} catch (e) {
driverPromise[dataSource] = null;

if (driver) {
await driver.release();
}
// orchestrator options can be empty, if user didn't define it.
// so we are adding default and configuring queues concurrency.
const orchestratorOptions =
this.optsHandler.getOrchestratorInitializedOptions(
context,
(await this.orchestratorOptions(context)) || {},
);

throw e;
}
})();
},
{
externalDriverFactory: this.options.externalDriverFactory && (async () => {
if (externalPreAggregationsDriverPromise) {
return externalPreAggregationsDriverPromise;
return this.createOrchestratorApi(
/**
* Driver factory function `DriverFactoryByDataSource`.
*/
async (dataSource = 'default') => {
if (driverPromise[dataSource]) {
return driverPromise[dataSource];
}

// eslint-disable-next-line no-return-assign
return externalPreAggregationsDriverPromise = (async () => {
return driverPromise[dataSource] = (async () => {
let driver: BaseDriver | null = null;

try {
driver = await this.options.externalDriverFactory(context);
driver = await this.resolveDriver(
{
...context,
dataSource,
},
orchestratorOptions,
);

if (typeof driver === 'object' && driver != null) {
if (driver.setLogger) {
driver.setLogger(this.logger);
Expand All @@ -662,10 +623,10 @@ export class CubejsServerCore {
}

throw new Error(
`Unexpected return type, externalDriverFactory must return driver, actual: ${getRealType(driver)}`
`Unexpected return type, driverFactory must return driver (dataSource: "${dataSource}"), actual: ${getRealType(driver)}`
);
} catch (e) {
externalPreAggregationsDriverPromise = null;
driverPromise[dataSource] = null;

if (driver) {
await driver.release();
Expand All @@ -674,23 +635,56 @@ export class CubejsServerCore {
throw e;
}
})();
}),
contextToDbType: async (dataSource) => contextToDbType({
...context,
dataSource
}),
// speedup with cache
contextToExternalDbType: () => externalDbType,
redisPrefix: orchestratorId,
skipExternalCacheAndQueue: externalDbType === 'cubestore',
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
...orchestratorOptions,
}
);
},
{
externalDriverFactory: this.options.externalDriverFactory && (async () => {
if (externalPreAggregationsDriverPromise) {
return externalPreAggregationsDriverPromise;
}

// eslint-disable-next-line no-return-assign
return externalPreAggregationsDriverPromise = (async () => {
let driver: BaseDriver | null = null;

try {
driver = await this.options.externalDriverFactory(context);
if (typeof driver === 'object' && driver != null) {
if (driver.setLogger) {
driver.setLogger(this.logger);
}

this.orchestratorStorage.set(orchestratorId, orchestratorApi);
await driver.testConnection();

return driver;
}

return orchestratorApi;
throw new Error(
`Unexpected return type, externalDriverFactory must return driver, actual: ${getRealType(driver)}`
);
} catch (e) {
externalPreAggregationsDriverPromise = null;

if (driver) {
await driver.release();
}

throw e;
}
})();
}),
contextToDbType: async (dataSource) => contextToDbType({
...context,
dataSource
}),
// speedup with cache
contextToExternalDbType: () => externalDbType,
redisPrefix: orchestratorId,
skipExternalCacheAndQueue: externalDbType === 'cubestore',
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
...orchestratorOptions,
}
);
});
}

protected createCompilerApi(repository, options: Record<string, any> = {}) {
Expand Down
Loading