Skip to content

Commit

Permalink
feat(posthog): add more event for ibis
Browse files Browse the repository at this point in the history
  • Loading branch information
grieve54706 committed Oct 24, 2024
1 parent e1f6d3c commit 185e488
Show file tree
Hide file tree
Showing 7 changed files with 423 additions and 27 deletions.
36 changes: 29 additions & 7 deletions wren-ui/src/apollo/server/adaptors/ibisAdaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export interface IIbisAdaptor {
query: string,
options: IbisQueryOptions,
) => Promise<IbisQueryResponse>;
dryRun: (query: string, options: IbisBaseOptions) => Promise<boolean>;
dryRun: (query: string, options: IbisBaseOptions) => Promise<DryRunResponse>;
getTables: (
dataSource: DataSourceName,
connectionInfo: WREN_AI_CONNECTION_INFO,
Expand All @@ -130,12 +130,20 @@ export interface IIbisAdaptor {
) => Promise<ValidationResponse>;
}

export interface IbisQueryResponse {
export interface IbisResponse {
correlationId?: string;
processTime?: string;
}

export interface IbisQueryResponse extends IbisResponse {
columns: string[];
data: any[];
dtypes: Record<string, string>;
}

export interface DryRunResponse extends IbisResponse {
}

export class IbisAdaptor implements IIbisAdaptor {
private ibisServerBaseUrl: string;

Expand Down Expand Up @@ -185,22 +193,29 @@ export class IbisAdaptor implements IIbisAdaptor {
},
},
);
const response = res.data;
return response;
return {
...res.data,
correlationId: res.headers['X-Correlation-ID'],
processTime: res.headers['X-Process-Time'],
};
} catch (e) {
logger.debug(`Got error when querying ibis: ${e.response.data}`);

throw Errors.create(Errors.GeneralErrorCodes.IBIS_SERVER_ERROR, {
customMessage: e.response.data || 'Error querying ibis server',
originalError: e,
other: {
correlationId: e.response.headers['X-Correlation-ID'],
processTime: e.response.headers['X-Process-Time'],
},
});
}
}

public async dryRun(
query: string,
options: IbisQueryOptions,
): Promise<boolean> {
): Promise<DryRunResponse> {
const { dataSource, mdl } = options;
const connectionInfo = this.updateConnectionInfo(options.connectionInfo);
const ibisConnectionInfo = toIbisConnectionInfo(dataSource, connectionInfo);
Expand All @@ -211,17 +226,24 @@ export class IbisAdaptor implements IIbisAdaptor {
};
logger.debug(`Dry run sql from ibis with body:`);
try {
await axios.post(
const response = await axios.post(
`${this.ibisServerBaseUrl}/${dataSourceUrlMap[dataSource]}/query?dryRun=true`,
body,
);
logger.debug(`Ibis server Dry run success`);
return true;
return {
correlationId: response.headers['X-Correlation-ID'],
processTime: response.headers['X-Process-Time'],
};
} catch (err) {
logger.info(`Got error when dry running ibis`);
throw Errors.create(Errors.GeneralErrorCodes.DRY_RUN_ERROR, {
customMessage: err.response.data,
originalError: err,
other: {
correlationId: err.response.headers['X-Correlation-ID'],
processTime: err.response.headers['X-Process-Time'],
},
});
}
}
Expand Down
126 changes: 125 additions & 1 deletion wren-ui/src/apollo/server/adaptors/tests/ibisAdaptor.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import axios from 'axios';
import { IbisAdaptor, ValidationRules } from '../ibisAdaptor';
import { DryRunResponse, IbisAdaptor, IbisQueryOptions, IbisQueryResponse, ValidationRules } from '../ibisAdaptor';
import { DataSourceName } from '../../types';
import { Manifest } from '../../mdl/type';
import {
Expand Down Expand Up @@ -388,4 +388,128 @@ describe('IbisAdaptor', () => {
},
);
});

