Skip to content

Commit

Permalink
[backend] Improve bulk Stix loader to support 100K+ relations loading…
Browse files Browse the repository at this point in the history
… in one bulk (#6386)
  • Loading branch information
richard-julien authored Mar 20, 2024
1 parent 380ba56 commit a28aa57
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 20 deletions.
28 changes: 18 additions & 10 deletions opencti-platform/opencti-graphql/src/database/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -1609,17 +1609,25 @@ const buildLocalMustFilter = async (validFilter) => {
const parentKey = arrayKeys.at(0);
const { key: nestedKey, values: nestedValues, operator: nestedOperator = 'eq' } = nestedElement;
const nestedShould = [];
for (let i = 0; i < nestedValues.length; i += 1) {
const nestedFieldKey = `${parentKey}.${nestedKey}`;
const nestedSearchValues = nestedValues[i].toString();
if (nestedOperator === 'wildcard') {
nestedShould.push({ query_string: { query: `${nestedSearchValues}`, fields: [nestedFieldKey] } });
} else if (nestedOperator === 'not_eq') {
nestedMustNot.push({ match_phrase: { [nestedFieldKey]: nestedSearchValues } });
} else if (RANGE_OPERATORS.includes(nestedOperator)) {
nestedShould.push({ range: { [nestedFieldKey]: { [nestedOperator]: nestedSearchValues } } });
const nestedFieldKey = `${parentKey}.${nestedKey}`;
if (nestedKey === ID_INTERNAL) {
if (nestedOperator === 'not_eq') {
nestedMustNot.push({ terms: { [`${nestedFieldKey}.keyword`]: nestedValues } });
} else {
nestedShould.push({ match_phrase: { [nestedFieldKey]: nestedSearchValues } });
nestedShould.push({ terms: { [`${nestedFieldKey}.keyword`]: nestedValues } });
}
} else {
for (let i = 0; i < nestedValues.length; i += 1) {
const nestedSearchValues = nestedValues[i].toString();
if (nestedOperator === 'wildcard') {
nestedShould.push({ query_string: { query: `${nestedSearchValues}`, fields: [nestedFieldKey] } });
} else if (nestedOperator === 'not_eq') {
nestedMustNot.push({ match_phrase: { [nestedFieldKey]: nestedSearchValues } });
} else if (RANGE_OPERATORS.includes(nestedOperator)) {
nestedShould.push({ range: { [nestedFieldKey]: { [nestedOperator]: nestedSearchValues } } });
} else {
nestedShould.push({ match_phrase: { [nestedFieldKey]: nestedSearchValues } });
}
}
}
const should = {
Expand Down
39 changes: 29 additions & 10 deletions opencti-platform/opencti-graphql/src/database/middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,13 @@ const loadElementMetaDependencies = async (context, user, elements, args = {}) =
for (let i = 0; i < groupOfWorkingIds.length; i += 1) {
const fromIds = groupOfWorkingIds[i];
const relationFilter = { mode: FilterMode.And, filters: [{ key: ['fromId'], values: fromIds }], filterGroups: [] };
const refsListed = await listAllRelations(context, user, relTypes, { filters: relationFilter });
refsRelations.push(...refsListed);
// All callback to iteratively push the relations to the global ref relations array
// As listAllRelations can bring more than 100K+ relations, we need to split the append
// due to nodejs limitation to 100K function parameters limit
const allRelCallback = async (relations) => {
refsRelations.push(...relations);
};
await listAllRelations(context, user, relTypes, { baseData: true, filters: relationFilter, callback: allRelCallback });
}
const refsPerElements = R.groupBy((r) => r.fromId, refsRelations);
// Parallel resolutions
Expand All @@ -294,16 +299,25 @@ const loadElementMetaDependencies = async (context, user, elements, args = {}) =
const entries = Object.entries(grouped);
for (let index = 0; index < entries.length; index += 1) {
const [key, values] = entries[index];
const inputKey = schemaRelationsRefDefinition.convertDatabaseNameToInputName(element.entity_type, key);
const resolvedElementsWithRelation = R.map((v) => {
const invalidRelations = [];
const resolvedElementsWithRelation = [];
let startProcessingTime = new Date().getTime();
for (let valueIndex = 0; valueIndex < values.length; valueIndex += 1) {
const v = values[valueIndex];
const resolvedElement = toResolvedElements[v.toId];
return resolvedElement ? { ...resolvedElement, i_relation: v } : {};
}, values).filter((d) => isNotEmptyField(d));
const metaRefKey = schemaRelationsRefDefinition.getRelationRef(element.entity_type, inputKey);
if (isEmptyField(metaRefKey)) {
throw UnsupportedError('Schema validation failure when loading dependencies', { key, inputKey, type: element.entity_type });
if (resolvedElement) {
resolvedElementsWithRelation.push({ ...resolvedElement, i_relation: v });
} else {
invalidRelations.push({ relation_id: v.id, target_id: v.toId });
}
// Prevent event loop locking more than MAX_EVENT_LOOP_PROCESSING_TIME
if (new Date().getTime() - startProcessingTime > MAX_EVENT_LOOP_PROCESSING_TIME) {
startProcessingTime = new Date().getTime();
await new Promise((resolve) => {
setImmediate(resolve);
});
}
}
const invalidRelations = values.filter((v) => toResolvedElements[v.toId] === undefined);
if (invalidRelations.length > 0) {
// Some targets can be unresolved in case of potential inconsistency between relation and target
// This kind of situation can happen if:
Expand All @@ -312,6 +326,11 @@ const loadElementMetaDependencies = async (context, user, elements, args = {}) =
const relations = invalidRelations.map((v) => ({ relation_id: v.id, target_id: v.toId }));
logApp.warn('Targets of loadElementMetaDependencies not found', { relations });
}
const inputKey = schemaRelationsRefDefinition.convertDatabaseNameToInputName(element.entity_type, key);
const metaRefKey = schemaRelationsRefDefinition.getRelationRef(element.entity_type, inputKey);
if (isEmptyField(metaRefKey)) {
throw UnsupportedError('Schema validation failure when loading dependencies', { key, inputKey, type: element.entity_type });
}
data[key] = !metaRefKey.multiple ? R.head(resolvedElementsWithRelation)?.internal_id : resolvedElementsWithRelation.map((r) => r.internal_id);
data[inputKey] = !metaRefKey.multiple ? R.head(resolvedElementsWithRelation) : resolvedElementsWithRelation;
}
Expand Down

0 comments on commit a28aa57

Please sign in to comment.