Skip to content

Commit

Permalink
IMN-798 Add events service v2 in catalog-platformstate-writer (#942)
Browse files Browse the repository at this point in the history
Co-authored-by: Roberto Taglioni <roberto.taglioni@pagopa.it>
  • Loading branch information
shuyec and taglioni-r authored Oct 4, 2024
1 parent 749d53d commit 5ca0bc4
Show file tree
Hide file tree
Showing 16 changed files with 2,590 additions and 108 deletions.
197 changes: 191 additions & 6 deletions packages/catalog-platformstate-writer/src/consumerServiceV2.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,183 @@
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { EServiceEventEnvelopeV2 } from "pagopa-interop-models";
import {
Descriptor,
DescriptorId,
descriptorState,
EService,
EServiceEventEnvelopeV2,
EServiceV2,
fromEServiceV2,
makeGSIPKEServiceIdDescriptorId,
makePlatformStatesEServiceDescriptorPK,
missingKafkaMessageDataError,
PlatformStatesCatalogEntry,
unsafeBrandId,
} from "pagopa-interop-models";
import { match } from "ts-pattern";
import {
deleteCatalogEntry,
descriptorStateToItemState,
readCatalogEntry,
updateDescriptorStateInPlatformStatesEntry,
updateDescriptorStateInTokenGenerationStatesTable,
writeCatalogEntry,
} from "./utils.js";

export async function handleMessageV2(
message: EServiceEventEnvelopeV2,
_dynamoDBClient: DynamoDBClient
dynamoDBClient: DynamoDBClient
): Promise<void> {
await match(message)
.with({ type: "EServiceDescriptorPublished" }, async (msg) => {
const { eservice, descriptor } = parseEServiceAndDescriptor(
msg.data.eservice,
unsafeBrandId(msg.data.descriptorId),
message.type
);
const previousDescriptor = eservice.descriptors.find(
(d) => d.version === (Number(descriptor.version) - 1).toString()
);

// flow for current descriptor
const processCurrentDescriptor = async (): Promise<void> => {
const primaryKeyCurrent = makePlatformStatesEServiceDescriptorPK({
eserviceId: eservice.id,
descriptorId: descriptor.id,
});
const existingCatalogEntryCurrent = await readCatalogEntry(
primaryKeyCurrent,
dynamoDBClient
);
if (existingCatalogEntryCurrent) {
if (existingCatalogEntryCurrent.version > msg.version) {
// Stops processing if the message is older than the catalog entry
return Promise.resolve();
} else {
await updateDescriptorStateInPlatformStatesEntry(
dynamoDBClient,
primaryKeyCurrent,
descriptorStateToItemState(descriptor.state),
msg.version
);
}
} else {
const catalogEntry: PlatformStatesCatalogEntry = {
PK: primaryKeyCurrent,
state: descriptorStateToItemState(descriptor.state),
descriptorAudience: descriptor.audience,
descriptorVoucherLifespan: descriptor.voucherLifespan,
version: msg.version,
updatedAt: new Date().toISOString(),
};

await writeCatalogEntry(catalogEntry, dynamoDBClient);
}

// token-generation-states
const eserviceId_descriptorId = makeGSIPKEServiceIdDescriptorId({
eserviceId: eservice.id,
descriptorId: descriptor.id,
});
await updateDescriptorStateInTokenGenerationStatesTable(
eserviceId_descriptorId,
descriptorStateToItemState(descriptor.state),
dynamoDBClient
);
};

await processCurrentDescriptor();

// flow for previous descriptor

if (
!previousDescriptor ||
previousDescriptor.state !== descriptorState.archived
) {
return Promise.resolve();
} else {
const primaryKeyPrevious = makePlatformStatesEServiceDescriptorPK({
eserviceId: eservice.id,
descriptorId: previousDescriptor.id,
});

await deleteCatalogEntry(primaryKeyPrevious, dynamoDBClient);

// token-generation-states
const eserviceId_descriptorId_previous =
makeGSIPKEServiceIdDescriptorId({
eserviceId: eservice.id,
descriptorId: previousDescriptor.id,
});
await updateDescriptorStateInTokenGenerationStatesTable(
eserviceId_descriptorId_previous,
descriptorStateToItemState(previousDescriptor.state),
dynamoDBClient
);
}
})
.with(
{ type: "EServiceDescriptorActivated" },
{ type: "EServiceDescriptorSuspended" },
async (msg) => {
const { eservice, descriptor } = parseEServiceAndDescriptor(
msg.data.eservice,
unsafeBrandId(msg.data.descriptorId),
message.type
);
const primaryKey = makePlatformStatesEServiceDescriptorPK({
eserviceId: eservice.id,
descriptorId: descriptor.id,
});
const catalogEntry = await readCatalogEntry(primaryKey, dynamoDBClient);

if (!catalogEntry || catalogEntry.version > msg.version) {
return Promise.resolve();
} else {
await updateDescriptorStateInPlatformStatesEntry(
dynamoDBClient,
primaryKey,
descriptorStateToItemState(descriptor.state),
msg.version
);

// token-generation-states
const eserviceId_descriptorId = makeGSIPKEServiceIdDescriptorId({
eserviceId: eservice.id,
descriptorId: descriptor.id,
});
await updateDescriptorStateInTokenGenerationStatesTable(
eserviceId_descriptorId,
descriptorStateToItemState(descriptor.state),
dynamoDBClient
);
}
}
)
.with({ type: "EServiceDescriptorArchived" }, async (msg) => {
const { eservice, descriptor } = parseEServiceAndDescriptor(
msg.data.eservice,
unsafeBrandId<DescriptorId>(msg.data.descriptorId),
msg.type
);

const primaryKey = makePlatformStatesEServiceDescriptorPK({
eserviceId: eservice.id,
descriptorId: unsafeBrandId<DescriptorId>(msg.data.descriptorId),
});
await deleteCatalogEntry(primaryKey, dynamoDBClient);

// token-generation-states
const descriptorId = unsafeBrandId<DescriptorId>(msg.data.descriptorId);
const eserviceId_descriptorId = makeGSIPKEServiceIdDescriptorId({
eserviceId: eservice.id,
descriptorId,
});
await updateDescriptorStateInTokenGenerationStatesTable(
eserviceId_descriptorId,
descriptorStateToItemState(descriptor.state),
dynamoDBClient
);
})
.with(
{ type: "EServiceDeleted" },
{ type: "EServiceAdded" },
Expand All @@ -16,10 +187,6 @@ export async function handleMessageV2(
{ type: "EServiceDraftDescriptorDeleted" },
{ type: "EServiceDraftDescriptorUpdated" },
{ type: "EServiceDescriptorQuotasUpdated" },
{ type: "EServiceDescriptorActivated" },
{ type: "EServiceDescriptorArchived" },
{ type: "EServiceDescriptorPublished" },
{ type: "EServiceDescriptorSuspended" },
{ type: "EServiceDescriptorInterfaceAdded" },
{ type: "EServiceDescriptorDocumentAdded" },
{ type: "EServiceDescriptorInterfaceUpdated" },
Expand All @@ -34,3 +201,21 @@ export async function handleMessageV2(
)
.exhaustive();
}

export const parseEServiceAndDescriptor = (
eserviceV2: EServiceV2 | undefined,
descriptorId: DescriptorId,
eventType: string
): { eservice: EService; descriptor: Descriptor } => {
if (!eserviceV2) {
throw missingKafkaMessageDataError("eservice", eventType);
}

const eservice = fromEServiceV2(eserviceV2);

const descriptor = eservice.descriptors.find((d) => d.id === descriptorId);
if (!descriptor) {
throw missingKafkaMessageDataError("descriptor", eventType);
}
return { eservice, descriptor };
};
Loading

0 comments on commit 5ca0bc4

Please sign in to comment.