Skip to content

Commit

Permalink
[backend] Improve hub/children synchronization pattern (#6253)
Browse files Browse the repository at this point in the history
Co-authored-by: Julien Richard <julien.richard@filigran.io>
  • Loading branch information
2 people authored and Goumies committed Mar 13, 2024
1 parent f7eb8f5 commit 2178248
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7524,7 +7524,7 @@ type ExternalReferenceEditMutations {
relationAdd(input: StixRefRelationshipAddInput!): StixRefRelationship
relationDelete(fromId: StixRef!, relationship_type: String!): ExternalReference
askEnrichment(connectorId: ID!): Work
importPush(file: Upload!, version: String, noTriggerImport: Boolean): File
importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File
}

type KillChainPhaseEditMutations {
Expand All @@ -7544,7 +7544,7 @@ type StixCoreObjectEditMutations {
restrictionOrganizationAdd(organizationId: ID!): StixCoreObject
restrictionOrganizationDelete(organizationId: ID!): StixCoreObject
askEnrichment(connectorId: ID!): Work
importPush(file: Upload!, version: String, noTriggerImport: Boolean): File
importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File
exportAsk(format: String!, exportType: String!, maxMarkingDefinition: String, contentMaxMarkings: [String], fileMaxMarkings: [String]): [File!]
exportPush(file: Upload!): Boolean
}
Expand All @@ -7565,7 +7565,7 @@ type StixDomainObjectEditMutations {
relationAdd(input: StixRefRelationshipAddInput!): StixRefRelationship
relationsAdd(input: StixRefRelationshipsAddInput!): StixDomainObject
relationDelete(toId: StixRef!, relationship_type: String): StixDomainObject
importPush(file: Upload!, version: String, noTriggerImport: Boolean): File
importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File
exportAsk(format: String!, exportType: String!, contentMaxMarkings: [String], fileMaxMarkings: [String]): [File!]
exportPush(file: Upload!, file_max_markings: [String]!): Boolean
stixDomainObjectFileEdit(input: StixDomainObjectFileEditInput): StixDomainObject
Expand Down Expand Up @@ -7798,7 +7798,7 @@ type StixCyberObservableEditMutations {
relationsAdd(input: StixRefRelationshipsAddInput!): StixCyberObservable
relationDelete(toId: StixRef!, relationship_type: String!): StixCyberObservable
promote: StixCyberObservable
importPush(file: Upload!, version: String, noTriggerImport: Boolean): File
importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File
exportAsk(format: String!, exportType: String!, maxMarkingDefinition: String): [File!]
exportPush(file: Upload!): Boolean
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12016,7 +12016,7 @@ type ExternalReferenceEditMutations {
relationAdd(input: StixRefRelationshipAddInput!): StixRefRelationship @auth(for: [KNOWLEDGE_KNUPDATE])
relationDelete(fromId: StixRef!, relationship_type: String!): ExternalReference @auth(for: [KNOWLEDGE_KNUPDATE])
askEnrichment(connectorId: ID!): Work @auth(for: [KNOWLEDGE_KNENRICHMENT])
importPush(file: Upload!, version: String, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD])
importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD])
}
type KillChainPhaseEditMutations {
delete: ID
Expand All @@ -12037,7 +12037,7 @@ type StixCoreObjectEditMutations {
restrictionOrganizationAdd(organizationId: ID!): StixCoreObject @auth(for: [KNOWLEDGE_KNUPDATE_KNORGARESTRICT])
restrictionOrganizationDelete(organizationId: ID!): StixCoreObject @auth(for: [KNOWLEDGE_KNUPDATE_KNORGARESTRICT])
askEnrichment(connectorId: ID!): Work @auth(for: [KNOWLEDGE_KNENRICHMENT])
importPush(file: Upload!, version: String, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD])
importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD])
exportAsk(format: String!, exportType: String!, maxMarkingDefinition: String, contentMaxMarkings: [String], fileMaxMarkings: [String]): [File!]
@auth(for: [KNOWLEDGE_KNGETEXPORT_KNASKEXPORT])
exportPush(file: Upload!): Boolean @auth(for: [CONNECTORAPI])
Expand All @@ -12062,7 +12062,7 @@ type StixDomainObjectEditMutations {
relationAdd(input: StixRefRelationshipAddInput!): StixRefRelationship @auth(for: [KNOWLEDGE_KNUPDATE])
relationsAdd(input: StixRefRelationshipsAddInput!): StixDomainObject @auth(for: [KNOWLEDGE_KNUPDATE])
relationDelete(toId: StixRef!, relationship_type: String): StixDomainObject @auth(for: [KNOWLEDGE_KNUPDATE])
importPush(file: Upload!, version: String, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD])
importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD])
exportAsk(format: String!, exportType: String!, contentMaxMarkings: [String], fileMaxMarkings: [String]): [File!]
@auth(for: [KNOWLEDGE_KNGETEXPORT_KNASKEXPORT])
exportPush(file: Upload!, file_max_markings: [String]!): Boolean @auth(for: [CONNECTORAPI])
Expand Down Expand Up @@ -12292,7 +12292,7 @@ type StixCyberObservableEditMutations {
relationsAdd(input: StixRefRelationshipsAddInput!): StixCyberObservable @auth(for: [KNOWLEDGE_KNUPDATE])
relationDelete(toId: StixRef!, relationship_type: String!): StixCyberObservable @auth(for: [KNOWLEDGE_KNUPDATE])
promote: StixCyberObservable @auth(for: [KNOWLEDGE_KNUPDATE])
importPush(file: Upload!, version: String, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD])
importPush(file: Upload!, version: DateTime, noTriggerImport: Boolean): File @auth(for: [KNOWLEDGE_KNUPLOAD])
exportAsk(format: String!, exportType: String!, maxMarkingDefinition: String): [File!]
@auth(for: [KNOWLEDGE_KNGETEXPORT_KNASKEXPORT])
exportPush(file: Upload!): Boolean @auth(for: [CONNECTORAPI])
Expand Down
4 changes: 2 additions & 2 deletions opencti-platform/opencti-graphql/src/database/file-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { defaultProvider } from '@aws-sdk/credential-provider-node';
import { getDefaultRoleAssumerWithWebIdentity } from '@aws-sdk/client-sts';
import mime from 'mime-types';
import conf, { booleanConf, ENABLED_FILE_INDEX_MANAGER, logApp } from '../config/conf';
import { now, sinceNowInMinutes, truncate } from '../utils/format';
import { now, sinceNowInMinutes, truncate, utcDate } from '../utils/format';
import { DatabaseError, FunctionalError, UnsupportedError } from '../config/errors';
import { createWork, deleteWorkForFile, deleteWorkForSource } from '../domain/work';
import { isNotEmptyField } from './utils';
Expand Down Expand Up @@ -328,7 +328,7 @@ export const upload = async (context, user, filePath, fileUpload, opts) => {
const key = `${filePath}/${truncatedFileName}`;
const currentFile = await documentFindById(context, user, key);
if (currentFile) {
if (currentFile.metaData?.version === metadata.version) {
if (utcDate(currentFile.metaData.version).isSameOrAfter(utcDate(metadata.version))) {
return { upload: currentFile, untouched: true };
}
if (errorOnExisting) {
Expand Down
72 changes: 42 additions & 30 deletions opencti-platform/opencti-graphql/src/database/middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -1359,8 +1359,9 @@ const innerUpdateAttribute = (instance, rawInput) => {
}
return input;
};
const prepareAttributesForUpdate = (instance, elements, upsert) => {
const prepareAttributesForUpdate = async (context, user, instance, elements, upsert) => {
const instanceType = instance.entity_type;
const platformStatuses = await getEntitiesListFromCache(context, user, ENTITY_TYPE_STATUS);
return elements.map((input) => {
// Dynamic cases, attributes not defined in the schema
if (input.key.startsWith(RULE_PREFIX) || input.key.startsWith(REL_INDEX_PREFIX)) {
Expand All @@ -1371,6 +1372,37 @@ const prepareAttributesForUpdate = (instance, elements, upsert) => {
if (!def) {
throw UnsupportedError('Cant prepare attribute for update', { type: instance.entity_type, name: input.key });
}
// Specific case for Label
if (input.key === VALUE_FIELD && instanceType === ENTITY_TYPE_LABEL) {
return {
key: input.key,
value: input.value.map((v) => v.toLowerCase())
};
}
// Specific case for name in aliased entities
// If name change already inside aliases, name must be kep untouched
if (upsert && input.key === NAME_FIELD && isStixObjectAliased(instanceType)) {
const aliasField = resolveAliasesField(instanceType).name;
const normalizeAliases = instance[aliasField] ? instance[aliasField].map((e) => normalizeName(e)) : [];
const name = normalizeName(input.value.at(0));
if ((normalizeAliases).includes(name)) {
return null;
}
}
// Aliases can't have the same name as entity name and an already existing normalized alias
if (input.key === ATTRIBUTE_ALIASES || input.key === ATTRIBUTE_ALIASES_OPENCTI) {
const filteredValues = input.value.filter((e) => normalizeName(e) !== normalizeName(instance.name));
const uniqAliases = R.uniqBy((e) => normalizeName(e), filteredValues);
return { key: input.key, value: uniqAliases };
}
// For upsert, workflow cant be reset or setup on un-existing workflow
if (input.key === X_WORKFLOW_ID && upsert) {
const workflowId = R.head(input.value);
const workflowStatus = workflowId ? platformStatuses.find((p) => p.id === workflowId) : workflowId;
if (isEmptyField(workflowStatus)) { // If workflow is not found, remove the input
return null;
}
}
// Check integer
if (def.type === 'numeric') {
return {
Expand Down Expand Up @@ -1409,29 +1441,6 @@ const prepareAttributesForUpdate = (instance, elements, upsert) => {
};
}
}
// Specific case for Label
if (input.key === VALUE_FIELD && instanceType === ENTITY_TYPE_LABEL) {
return {
key: input.key,
value: input.value.map((v) => v.toLowerCase())
};
}
// Specific case for name in aliased entities
// If name change already inside aliases, name must be kep untouched
if (upsert && input.key === NAME_FIELD && isStixObjectAliased(instanceType)) {
const aliasField = resolveAliasesField(instanceType).name;
const normalizeAliases = instance[aliasField] ? instance[aliasField].map((e) => normalizeName(e)) : [];
const name = normalizeName(input.value.at(0));
if ((normalizeAliases).includes(name)) {
return null;
}
}
// Aliases can't have the same name as entity name and an already existing normalized alias
if (input.key === ATTRIBUTE_ALIASES || input.key === ATTRIBUTE_ALIASES_OPENCTI) {
const filteredValues = input.value.filter((e) => normalizeName(e) !== normalizeName(instance.name));
const uniqAliases = R.uniqBy((e) => normalizeName(e), filteredValues);
return { key: input.key, value: uniqAliases };
}
// No need to rework the input
return input;
}).filter((i) => isNotEmptyField(i));
Expand Down Expand Up @@ -1465,7 +1474,7 @@ const updateAttributeRaw = async (context, user, instance, inputs, opts = {}) =>
const elements = Array.isArray(inputs) ? inputs : [inputs];
const instanceType = instance.entity_type;
// Prepare attributes
const preparedElements = prepareAttributesForUpdate(instance, elements, upsert);
const preparedElements = await prepareAttributesForUpdate(context, user, instance, elements, upsert);
// region Check date range
const inputKeys = elements.map((i) => i.key);
if (inputKeys.includes(START_TIME) || inputKeys.includes(STOP_TIME)) {
Expand Down Expand Up @@ -2356,8 +2365,9 @@ const upsertElement = async (context, user, element, type, basePatch, opts = {})
const updatePatch = { ...basePatch };
// Handle attributes updates
if (isNotEmptyField(basePatch.stix_id) || isNotEmptyField(basePatch.x_opencti_stix_ids)) {
const compareIds = [element.standard_id, generateStandardId(type, basePatch)];
const ids = [...(basePatch.x_opencti_stix_ids || [])];
if (isNotEmptyField(basePatch.stix_id) && basePatch.stix_id !== element.standard_id) {
if (isNotEmptyField(basePatch.stix_id) && !compareIds.includes(basePatch.stix_id)) {
ids.push(basePatch.stix_id);
}
if (ids.length > 0) {
Expand Down Expand Up @@ -3081,7 +3091,8 @@ const createEntityRaw = async (context, user, rawInput, type, opts = {}) => {
const standardId = resolvedInput.standard_id || generateStandardId(type, resolvedInput);
// Check if the entity exists, must be done with SYSTEM USER to really find it.
const existingEntities = [];
const existingByIdsPromise = internalFindByIds(context, SYSTEM_USER, participantIds, { type });
const finderIds = [...participantIds, ...(context.previousStandard ? [context.previousStandard] : [])];
const existingByIdsPromise = internalFindByIds(context, SYSTEM_USER, finderIds, { type });
// Hash are per definition keys.
// When creating a hash, we can check all hashes to update or merge the result
// Generating multiple standard ids could be a solution but to complex to implements
Expand Down Expand Up @@ -3151,8 +3162,8 @@ const createEntityRaw = async (context, user, rawInput, type, opts = {}) => {
const concurrentAliases = R.flatten(R.map((c) => [c[key], c.name], concurrentEntities));
const normedAliases = R.uniq(concurrentAliases.map((c) => normalizeName(c)));
const filteredAliases = R.filter((i) => !normedAliases.includes(normalizeName(i)), resolvedInput[key] || []);
const inputAliases = { ...resolvedInput, [key]: filteredAliases };
return upsertElement(context, user, existingByStandard, type, inputAliases, { ...opts, locks: participantIds });
const resolvedAliases = { ...resolvedInput, [key]: filteredAliases };
return upsertElement(context, user, existingByStandard, type, resolvedAliases, { ...opts, locks: participantIds });
}
if (resolvedInput.update === true) {
// The new one is new reference, merge all found entities
Expand All @@ -3165,7 +3176,8 @@ const createEntityRaw = async (context, user, rawInput, type, opts = {}) => {
}
if (resolvedInput.stix_id && !existingEntities.map((n) => getInstanceIds(n)).flat().includes(resolvedInput.stix_id)) {
const target = R.head(filteredEntities);
return upsertElement(context, user, target, type, { x_opencti_stix_ids: [...target.x_opencti_stix_ids, resolvedInput.stix_id] }, { ...opts, locks: participantIds });
const resolvedStixIds = { ...target, x_opencti_stix_ids: [...target.x_opencti_stix_ids, resolvedInput.stix_id] };
return upsertElement(context, user, target, type, resolvedStixIds, { ...opts, locks: participantIds });
}
// If not we dont know what to do, just throw an exception.
throw UnsupportedError('Cant upsert entity. Too many entities resolved', { input, entityIds });
Expand Down
2 changes: 1 addition & 1 deletion opencti-platform/opencti-graphql/src/domain/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export const createSyncHttpUri = (sync, state, testMode) => {
return `${httpBase(uri)}stream/${stream}`;
}
const from = isEmptyField(state) ? '0-0' : state;
const recover = sync.recover ?? sync.created_at;
const recover = sync.recover ?? now();
let streamUri = `${httpBase(uri)}stream/${stream}?from=${from}&listen-delete=${del}&no-dependencies=${dep}`;
if (recover) {
streamUri += `&recover=${recover}`;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ export const stixCoreObjectImportPush = async (context, user, id, file, args = {
const isAutoExternal = !entitySetting ? false : entitySetting.platform_entity_files_ref;
const filePath = `import/${previous.entity_type}/${internalId}`;
// 01. Upload the file
const meta = { version: fileVersion };
const meta = { version: fileVersion?.toISOString() };
if (isAutoExternal) {
const key = `${filePath}/${filename}`;
meta.external_reference_id = generateStandardId(ENTITY_TYPE_EXTERNAL_REFERENCE, { url: `/storage/get/${key}` });
Expand Down
8 changes: 4 additions & 4 deletions opencti-platform/opencti-graphql/src/generated/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6759,7 +6759,7 @@ export type ExternalReferenceEditMutationsFieldPatchArgs = {
export type ExternalReferenceEditMutationsImportPushArgs = {
file: Scalars['Upload']['input'];
noTriggerImport?: InputMaybe<Scalars['Boolean']['input']>;
version?: InputMaybe<Scalars['String']['input']>;
version?: InputMaybe<Scalars['DateTime']['input']>;
};


Expand Down Expand Up @@ -21812,7 +21812,7 @@ export type StixCoreObjectEditMutationsExportPushArgs = {
export type StixCoreObjectEditMutationsImportPushArgs = {
file: Scalars['Upload']['input'];
noTriggerImport?: InputMaybe<Scalars['Boolean']['input']>;
version?: InputMaybe<Scalars['String']['input']>;
version?: InputMaybe<Scalars['DateTime']['input']>;
};


Expand Down Expand Up @@ -22385,7 +22385,7 @@ export type StixCyberObservableEditMutationsFieldPatchArgs = {
export type StixCyberObservableEditMutationsImportPushArgs = {
file: Scalars['Upload']['input'];
noTriggerImport?: InputMaybe<Scalars['Boolean']['input']>;
version?: InputMaybe<Scalars['String']['input']>;
version?: InputMaybe<Scalars['DateTime']['input']>;
};


Expand Down Expand Up @@ -22679,7 +22679,7 @@ export type StixDomainObjectEditMutationsFieldPatchArgs = {
export type StixDomainObjectEditMutationsImportPushArgs = {
file: Scalars['Upload']['input'];
noTriggerImport?: InputMaybe<Scalars['Boolean']['input']>;
version?: InputMaybe<Scalars['String']['input']>;
version?: InputMaybe<Scalars['DateTime']['input']>;
};


Expand Down
1 change: 1 addition & 0 deletions opencti-platform/opencti-graphql/src/graphql/graphql.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const createApolloServer = () => {
executeContext.req = req;
executeContext.res = res;
executeContext.synchronizedUpsert = req.headers['synchronized-upsert'] === 'true';
executeContext.previousStandard = req.headers['previous-standard'];
executeContext.workId = req.headers['opencti-work-id'];
try {
const user = await authenticateUserFromRequest(executeContext, req, res);
Expand Down
22 changes: 11 additions & 11 deletions opencti-platform/opencti-graphql/src/manager/syncManager.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import * as R from 'ramda';
import EventSource from 'eventsource';
import { clearIntervalAsync, setIntervalAsync } from 'set-interval-async/fixed';
import * as jsonpatch from 'fast-json-patch';
import conf, { booleanConf, getPlatformHttpProxyAgent, logApp } from '../config/conf';
import { executionContext, SYSTEM_USER } from '../utils/access';
import { TYPE_LOCK_ERROR } from '../config/errors';
Expand Down Expand Up @@ -80,13 +78,9 @@ const syncManagerInstance = (syncId) => {
};
const transformDataWithReverseIdAndFilesData = async (sync, httpClient, data, context) => {
const { uri } = sync;
let processingData = data;
const processingData = { ...data };
// Reverse patch the id if modified
const idOperations = (context?.reverse_patch ?? []).filter((patch) => patch.path === '/id');
if (idOperations.length > 0) {
const { newDocument: stixPreviousID } = jsonpatch.applyPatch(R.clone(data), idOperations);
processingData = stixPreviousID;
}
const idOperation = (context?.reverse_patch ?? []).find((patch) => patch.path === '/id');
// Handle file enrichment
const entityFiles = processingData.extensions[STIX_EXT_OCTI].files ?? [];
for (let index = 0; index < entityFiles.length; index += 1) {
Expand All @@ -95,7 +89,7 @@ const syncManagerInstance = (syncId) => {
const response = await httpClient.get(`${httpBase(uri)}${fileUri.substring(fileUri.indexOf('storage/get'))}`);
entityFile.data = Buffer.from(response.data, 'utf-8').toString('base64');
}
return processingData;
return { data: processingData, previous_standard: idOperation?.value };
};
const saveCurrentState = async (context, type, sync, eventId) => {
const currentTime = new Date().getTime();
Expand Down Expand Up @@ -139,11 +133,17 @@ const syncManagerInstance = (syncId) => {
if (eventType === 'heartbeat') {
await saveCurrentState(context, eventType, sync, eventId);
} else {
const syncData = await transformDataWithReverseIdAndFilesData(sync, httpClient, data, eventContext);
const { data: syncData, previous_standard } = await transformDataWithReverseIdAndFilesData(sync, httpClient, data, eventContext);
const enrichedEvent = JSON.stringify({ id: eventId, type: eventType, data: syncData, context: eventContext });
const content = Buffer.from(enrichedEvent, 'utf-8').toString('base64');
// Applicant_id should be a userId coming from synchronizer
await pushToSync({ type: 'event', synchronized, update: true, applicant_id: sync.user_id ?? OPENCTI_SYSTEM_UUID, content });
await pushToSync({
type: 'event',
synchronized,
previous_standard,
update: true,
applicant_id: sync.user_id ?? OPENCTI_SYSTEM_UUID,
content });
await saveCurrentState(context, 'event', sync, eventId);
}
} catch (e) {
Expand Down
Loading

0 comments on commit 2178248

Please sign in to comment.