From 501c775444787ddd683d63e44182b4ce645039da Mon Sep 17 00:00:00 2001 From: Samuel Hassine Date: Sun, 3 Mar 2024 20:19:44 +0100 Subject: [PATCH] [backend] Improve hub/children synchronization pattern (#6253) Co-authored-by: Julien Richard --- .../src/schema/relay.schema.graphql | 8 +-- .../config/schema/opencti.graphql | 8 +-- .../src/database/file-storage.js | 4 +- .../src/database/middleware.js | 72 +++++++++++-------- .../opencti-graphql/src/domain/connector.js | 2 +- .../src/domain/stixCoreObject.js | 2 +- .../opencti-graphql/src/generated/graphql.ts | 8 +-- .../opencti-graphql/src/graphql/graphql.js | 1 + .../src/manager/syncManager.js | 22 +++--- opencti-worker/src/worker.py | 2 + 10 files changed, 72 insertions(+), 57 deletions(-) diff --git a/opencti-platform/opencti-front/src/schema/relay.schema.graphql b/opencti-platform/opencti-front/src/schema/relay.schema.graphql index 50e86fed3cf82..ae065834d6774 100644 --- a/opencti-platform/opencti-front/src/schema/relay.schema.graphql +++ b/opencti-platform/opencti-front/src/schema/relay.schema.graphql @@ -7522,7 +7522,7 @@ type ExternalReferenceEditMutations { relationAdd(input: StixRefRelationshipAddInput!): StixRefRelationship relationDelete(fromId: StixRef!, relationship_type: String!): ExternalReference askEnrichment(connectorId: ID!): Work - importPush(file: Upload!, version: String, noTriggerImport: Boolean): File + importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File } type KillChainPhaseEditMutations { @@ -7542,7 +7542,7 @@ type StixCoreObjectEditMutations { restrictionOrganizationAdd(organizationId: ID!): StixCoreObject restrictionOrganizationDelete(organizationId: ID!): StixCoreObject askEnrichment(connectorId: ID!): Work - importPush(file: Upload!, version: String, noTriggerImport: Boolean): File + importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File exportAsk(format: String!, exportType: String!, maxMarkingDefinition: String): [File!] exportPush(file: Upload!): Boolean } @@ -7563,7 +7563,7 @@ type StixDomainObjectEditMutations { relationAdd(input: StixRefRelationshipAddInput!): StixRefRelationship relationsAdd(input: StixRefRelationshipsAddInput!): StixDomainObject relationDelete(toId: StixRef!, relationship_type: String): StixDomainObject - importPush(file: Upload!, version: String, noTriggerImport: Boolean): File + importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File exportAsk(format: String!, exportType: String!, maxMarkingDefinition: String): [File!] exportPush(file: Upload!): Boolean stixDomainObjectFileEdit(input: StixDomainObjectFileEditInput): StixDomainObject @@ -7796,7 +7796,7 @@ type StixCyberObservableEditMutations { relationsAdd(input: StixRefRelationshipsAddInput!): StixCyberObservable relationDelete(toId: StixRef!, relationship_type: String!): StixCyberObservable promote: StixCyberObservable - importPush(file: Upload!, version: String, noTriggerImport: Boolean): File + importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File exportAsk(format: String!, exportType: String!, maxMarkingDefinition: String): [File!] exportPush(file: Upload!): Boolean } diff --git a/opencti-platform/opencti-graphql/config/schema/opencti.graphql b/opencti-platform/opencti-graphql/config/schema/opencti.graphql index 5ee575e82f7a2..155dbf767ac24 100644 --- a/opencti-platform/opencti-graphql/config/schema/opencti.graphql +++ b/opencti-platform/opencti-graphql/config/schema/opencti.graphql @@ -12014,7 +12014,7 @@ type ExternalReferenceEditMutations { relationAdd(input: StixRefRelationshipAddInput!): StixRefRelationship @auth(for: [KNOWLEDGE_KNUPDATE]) relationDelete(fromId: StixRef!, relationship_type: String!): ExternalReference @auth(for: [KNOWLEDGE_KNUPDATE]) askEnrichment(connectorId: ID!): Work @auth(for: [KNOWLEDGE_KNENRICHMENT]) - importPush(file: Upload!, version: String, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD]) + importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD]) } type KillChainPhaseEditMutations { delete: ID @@ -12035,7 +12035,7 @@ type StixCoreObjectEditMutations { restrictionOrganizationAdd(organizationId: ID!): StixCoreObject @auth(for: [KNOWLEDGE_KNUPDATE_KNORGARESTRICT]) restrictionOrganizationDelete(organizationId: ID!): StixCoreObject @auth(for: [KNOWLEDGE_KNUPDATE_KNORGARESTRICT]) askEnrichment(connectorId: ID!): Work @auth(for: [KNOWLEDGE_KNENRICHMENT]) - importPush(file: Upload!, version: String, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD]) + importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD]) exportAsk(format: String!, exportType: String!, maxMarkingDefinition: String): [File!] @auth(for: [KNOWLEDGE_KNGETEXPORT_KNASKEXPORT]) exportPush(file: Upload!): Boolean @auth(for: [CONNECTORAPI]) @@ -12060,7 +12060,7 @@ type StixDomainObjectEditMutations { relationAdd(input: StixRefRelationshipAddInput!): StixRefRelationship @auth(for: [KNOWLEDGE_KNUPDATE]) relationsAdd(input: StixRefRelationshipsAddInput!): StixDomainObject @auth(for: [KNOWLEDGE_KNUPDATE]) relationDelete(toId: StixRef!, relationship_type: String): StixDomainObject @auth(for: [KNOWLEDGE_KNUPDATE]) - importPush(file: Upload!, version: String, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD]) + importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD]) exportAsk(format: String!, exportType: String!, maxMarkingDefinition: String): [File!] @auth(for: [KNOWLEDGE_KNGETEXPORT_KNASKEXPORT]) exportPush(file: Upload!): Boolean @auth(for: [CONNECTORAPI]) @@ -12290,7 +12290,7 @@ type StixCyberObservableEditMutations { relationsAdd(input: StixRefRelationshipsAddInput!): StixCyberObservable @auth(for: [KNOWLEDGE_KNUPDATE]) relationDelete(toId: StixRef!, relationship_type: String!): StixCyberObservable @auth(for: [KNOWLEDGE_KNUPDATE]) promote: StixCyberObservable @auth(for: [KNOWLEDGE_KNUPDATE]) - importPush(file: Upload!, version: String, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD]) + importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD]) exportAsk(format: String!, exportType: String!, maxMarkingDefinition: String): [File!] @auth(for: [KNOWLEDGE_KNGETEXPORT_KNASKEXPORT]) exportPush(file: Upload!): Boolean @auth(for: [CONNECTORAPI]) diff --git a/opencti-platform/opencti-graphql/src/database/file-storage.js b/opencti-platform/opencti-graphql/src/database/file-storage.js index edac6e2f3b474..f92f36e456cb3 100644 --- a/opencti-platform/opencti-graphql/src/database/file-storage.js +++ b/opencti-platform/opencti-graphql/src/database/file-storage.js @@ -7,7 +7,7 @@ import { defaultProvider } from '@aws-sdk/credential-provider-node'; import { getDefaultRoleAssumerWithWebIdentity } from '@aws-sdk/client-sts'; import mime from 'mime-types'; import conf, { booleanConf, ENABLED_FILE_INDEX_MANAGER, logApp } from '../config/conf'; -import { now, sinceNowInMinutes, truncate } from '../utils/format'; +import { now, sinceNowInMinutes, truncate, utcDate } from '../utils/format'; import { DatabaseError, FunctionalError, UnsupportedError } from '../config/errors'; import { createWork, deleteWorkForFile, deleteWorkForSource } from '../domain/work'; import { isNotEmptyField } from './utils'; @@ -328,7 +328,7 @@ export const upload = async (context, user, filePath, fileUpload, opts) => { const key = `${filePath}/${truncatedFileName}`; const currentFile = await documentFindById(context, user, key); if (currentFile) { - if (currentFile.metaData?.version === metadata.version) { + if (utcDate(currentFile.metaData.version).isSameOrAfter(utcDate(metadata.version))) { return { upload: currentFile, untouched: true }; } if (errorOnExisting) { diff --git a/opencti-platform/opencti-graphql/src/database/middleware.js b/opencti-platform/opencti-graphql/src/database/middleware.js index c91a40010b411..27dcc91404be3 100644 --- a/opencti-platform/opencti-graphql/src/database/middleware.js +++ b/opencti-platform/opencti-graphql/src/database/middleware.js @@ -1359,8 +1359,9 @@ const innerUpdateAttribute = (instance, rawInput) => { } return input; }; -const prepareAttributesForUpdate = (instance, elements, upsert) => { +const prepareAttributesForUpdate = async (context, user, instance, elements, upsert) => { const instanceType = instance.entity_type; + const platformStatuses = await getEntitiesListFromCache(context, user, ENTITY_TYPE_STATUS); return elements.map((input) => { // Dynamic cases, attributes not defined in the schema if (input.key.startsWith(RULE_PREFIX) || input.key.startsWith(REL_INDEX_PREFIX)) { @@ -1371,6 +1372,37 @@ const prepareAttributesForUpdate = (instance, elements, upsert) => { if (!def) { throw UnsupportedError('Cant prepare attribute for update', { type: instance.entity_type, name: input.key }); } + // Specific case for Label + if (input.key === VALUE_FIELD && instanceType === ENTITY_TYPE_LABEL) { + return { + key: input.key, + value: input.value.map((v) => v.toLowerCase()) + }; + } + // Specific case for name in aliased entities + // If name change already inside aliases, name must be kep untouched + if (upsert && input.key === NAME_FIELD && isStixObjectAliased(instanceType)) { + const aliasField = resolveAliasesField(instanceType).name; + const normalizeAliases = instance[aliasField] ? instance[aliasField].map((e) => normalizeName(e)) : []; + const name = normalizeName(input.value.at(0)); + if ((normalizeAliases).includes(name)) { + return null; + } + } + // Aliases can't have the same name as entity name and an already existing normalized alias + if (input.key === ATTRIBUTE_ALIASES || input.key === ATTRIBUTE_ALIASES_OPENCTI) { + const filteredValues = input.value.filter((e) => normalizeName(e) !== normalizeName(instance.name)); + const uniqAliases = R.uniqBy((e) => normalizeName(e), filteredValues); + return { key: input.key, value: uniqAliases }; + } + // For upsert, workflow cant be reset or setup on un-existing workflow + if (input.key === X_WORKFLOW_ID && upsert) { + const workflowId = R.head(input.value); + const workflowStatus = workflowId ? platformStatuses.find((p) => p.id === workflowId) : workflowId; + if (isEmptyField(workflowStatus)) { // If workflow is not found, remove the input + return null; + } + } // Check integer if (def.type === 'numeric') { return { @@ -1409,29 +1441,6 @@ const prepareAttributesForUpdate = (instance, elements, upsert) => { }; } } - // Specific case for Label - if (input.key === VALUE_FIELD && instanceType === ENTITY_TYPE_LABEL) { - return { - key: input.key, - value: input.value.map((v) => v.toLowerCase()) - }; - } - // Specific case for name in aliased entities - // If name change already inside aliases, name must be kep untouched - if (upsert && input.key === NAME_FIELD && isStixObjectAliased(instanceType)) { - const aliasField = resolveAliasesField(instanceType).name; - const normalizeAliases = instance[aliasField] ? instance[aliasField].map((e) => normalizeName(e)) : []; - const name = normalizeName(input.value.at(0)); - if ((normalizeAliases).includes(name)) { - return null; - } - } - // Aliases can't have the same name as entity name and an already existing normalized alias - if (input.key === ATTRIBUTE_ALIASES || input.key === ATTRIBUTE_ALIASES_OPENCTI) { - const filteredValues = input.value.filter((e) => normalizeName(e) !== normalizeName(instance.name)); - const uniqAliases = R.uniqBy((e) => normalizeName(e), filteredValues); - return { key: input.key, value: uniqAliases }; - } // No need to rework the input return input; }).filter((i) => isNotEmptyField(i)); @@ -1465,7 +1474,7 @@ const updateAttributeRaw = async (context, user, instance, inputs, opts = {}) => const elements = Array.isArray(inputs) ? inputs : [inputs]; const instanceType = instance.entity_type; // Prepare attributes - const preparedElements = prepareAttributesForUpdate(instance, elements, upsert); + const preparedElements = await prepareAttributesForUpdate(context, user, instance, elements, upsert); // region Check date range const inputKeys = elements.map((i) => i.key); if (inputKeys.includes(START_TIME) || inputKeys.includes(STOP_TIME)) { @@ -2356,8 +2365,9 @@ const upsertElement = async (context, user, element, type, basePatch, opts = {}) const updatePatch = { ...basePatch }; // Handle attributes updates if (isNotEmptyField(basePatch.stix_id) || isNotEmptyField(basePatch.x_opencti_stix_ids)) { + const compareIds = [element.standard_id, generateStandardId(type, basePatch)]; const ids = [...(basePatch.x_opencti_stix_ids || [])]; - if (isNotEmptyField(basePatch.stix_id) && basePatch.stix_id !== element.standard_id) { + if (isNotEmptyField(basePatch.stix_id) && !compareIds.includes(basePatch.stix_id)) { ids.push(basePatch.stix_id); } if (ids.length > 0) { @@ -3081,7 +3091,8 @@ const createEntityRaw = async (context, user, rawInput, type, opts = {}) => { const standardId = resolvedInput.standard_id || generateStandardId(type, resolvedInput); // Check if the entity exists, must be done with SYSTEM USER to really find it. const existingEntities = []; - const existingByIdsPromise = internalFindByIds(context, SYSTEM_USER, participantIds, { type }); + const finderIds = [...participantIds, ...(context.previousStandard ? [context.previousStandard] : [])]; + const existingByIdsPromise = internalFindByIds(context, SYSTEM_USER, finderIds, { type }); // Hash are per definition keys. // When creating a hash, we can check all hashes to update or merge the result // Generating multiple standard ids could be a solution but to complex to implements @@ -3151,8 +3162,8 @@ const createEntityRaw = async (context, user, rawInput, type, opts = {}) => { const concurrentAliases = R.flatten(R.map((c) => [c[key], c.name], concurrentEntities)); const normedAliases = R.uniq(concurrentAliases.map((c) => normalizeName(c))); const filteredAliases = R.filter((i) => !normedAliases.includes(normalizeName(i)), resolvedInput[key] || []); - const inputAliases = { ...resolvedInput, [key]: filteredAliases }; - return upsertElement(context, user, existingByStandard, type, inputAliases, { ...opts, locks: participantIds }); + const resolvedAliases = { ...resolvedInput, [key]: filteredAliases }; + return upsertElement(context, user, existingByStandard, type, resolvedAliases, { ...opts, locks: participantIds }); } if (resolvedInput.update === true) { // The new one is new reference, merge all found entities @@ -3165,7 +3176,8 @@ const createEntityRaw = async (context, user, rawInput, type, opts = {}) => { } if (resolvedInput.stix_id && !existingEntities.map((n) => getInstanceIds(n)).flat().includes(resolvedInput.stix_id)) { const target = R.head(filteredEntities); - return upsertElement(context, user, target, type, { x_opencti_stix_ids: [...target.x_opencti_stix_ids, resolvedInput.stix_id] }, { ...opts, locks: participantIds }); + const resolvedStixIds = { ...target, x_opencti_stix_ids: [...target.x_opencti_stix_ids, resolvedInput.stix_id] }; + return upsertElement(context, user, target, type, resolvedStixIds, { ...opts, locks: participantIds }); } // If not we dont know what to do, just throw an exception. throw UnsupportedError('Cant upsert entity. Too many entities resolved', { input, entityIds }); diff --git a/opencti-platform/opencti-graphql/src/domain/connector.js b/opencti-platform/opencti-graphql/src/domain/connector.js index 1ad0f1bce446f..9f2260b5159dd 100644 --- a/opencti-platform/opencti-graphql/src/domain/connector.js +++ b/opencti-platform/opencti-graphql/src/domain/connector.js @@ -151,7 +151,7 @@ export const createSyncHttpUri = (sync, state, testMode) => { return `${httpBase(uri)}stream/${stream}`; } const from = isEmptyField(state) ? '0-0' : state; - const recover = sync.recover ?? sync.created_at; + const recover = sync.recover ?? now(); let streamUri = `${httpBase(uri)}stream/${stream}?from=${from}&listen-delete=${del}&no-dependencies=${dep}`; if (recover) { streamUri += `&recover=${recover}`; diff --git a/opencti-platform/opencti-graphql/src/domain/stixCoreObject.js b/opencti-platform/opencti-graphql/src/domain/stixCoreObject.js index 8dc6ec8122d73..2728d146a22e3 100644 --- a/opencti-platform/opencti-graphql/src/domain/stixCoreObject.js +++ b/opencti-platform/opencti-graphql/src/domain/stixCoreObject.js @@ -385,7 +385,7 @@ export const stixCoreObjectImportPush = async (context, user, id, file, args = { const isAutoExternal = !entitySetting ? false : entitySetting.platform_entity_files_ref; const filePath = `import/${previous.entity_type}/${internalId}`; // 01. Upload the file - const meta = { version: fileVersion }; + const meta = { version: fileVersion?.toISOString() }; if (isAutoExternal) { const key = `${filePath}/${filename}`; meta.external_reference_id = generateStandardId(ENTITY_TYPE_EXTERNAL_REFERENCE, { url: `/storage/get/${key}` }); diff --git a/opencti-platform/opencti-graphql/src/generated/graphql.ts b/opencti-platform/opencti-graphql/src/generated/graphql.ts index f312099972361..3ba19eaee9ae6 100644 --- a/opencti-platform/opencti-graphql/src/generated/graphql.ts +++ b/opencti-platform/opencti-graphql/src/generated/graphql.ts @@ -6759,7 +6759,7 @@ export type ExternalReferenceEditMutationsFieldPatchArgs = { export type ExternalReferenceEditMutationsImportPushArgs = { file: Scalars['Upload']['input']; noTriggerImport?: InputMaybe; - version?: InputMaybe; + version?: InputMaybe; }; @@ -21808,7 +21808,7 @@ export type StixCoreObjectEditMutationsExportPushArgs = { export type StixCoreObjectEditMutationsImportPushArgs = { file: Scalars['Upload']['input']; noTriggerImport?: InputMaybe; - version?: InputMaybe; + version?: InputMaybe; }; @@ -22381,7 +22381,7 @@ export type StixCyberObservableEditMutationsFieldPatchArgs = { export type StixCyberObservableEditMutationsImportPushArgs = { file: Scalars['Upload']['input']; noTriggerImport?: InputMaybe; - version?: InputMaybe; + version?: InputMaybe; }; @@ -22673,7 +22673,7 @@ export type StixDomainObjectEditMutationsFieldPatchArgs = { export type StixDomainObjectEditMutationsImportPushArgs = { file: Scalars['Upload']['input']; noTriggerImport?: InputMaybe; - version?: InputMaybe; + version?: InputMaybe; }; diff --git a/opencti-platform/opencti-graphql/src/graphql/graphql.js b/opencti-platform/opencti-graphql/src/graphql/graphql.js index 657a37f4e714e..9652e2a584653 100644 --- a/opencti-platform/opencti-graphql/src/graphql/graphql.js +++ b/opencti-platform/opencti-graphql/src/graphql/graphql.js @@ -78,6 +78,7 @@ const createApolloServer = () => { executeContext.req = req; executeContext.res = res; executeContext.synchronizedUpsert = req.headers['synchronized-upsert'] === 'true'; + executeContext.previousStandard = req.headers['previous-standard']; executeContext.workId = req.headers['opencti-work-id']; try { const user = await authenticateUserFromRequest(executeContext, req, res); diff --git a/opencti-platform/opencti-graphql/src/manager/syncManager.js b/opencti-platform/opencti-graphql/src/manager/syncManager.js index 9ffb5b066ad4e..ca33928aab081 100644 --- a/opencti-platform/opencti-graphql/src/manager/syncManager.js +++ b/opencti-platform/opencti-graphql/src/manager/syncManager.js @@ -1,7 +1,5 @@ -import * as R from 'ramda'; import EventSource from 'eventsource'; import { clearIntervalAsync, setIntervalAsync } from 'set-interval-async/fixed'; -import * as jsonpatch from 'fast-json-patch'; import conf, { booleanConf, getPlatformHttpProxyAgent, logApp } from '../config/conf'; import { executionContext, SYSTEM_USER } from '../utils/access'; import { TYPE_LOCK_ERROR } from '../config/errors'; @@ -80,13 +78,9 @@ const syncManagerInstance = (syncId) => { }; const transformDataWithReverseIdAndFilesData = async (sync, httpClient, data, context) => { const { uri } = sync; - let processingData = data; + const processingData = { ...data }; // Reverse patch the id if modified - const idOperations = (context?.reverse_patch ?? []).filter((patch) => patch.path === '/id'); - if (idOperations.length > 0) { - const { newDocument: stixPreviousID } = jsonpatch.applyPatch(R.clone(data), idOperations); - processingData = stixPreviousID; - } + const idOperation = (context?.reverse_patch ?? []).find((patch) => patch.path === '/id'); // Handle file enrichment const entityFiles = processingData.extensions[STIX_EXT_OCTI].files ?? []; for (let index = 0; index < entityFiles.length; index += 1) { @@ -95,7 +89,7 @@ const syncManagerInstance = (syncId) => { const response = await httpClient.get(`${httpBase(uri)}${fileUri.substring(fileUri.indexOf('storage/get'))}`); entityFile.data = Buffer.from(response.data, 'utf-8').toString('base64'); } - return processingData; + return { data: processingData, previous_standard: idOperation?.value }; }; const saveCurrentState = async (context, type, sync, eventId) => { const currentTime = new Date().getTime(); @@ -139,11 +133,17 @@ const syncManagerInstance = (syncId) => { if (eventType === 'heartbeat') { await saveCurrentState(context, eventType, sync, eventId); } else { - const syncData = await transformDataWithReverseIdAndFilesData(sync, httpClient, data, eventContext); + const { data: syncData, previous_standard } = await transformDataWithReverseIdAndFilesData(sync, httpClient, data, eventContext); const enrichedEvent = JSON.stringify({ id: eventId, type: eventType, data: syncData, context: eventContext }); const content = Buffer.from(enrichedEvent, 'utf-8').toString('base64'); // Applicant_id should be a userId coming from synchronizer - await pushToSync({ type: 'event', synchronized, update: true, applicant_id: sync.user_id ?? OPENCTI_SYSTEM_UUID, content }); + await pushToSync({ + type: 'event', + synchronized, + previous_standard, + update: true, + applicant_id: sync.user_id ?? OPENCTI_SYSTEM_UUID, + content }); await saveCurrentState(context, 'event', sync, eventId); } } catch (e) { diff --git a/opencti-worker/src/worker.py b/opencti-worker/src/worker.py index eb0fdefc72e7c..a16d289b19c20 100644 --- a/opencti-worker/src/worker.py +++ b/opencti-worker/src/worker.py @@ -247,6 +247,8 @@ def data_handler( # pylint: disable=too-many-statements, too-many-locals work_id = data["work_id"] if "work_id" in data else None synchronized = data["synchronized"] if "synchronized" in data else False self.api.set_synchronized_upsert_header(synchronized) + previous_standard = data.get("previous_standard") + self.api.set_previous_standard_header(previous_standard) # Execute the import self.processing_count += 1 content = "Unparseable"