Skip to content

Commit 5d65c88

Browse files
add support for on_failure parameter
1 parent edaf515 commit 5d65c88

File tree

6 files changed

+52
-9
lines changed

6 files changed

+52
-9
lines changed

x-pack/plugins/ingest_pipelines/common/lib/pipeline_serialization.test.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ describe('pipeline_serialization', () => {
2020
},
2121
},
2222
],
23+
on_failure: [
24+
{
25+
set: {
26+
field: 'error.message',
27+
value: '{{ failure_message }}',
28+
},
29+
},
30+
],
2331
},
2432
pipeline2: {
2533
description: 'pipeline2 description',
@@ -39,6 +47,14 @@ describe('pipeline_serialization', () => {
3947
},
4048
},
4149
],
50+
onFailure: [
51+
{
52+
set: {
53+
field: 'error.message',
54+
value: '{{ failure_message }}',
55+
},
56+
},
57+
],
4258
},
4359
{
4460
name: 'pipeline2',

x-pack/plugins/ingest_pipelines/common/lib/pipeline_serialization.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,25 @@ import { PipelinesByName, Pipeline } from '../types';
99
export function deserializePipelines(pipelinesByName: PipelinesByName): Pipeline[] {
1010
const pipelineNames: string[] = Object.keys(pipelinesByName);
1111

12-
const deserializedTemplates = pipelineNames.map((name: string) => {
13-
const { description, version, processors } = pipelinesByName[name];
12+
const deserializedPipelines = pipelineNames.map((name: string) => {
13+
const { description, version, processors, on_failure } = pipelinesByName[name];
1414

15-
return {
15+
const pipeline = {
1616
name,
1717
description,
1818
version,
1919
processors,
20+
onFailure: on_failure,
2021
};
22+
23+
// Remove any undefined values
24+
return Object.entries(pipeline).reduce((pipelineDefinition: any, [key, value]) => {
25+
if (value !== undefined) {
26+
pipelineDefinition[key] = value;
27+
}
28+
return pipelineDefinition;
29+
}, {});
2130
});
2231

23-
return deserializedTemplates;
32+
return deserializedPipelines;
2433
}

x-pack/plugins/ingest_pipelines/common/types.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@ export interface Pipeline {
1515
description: string;
1616
version?: number;
1717
processors: Processor[];
18+
onFailure?: Processor[];
1819
}
1920

2021
export interface PipelinesByName {
21-
[key: string]: Omit<Pipeline, 'name'>;
22+
[key: string]: {
23+
description: string;
24+
version?: number;
25+
processors: Processor[];
26+
on_failure?: Processor[];
27+
};
2228
}

x-pack/plugins/ingest_pipelines/server/routes/api/create.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ import { RouteDependencies } from '../../types';
1313
const bodySchema = schema.object({
1414
name: schema.string(),
1515
description: schema.string(),
16-
processors: schema.arrayOf(schema.any()), // todo fix
16+
processors: schema.arrayOf(schema.recordOf(schema.string(), schema.any())),
1717
version: schema.maybe(schema.number()),
18+
onFailure: schema.maybe(schema.arrayOf(schema.recordOf(schema.string(), schema.any()))),
1819
});
1920

2021
export const registerCreateRoute = ({
@@ -33,7 +34,7 @@ export const registerCreateRoute = ({
3334
const { callAsCurrentUser } = ctx.core.elasticsearch.dataClient;
3435
const pipeline = req.body as Pipeline;
3536

36-
const { name, description, processors, version } = pipeline;
37+
const { name, description, processors, version, onFailure } = pipeline;
3738

3839
try {
3940
// Check that a pipeline with the same name doesn't already exist
@@ -62,6 +63,7 @@ export const registerCreateRoute = ({
6263
description,
6364
processors,
6465
version,
66+
on_failure: onFailure,
6567
},
6668
});
6769

x-pack/plugins/ingest_pipelines/server/routes/api/update.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ import { RouteDependencies } from '../../types';
1111

1212
const bodySchema = schema.object({
1313
description: schema.string(),
14-
processors: schema.arrayOf(schema.any()), // todo fix
14+
processors: schema.arrayOf(schema.recordOf(schema.string(), schema.any())),
1515
version: schema.maybe(schema.number()),
16+
onFailure: schema.maybe(schema.arrayOf(schema.recordOf(schema.string(), schema.any()))),
1617
});
1718

1819
const paramsSchema = schema.object({
@@ -37,7 +38,7 @@ export const registerUpdateRoute = ({
3738
const { name } = req.params as typeof paramsSchema.type;
3839
const pipeline = req.body as Pipeline;
3940

40-
const { description, processors, version } = pipeline;
41+
const { description, processors, version, onFailure } = pipeline;
4142

4243
try {
4344
// Verify pipeline exists; ES will throw 404 if it doesn't
@@ -49,6 +50,7 @@ export const registerUpdateRoute = ({
4950
description,
5051
processors,
5152
version,
53+
on_failure: onFailure,
5254
},
5355
});
5456

x-pack/test/api_integration/apis/management/ingest_pipelines/ingest_pipelines.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,14 @@ export default function({ getService }: FtrProviderContext) {
3535
},
3636
},
3737
],
38+
onFailure: [
39+
{
40+
set: {
41+
field: 'error.message',
42+
value: '{{ failure_message }}',
43+
},
44+
},
45+
],
3846
version: 1,
3947
})
4048
.expect(200);

0 commit comments

Comments
 (0)