Skip to content

Commit

Permalink
[ML] Support to set destination ingest pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
walterra committed Jan 27, 2022
1 parent 61ef26d commit 1896cbb
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 1 deletion.
5 changes: 5 additions & 0 deletions x-pack/plugins/transform/common/api_schemas/type_guards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';

import type { EsIndex } from '../types/es_index';
import type { EsIngestPipeline } from '../types/es_ingest_pipeline';
import { isPopulatedObject } from '../shared_imports';

// To be able to use the type guards on the client side, we need to make sure we don't import
Expand Down Expand Up @@ -60,6 +61,10 @@ export const isEsIndices = (arg: unknown): arg is EsIndex[] => {
return Array.isArray(arg);
};

export const isEsIngestPipelines = (arg: unknown): arg is EsIngestPipeline[] => {
return Array.isArray(arg) && arg.every((d) => isPopulatedObject(d, ['name']));
};

export const isEsSearchResponse = (arg: unknown): arg is estypes.SearchResponse => {
return isPopulatedObject(arg, ['hits']);
};
Expand Down
13 changes: 13 additions & 0 deletions x-pack/plugins/transform/common/types/es_ingest_pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

// This interface doesn't cover a full ingest pipeline spec,
// just what's necessary to make it work in the transform creation wizard.
// The full interface can be found in x-pack/plugins/ingest_pipelines/common/types.ts
export interface EsIngestPipeline {
name: string;
}
4 changes: 4 additions & 0 deletions x-pack/plugins/transform/public/app/common/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ export const getCreateTransformRequestBody = (
: {}),
dest: {
index: transformDetailsState.destinationIndex,
// conditionally add optional ingest pipeline
...(transformDetailsState.destinationIngestPipeline !== ''
? { pipeline: transformDetailsState.destinationIngestPipeline }
: {}),
},
// conditionally add continuous mode config
...(transformDetailsState.isContinuousModeEnabled
Expand Down
8 changes: 8 additions & 0 deletions x-pack/plugins/transform/public/app/hooks/use_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import type { GetTransformsStatsResponseSchema } from '../../../common/api_schem
import { TransformId } from '../../../common/types/transform';
import { API_BASE_PATH } from '../../../common/constants';
import { EsIndex } from '../../../common/types/es_index';
import { EsIngestPipeline } from '../../../common/types/es_ingest_pipeline';

import { useAppDependencies } from '../app_dependencies';

Expand Down Expand Up @@ -202,6 +203,13 @@ export const useApi = () => {
return e;
}
},
async getEsIngestPipelines(): Promise<EsIngestPipeline[] | HttpFetchError> {
try {
return await http.get('/api/ingest_pipelines');
} catch (e) {
return e;
}
},
async getHistogramsForFields(
indexPatternTitle: string,
fields: FieldHistogramRequestConfig[],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import type { TransformConfigUnion, TransformId } from '../../../../../../common/types/transform';

export type EsIndexName = string;
export type EsIngestPipelineName = string;
export type IndexPatternTitle = string;

export interface StepDetailsExposedState {
continuousModeDateField: string;
continuousModeDelay: string;
createIndexPattern: boolean;
destinationIndex: EsIndexName;
destinationIngestPipeline: EsIngestPipelineName;
isContinuousModeEnabled: boolean;
isRetentionPolicyEnabled: boolean;
retentionPolicyDateField: string;
Expand Down Expand Up @@ -48,6 +50,7 @@ export function getDefaultStepDetailsState(): StepDetailsExposedState {
transformFrequency: defaultTransformFrequency,
transformSettingsMaxPageSearchSize: defaultTransformSettingsMaxPageSearchSize,
destinationIndex: '',
destinationIngestPipeline: '',
touched: false,
valid: false,
indexPatternTimeField: undefined,
Expand All @@ -73,6 +76,11 @@ export function applyTransformConfigToDetailsState(
state.transformDescription = transformConfig.description;
}

// Ingest Pipeline
if (transformConfig.dest.pipeline !== undefined) {
state.destinationIngestPipeline = transformConfig.dest.pipeline;
}

// Frequency
if (transformConfig.frequency !== undefined) {
state.transformFrequency = transformConfig.frequency;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { toMountPoint } from '../../../../../../../../../src/plugins/kibana_reac

import {
isEsIndices,
isEsIngestPipelines,
isPostTransformsPreviewResponseSchema,
} from '../../../../../../common/api_schemas/type_guards';
import { TransformId } from '../../../../../../common/types/transform';
Expand Down Expand Up @@ -82,8 +83,12 @@ export const StepDetailsForm: FC<StepDetailsFormProps> = React.memo(
const [destinationIndex, setDestinationIndex] = useState<EsIndexName>(
defaults.destinationIndex
);
const [destinationIngestPipeline, setDestinationIngestPipeline] = useState<string>(
defaults.destinationIngestPipeline
);
const [transformIds, setTransformIds] = useState<TransformId[]>([]);
const [indexNames, setIndexNames] = useState<EsIndexName[]>([]);
const [ingestPipelineNames, setIngestPipelineNames] = useState<string[]>([]);

const canCreateDataView = useMemo(
() =>
Expand Down Expand Up @@ -180,7 +185,10 @@ export const StepDetailsForm: FC<StepDetailsFormProps> = React.memo(
setTransformIds(resp.transforms.map((transform) => transform.id));
}

const indices = await api.getEsIndices();
const [indices, ingestPipelines] = await Promise.all([
api.getEsIndices(),
api.getEsIngestPipelines(),
]);

if (isEsIndices(indices)) {
setIndexNames(indices.map((index) => index.name));
Expand All @@ -200,6 +208,24 @@ export const StepDetailsForm: FC<StepDetailsFormProps> = React.memo(
});
}

if (isEsIngestPipelines(ingestPipelines)) {
setIngestPipelineNames(ingestPipelines.map(({ name }) => name));
} else {
toastNotifications.addDanger({
title: i18n.translate('xpack.transform.stepDetailsForm.errorGettingIngestPipelines', {
defaultMessage: 'An error occurred getting the existing ingest pipeline names:',
}),
text: toMountPoint(
<ToastNotificationText
overlays={overlays}
theme={theme}
text={getErrorMessage(ingestPipelines)}
/>,
{ theme$: theme.theme$ }
),
});
}

try {
setIndexPatternTitles(await deps.data.indexPatterns.getTitles());
} catch (e) {
Expand Down Expand Up @@ -311,6 +337,7 @@ export const StepDetailsForm: FC<StepDetailsFormProps> = React.memo(
transformSettingsMaxPageSearchSize,
transformSettingsDocsPerSecond,
destinationIndex,
destinationIngestPipeline,
touched: true,
valid,
indexPatternTimeField,
Expand Down Expand Up @@ -443,6 +470,25 @@ export const StepDetailsForm: FC<StepDetailsFormProps> = React.memo(
/>
</EuiFormRow>

{ingestPipelineNames.length > 0 && (
<EuiFormRow
label={i18n.translate(
'xpack.transform.stepDetailsForm.destinationIngestPipelineLabel',
{
defaultMessage: 'Destination ingest pipeline',
}
)}
>
<EuiSelect
options={ingestPipelineNames.map((text: string) => ({ text, value: text }))}
value={destinationIngestPipeline}
onChange={(e) => setDestinationIngestPipeline(e.target.value)}
hasNoInitialSelection={defaults.destinationIngestPipeline === ''}
data-test-subj="transformDestinationPipelineSelect"
/>
</EuiFormRow>
)}

{stepDefineState.transformFunction === TRANSFORM_FUNCTION.LATEST ? (
<>
<EuiSpacer size={'m'} />
Expand Down

0 comments on commit 1896cbb

Please sign in to comment.