Skip to content

Commit

Permalink
[sample data] handle index aliases when installing/uninstalling datas…
Browse files Browse the repository at this point in the history
…ets (elastic#122689)

* refactor install/uninstall routes

* only skip failing tests

* check for aliases when uninstalling sample datasets

* fix return value

* add unit tests

* factorize installer creation

* add tests for the alias scenario

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
pgayvallet and kibanamachine authored Jan 17, 2022
1 parent b0f4e88 commit 683ab10
Show file tree
Hide file tree
Showing 10 changed files with 740 additions and 209 deletions.
13 changes: 13 additions & 0 deletions src/plugins/home/server/services/sample_data/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export class SampleDataInstallError extends Error {
constructor(message: string, public readonly httpCode: number) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { IScopedClusterClient, Logger } from 'kibana/server';
import type { DataIndexSchema } from './sample_dataset_registry_types';
import {
translateTimeRelativeToDifference,
translateTimeRelativeToWeek,
} from './translate_timestamp';
import { loadData } from './load_data';

export const insertDataIntoIndex = ({
dataIndexConfig,
logger,
esClient,
index,
nowReference,
}: {
dataIndexConfig: DataIndexSchema;
index: string;
nowReference: string;
esClient: IScopedClusterClient;
logger: Logger;
}) => {
const updateTimestamps = (doc: any) => {
dataIndexConfig.timeFields
.filter((timeFieldName: string) => doc[timeFieldName])
.forEach((timeFieldName: string) => {
doc[timeFieldName] = dataIndexConfig.preserveDayOfWeekTimeOfDay
? translateTimeRelativeToWeek(
doc[timeFieldName],
dataIndexConfig.currentTimeMarker,
nowReference
)
: translateTimeRelativeToDifference(
doc[timeFieldName],
dataIndexConfig.currentTimeMarker,
nowReference
);
});
return doc;
};

const bulkInsert = async (docs: unknown[]) => {
const insertCmd = { index: { _index: index } };
const bulk: unknown[] = [];
docs.forEach((doc: unknown) => {
bulk.push(insertCmd);
bulk.push(updateTimestamps(doc));
});

const { body: resp } = await esClient.asCurrentUser.bulk({
body: bulk,
});

if (resp.errors) {
const errMsg = `sample_data install errors while bulk inserting. Elasticsearch response: ${JSON.stringify(
resp,
null,
''
)}`;
logger.warn(errMsg);
return Promise.reject(
new Error(`Unable to load sample data into index "${index}", see kibana logs for details`)
);
}
};
return loadData(dataIndexConfig.dataPath, bulkInsert); // this returns a Promise
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import { createUnzip } from 'zlib';

const BULK_INSERT_SIZE = 500;

export function loadData(path: any, bulkInsert: (docs: any[]) => Promise<void>) {
export function loadData(
path: string,
bulkInsert: (docs: unknown[]) => Promise<void>
): Promise<number> {
return new Promise((resolve, reject) => {
let count: number = 0;
let docs: any[] = [];
Expand Down
161 changes: 26 additions & 135 deletions src/plugins/home/server/services/sample_data/routes/install.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,73 +6,12 @@
* Side Public License, v 1.
*/

import { Readable } from 'stream';
import { schema } from '@kbn/config-schema';
import { IRouter, Logger, IScopedClusterClient } from 'src/core/server';
import { IRouter, Logger } from 'src/core/server';
import { SampleDatasetSchema } from '../lib/sample_dataset_registry_types';
import { createIndexName } from '../lib/create_index_name';
import {
dateToIso8601IgnoringTime,
translateTimeRelativeToDifference,
translateTimeRelativeToWeek,
} from '../lib/translate_timestamp';
import { loadData } from '../lib/load_data';
import { SampleDataUsageTracker } from '../usage/usage';
import { getSavedObjectsClient } from './utils';
import { getUniqueObjectTypes } from '../lib/utils';

const insertDataIntoIndex = (
dataIndexConfig: any,
index: string,
nowReference: string,
esClient: IScopedClusterClient,
logger: Logger
) => {
function updateTimestamps(doc: any) {
dataIndexConfig.timeFields
.filter((timeFieldName: string) => doc[timeFieldName])
.forEach((timeFieldName: string) => {
doc[timeFieldName] = dataIndexConfig.preserveDayOfWeekTimeOfDay
? translateTimeRelativeToWeek(
doc[timeFieldName],
dataIndexConfig.currentTimeMarker,
nowReference
)
: translateTimeRelativeToDifference(
doc[timeFieldName],
dataIndexConfig.currentTimeMarker,
nowReference
);
});
return doc;
}

const bulkInsert = async (docs: any) => {
const insertCmd = { index: { _index: index } };
const bulk: any[] = [];
docs.forEach((doc: any) => {
bulk.push(insertCmd);
bulk.push(updateTimestamps(doc));
});

const { body: resp } = await esClient.asCurrentUser.bulk({
body: bulk,
});

if (resp.errors) {
const errMsg = `sample_data install errors while bulk inserting. Elasticsearch response: ${JSON.stringify(
resp,
null,
''
)}`;
logger.warn(errMsg);
return Promise.reject(
new Error(`Unable to load sample data into index "${index}", see kibana logs for details`)
);
}
};
return loadData(dataIndexConfig.dataPath, bulkInsert); // this returns a Promise
};
import { getSampleDataInstaller } from './utils';
import { SampleDataInstallError } from '../errors';

export function createInstallRoute(
router: IRouter,
Expand All @@ -95,86 +34,38 @@ export function createInstallRoute(
if (!sampleDataset) {
return res.notFound();
}

// @ts-ignore Custom query validation used
const now = query.now ? new Date(query.now) : new Date();
const nowReference = dateToIso8601IgnoringTime(now);
const counts = {};
for (let i = 0; i < sampleDataset.dataIndices.length; i++) {
const dataIndexConfig = sampleDataset.dataIndices[i];
const index = createIndexName(sampleDataset.id, dataIndexConfig.id);

// clean up any old installation of dataset
try {
await context.core.elasticsearch.client.asCurrentUser.indices.delete({
index,
});
} catch (err) {
// ignore delete errors
}
const sampleDataInstaller = getSampleDataInstaller({
datasetId: sampleDataset.id,
sampleDatasets,
logger,
context,
});

try {
await context.core.elasticsearch.client.asCurrentUser.indices.create({
index,
try {
const installResult = await sampleDataInstaller.install(params.id, now);
// track the usage operation in a non-blocking way
usageTracker.addInstall(params.id);
return res.ok({
body: {
elasticsearchIndicesCreated: installResult.createdDocsPerIndex,
kibanaSavedObjectsLoaded: installResult.createdSavedObjects,
},
});
} catch (e) {
if (e instanceof SampleDataInstallError) {
return res.customError({
body: {
settings: { index: { number_of_shards: 1, auto_expand_replicas: '0-1' } },
mappings: { properties: dataIndexConfig.fields },
message: e.message,
},
statusCode: e.httpCode,
});
} catch (err) {
const errMsg = `Unable to create sample data index "${index}", error: ${err.message}`;
logger.warn(errMsg);
return res.customError({ body: errMsg, statusCode: err.status });
}

try {
const count = await insertDataIntoIndex(
dataIndexConfig,
index,
nowReference,
context.core.elasticsearch.client,
logger
);
(counts as any)[index] = count;
} catch (err) {
const errMsg = `sample_data install errors while loading data. Error: ${err}`;
throw new Error(errMsg);
}
throw e;
}

const { getImporter } = context.core.savedObjects;
const objectTypes = getUniqueObjectTypes(sampleDataset.savedObjects);
const savedObjectsClient = getSavedObjectsClient(context, objectTypes);
const importer = getImporter(savedObjectsClient);

const savedObjects = sampleDataset.savedObjects.map(({ version, ...obj }) => obj);
const readStream = Readable.from(savedObjects);

try {
const { errors = [] } = await importer.import({
readStream,
overwrite: true,
createNewCopies: false,
});
if (errors.length > 0) {
const errMsg = `sample_data install errors while loading saved objects. Errors: ${JSON.stringify(
errors.map(({ type, id, error }) => ({ type, id, error })) // discard other fields
)}`;
logger.warn(errMsg);
return res.customError({ body: errMsg, statusCode: 500 });
}
} catch (err) {
const errMsg = `import failed, error: ${err.message}`;
throw new Error(errMsg);
}
usageTracker.addInstall(params.id);

// FINALLY
return res.ok({
body: {
elasticsearchIndicesCreated: counts,
kibanaSavedObjectsLoaded: sampleDataset.savedObjects.length,
},
});
}
);
}
76 changes: 22 additions & 54 deletions src/plugins/home/server/services/sample_data/routes/uninstall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
* Side Public License, v 1.
*/

import { isBoom } from '@hapi/boom';
import { schema } from '@kbn/config-schema';
import type { IRouter, Logger } from 'src/core/server';
import { SampleDatasetSchema } from '../lib/sample_dataset_registry_types';
import { createIndexName } from '../lib/create_index_name';
import { SampleDataUsageTracker } from '../usage/usage';
import { findSampleObjects } from '../lib/find_sample_objects';
import { getUniqueObjectTypes } from '../lib/utils';
import { getSavedObjectsClient } from './utils';
import { getSampleDataInstaller } from './utils';
import { SampleDataInstallError } from '../errors';

export function createUninstallRoute(
router: IRouter,
Expand All @@ -31,62 +28,33 @@ export function createUninstallRoute(
},
async (context, request, response) => {
const sampleDataset = sampleDatasets.find(({ id }) => id === request.params.id);

if (!sampleDataset) {
return response.notFound();
}

for (let i = 0; i < sampleDataset.dataIndices.length; i++) {
const dataIndexConfig = sampleDataset.dataIndices[i];
const index = createIndexName(sampleDataset.id, dataIndexConfig.id);

try {
// TODO: don't delete the index if sample data exists in other spaces (#116677)
await context.core.elasticsearch.client.asCurrentUser.indices.delete({ index });
} catch (err) {
// if the index doesn't exist, ignore the error and proceed
if (err.body.status !== 404) {
return response.customError({
statusCode: err.body.status,
body: {
message: `Unable to delete sample data index "${index}", error: ${err.body.error.type}`,
},
});
}
}
}

const objects = sampleDataset.savedObjects.map(({ type, id }) => ({ type, id }));
const objectTypes = getUniqueObjectTypes(objects);
const client = getSavedObjectsClient(context, objectTypes);
const findSampleObjectsResult = await findSampleObjects({ client, logger, objects });

const objectsToDelete = findSampleObjectsResult.filter(({ foundObjectId }) => foundObjectId);
const deletePromises = objectsToDelete.map(({ type, foundObjectId }) =>
client.delete(type, foundObjectId!).catch((err) => {
// if the object doesn't exist, ignore the error and proceed
if (isBoom(err) && err.output.statusCode === 404) {
return;
}
throw err;
})
);
const sampleDataInstaller = getSampleDataInstaller({
datasetId: sampleDataset.id,
sampleDatasets,
logger,
context,
});

try {
await Promise.all(deletePromises);
} catch (err) {
return response.customError({
statusCode: err.body.status,
body: {
message: `Unable to delete sample dataset saved objects, error: ${err.body.error.type}`,
},
});
await sampleDataInstaller.uninstall(request.params.id);
// track the usage operation in a non-blocking way
usageTracker.addUninstall(request.params.id);
return response.noContent();
} catch (e) {
if (e instanceof SampleDataInstallError) {
return response.customError({
body: {
message: e.message,
},
statusCode: e.httpCode,
});
}
throw e;
}

// track the usage operation in a non-blocking way
usageTracker.addUninstall(request.params.id);

return response.noContent();
}
);
}
Loading

0 comments on commit 683ab10

Please sign in to comment.