diff --git a/wren-ui/src/apollo/server/adaptors/ibisAdaptor.ts b/wren-ui/src/apollo/server/adaptors/ibisAdaptor.ts index 67b9c8e8a..3a1a7d2af 100644 --- a/wren-ui/src/apollo/server/adaptors/ibisAdaptor.ts +++ b/wren-ui/src/apollo/server/adaptors/ibisAdaptor.ts @@ -110,7 +110,7 @@ export interface IIbisAdaptor { query: string, options: IbisQueryOptions, ) => Promise; - dryRun: (query: string, options: IbisBaseOptions) => Promise; + dryRun: (query: string, options: IbisBaseOptions) => Promise; getTables: ( dataSource: DataSourceName, connectionInfo: WREN_AI_CONNECTION_INFO, @@ -130,12 +130,20 @@ export interface IIbisAdaptor { ) => Promise; } -export interface IbisQueryResponse { +export interface IbisResponse { + correlationId?: string; + processTime?: string; +} + +export interface IbisQueryResponse extends IbisResponse { columns: string[]; data: any[]; dtypes: Record; } +export interface DryRunResponse extends IbisResponse { +} + export class IbisAdaptor implements IIbisAdaptor { private ibisServerBaseUrl: string; @@ -185,14 +193,21 @@ export class IbisAdaptor implements IIbisAdaptor { }, }, ); - const response = res.data; - return response; + return { + ...res.data, + correlationId: res.headers['X-Correlation-ID'], + processTime: res.headers['X-Process-Time'], + }; } catch (e) { logger.debug(`Got error when querying ibis: ${e.response.data}`); throw Errors.create(Errors.GeneralErrorCodes.IBIS_SERVER_ERROR, { customMessage: e.response.data || 'Error querying ibis server', originalError: e, + other: { + correlationId: e.response.headers['X-Correlation-ID'], + processTime: e.response.headers['X-Process-Time'], + }, }); } } @@ -200,7 +215,7 @@ export class IbisAdaptor implements IIbisAdaptor { public async dryRun( query: string, options: IbisQueryOptions, - ): Promise { + ): Promise { const { dataSource, mdl } = options; const connectionInfo = this.updateConnectionInfo(options.connectionInfo); const ibisConnectionInfo = toIbisConnectionInfo(dataSource, connectionInfo); @@ -211,17 +226,24 @@ export class IbisAdaptor implements IIbisAdaptor { }; logger.debug(`Dry run sql from ibis with body:`); try { - await axios.post( + const response = await axios.post( `${this.ibisServerBaseUrl}/${dataSourceUrlMap[dataSource]}/query?dryRun=true`, body, ); logger.debug(`Ibis server Dry run success`); - return true; + return { + correlationId: response.headers['X-Correlation-ID'], + processTime: response.headers['X-Process-Time'], + }; } catch (err) { logger.info(`Got error when dry running ibis`); throw Errors.create(Errors.GeneralErrorCodes.DRY_RUN_ERROR, { customMessage: err.response.data, originalError: err, + other: { + correlationId: err.response.headers['X-Correlation-ID'], + processTime: err.response.headers['X-Process-Time'], + }, }); } } diff --git a/wren-ui/src/apollo/server/adaptors/tests/ibisAdaptor.test.ts b/wren-ui/src/apollo/server/adaptors/tests/ibisAdaptor.test.ts index e6ad398b6..ecaa03307 100644 --- a/wren-ui/src/apollo/server/adaptors/tests/ibisAdaptor.test.ts +++ b/wren-ui/src/apollo/server/adaptors/tests/ibisAdaptor.test.ts @@ -1,5 +1,5 @@ import axios from 'axios'; -import { IbisAdaptor, ValidationRules } from '../ibisAdaptor'; +import { DryRunResponse, IbisAdaptor, IbisQueryOptions, IbisQueryResponse, ValidationRules } from '../ibisAdaptor'; import { DataSourceName } from '../../types'; import { Manifest } from '../../mdl/type'; import { @@ -388,4 +388,128 @@ describe('IbisAdaptor', () => { }, ); }); + + it('should get data, correlationId and processTime', async () => { + mockedAxios.post.mockResolvedValue({ + data: { + columns: [], + data: [], + dtypes: {}, + }, + headers: { + "X-Correlation-ID": '123', + "X-Process-Time": '1s', + }, + }); + mockedEncryptor.prototype.decrypt.mockReturnValue( + JSON.stringify({ password: mockPostgresConnectionInfo.password }), + ); + + const res: IbisQueryResponse = await ibisAdaptor.query( + "SELECT * FROM test_table", + { + dataSource: DataSourceName.POSTGRES, + connectionInfo: mockPostgresConnectionInfo, + mdl: mockManifest, + limit: 10, + } as IbisQueryOptions, + ); + + expect(res.data).toEqual([]); + expect(res.correlationId).toEqual('123'); + expect(res.processTime).toEqual('1s'); + }); + + it('should throw an exception with correlationId and processTime when query fails', async () => { + mockedAxios.post.mockRejectedValue({ + response: { + data: "Error message", + headers: { + "X-Correlation-ID": '123', + "X-Process-Time": '1s', + }, + }, + }); + mockedEncryptor.prototype.decrypt.mockReturnValue( + JSON.stringify({ password: mockPostgresConnectionInfo.password }), + ); + + await expect( + ibisAdaptor.query( + "SELECT * FROM test_table", + { + dataSource: DataSourceName.POSTGRES, + connectionInfo: mockPostgresConnectionInfo, + mdl: mockManifest, + limit: 10, + } + ) + ).rejects.toMatchObject({ + message: "Error message", + extensions: { + other: { + correlationId: '123', + processTime: '1s', + }, + }, + }); + }); + + it('should get data, correlationId and processTime when dry run succeeds', async () => { + mockedAxios.post.mockResolvedValue({ + headers: { + "X-Correlation-ID": '123', + "X-Process-Time": '1s', + }, + }); + mockedEncryptor.prototype.decrypt.mockReturnValue( + JSON.stringify({ password: mockPostgresConnectionInfo.password }), + ); + + const res: DryRunResponse = await ibisAdaptor.dryRun( + "SELECT * FROM test_table", + { + dataSource: DataSourceName.POSTGRES, + connectionInfo: mockPostgresConnectionInfo, + mdl: mockManifest, + } as IbisQueryOptions, + ); + + expect(res.correlationId).toEqual('123'); + expect(res.processTime).toEqual('1s'); + }); + + it('should throw an exception with correlationId and processTime when dry run fails', async () => { + mockedAxios.post.mockRejectedValue({ + response: { + data: "Error message", + headers: { + "X-Correlation-ID": '123', + "X-Process-Time": '1s', + }, + }, + }); + mockedEncryptor.prototype.decrypt.mockReturnValue( + JSON.stringify({ password: mockPostgresConnectionInfo.password }), + ); + + await expect( + ibisAdaptor.dryRun( + "SELECT * FROM test_table", + { + dataSource: DataSourceName.POSTGRES, + connectionInfo: mockPostgresConnectionInfo, + mdl: mockManifest, + } + ) + ).rejects.toMatchObject({ + message: "Error message", + extensions: { + other: { + correlationId: '123', + processTime: '1s', + }, + }, + }); + }); }); diff --git a/wren-ui/src/apollo/server/services/queryService.ts b/wren-ui/src/apollo/server/services/queryService.ts index 7fbd744a0..1b2b06451 100644 --- a/wren-ui/src/apollo/server/services/queryService.ts +++ b/wren-ui/src/apollo/server/services/queryService.ts @@ -6,9 +6,14 @@ import { IIbisAdaptor, IbisQueryResponse, ValidationRules, + IbisResponse, } from '../adaptors/ibisAdaptor'; import { getLogger } from '@server/utils'; import { Project } from '../repositories'; +import { + PostHogTelemetry, + TelemetryEvent, +} from '../telemetry/telemetry'; const logger = getLogger('QueryService'); logger.level = 'debug'; @@ -21,6 +26,8 @@ export interface ColumnMetadata { } export interface PreviewDataResponse { + correlationId?: string; + processTime?: string; columns: ColumnMetadata[]; data: any[][]; } @@ -71,16 +78,20 @@ export interface IQueryService { export class QueryService implements IQueryService { private readonly ibisAdaptor: IIbisAdaptor; private readonly wrenEngineAdaptor: IWrenEngineAdaptor; + private readonly telemetry: PostHogTelemetry; constructor({ ibisAdaptor, wrenEngineAdaptor, + telemetry, }: { ibisAdaptor: IIbisAdaptor; wrenEngineAdaptor: IWrenEngineAdaptor; + telemetry: PostHogTelemetry; }) { this.ibisAdaptor = ibisAdaptor; this.wrenEngineAdaptor = wrenEngineAdaptor; + this.telemetry = telemetry; } public async preview( @@ -106,23 +117,14 @@ export class QueryService implements IQueryService { this.checkDataSourceIsSupported(dataSource); logger.debug('Use ibis adaptor to preview'); if (dryRun) { - await this.ibisAdaptor.dryRun(sql, { - dataSource, - connectionInfo, - mdl, - }); + await this.ibisDryRun(sql, dataSource, connectionInfo, mdl); + return true; } else { - const data = await this.ibisAdaptor.query(sql, { - dataSource, - connectionInfo, - mdl, - limit, - }); - return this.transformDataType(data); + return await this.ibisQuery(sql, dataSource, connectionInfo, mdl, limit); } } } - + public async describeStatement( sql: string, options: PreviewOptions, @@ -163,6 +165,57 @@ export class QueryService implements IQueryService { } } + private checkDataSourceIsSupported(dataSource: DataSourceName) { + if ( + !Object.prototype.hasOwnProperty.call(SupportedDataSource, dataSource) + ) { + throw new Error(`Unsupported datasource for ibis: "${dataSource}"`); + } + } + + private async ibisDryRun( + sql: string, + dataSource: DataSourceName, + connectionInfo: any, + mdl: Manifest + ) { + const event = TelemetryEvent.IBIS_DRY_RUN; + try { + const res = await this.ibisAdaptor.dryRun(sql, { + dataSource, + connectionInfo, + mdl, + }); + this.sendIbisEvent(event, res, { dataSource, sql }); + } catch (err: any) { + this.sendIbisFailedEvent(event, err, { dataSource, sql }); + throw err; + } + } + + private async ibisQuery( + sql: string, + dataSource: DataSourceName, + connectionInfo: any, + mdl: Manifest, + limit: number + ): Promise { + const event = TelemetryEvent.IBIS_QUERY; + try { + const res = await this.ibisAdaptor.query(sql, { + dataSource, + connectionInfo, + mdl, + limit, + }); + this.sendIbisEvent(event, res, { dataSource, sql }); + return this.transformDataType(res); + } catch (err: any) { + this.sendIbisFailedEvent(event, err, { dataSource, sql }); + throw err; + } + } + private transformDataType(data: IbisQueryResponse): PreviewDataResponse { const columns = data.columns; const dtypes = data.dtypes; @@ -188,11 +241,25 @@ export class QueryService implements IQueryService { } as PreviewDataResponse; } - private checkDataSourceIsSupported(dataSource: DataSourceName) { - if ( - !Object.prototype.hasOwnProperty.call(SupportedDataSource, dataSource) - ) { - throw new Error(`Unsupported datasource for ibis: "${dataSource}"`); - } + private sendIbisEvent(event: TelemetryEvent, res: IbisResponse, others: Record) { + this.telemetry.sendEvent(event, { + correlationId: res.correlationId, + processTime: res.processTime, + ...others, + }); + } + + private sendIbisFailedEvent(event: TelemetryEvent, err: any, others: Record) { + this.telemetry.sendEvent( + event, + { + correlationId: err.extensions.other.correlationId, + processTime: err.extensions.other.processTime, + error: err.message, + ...others + }, + err.extensions?.service, + false, + ); } } diff --git a/wren-ui/src/apollo/server/services/tests/queryService.test.ts b/wren-ui/src/apollo/server/services/tests/queryService.test.ts new file mode 100644 index 000000000..c2671b1ad --- /dev/null +++ b/wren-ui/src/apollo/server/services/tests/queryService.test.ts @@ -0,0 +1,176 @@ +import { TelemetryEvent } from "../../telemetry/telemetry"; +import { DataSourceName } from "../../types"; +import { QueryService } from "../queryService"; + +describe('QueryService', () => { + let mockIbisAdaptor; + let mockWrenEngineAdaptor; + let mockTelemetry; + let queryService; + + beforeEach(() => { + mockIbisAdaptor = { + query: jest.fn(), + dryRun: jest.fn(), + }; + mockWrenEngineAdaptor = {}; + mockTelemetry = new MockTelemetry(); + + queryService = new QueryService({ + ibisAdaptor: mockIbisAdaptor, + wrenEngineAdaptor: mockWrenEngineAdaptor, + telemetry: mockTelemetry, + }); + }); + + afterEach(() => { + mockTelemetry.records = []; + jest.clearAllMocks(); + }); + + it('should return true and send event when previewing via ibis dry run succeeds', async () => { + mockIbisAdaptor.dryRun.mockResolvedValue({ + correlationId: '123', + processTime: '1s', + }); + + const res = await queryService.preview('SELECT * FROM test', { + project: { type: DataSourceName.POSTGRES, connectionInfo: {} }, + manifest: {}, + dryRun: true, + }); + + expect(res).toBe(true); + expect(mockTelemetry.records).toHaveLength(1); + expect(mockTelemetry.records[0]).toEqual({ + event: TelemetryEvent.IBIS_DRY_RUN, + properties: { + correlationId: '123', + processTime: '1s', + sql: 'SELECT * FROM test', + dataSource: DataSourceName.POSTGRES, + }, + actionSuccess: true, + }); + }); + + it('should send event when previewing via ibis dry run fails', async () => { + mockIbisAdaptor.dryRun.mockRejectedValue({ + message: 'Error message', + extensions: { + other: { + correlationId: '123', + processTime: '1s', + } + } + }); + + try { + await queryService.preview('SELECT * FROM test', { + project: { type: DataSourceName.POSTGRES, connectionInfo: {} }, + manifest: {}, + dryRun: true, + }); + } catch (e) { + expect(e.message).toEqual('Error message'); + expect(e.extensions.other.correlationId).toEqual('123'); + expect(e.extensions.other.processTime).toEqual('1s'); + } + + expect(mockTelemetry.records).toHaveLength(1); + expect(mockTelemetry.records[0]).toEqual({ + event: TelemetryEvent.IBIS_DRY_RUN, + properties: { + correlationId: '123', + processTime: '1s', + sql: 'SELECT * FROM test', + dataSource: DataSourceName.POSTGRES, + error: 'Error message', + }, + actionSuccess: false, + service: undefined, + }); + }); + + it('should return data and send event when previewing via ibis query succeeds', async () => { + mockIbisAdaptor.query.mockResolvedValue({ + data: [], + columns: [], + dtypes: [], + correlationId: '123', + processTime: '1s', + }); + + const res = await queryService.preview('SELECT * FROM test', { + project: { type: DataSourceName.POSTGRES, connectionInfo: {} }, + manifest: {}, + limit: 10, + }); + + expect(res.data).toEqual([]); + expect(mockTelemetry.records).toHaveLength(1); + expect(mockTelemetry.records[0]).toEqual({ + event: TelemetryEvent.IBIS_QUERY, + properties: { + correlationId: '123', + processTime: '1s', + sql: 'SELECT * FROM test', + dataSource: DataSourceName.POSTGRES, + }, + actionSuccess: true, + }); + }); + + it('should send event when previewing via ibis query fails', async () => { + mockIbisAdaptor.query.mockRejectedValue({ + message: 'Error message', + extensions: { + other: { + correlationId: '123', + processTime: '1s', + } + } + }); + + await expect( + queryService.preview('SELECT * FROM test', { + project: { type: DataSourceName.POSTGRES, connectionInfo: {} }, + manifest: {}, + }) + ).rejects.toMatchObject({ + message: 'Error message', + extensions: { + other: { + correlationId: '123', + processTime: '1s', + } + } + }); + + expect(mockTelemetry.records).toHaveLength(1); + expect(mockTelemetry.records[0]).toEqual({ + event: TelemetryEvent.IBIS_QUERY, + properties: { + correlationId: '123', + processTime: '1s', + sql: 'SELECT * FROM test', + dataSource: DataSourceName.POSTGRES, + error: 'Error message', + }, + actionSuccess: false, + service: undefined, + }); + }); +}); + +class MockTelemetry { + records: any[] = []; + sendEvent( + event: TelemetryEvent, + properties: Record = {}, + service: any, + actionSuccess: boolean = true, + ) { + this.records.push({ event, properties, service, actionSuccess }); + } +} \ No newline at end of file diff --git a/wren-ui/src/apollo/server/telemetry/telemetry.ts b/wren-ui/src/apollo/server/telemetry/telemetry.ts index eece995dc..f8a18cc57 100644 --- a/wren-ui/src/apollo/server/telemetry/telemetry.ts +++ b/wren-ui/src/apollo/server/telemetry/telemetry.ts @@ -55,6 +55,10 @@ export enum TelemetryEvent { // settings event SETTING_RESET_PROJECT = 'setting_reset_project', + // ibis event + IBIS_DRY_RUN = 'ibis_dry_run', + IBIS_QUERY = 'ibis_query', + // Default error GRAPHQL_ERROR = 'graphql_error', } diff --git a/wren-ui/src/apollo/server/utils/error.ts b/wren-ui/src/apollo/server/utils/error.ts index 3e4e779ab..ebb75330d 100644 --- a/wren-ui/src/apollo/server/utils/error.ts +++ b/wren-ui/src/apollo/server/utils/error.ts @@ -101,6 +101,7 @@ export const create = ( customMessage?: string; originalError?: Error; service?: WrenService; + other?: any; }, ): GraphQLError => { const { customMessage, originalError, service } = options || {}; @@ -124,6 +125,7 @@ export const create = ( shortMessage: shortMessages[code] || shortMessages[GeneralErrorCodes.INTERNAL_SERVER_ERROR], + other: options?.other, }, }); diff --git a/wren-ui/src/pages/api/graphql.ts b/wren-ui/src/pages/api/graphql.ts index 29016747f..a9a482f78 100644 --- a/wren-ui/src/pages/api/graphql.ts +++ b/wren-ui/src/pages/api/graphql.ts @@ -104,6 +104,7 @@ const bootstrapServer = async () => { const queryService = new QueryService({ ibisAdaptor, wrenEngineAdaptor, + telemetry, }); const modelService = new ModelService({ projectService,