Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ const controller = ({

logger.info(
LOG_MODULE,
`Request Delete '${entityName ? entityName : 'all'}' records on '{${actionType}}' Active Submission '${submissionId}'`,
`Request Delete '${entityName ? entityName : 'all'}' records on '${actionType}' Active Submission '${submissionId}'`,
);

const submission = await service.getSubmissionById(submissionId);
Expand Down Expand Up @@ -184,8 +184,7 @@ const controller = ({
const username = user?.username || '';

const editSubmittedDataResult = await dataService.editSubmittedData({
records: payload,
entityName,
data: { [entityName]: payload },
categoryId,
organization,
username,
Expand Down Expand Up @@ -312,8 +311,7 @@ const controller = ({
const username = user?.username || '';

const resultSubmission = await service.submit({
records: payload,
entityName,
data: { [entityName]: payload },
categoryId,
organization,
username,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { eq } from 'drizzle-orm/sql';
import { ListAllCategoriesResponse } from 'src/utils/types.js';

import { Dictionary as SchemasDictionary } from '@overture-stack/lectern-client';
import { Category, Dictionary, dictionaryCategories, NewCategory } from '@overture-stack/lyric-data-model/models';

import { BaseDependencies } from '../config/config.js';
import { ServiceUnavailable } from '../utils/errors.js';
import { ListAllCategoriesResponse } from '../utils/types.js';

const repository = (dependencies: BaseDependencies) => {
const LOG_MODULE = 'CATEGORY_REPOSITORY';
Expand Down
189 changes: 101 additions & 88 deletions packages/data-provider/src/services/submission/processor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as _ from 'lodash-es';

import { type DataRecord, DictionaryValidationRecordErrorDetails, type Schema } from '@overture-stack/lectern-client';
import { type DataRecord, DictionaryValidationRecordErrorDetails } from '@overture-stack/lectern-client';
import {
Submission,
SubmissionData,
Expand All @@ -13,11 +13,12 @@ import {
import { BaseDependencies } from '../../config/config.js';
import submissionRepository from '../../repository/activeSubmissionRepository.js';
import categoryRepository from '../../repository/categoryRepository.js';
import dictionaryRepository from '../../repository/dictionaryRepository.js';
import submittedRepository from '../../repository/submittedRepository.js';
import { getDictionarySchemaRelations, type SchemaChildNode } from '../../utils/dictionarySchemaRelations.js';
import { validateSchemas } from '../../utils/dictionaryUtils.js';
import { BadRequest } from '../../utils/errors.js';
import { convertRecordToString } from '../../utils/formatUtils.js';
import { mergeDeleteRecords, mergeInsertsRecords, mergeUpdatesBySystemId } from '../../utils/mergeRecords.js';
import { parseRecordsToEdit, parseRecordsToInsert } from '../../utils/recordsParser.js';
import {
extractSchemaDataFromMergedDataRecords,
filterDeletesFromUpdates,
Expand All @@ -26,12 +27,7 @@ import {
groupSchemaErrorsByEntity,
mapGroupedUpdateSubmissionData,
mergeAndReferenceEntityData,
mergeDeleteRecords,
mergeInsertsRecords,
mergeUpdatesBySystemId,
parseToSchema,
segregateFieldChangeRecords,
validateSchemas,
} from '../../utils/submissionUtils.js';
import {
computeDataDiff,
Expand All @@ -44,9 +40,10 @@ import {
} from '../../utils/submittedDataUtils.js';
import {
CommitSubmissionParams,
type EntityData,
type SchemasDictionary,
SUBMISSION_STATUS,
type SubmittedDataResponse,
type ValidateFilesParams,
} from '../../utils/types.js';
import searchDataRelations from '../submittedData/searchDataRelations.js';

Expand Down Expand Up @@ -431,14 +428,9 @@ const processor = (dependencies: BaseDependencies) => {
logger.info(LOG_MODULE, `Errors detected in data submission:${errorMessage}`);
}

// Update Active Submission
return await updateActiveSubmission({
// Update validation results for the active submission
return await updateValidationResultForSubmission({
idActiveSubmission: originalSubmission.id,
submissionData: {
inserts: submissionData.inserts,
deletes: submissionData.deletes,
updates: submissionData.updates,
},
schemaErrors: submissionSchemaErrors,
dictionaryId: currentDictionary.id,
username,
Expand All @@ -448,44 +440,55 @@ const processor = (dependencies: BaseDependencies) => {
/**
* Void function to process and validate uploaded records on an Active Submission.
* Performs the schema data validation of data to be edited combined with all Submitted Data.
* @param records Records to be processed
* @param records A map of entity names to arrays of raw records to be processed.
* @param params
* @param params.schema Schema to parse data with
* @param params.submission A `Submission` object representing the Active Submission
* @param params.schemasDictionary A dictionary of schema definitions used for record validation.
* @param params.submissionId Submission ID
* @param params.username User who performs the action
*/
const processEditRecordsAsync = async (
records: Record<string, unknown>[],
records: EntityData,
{
schema,
submission,
schemasDictionary,
submissionId,
username,
}: {
schema: Schema;
submission: Submission;
schemasDictionary: SchemasDictionary;
submissionId: number;
username: string;
},
): Promise<void> => {
const { getDictionaryById } = dictionaryRepository(dependencies);
const { getSubmissionById, update } = submissionRepository(dependencies);

try {
// Parse file data
const recordsParsed = records.map(convertRecordToString).map(parseToSchema(schema));
const mapRecordsParsed = parseRecordsToEdit(records, schemasDictionary);

const filesDataProcessed = await compareUpdatedData(recordsParsed);

const currentDictionary = await getDictionaryById(submission.dictionaryId);
if (!currentDictionary) {
throw new BadRequest(`Dictionary in category '${submission.dictionaryCategoryId}' not found`);
if (Object.keys(mapRecordsParsed).length === 0) {
throw new Error('No entities to edit on this submission');
}

const mapDataProcessed = Object.fromEntries(
await Promise.all(
Object.entries(mapRecordsParsed).map(async ([schemaName, dataRecords]) => {
const filesDataProcessed = await compareUpdatedData(dataRecords);
return [schemaName, filesDataProcessed];
}),
),
);

// get dictionary relations
const dictionaryRelations = getDictionarySchemaRelations(currentDictionary.dictionary);
const dictionaryRelations = getDictionarySchemaRelations(schemasDictionary.schemas);

// Get Active Submission from database
const activeSubmission = await getSubmissionById(submissionId);
if (!activeSubmission) {
throw new Error(`Submission '${activeSubmission}' not found`);
}

const foundDependentUpdates = await findUpdateDependents({
dictionaryRelations,
organization: submission.organization,
submissionUpdateData: { [schema.name]: filesDataProcessed },
organization: activeSubmission.organization,
submissionUpdateData: mapDataProcessed,
});

const systemIdsWithDependents: string[] = [];
Expand All @@ -511,14 +514,14 @@ const processor = (dependencies: BaseDependencies) => {

// Identify what requested updates involves ID and nonID field changes
const { idFieldChangeRecord, nonIdFieldChangeRecord } = segregateFieldChangeRecords(
{ [schema.name]: filesDataProcessed },
mapDataProcessed,
dictionaryRelations,
);

// Aggegates all Update changes on Submission
// Note: We do not include records involving primary ID fields changes in here. We would rather do a DELETE and an INSERT
const updatedActiveSubmissionData: Record<string, SubmissionUpdateData[]> = mergeUpdatesBySystemId(
submission.data.updates ?? {},
activeSubmission.data.updates ?? {},
totalDependants,
nonIdFieldChangeRecord,
);
Expand All @@ -527,30 +530,33 @@ const processor = (dependencies: BaseDependencies) => {
const additions = await handleIdFieldChanges(idFieldChangeRecord);

// Merge Active Submission Inserts with Edit generated new Inserts
const mergedInserts = mergeInsertsRecords(submission.data.inserts ?? {}, additions.inserts);
const mergedInserts = mergeInsertsRecords(activeSubmission.data.inserts ?? {}, additions.inserts);

// Merge Active Submission Deletes with Edit generated new Deletes
const mergedDeletes = mergeDeleteRecords(submission.data.deletes ?? {}, additions.deletes);
const mergedDeletes = mergeDeleteRecords(activeSubmission.data.deletes ?? {}, additions.deletes);

// filter out delete records found on update records
const filteredDeletes = filterDeletesFromUpdates(mergedDeletes, updatedActiveSubmissionData);

// Result merge Active Submission data with incoming TSV file data processed
const mergedSubmissionData: SubmissionData = {
inserts: mergedInserts,
deletes: filteredDeletes,
updates: updatedActiveSubmissionData,
};
await update(activeSubmission.id, {
data: mergedSubmissionData,
updatedBy: username,
});

// Perform Schema Data validation Async.
performDataValidation({
originalSubmission: submission,
submissionData: {
inserts: mergedInserts,
deletes: filteredDeletes,
updates: updatedActiveSubmissionData,
},
originalSubmission: activeSubmission,
submissionData: mergedSubmissionData,
username,
});
} catch (error) {
logger.error(
LOG_MODULE,
`There was an error processing records on entity '${schema.name}'`,
JSON.stringify(error),
);
logger.error(LOG_MODULE, `There was an error processing records on this submission`, JSON.stringify(error));
}
logger.info(LOG_MODULE, `Finished validating files`);
};
Expand Down Expand Up @@ -594,29 +600,27 @@ const processor = (dependencies: BaseDependencies) => {
};

/**
* Update Active Submission in database
* Store validation results for the active submission in the database.
* IMPORTANT: Submission data is not updated
* @param {Object} input
* @param {number} input.dictionaryId The Dictionary ID of the Submission
* @param {SubmissionData} input.submissionData Data to be submitted grouped on inserts, updates and deletes
* @param {number} input.idActiveSubmission ID of the Active Submission
* @param {Record<string, Record<string, DictionaryValidationRecordErrorDetails[]>>} input.schemaErrors Array of schemaErrors
* @param {string} input.username User updating the active submission
* @returns {Promise<Submission>} An Active Submission updated
*/
const updateActiveSubmission = async (input: {
const updateValidationResultForSubmission = async (input: {
dictionaryId: number;
submissionData: SubmissionData;
idActiveSubmission: number;
schemaErrors: Record<string, Record<string, DictionaryValidationRecordErrorDetails[]>>;
username: string;
}): Promise<Submission> => {
const { dictionaryId, submissionData, idActiveSubmission, schemaErrors, username } = input;
const { dictionaryId, idActiveSubmission, schemaErrors, username } = input;
const { update } = submissionRepository(dependencies);
const newStatusSubmission =
Object.keys(schemaErrors).length > 0 ? SUBMISSION_STATUS.INVALID : SUBMISSION_STATUS.VALID;
// Update with new data
// Update validation results only — submission data has already been updated.
const updatedActiveSubmission = await update(idActiveSubmission, {
data: submissionData,
status: newStatusSubmission,
dictionaryId: dictionaryId,
updatedBy: username,
Expand All @@ -631,54 +635,63 @@ const processor = (dependencies: BaseDependencies) => {
};

/**
* Void function to process and validate records on an Active Submission.
* Performs the schema data validation combined with all Submitted Data.
* @param {Record<string, unknown>} records Records to be processed
* @param {Object} params
* @param {number} params.categoryId Category Identifier
* @param {string} params.organization Organization name
* @param {Schema} params.schema Schema to validate records with
* @param {string} params.username User who performs the action
* @returns {void}
* Processes and validates a batch of incoming records for an active submission.
* This function updates the submission merging the new records with existing submission data.
* Performs a full schema data validation against the combined dataset
* @param params
* @param params.records A map of entity names to arrays of raw records to be processed.
* @param params.schemasDictionary A dictionary of schema definitions used for record validation.
* @param params.submissionId Submission ID
* @param params.username User who performs the action
* @returns
*/
const validateRecordsAsync = async (records: Record<string, unknown>[], params: ValidateFilesParams) => {
const { getActiveSubmission } = submissionRepository(dependencies);

const { categoryId, organization, username, schema } = params;
const processInsertRecordsAsync = async ({
records,
schemasDictionary,
submissionId,
username,
}: {
records: EntityData;
schemasDictionary: SchemasDictionary;
submissionId: number;
username: string;
}) => {
const { getSubmissionById, update } = submissionRepository(dependencies);

try {
// Get Active Submission from database
const activeSubmission = await getActiveSubmission({ categoryId, username, organization });
const activeSubmission = await getSubmissionById(submissionId);
if (!activeSubmission) {
throw new BadRequest(`Submission '${activeSubmission}' not found`);
throw new Error(`Submission '${activeSubmission}' not found`);
}

const recordsParsed = records.map(convertRecordToString).map(parseToSchema(schema));
const insertRecords = parseRecordsToInsert(records, schemasDictionary);

const insertRecords: Record<string, SubmissionInsertData> = {
[schema.name]: {
batchName: schema.name,
records: recordsParsed,
},
// Merge Active Submission insert records with incoming TSV file data processed
const insertActiveSubmissionData = mergeInsertsRecords(activeSubmission.data.inserts ?? {}, insertRecords);

// Result merged submission Data
const mergedSubmissionData: SubmissionData = {
inserts: insertActiveSubmissionData,
deletes: activeSubmission.data.deletes,
updates: activeSubmission.data.updates,
};

// Merge Active Submission data with incoming TSV file data processed
const insertActiveSubmissionData = mergeInsertsRecords(activeSubmission.data.inserts ?? {}, insertRecords);
await update(activeSubmission.id, {
data: mergedSubmissionData,
updatedBy: username,
});
Comment on lines +680 to +683
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When new data is added to the submission, it is merged with the existing data in the active submission and then persisted to the database. This ensures that the submission contains all data merged and complete before the the validation occurs afterward.


// Perform Schema Data validation Async.
await performDataValidation({
originalSubmission: activeSubmission,
submissionData: {
inserts: insertActiveSubmissionData,
deletes: activeSubmission.data.deletes,
updates: activeSubmission.data.updates,
},
submissionData: mergedSubmissionData,
username,
});
} catch (error) {
logger.error(
LOG_MODULE,
`There was an error processing records on entity '${schema.name}'`,
`There was an error processing records on submission '${submissionId}'`,
JSON.stringify(error),
);
}
Expand All @@ -689,8 +702,8 @@ const processor = (dependencies: BaseDependencies) => {
processEditRecordsAsync,
performCommitSubmissionAsync,
performDataValidation,
updateActiveSubmission,
validateRecordsAsync,
updateValidationResultForSubmission,
processInsertRecordsAsync,
};
};

Expand Down
Loading