Skip to content

Commit

Permalink
[WIP] Async Queries Followup (#6966)
Browse files Browse the repository at this point in the history
Stops index pattern from being created when selecting external datasource
    Checks to see if datasource.name exists before creating dataframe
    Handles PPL error in request

Issues Resolved
#6957

Signed-off-by: Sean Li <lnse@amazon.com>
  • Loading branch information
sejli authored Jun 7, 2024
1 parent 328e08e commit c86e90c
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ export class SQLAsyncQlSearchInterceptor extends SearchInterceptor {
const { id, ...searchRequest } = request;
const path = trimEnd('/api/sqlasyncql/jobs');

const fetchDataFrame = (queryString: string | undefined, df = null) => {
const fetchDataFrame = (queryString: string | undefined, dataSource: string, df = null) => {
const body = stringify({
query: { qs: queryString, format: 'jdbc' },
dataSource,
df,
});
return from(
Expand All @@ -53,14 +54,15 @@ export class SQLAsyncQlSearchInterceptor extends SearchInterceptor {
};

const rawDataFrame = getRawDataFrame(searchRequest);
const dataFrame = fetchDataFrame(getRawQueryString(searchRequest), rawDataFrame);
const dataFrame = fetchDataFrame(
getRawQueryString(searchRequest),
searchRequest.params.index,
rawDataFrame
);

// subscribe to dataFrame to see if an error is returned, display a toast message if so
dataFrame.subscribe((df) => {
// TODO: MQL Async: clean later
// if (!df.body.error) {
// console.log('SEARCH INTERCEPTOR:', df);
// }
if (!df.body.error) return;
const jsError = new Error(df.body.error.response);
this.deps.toasts.addError(jsError, {
Expand Down
1 change: 1 addition & 0 deletions plugins-extra/query_enhancements/server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ export function defineRoutes(
format: schema.string(),
}),
df: schema.any(),
dataSource: schema.string(),
}),
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export class SQLAsyncFacet {
const params = {
body: {
query: request.body.query.qs,
datasource: df?.name,
datasource: request.body.dataSource,
lang: 'sql',
sessionId: df?.meta?.sessionId,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ export const pplSearchStrategyProvider = (
? `source=${source} | head`
: requestParams.search;
const rawResponse: any = await pplFacet.describeQuery(request);
if (!rawResponse.success) {
return {
type: 'data_frame',
body: { error: rawResponse.data },
took: rawResponse.took,
};
}

const dataFrame = createDataFrame({
name: source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,13 @@ export async function getExternalSearchParamsFromRequest(
// However, TODO: need to fix the df creation to not get mappings for fields for index when creating the temp index pattern if not 'default'
const dataFrame =
getDataFrame() ??
getDataFrameBySource(dataSource.name) ??
(await setDataFrame(createDataFrame({ name: dataSource.name, fields: [] })));

(dataSource?.name ? getDataFrameBySource(dataSource.name) : null) ??
(await setDataFrame(createDataFrame({ name: dataSource?.name ?? indexTitle, fields: [] })));
return {
index: indexTitle,
index: dataSource ? dataSource.name : indexTitle,
body: {
...searchRequest.body,
...(dataFrame ? { df: dataFrame } : {}),
df: dataFrame,
},
...searchParams,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ export class SearchSource {
return onResponse(searchRequest, convertResult(response as IDataFrameResponse));
} else if ((response as IDataFrameResponse).type === DATA_FRAME_TYPES.POLLING) {
const dataFrameResponse = response as IDataFrameResponse;
await this.setDataFrame(dataFrameResponse.body as IDataFrame);
if (!response.body?.error) {
await this.setDataFrame(dataFrameResponse.body as IDataFrame);
}
return onResponse(searchRequest, convertResult(response as IDataFrameResponse));
}
// TODO: MQL else if data_frame_polling then poll for the data frame updating the df fields only
Expand Down
32 changes: 18 additions & 14 deletions src/plugins/data/public/search/search_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,15 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
if (dataFrame?.name && dataFrame?.meta?.sessionId) {
this.sessionCache.set(dataFrame.name, dataFrame.meta.sessionId);
}
const existingIndexPattern = indexPatterns.getByTitle(dataFrame.name!, true);
const dataSet = await indexPatterns.create(
dataFrameToSpec(dataFrame, existingIndexPattern?.id),
!existingIndexPattern?.id
);
// save to cache by title because the id is not unique for temporary index pattern created
indexPatterns.saveToCache(dataSet.title, dataSet);
if (dataFrame?.schema) {
const existingIndexPattern = indexPatterns.getByTitle(dataFrame.name!, true);
const dataSet = await indexPatterns.create(
dataFrameToSpec(dataFrame, existingIndexPattern?.id),
!existingIndexPattern?.id
);
// save to cache by title because the id is not unique for temporary index pattern created
indexPatterns.saveToCache(dataSet.title, dataSet);
}
},
clear: () => {
if (this.dfCache.get() === undefined) return;
Expand Down Expand Up @@ -208,13 +210,15 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
if (dataFrame?.name && dataFrame?.meta?.sessionId) {
this.sessionCache.set(dataFrame.name, dataFrame.meta.sessionId);
}
const existingIndexPattern = indexPatterns.getByTitle(dataFrame.name!, true);
const dataSet = await indexPatterns.create(
dataFrameToSpec(dataFrame, existingIndexPattern?.id),
!existingIndexPattern?.id
);
// save to cache by title because the id is not unique for temporary index pattern created
indexPatterns.saveToCache(dataSet.title, dataSet);
if (dataFrame?.schema) {
const existingIndexPattern = indexPatterns.getByTitle(dataFrame.name!, true);
const dataSet = await indexPatterns.create(
dataFrameToSpec(dataFrame, existingIndexPattern?.id),
!existingIndexPattern?.id
);
// save to cache by title because the id is not unique for temporary index pattern created
indexPatterns.saveToCache(dataSet.title, dataSet);
}
},
clear: (name: string) => {
if (this.dfsCache.get(name) === undefined) return;
Expand Down
48 changes: 32 additions & 16 deletions src/plugins/data/server/search/search_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,26 +243,41 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
};

const dfsService: DataFramesService = {
get: () => this.dfCache.get(),
get: (name: string) => {
const df = this.dfsCache.get(name);
if (df) {
const sessionId = this.sessionCache.get(df?.name);
if (sessionId) {
df.meta = { ...df.meta, sessionId };
}
}
return df;
},
set: async (dataFrame: IDataFrame) => {
if (this.dfCache.get() && this.dfCache.get()?.name !== dataFrame.name) {
scopedIndexPatterns.clearCache(this.dfCache.get()!.name, false);
// this.dfsCache.get(dataFrame.name!, dataFrame);
this.dfsCache.set(dataFrame.name!, dataFrame);
// if (this.dfCache.get() && this.dfCache.get()?.name !== dataFrame.name) {
// scopedIndexPatterns.clearCache(this.dfCache.get()!.name, false);
// }
// this.dfCache.set(dataFrame);
if (dataFrame?.name && dataFrame?.meta?.sessionId) {
this.sessionCache.set(dataFrame.name, dataFrame.meta.sessionId);
}
if (dataFrame?.schema) {
const existingIndexPattern = scopedIndexPatterns.getByTitle(dataFrame.name!, true);
const dataSet = await scopedIndexPatterns.create(
dataFrameToSpec(dataFrame, existingIndexPattern?.id),
!existingIndexPattern?.id
);
// save to cache by title because the id is not unique for temporary index pattern created
scopedIndexPatterns.saveToCache(dataSet.title, dataSet);
}
this.dfCache.set(dataFrame);
const existingIndexPattern = scopedIndexPatterns.getByTitle(dataFrame.name!, true);
const dataSet = await scopedIndexPatterns.create(
dataFrameToSpec(dataFrame, existingIndexPattern?.id),
!existingIndexPattern?.id
);

// save to cache by title because the id is not unique for temporary index pattern created
scopedIndexPatterns.saveToCache(dataSet.title, dataSet);
},
clear: () => {
if (this.dfCache.get() === undefined) return;
clear: (name: string) => {
if (this.dfsCache.get(name) === undefined) return;
// name because the id is not unique for temporary index pattern created
scopedIndexPatterns.clearCache(this.dfCache.get()!.name, false);
this.dfCache.clear();
scopedIndexPatterns.clearCache(this.dfsCache.get(name)!.name, false);
this.dfsCache.clear(name);
},
};

Expand Down Expand Up @@ -308,6 +323,7 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
loadingCount$: new BehaviorSubject(0),
},
df: dfService,
dfs: dfsService,
session: sessionService,
};

Expand Down

0 comments on commit c86e90c

Please sign in to comment.