Skip to content

Commit 937bae1

Browse files
[Fleet] Fix duplicate data streams being shown in UI (#89812) (#90059)
* Add API integration tests for data streams list, including one that is expected to fail due to reliance on number of backing indices * Use ES data streams API as source of truth for list of data streams, and only query against backing indices afterwards * Get package name from data stream meta info * Increate retry timeout * Move initial info requests inside Promise.all Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
1 parent e5ff183 commit 937bae1

File tree

6 files changed

+344
-127
lines changed

6 files changed

+344
-127
lines changed

x-pack/plugins/fleet/common/types/models/data_stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export interface DataStream {
1111
type: string;
1212
package: string;
1313
package_version: string;
14-
last_activity: string;
14+
last_activity_ms: number;
1515
size_in_bytes: number;
1616
dashboards: Array<{
1717
id: string;

x-pack/plugins/fleet/public/applications/fleet/sections/data_stream/list_page/index.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,14 @@ export const DataStreamListPage: React.FunctionComponent<{}> = () => {
121121
},
122122
},
123123
{
124-
field: 'last_activity',
124+
field: 'last_activity_ms',
125125
sortable: true,
126126
width: '25%',
127127
dataType: 'date',
128128
name: i18n.translate('xpack.fleet.dataStreamList.lastActivityColumnTitle', {
129129
defaultMessage: 'Last activity',
130130
}),
131-
render: (date: DataStream['last_activity']) => {
131+
render: (date: DataStream['last_activity_ms']) => {
132132
try {
133133
const formatter = fieldFormats.getInstance('date');
134134
return formatter.convert(date);

x-pack/plugins/fleet/server/routes/data_streams/handlers.ts

Lines changed: 153 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -4,157 +4,187 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66
import { RequestHandler, SavedObjectsClientContract } from 'src/core/server';
7+
import { keyBy, keys, merge } from 'lodash';
78
import { DataStream } from '../../types';
89
import { GetDataStreamsResponse, KibanaAssetType, KibanaSavedObjectType } from '../../../common';
910
import { getPackageSavedObjects, getKibanaSavedObject } from '../../services/epm/packages/get';
1011
import { defaultIngestErrorHandler } from '../../errors';
1112

1213
const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*';
1314

15+
interface ESDataStreamInfoResponse {
16+
data_streams: Array<{
17+
name: string;
18+
timestamp_field: {
19+
name: string;
20+
};
21+
indices: Array<{ index_name: string; index_uuid: string }>;
22+
generation: number;
23+
_meta?: {
24+
package?: {
25+
name: string;
26+
};
27+
managed_by?: string;
28+
managed?: boolean;
29+
[key: string]: any;
30+
};
31+
status: string;
32+
template: string;
33+
ilm_policy: string;
34+
hidden: boolean;
35+
}>;
36+
}
37+
38+
interface ESDataStreamStatsResponse {
39+
data_streams: Array<{
40+
data_stream: string;
41+
backing_indices: number;
42+
store_size_bytes: number;
43+
maximum_timestamp: number;
44+
}>;
45+
}
46+
1447
export const getListHandler: RequestHandler = async (context, request, response) => {
1548
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
49+
const body: GetDataStreamsResponse = {
50+
data_streams: [],
51+
};
1652

1753
try {
18-
// Get stats (size on disk) of all potentially matching indices
19-
const { indices: indexStats } = await callCluster('indices.stats', {
20-
index: DATA_STREAM_INDEX_PATTERN,
21-
metric: ['store'],
22-
});
54+
// Get matching data streams, their stats, and package SOs
55+
const [
56+
{ data_streams: dataStreamsInfo },
57+
{ data_streams: dataStreamStats },
58+
packageSavedObjects,
59+
] = await Promise.all([
60+
callCluster('transport.request', {
61+
method: 'GET',
62+
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}`,
63+
}) as Promise<ESDataStreamInfoResponse>,
64+
callCluster('transport.request', {
65+
method: 'GET',
66+
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}/_stats`,
67+
}) as Promise<ESDataStreamStatsResponse>,
68+
getPackageSavedObjects(context.core.savedObjects.client),
69+
]);
70+
const dataStreamsInfoByName = keyBy(dataStreamsInfo, 'name');
71+
const dataStreamsStatsByName = keyBy(dataStreamStats, 'data_stream');
72+
73+
// Combine data stream info
74+
const dataStreams = merge(dataStreamsInfoByName, dataStreamsStatsByName);
75+
const dataStreamNames = keys(dataStreams);
76+
77+
// Map package SOs
78+
const packageSavedObjectsByName = keyBy(packageSavedObjects.saved_objects, 'id');
79+
const packageMetadata: any = {};
80+
81+
// Query additional information for each data stream
82+
const dataStreamPromises = dataStreamNames.map(async (dataStreamName) => {
83+
const dataStream = dataStreams[dataStreamName];
84+
const dataStreamResponse: DataStream = {
85+
index: dataStreamName,
86+
dataset: '',
87+
namespace: '',
88+
type: '',
89+
package: dataStream._meta?.package?.name || '',
90+
package_version: '',
91+
last_activity_ms: dataStream.maximum_timestamp,
92+
size_in_bytes: dataStream.store_size_bytes,
93+
dashboards: [],
94+
};
2395

24-
// Get all matching indices and info about each
25-
// This returns the top 100,000 indices (as buckets) by last activity
26-
const { aggregations } = await callCluster('search', {
27-
index: DATA_STREAM_INDEX_PATTERN,
28-
body: {
29-
size: 0,
30-
query: {
31-
bool: {
32-
must: [
33-
{
34-
exists: {
35-
field: 'data_stream.namespace',
96+
// Query backing indices to extract data stream dataset, namespace, and type values
97+
const {
98+
aggregations: { dataset, namespace, type },
99+
} = await callCluster('search', {
100+
index: dataStream.indices.map((index) => index.index_name),
101+
body: {
102+
size: 0,
103+
query: {
104+
bool: {
105+
must: [
106+
{
107+
exists: {
108+
field: 'data_stream.namespace',
109+
},
36110
},
37-
},
38-
{
39-
exists: {
40-
field: 'data_stream.dataset',
111+
{
112+
exists: {
113+
field: 'data_stream.dataset',
114+
},
41115
},
42-
},
43-
],
116+
],
117+
},
44118
},
45-
},
46-
aggs: {
47-
index: {
48-
terms: {
49-
field: '_index',
50-
size: 100000,
51-
order: {
52-
last_activity: 'desc',
119+
aggs: {
120+
dataset: {
121+
terms: {
122+
field: 'data_stream.dataset',
123+
size: 1,
53124
},
54125
},
55-
aggs: {
56-
dataset: {
57-
terms: {
58-
field: 'data_stream.dataset',
59-
size: 1,
60-
},
61-
},
62-
namespace: {
63-
terms: {
64-
field: 'data_stream.namespace',
65-
size: 1,
66-
},
126+
namespace: {
127+
terms: {
128+
field: 'data_stream.namespace',
129+
size: 1,
67130
},
68-
type: {
69-
terms: {
70-
field: 'data_stream.type',
71-
size: 1,
72-
},
73-
},
74-
last_activity: {
75-
max: {
76-
field: '@timestamp',
77-
},
131+
},
132+
type: {
133+
terms: {
134+
field: 'data_stream.type',
135+
size: 1,
78136
},
79137
},
80138
},
81139
},
82-
},
83-
});
84-
85-
const body: GetDataStreamsResponse = {
86-
data_streams: [],
87-
};
88-
89-
if (!(aggregations && aggregations.index && aggregations.index.buckets)) {
90-
return response.ok({
91-
body,
92140
});
93-
}
94141

95-
const {
96-
index: { buckets: indexResults },
97-
} = aggregations;
98-
99-
const packageSavedObjects = await getPackageSavedObjects(context.core.savedObjects.client);
100-
const packageMetadata: any = {};
101-
102-
const dataStreamsPromises = (indexResults as any[]).map(async (result) => {
103-
const {
104-
key: indexName,
105-
dataset: { buckets: datasetBuckets },
106-
namespace: { buckets: namespaceBuckets },
107-
type: { buckets: typeBuckets },
108-
last_activity: { value_as_string: lastActivity },
109-
} = result;
110-
111-
// We don't have a reliable way to associate index with package ID, so
112-
// this is a hack to extract the package ID from the first part of the dataset name
113-
// with fallback to extraction from index name
114-
const pkg = datasetBuckets.length
115-
? datasetBuckets[0].key.split('.')[0]
116-
: indexName.split('-')[1].split('.')[0];
117-
const pkgSavedObject = packageSavedObjects.saved_objects.filter((p) => p.id === pkg);
118-
119-
// if
120-
// - the datastream is associated with a package
121-
// - and the package has been installed through EPM
122-
// - and we didn't pick the metadata in an earlier iteration of this map()
123-
if (pkg !== '' && pkgSavedObject.length > 0 && !packageMetadata[pkg]) {
124-
// then pick the dashboards from the package saved object
125-
const dashboards =
126-
pkgSavedObject[0].attributes?.installed_kibana?.filter(
127-
(o) => o.type === KibanaSavedObjectType.dashboard
128-
) || [];
129-
// and then pick the human-readable titles from the dashboard saved objects
130-
const enhancedDashboards = await getEnhancedDashboards(
131-
context.core.savedObjects.client,
132-
dashboards
133-
);
134-
135-
packageMetadata[pkg] = {
136-
version: pkgSavedObject[0].attributes?.version || '',
137-
dashboards: enhancedDashboards,
138-
};
142+
// Set values from backing indices query
143+
dataStreamResponse.dataset = dataset.buckets[0]?.key || '';
144+
dataStreamResponse.namespace = namespace.buckets[0]?.key || '';
145+
dataStreamResponse.type = type.buckets[0]?.key || '';
146+
147+
// Find package saved object
148+
const pkgName = dataStreamResponse.package;
149+
const pkgSavedObject = pkgName ? packageSavedObjectsByName[pkgName] : null;
150+
151+
if (pkgSavedObject) {
152+
// if
153+
// - the data stream is associated with a package
154+
// - and the package has been installed through EPM
155+
// - and we didn't pick the metadata in an earlier iteration of this map()
156+
if (!packageMetadata[pkgName]) {
157+
// then pick the dashboards from the package saved object
158+
const dashboards =
159+
pkgSavedObject.attributes?.installed_kibana?.filter(
160+
(o) => o.type === KibanaSavedObjectType.dashboard
161+
) || [];
162+
// and then pick the human-readable titles from the dashboard saved objects
163+
const enhancedDashboards = await getEnhancedDashboards(
164+
context.core.savedObjects.client,
165+
dashboards
166+
);
167+
168+
packageMetadata[pkgName] = {
169+
version: pkgSavedObject.attributes?.version || '',
170+
dashboards: enhancedDashboards,
171+
};
172+
}
173+
174+
// Set values from package information
175+
dataStreamResponse.package = pkgName;
176+
dataStreamResponse.package_version = packageMetadata[pkgName].version;
177+
dataStreamResponse.dashboards = packageMetadata[pkgName].dashboards;
139178
}
140179

141-
return {
142-
index: indexName,
143-
dataset: datasetBuckets.length ? datasetBuckets[0].key : '',
144-
namespace: namespaceBuckets.length ? namespaceBuckets[0].key : '',
145-
type: typeBuckets.length ? typeBuckets[0].key : '',
146-
package: pkgSavedObject.length ? pkg : '',
147-
package_version: packageMetadata[pkg] ? packageMetadata[pkg].version : '',
148-
last_activity: lastActivity,
149-
size_in_bytes: indexStats[indexName] ? indexStats[indexName].total.store.size_in_bytes : 0,
150-
dashboards: packageMetadata[pkg] ? packageMetadata[pkg].dashboards : [],
151-
};
180+
return dataStreamResponse;
152181
});
153182

154-
const dataStreams: DataStream[] = await Promise.all(dataStreamsPromises);
155-
156-
body.data_streams = dataStreams;
157-
183+
// Return final data streams objects sorted by last activity, decending
184+
// After filtering out data streams that are missing dataset/namespace/type fields
185+
body.data_streams = (await Promise.all(dataStreamPromises))
186+
.filter(({ dataset, namespace, type }) => dataset && namespace && type)
187+
.sort((a, b) => b.last_activity_ms - a.last_activity_ms);
158188
return response.ok({
159189
body,
160190
});
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
export default function loadTests({ loadTestFile }) {
8+
describe('Data Stream Endpoints', () => {
9+
loadTestFile(require.resolve('./list'));
10+
});
11+
}

0 commit comments

Comments
 (0)