Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Stack monitoring] logstash - read from correct aggregation #122443

Merged
merged 16 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export function StatementSection({ iconType, headingText, elements, onShowVertex
}

return (
<div>
<div data-test-subj={`pipelineViewerSection_${headingText}`}>
<StatementListHeading iconType={iconType} title={headingText} />
<EuiSpacer size="s" />
<StatementList elements={elements} onShowVertexDetails={onShowVertexDetails} />
Expand Down
19 changes: 13 additions & 6 deletions x-pack/plugins/monitoring/server/lib/logstash/get_pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ export function _enrichStateWithStatsAggregation(
statsAggregation: any,
timeseriesIntervalInSeconds: number
) {
// we could have data in both legacy and metricbeat collection, we pick the bucket most filled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we should consider plotting both? I'd guess this will mean a break in the graph when moving from internal collection to metricbeat.

Since the pipeline stack monitoring is beta though, maybe that's okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be an option. We could also merge the aggregations and ensure we only consider a single datasource per node, since a pipeline can run on multiple nodes if we receive both legacy and metricbeat monitoring for a pipeline-node tuple there is duplication that we should ignore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really good point. I opened #123191 to capture the improvement for later.

const bucketCount = (aggregationKey: string) =>
get(
statsAggregation.aggregations,
`${aggregationKey}.scoped.total_processor_duration_stats.count`
);

const pipelineBucket =
bucketCount('pipelines_mb') > bucketCount('pipelines')
? statsAggregation.aggregations.pipelines_mb
: statsAggregation.aggregations.pipelines;
const logstashState = stateDocument.logstash_state || stateDocument.logstash?.node?.state;
const vertices = logstashState?.pipeline?.representation?.graph?.vertices ?? [];

Expand All @@ -85,14 +96,10 @@ export function _enrichStateWithStatsAggregation(
vertex.stats = {};
});

const totalDurationStats =
statsAggregation.aggregations.pipelines.scoped.total_processor_duration_stats;
const totalDurationStats = pipelineBucket.scoped.total_processor_duration_stats;
const totalProcessorsDurationInMillis = totalDurationStats.max - totalDurationStats.min;

const verticesWithStatsBuckets =
statsAggregation.aggregations?.pipelines.scoped.vertices?.vertex_id.buckets ??
statsAggregation.aggregations?.pipelines_mb.scoped.vertices?.vertex_id.buckets ??
[];
const verticesWithStatsBuckets = pipelineBucket.scoped.vertices?.vertex_id.buckets ?? [];
verticesWithStatsBuckets.forEach((vertexStatsBucket: any) => {
// Each vertexStats bucket contains a list of stats for a single vertex within a single timeseries interval
const vertexId = vertexStatsBucket.key;
Expand Down
2 changes: 2 additions & 0 deletions x-pack/test/functional/apps/monitoring/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export default function ({ loadTestFile }) {
loadTestFile(require.resolve('./logstash/nodes_mb'));
loadTestFile(require.resolve('./logstash/pipelines'));
loadTestFile(require.resolve('./logstash/pipelines_mb'));
loadTestFile(require.resolve('./logstash/pipeline_viewer'));
loadTestFile(require.resolve('./logstash/pipeline_viewer_mb'));
loadTestFile(require.resolve('./logstash/node_detail'));
loadTestFile(require.resolve('./logstash/node_detail_mb'));
loadTestFile(require.resolve('./beats/cluster'));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import expect from '@kbn/expect';
import { getLifecycleMethods } from '../_get_lifecycle_methods';

export default function ({ getService, getPageObjects }) {
const overview = getService('monitoringClusterOverview');
const pipelinesList = getService('monitoringLogstashPipelines');
const pipelineViewer = getService('monitoringLogstashPipelineViewer');

describe('Logstash pipeline viewer', () => {
const { setup, tearDown } = getLifecycleMethods(getService, getPageObjects);

before(async () => {
await setup('x-pack/test/functional/es_archives/monitoring/logstash_pipelines', {
from: 'Jan 22, 2018 @ 09:10:00.000',
to: 'Jan 22, 2018 @ 09:41:00.000',
});

await overview.closeAlertsModal();

// go to nginx_logs pipeline view
await overview.clickLsPipelines();
expect(await pipelinesList.isOnListing()).to.be(true);
await pipelinesList.clickPipeline('nginx_logs');
expect(await pipelineViewer.isOnPipelineViewer()).to.be(true);
});

after(async () => {
await tearDown();
});

it('displays pipelines inputs, filters and ouputs', async () => {
const { inputs, filters, outputs } = await pipelineViewer.getPipelineDefinition();

expect(inputs).to.eql([{ name: 'generator', metrics: ['mygen01', '62.5 e/s emitted'] }]);
expect(filters).to.eql([
{ name: 'sleep', metrics: ['1%', '94.86 ms/e', '62.5 e/s received'] },
]);
expect(outputs).to.eql([{ name: 'stdout', metrics: ['0%', '0 ms/e', '62.5 e/s received'] }]);
});
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import expect from '@kbn/expect';
import { getLifecycleMethods } from '../_get_lifecycle_methods';

export default function ({ getService, getPageObjects }) {
const overview = getService('monitoringClusterOverview');
const pipelinesList = getService('monitoringLogstashPipelines');
const pipelineViewer = getService('monitoringLogstashPipelineViewer');

describe('Logstash pipeline viewer mb', () => {
const { setup, tearDown } = getLifecycleMethods(getService, getPageObjects);

before(async () => {
await setup('x-pack/test/functional/es_archives/monitoring/logstash_pipelines_mb', {
from: 'Jan 22, 2018 @ 09:10:00.000',
to: 'Jan 22, 2018 @ 09:41:00.000',
useCreate: true,
});

await overview.closeAlertsModal();

// go to nginx_logs pipeline view
await overview.clickLsPipelines();
expect(await pipelinesList.isOnListing()).to.be(true);
await pipelinesList.clickPipeline('nginx_logs');
expect(await pipelineViewer.isOnPipelineViewer()).to.be(true);
});

after(async () => {
await tearDown();
});

it('displays pipelines inputs and ouputs', async () => {
const { inputs, filters, outputs } = await pipelineViewer.getPipelineDefinition();

expect(inputs).to.eql([{ name: 'generator', metrics: ['mygen01', '62.5 e/s emitted'] }]);
expect(filters).to.eql([
{ name: 'sleep', metrics: ['1%', '94.86 ms/e', '62.5 e/s received'] },
]);
expect(outputs).to.eql([{ name: 'stdout', metrics: ['0%', '0 ms/e', '62.5 e/s received'] }]);
});
});
}
Binary file not shown.
2 changes: 2 additions & 0 deletions x-pack/test/functional/services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
MonitoringLogstashNodesProvider,
MonitoringLogstashNodeDetailProvider,
MonitoringLogstashPipelinesProvider,
MonitoringLogstashPipelineViewerProvider,
MonitoringLogstashSummaryStatusProvider,
MonitoringKibanaOverviewProvider,
MonitoringKibanaInstancesProvider,
Expand Down Expand Up @@ -98,6 +99,7 @@ export const services = {
monitoringLogstashNodes: MonitoringLogstashNodesProvider,
monitoringLogstashNodeDetail: MonitoringLogstashNodeDetailProvider,
monitoringLogstashPipelines: MonitoringLogstashPipelinesProvider,
monitoringLogstashPipelineViewer: MonitoringLogstashPipelineViewerProvider,
monitoringLogstashSummaryStatus: MonitoringLogstashSummaryStatusProvider,
monitoringKibanaOverview: MonitoringKibanaOverviewProvider,
monitoringKibanaInstances: MonitoringKibanaInstancesProvider,
Expand Down
1 change: 1 addition & 0 deletions x-pack/test/functional/services/monitoring/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export { MonitoringLogstashOverviewProvider } from './logstash_overview';
export { MonitoringLogstashNodesProvider } from './logstash_nodes';
export { MonitoringLogstashNodeDetailProvider } from './logstash_node_detail';
export { MonitoringLogstashPipelinesProvider } from './logstash_pipelines';
export { MonitoringLogstashPipelineViewerProvider } from './logstash_pipeline_viewer';
export { MonitoringLogstashSummaryStatusProvider } from './logstash_summary_status';
export { MonitoringKibanaOverviewProvider } from './kibana_overview';
export { MonitoringKibanaInstancesProvider } from './kibana_instances';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export function MonitoringLogstashPipelineViewerProvider({ getService }) {
const testSubjects = getService('testSubjects');
const retry = getService('retry');
const find = getService('find');

const PIPELINE_VIEWER_SELECTOR = '.monPipelineViewer';
const SUBJ_PIPELINE_SECTION_PREFIX = 'pipelineViewerSection_';
const PIPELINE_SECTION_ITEM_CLS = 'monPipelineViewer__listItem';

return new (class LogstashPipelineViewer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return new class pattern is really interesting. Not sure why we do it that way, but indeed this fits with the rest of the monitoring test services.

isOnPipelineViewer() {
return retry.try(() => find.existsByCssSelector(PIPELINE_VIEWER_SELECTOR));
}

async getPipelineDefinition() {
const getSectionItems = async (section) => {
const items = await section.findAllByClassName(PIPELINE_SECTION_ITEM_CLS);

return Promise.all(
items.map(async (item) => {
const [name, ...metrics] = await item.getVisibleText().then((text) => text.split('\n'));
return { name, metrics };
})
);
};

const [inputs, filters, outputs] = await Promise.all([
testSubjects.find(SUBJ_PIPELINE_SECTION_PREFIX + 'Inputs').then(getSectionItems),
testSubjects.find(SUBJ_PIPELINE_SECTION_PREFIX + 'Filters').then(getSectionItems),
testSubjects.find(SUBJ_PIPELINE_SECTION_PREFIX + 'Outputs').then(getSectionItems),
]);

return { inputs, filters, outputs };
}
})();
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ export function MonitoringLogstashPipelinesProvider({ getService, getPageObjects
}, []);
}

async clickPipeline(id) {
const anchors = await testSubjects.findAll(SUBJ_PIPELINES_IDS);
for (let i = 0; i < anchors.length; i++) {
const anchor = anchors[i];
if ((await anchor.getVisibleText()) === id) {
return anchor.click();
}
}

throw new Error(`pipeline with id ${id} not found`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just nit picking but, seems like this is the pipeline "name" rather than "id". Pretty sure pipelines get a unique id number as well as a string name.

}

async clickIdCol() {
const headerCell = await testSubjects.find(SUBJ_TABLE_SORT_ID_COL);
const button = await headerCell.findByTagName('button');
Expand Down