it('should get data, correlationId and processTime', async () => {
mockedAxios.post.mockResolvedValue({
data: {
columns: [],
data: [],
dtypes: {},
},
headers: {
"X-Correlation-ID": '123',
"X-Process-Time": '1s',
},
});
mockedEncryptor.prototype.decrypt.mockReturnValue(
JSON.stringify({ password: mockPostgresConnectionInfo.password }),
);

const res: IbisQueryResponse = await ibisAdaptor.query(
"SELECT * FROM test_table",
{
dataSource: DataSourceName.POSTGRES,
connectionInfo: mockPostgresConnectionInfo,
mdl: mockManifest,
limit: 10,
} as IbisQueryOptions,
);

expect(res.data).toEqual([]);
expect(res.correlationId).toEqual('123');
expect(res.processTime).toEqual('1s');
});

it('should throw an exception with correlationId and processTime when query fails', async () => {
mockedAxios.post.mockRejectedValue({
response: {
data: "Error message",
headers: {
"X-Correlation-ID": '123',
"X-Process-Time": '1s',
},
},
});
mockedEncryptor.prototype.decrypt.mockReturnValue(
JSON.stringify({ password: mockPostgresConnectionInfo.password }),
);

await expect(
ibisAdaptor.query(
"SELECT * FROM test_table",
{
dataSource: DataSourceName.POSTGRES,
connectionInfo: mockPostgresConnectionInfo,
mdl: mockManifest,
limit: 10,
}
)
).rejects.toMatchObject({
message: "Error message",
extensions: {
other: {
correlationId: '123',
processTime: '1s',
},
},
});
});

it('should get data, correlationId and processTime when dry run succeeds', async () => {
mockedAxios.post.mockResolvedValue({
headers: {
"X-Correlation-ID": '123',
"X-Process-Time": '1s',
},
});
mockedEncryptor.prototype.decrypt.mockReturnValue(
JSON.stringify({ password: mockPostgresConnectionInfo.password }),
);

const res: DryRunResponse = await ibisAdaptor.dryRun(
"SELECT * FROM test_table",
{
dataSource: DataSourceName.POSTGRES,
connectionInfo: mockPostgresConnectionInfo,
mdl: mockManifest,
} as IbisQueryOptions,
);

expect(res.correlationId).toEqual('123');
expect(res.processTime).toEqual('1s');
});

it('should throw an exception with correlationId and processTime when dry run fails', async () => {
mockedAxios.post.mockRejectedValue({
response: {
data: "Error message",
headers: {
"X-Correlation-ID": '123',
"X-Process-Time": '1s',
},
},
});
mockedEncryptor.prototype.decrypt.mockReturnValue(
JSON.stringify({ password: mockPostgresConnectionInfo.password }),
);

await expect(
ibisAdaptor.dryRun(
"SELECT * FROM test_table",
{
dataSource: DataSourceName.POSTGRES,
connectionInfo: mockPostgresConnectionInfo,
mdl: mockManifest,
}
)
).rejects.toMatchObject({
message: "Error message",
extensions: {
other: {
correlationId: '123',
processTime: '1s',
},
},
});
});
});
105 changes: 86 additions & 19 deletions wren-ui/src/apollo/server/services/queryService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ import {
IIbisAdaptor,
IbisQueryResponse,
ValidationRules,
IbisResponse,
} from '../adaptors/ibisAdaptor';
import { getLogger } from '@server/utils';
import { Project } from '../repositories';
import {
PostHogTelemetry,
TelemetryEvent,
} from '../telemetry/telemetry';

const logger = getLogger('QueryService');
logger.level = 'debug';
Expand All @@ -21,6 +26,8 @@ export interface ColumnMetadata {
}

