Skip to content

Commit 5a8fabd

Browse files
authored
[Monitoring] De-duplicate pipeline ids based on the ephemeral_id changing (#49978)
* De-duplicate pipeline ids based on the ephemeral_id changing * Add tests
1 parent 063d13b commit 5a8fabd

File tree

6 files changed

+1662
-2
lines changed

6 files changed

+1662
-2
lines changed

x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipeline_ids.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66
import moment from 'moment';
7-
import { get } from 'lodash';
7+
import { get, uniq } from 'lodash';
88
import { createQuery } from '../create_query';
99
import { LogstashMetric } from '../metrics';
1010

@@ -74,5 +74,6 @@ export async function getLogstashPipelineIds(req, logstashIndexPattern, { cluste
7474

7575
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
7676
const response = await callWithRequest(req, 'search', params);
77-
return get(response, 'aggregations.nested_context.composite_data.buckets', []).map(bucket => bucket.key);
77+
const data = get(response, 'aggregations.nested_context.composite_data.buckets', []).map(bucket => bucket.key);
78+
return uniq(data, item => item.id);
7879
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"pipelines":[{"id":"eight","metrics":{"throughput":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.pipelines.events.out","label":"Pipeline Throughput","description":"Number of events emitted per second by the Logstash pipeline at the outputs stage.","units":"e/s","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,0],[1572882260000,0],[1572882270000,0]]},"nodesCount":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.logstash.uuid","label":"Pipeline Node Count","description":"Number of nodes on which the Logstash pipeline is running.","units":"","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,1],[1572882260000,1],[1572882270000,1]]}},"latestThroughput":0,"latestNodesCount":1},{"id":"eighteen","metrics":{"throughput":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.pipelines.events.out","label":"Pipeline Throughput","description":"Number of events emitted per second by the Logstash pipeline at the outputs stage.","units":"e/s","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,0],[1572882260000,0],[1572882270000,0]]},"nodesCount":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.logstash.uuid","label":"Pipeline Node Count","description":"Number of nodes on which the Logstash pipeline is running.","units":"","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,1],[1572882260000,1],[1572882270000,1]]}},"latestThroughput":0,"latestNodesCount":1},{"id":"eleven","metrics":{"throughput":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.pipelines.events.out","label":"Pipeline Throughput","description":"Number of events emitted per second by the Logstash pipeline at the outputs stage.","units":"e/s","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,0],[1572882260000,0],[1572882270000,0]]},"nodesCount":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.logstash.uuid","label":"Pipeline Node Count","description":"Number of nodes on which the Logstash pipeline is running.","units":"","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,1],[1572882260000,1],[1572882270000,1]]}},"latestThroughput":0,"latestNodesCount":1},{"id":"fifteen","metrics":{"throughput":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.pipelines.events.out","label":"Pipeline Throughput","description":"Number of events emitted per second by the Logstash pipeline at the outputs stage.","units":"e/s","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,0],[1572882260000,0],[1572882270000,0]]},"nodesCount":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.logstash.uuid","label":"Pipeline Node Count","description":"Number of nodes on which the Logstash pipeline is running.","units":"","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,1],[1572882260000,1],[1572882270000,1]]}},"latestThroughput":0,"latestNodesCount":1},{"id":"five","metrics":{"throughput":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.pipelines.events.out","label":"Pipeline Throughput","description":"Number of events emitted per second by the Logstash pipeline at the outputs stage.","units":"e/s","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,0],[1572882260000,0],[1572882270000,0]]},"nodesCount":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.logstash.uuid","label":"Pipeline Node Count","description":"Number of nodes on which the Logstash pipeline is running.","units":"","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,1],[1572882260000,1],[1572882270000,1]]}},"latestThroughput":0,"latestNodesCount":1},{"id":"four","metrics":{"throughput":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.pipelines.events.out","label":"Pipeline Throughput","description":"Number of events emitted per second by the Logstash pipeline at the outputs stage.","units":"e/s","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,0],[1572882260000,0],[1572882270000,0]]},"nodesCount":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.logstash.uuid","label":"Pipeline Node Count","description":"Number of nodes on which the Logstash pipeline is running.","units":"","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,1],[1572882260000,1],[1572882270000,1]]}},"latestThroughput":0,"latestNodesCount":1},{"id":"fourteen","metrics":{"throughput":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.pipelines.events.out","label":"Pipeline Throughput","description":"Number of events emitted per second by the Logstash pipeline at the outputs stage.","units":"e/s","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,0],[1572882260000,0],[1572882270000,0]]},"nodesCount":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.logstash.uuid","label":"Pipeline Node Count","description":"Number of nodes on which the Logstash pipeline is running.","units":"","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,1],[1572882260000,1],[1572882270000,1]]}},"latestThroughput":0,"latestNodesCount":1},{"id":"nine","metrics":{"throughput":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.pipelines.events.out","label":"Pipeline Throughput","description":"Number of events emitted per second by the Logstash pipeline at the outputs stage.","units":"e/s","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,0],[1572882260000,0],[1572882270000,0]]},"nodesCount":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.logstash.uuid","label":"Pipeline Node Count","description":"Number of nodes on which the Logstash pipeline is running.","units":"","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,1],[1572882260000,1],[1572882270000,1]]}},"latestThroughput":0,"latestNodesCount":1},{"id":"nineteen","metrics":{"throughput":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.pipelines.events.out","label":"Pipeline Throughput","description":"Number of events emitted per second by the Logstash pipeline at the outputs stage.","units":"e/s","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,0],[1572882260000,0],[1572882270000,0]]},"nodesCount":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.logstash.uuid","label":"Pipeline Node Count","description":"Number of nodes on which the Logstash pipeline is running.","units":"","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,1],[1572882260000,1],[1572882270000,1]]}},"latestThroughput":0,"latestNodesCount":1},{"id":"one","metrics":{"throughput":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.pipelines.events.out","label":"Pipeline Throughput","description":"Number of events emitted per second by the Logstash pipeline at the outputs stage.","units":"e/s","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,0],[1572882260000,0],[1572882270000,0]]},"nodesCount":{"bucket_size":"10 seconds","timeRange":{"min":1572882044855,"max":1572882638667},"metric":{"app":"logstash","field":"logstash_stats.logstash.uuid","label":"Pipeline Node Count","description":"Number of nodes on which the Logstash pipeline is running.","units":"","format":"0,0.[00]","hasCalculation":true,"isDerivative":false},"data":[[1572882220000,1],[1572882260000,1],[1572882270000,1]]}},"latestThroughput":0,"latestNodesCount":1}],"clusterStatus":{"node_count":1,"events_in_total":312,"events_out_total":234,"avg_memory":1037959168,"avg_memory_used":205063840,"max_uptime":40598,"pipeline_count":26,"queue_types":{"memory":26,"persisted":0},"versions":["8.0.0"]},"totalPipelineCount":26}

