-
Notifications
You must be signed in to change notification settings - Fork 946
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[api] Add migration to purge indicates indexing
- Loading branch information
Samuel Hassine
committed
Feb 3, 2021
1 parent
7a60041
commit 2a242ec
Showing
3 changed files
with
43 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
41 changes: 41 additions & 0 deletions
41
opencti-platform/opencti-graphql/src/migrations/1612381566895-clear_indicates_indexation.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import * as R from 'ramda'; | ||
import { Promise } from 'bluebird'; | ||
import { READ_DATA_INDICES } from '../database/utils'; | ||
import { ENTITY_TYPE_INDICATOR } from '../schema/stixDomainObject'; | ||
import { BULK_TIMEOUT, elBulk, elList, ES_MAX_CONCURRENCY, MAX_SPLIT } from '../database/elasticSearch'; | ||
import { logger } from '../config/conf'; | ||
import { SYSTEM_USER } from '../domain/user'; | ||
import { ABSTRACT_STIX_CORE_OBJECT, ABSTRACT_STIX_CORE_RELATIONSHIP } from '../schema/general'; | ||
|
||
export const up = async (next) => { | ||
const start = new Date().getTime(); | ||
logger.info(`[MIGRATION] Cleaning indicates for all entities and relationships`); | ||
const bulkOperations = []; | ||
const callback = (entities) => { | ||
const op = entities | ||
.filter((n) => n.entity_type !== ENTITY_TYPE_INDICATOR) | ||
.map((att) => { | ||
return [{ update: { _index: att._index, _id: att.id } }, { doc: { 'rel_indicates.internal_id': null } }]; | ||
}) | ||
.flat(); | ||
bulkOperations.push(...op); | ||
}; | ||
const filters = [{ key: 'rel_indicates.internal_id', values: ['EXISTS'] }]; | ||
const opts = { types: [ABSTRACT_STIX_CORE_OBJECT, ABSTRACT_STIX_CORE_RELATIONSHIP], filters, callback }; | ||
await elList(SYSTEM_USER, READ_DATA_INDICES, opts); | ||
// Apply operations. | ||
let currentProcessing = 0; | ||
const groupsOfOperations = R.splitEvery(MAX_SPLIT, bulkOperations); | ||
const concurrentUpdate = async (bulk) => { | ||
await elBulk({ refresh: true, timeout: BULK_TIMEOUT, body: bulk }); | ||
currentProcessing += bulk.length; | ||
logger.info(`[OPENCTI] Cleaning indicates indexation: ${currentProcessing} / ${bulkOperations.length}`); | ||
}; | ||
await Promise.map(groupsOfOperations, concurrentUpdate, { concurrency: ES_MAX_CONCURRENCY }); | ||
logger.info(`[MIGRATION] Cleaning indicates done in ${new Date() - start} ms`); | ||
next(); | ||
}; | ||
|
||
export const down = async (next) => { | ||
next(); | ||
}; |