export interface PreviewDataResponse {
correlationId?: string;
processTime?: string;
columns: ColumnMetadata[];
data: any[][];
}
Expand Down Expand Up @@ -71,16 +78,20 @@ export interface IQueryService {
export class QueryService implements IQueryService {
private readonly ibisAdaptor: IIbisAdaptor;
private readonly wrenEngineAdaptor: IWrenEngineAdaptor;
private readonly telemetry: PostHogTelemetry;

constructor({
ibisAdaptor,
wrenEngineAdaptor,
telemetry,
}: {
ibisAdaptor: IIbisAdaptor;
wrenEngineAdaptor: IWrenEngineAdaptor;
telemetry: PostHogTelemetry;
}) {
this.ibisAdaptor = ibisAdaptor;
this.wrenEngineAdaptor = wrenEngineAdaptor;
this.telemetry = telemetry;
}

public async preview(
Expand All @@ -106,23 +117,14 @@ export class QueryService implements IQueryService {
this.checkDataSourceIsSupported(dataSource);
logger.debug('Use ibis adaptor to preview');
if (dryRun) {
await this.ibisAdaptor.dryRun(sql, {
dataSource,
connectionInfo,
mdl,
});
await this.ibisDryRun(sql, dataSource, connectionInfo, mdl);
return true;
} else {
const data = await this.ibisAdaptor.query(sql, {
dataSource,
connectionInfo,
mdl,
limit,
});
return this.transformDataType(data);
return await this.ibisQuery(sql, dataSource, connectionInfo, mdl, limit);
}
}
}

public async describeStatement(
sql: string,
options: PreviewOptions,
Expand Down Expand Up @@ -163,6 +165,57 @@ export class QueryService implements IQueryService {
}
}

private checkDataSourceIsSupported(dataSource: DataSourceName) {
if (
!Object.prototype.hasOwnProperty.call(SupportedDataSource, dataSource)
) {
throw new Error(`Unsupported datasource for ibis: "${dataSource}"`);
}
}

private async ibisDryRun(
sql: string,
dataSource: DataSourceName,
connectionInfo: any,
mdl: Manifest
) {
const event = TelemetryEvent.IBIS_DRY_RUN;
try {
const res = await this.ibisAdaptor.dryRun(sql, {
dataSource,
connectionInfo,
mdl,
});
this.sendIbisEvent(event, res, { dataSource, sql });
} catch (err: any) {
this.sendIbisFailedEvent(event, err, { dataSource, sql });
throw err;
}
}

private async ibisQuery(
sql: string,
dataSource: DataSourceName,
connectionInfo: any,
mdl: Manifest,
limit: number
): Promise<PreviewDataResponse> {
const event = TelemetryEvent.IBIS_QUERY;
try {
const res = await this.ibisAdaptor.query(sql, {
dataSource,
connectionInfo,
mdl,
limit,
});
this.sendIbisEvent(event, res, { dataSource, sql });
return this.transformDataType(res);
} catch (err: any) {
this.sendIbisFailedEvent(event, err, { dataSource, sql });
throw err;
}
}

private transformDataType(data: IbisQueryResponse): PreviewDataResponse {
const columns = data.columns;
const dtypes = data.dtypes;
Expand All @@ -188,11 +241,25 @@ export class QueryService implements IQueryService {
} as PreviewDataResponse;
}

private checkDataSourceIsSupported(dataSource: DataSourceName) {
if (
!Object.prototype.hasOwnProperty.call(SupportedDataSource, dataSource)
) {
throw new Error(`Unsupported datasource for ibis: "${dataSource}"`);
}
private sendIbisEvent(event: TelemetryEvent, res: IbisResponse, others: Record<string, any>) {
this.telemetry.sendEvent(event, {
correlationId: res.correlationId,
processTime: res.processTime,
...others,
});
}

private sendIbisFailedEvent(event: TelemetryEvent, err: any, others: Record<string, any>) {
this.telemetry.sendEvent(
event,
{
correlationId: err.extensions.other.correlationId,
processTime: err.extensions.other.processTime,
error: err.message,
...others
},
err.extensions?.service,
false,
);
}
}
Loading

0 comments on commit 185e488

Please sign in to comment.