x-pack/test/api_integration/apis/monitoring/logstash/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export default function ({ loadTestFile }) {
1010
loadTestFile(require.resolve('./nodes'));
1111
loadTestFile(require.resolve('./node_detail'));
1212
loadTestFile(require.resolve('./multicluster_pipelines'));
13+
loadTestFile(require.resolve('./pipelines'));
1314
});
1415
}
1516

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
import expect from '@kbn/expect';
8+
import pipelinesFixture from './fixtures/pipelines';
9+
10+
export default function ({ getService }) {
11+
const supertest = getService('supertest');
12+
const esArchiver = getService('esArchiver');
13+
14+
describe('pipelines', () => {
15+
const archive = 'monitoring/logstash/changing_pipelines';
16+
const timeRange = {
17+
min: '2019-11-04T15:40:44.855Z',
18+
max: '2019-11-04T15:50:38.667Z'
19+
};
20+
const pagination = {
21+
size: 10,
22+
index: 0
23+
};
24+
const sort = {
25+
field: 'id',
26+
direction: 'asc'
27+
};
28+
29+
before('load archive', () => {
30+
return esArchiver.load(archive);
31+
});
32+
33+
after('unload archive', () => {
34+
return esArchiver.unload(archive);
35+
});
36+
37+
it('should return paginated pipelines', async () => {
38+
const { body } = await supertest
39+
.post('/api/monitoring/v1/clusters/TUjQLdHNTh2SB9Wy0gOtWg/logstash/pipelines')
40+
.set('kbn-xsrf', 'xxx')
41+
.send({ timeRange, pagination, sort })
42+
.expect(200);
43+
44+
expect(body).to.eql(pipelinesFixture);
45+
});
46+
47+
it('should get one of each after enough pagination', async () => {
48+
async function getIds(page) {
49+
const { body } = await supertest
50+
.post('/api/monitoring/v1/clusters/TUjQLdHNTh2SB9Wy0gOtWg/logstash/pipelines')
51+
.set('kbn-xsrf', 'xxx')
52+
.send({ timeRange, pagination: { ...pagination, index: page }, sort })
53+
.expect(200);
54+
55+
return body.pipelines.map(pipeline => pipeline.id);
56+
}
57+
58+
const ids = [
59+
...await getIds(0),
60+
...await getIds(1),
61+
...await getIds(2),
62+
];
63+
expect(ids.length).to.be(26);
64+
});
65+
});
66+
}
Binary file not shown.

0 commit comments

Comments
 (0)