Skip to content

Commit

Permalink
IMN-790 Add scaffold for agreement-platformstate-writer (#967)
Browse files Browse the repository at this point in the history
Co-authored-by: Stefano Hu <76391491+shuyec@users.noreply.github.com>
  • Loading branch information
taglioni-r and shuyec authored Oct 24, 2024
1 parent c261f3d commit 8339f68
Show file tree
Hide file tree
Showing 21 changed files with 434 additions and 6 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"start:catalog-platformstate-writer": "turbo start --filter pagopa-interop-catalog-platformstate-writer",
"start:agreement": "turbo start --filter pagopa-interop-agreement-process",
"start:agreement-readmodel-writer": "turbo start --filter pagopa-interop-agreement-readmodel-writer",
"start:agreement-platformstate-writer": "turbo start --filter pagopa-interop-agreement-platformstate-writer",
"start:agreement-email-sender": "turbo start --filter pagopa-interop-agreement-email-sender",
"start:attribute": "turbo start --filter pagopa-interop-attribute-registry-process",
"start:attribute-readmodel-writer": "turbo start --filter pagopa-interop-attribute-registry-readmodel-writer",
Expand Down
12 changes: 12 additions & 0 deletions packages/agreement-platformstate-writer/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
LOG_LEVEL=info

KAFKA_CLIENT_ID="agreement"
KAFKA_GROUP_ID="agreement-group-local"
KAFKA_BROKERS="localhost:9092"
KAFKA_DISABLE_AWS_IAM_AUTH="true"
AGREEMENT_TOPIC="event-store.agreement.events"
AWS_CONFIG_FILE=aws.config.local
TOKEN_GENERATION_READMODEL_TABLE_NAME_PLATFORM="platform-states"
TOKEN_GENERATION_READMODEL_TABLE_NAME_TOKEN_GENERATION="token-generation-states"

AWS_REGION="eu-central-1"
44 changes: 44 additions & 0 deletions packages/agreement-platformstate-writer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
FROM node:20.14.0-slim@sha256:5e8ac65a0231d76a388683d07ca36a9769ab019a85d85169fe28e206f7a3208e as build

RUN corepack enable

WORKDIR /app
COPY package.json /app/
COPY pnpm-lock.yaml /app/
COPY pnpm-workspace.yaml /app/

COPY ./packages/agreement-platformstate-writer/package.json /app/packages/agreement-platformstate-writer/package.json
COPY ./packages/commons/package.json /app/packages/commons/package.json
COPY ./packages/models/package.json /app/packages/models/package.json
COPY ./packages/kafka-iam-auth/package.json /app/packages/kafka-iam-auth/package.json

RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --frozen-lockfile

COPY tsconfig.json /app/
COPY turbo.json /app/
COPY ./packages/agreement-platformstate-writer /app/packages/agreement-platformstate-writer
COPY ./packages/commons /app/packages/commons
COPY ./packages/models /app/packages/models
COPY ./packages/kafka-iam-auth /app/packages/kafka-iam-auth

RUN pnpm build && \
rm -rf /app/node_modules/.modules.yaml && \
rm -rf /app/node_modules/.cache && \
mkdir /out && \
cp -a --parents -t /out \
node_modules packages/agreement-platformstate-writer/node_modules \
package*.json packages/agreement-platformstate-writer/package*.json \
packages/commons \
packages/models \
packages/kafka-iam-auth \
packages/agreement-platformstate-writer/dist && \
find /out -exec touch -h --date=@0 {} \;

FROM node:20.14.0-slim@sha256:5e8ac65a0231d76a388683d07ca36a9769ab019a85d85169fe28e206f7a3208e as final

COPY --from=build /out /app

WORKDIR /app/packages/agreement-platformstate-writer
EXPOSE 3000

CMD ["node", "."]
9 changes: 9 additions & 0 deletions packages/agreement-platformstate-writer/aws.config.local
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[default]
aws_access_key_id=key
aws_secret_access_key=secret
region=eu-south-1
services=local

[services local]
dynamodb=
endpoint_url=http://localhost:8085
47 changes: 47 additions & 0 deletions packages/agreement-platformstate-writer/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"name": "pagopa-interop-agreement-platformstate-writer",
"private": true,
"version": "1.0.0",
"description": "PagoPA Interoperability agreement consumer service that updates the token-generation-read-model",
"main": "dist",
"type": "module",
"scripts": {
"test": "vitest",
"test:it": "vitest integration",
"lint": "eslint . --ext .ts,.tsx",
"lint:autofix": "eslint . --ext .ts,.tsx --fix",
"format:check": "prettier --check src",
"format:write": "prettier --write src",
"start": "node --loader ts-node/esm -r 'dotenv-flow/config' --watch ./src/index.ts",
"build": "tsc",
"check": "tsc --project tsconfig.check.json"
},
"keywords": [],
"author": "",
"license": "Apache-2.0",
"devDependencies": {
"@pagopa/eslint-config": "3.0.0",
"@types/node": "20.14.6",
"@types/uuid": "9.0.8",
"date-fns": "3.6.0",
"pagopa-interop-commons-test": "workspace:*",
"prettier": "2.8.8",
"ts-node": "10.9.2",
"typescript": "5.4.5",
"uuid": "10.0.0",
"vitest": "1.6.0"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "3.637.0",
"@aws-sdk/util-dynamodb": "3.637.0",
"@protobuf-ts/runtime": "2.9.4",
"connection-string": "4.4.0",
"dotenv-flow": "4.1.0",
"kafka-iam-auth": "workspace:*",
"kafkajs": "2.2.4",
"pagopa-interop-commons": "workspace:*",
"pagopa-interop-models": "workspace:*",
"ts-pattern": "5.2.0",
"zod": "3.23.8"
}
}
15 changes: 15 additions & 0 deletions packages/agreement-platformstate-writer/src/config/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import {
AgreementTopicConfig,
PlatformStateWriterConfig,
} from "pagopa-interop-commons";
import { z } from "zod";

export const AgreementPlatformStateWriterConfig =
PlatformStateWriterConfig.and(AgreementTopicConfig);

export type AgreementPlatformStateWriterConfig = z.infer<
typeof AgreementPlatformStateWriterConfig
>;

export const config: AgreementPlatformStateWriterConfig =
AgreementPlatformStateWriterConfig.parse(process.env);
24 changes: 24 additions & 0 deletions packages/agreement-platformstate-writer/src/consumerServiceV1.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { match } from "ts-pattern";
import { AgreementEventEnvelopeV1 } from "pagopa-interop-models";
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";

export async function handleMessageV1(
message: AgreementEventEnvelopeV1,
_dynamoDBClient: DynamoDBClient
): Promise<void> {
await match(message)
.with(
{ type: "AgreementAdded" },
{ type: "AgreementActivated" },
{ type: "AgreementSuspended" },
{ type: "AgreementDeactivated" },
{ type: "AgreementDeleted" },
{ type: "VerifiedAttributeUpdated" },
{ type: "AgreementUpdated" },
{ type: "AgreementConsumerDocumentAdded" },
{ type: "AgreementConsumerDocumentRemoved" },
{ type: "AgreementContractAdded" },
async () => Promise.resolve()
)
.exhaustive();
}
33 changes: 33 additions & 0 deletions packages/agreement-platformstate-writer/src/consumerServiceV2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { AgreementEventEnvelopeV2 } from "pagopa-interop-models";
import { match } from "ts-pattern";

export async function handleMessageV2(
message: AgreementEventEnvelopeV2,
_dynamoDBClient: DynamoDBClient
): Promise<void> {
await match(message)
.with(
{ type: "AgreementAdded" },
{ type: "AgreementDeleted" },
{ type: "DraftAgreementUpdated" },
{ type: "AgreementSubmitted" },
{ type: "AgreementActivated" },
{ type: "AgreementUnsuspendedByProducer" },
{ type: "AgreementUnsuspendedByConsumer" },
{ type: "AgreementUnsuspendedByPlatform" },
{ type: "AgreementArchivedByConsumer" },
{ type: "AgreementArchivedByUpgrade" },
{ type: "AgreementUpgraded" },
{ type: "AgreementSuspendedByProducer" },
{ type: "AgreementSuspendedByConsumer" },
{ type: "AgreementSuspendedByPlatform" },
{ type: "AgreementRejected" },
{ type: "AgreementConsumerDocumentAdded" },
{ type: "AgreementConsumerDocumentRemoved" },
{ type: "AgreementSetDraftByPlatform" },
{ type: "AgreementSetMissingCertifiedAttributesByPlatform" },
() => Promise.resolve()
)
.exhaustive();
}
43 changes: 43 additions & 0 deletions packages/agreement-platformstate-writer/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { EachMessagePayload } from "kafkajs";
import { logger, decodeKafkaMessage } from "pagopa-interop-commons";
import { runConsumer } from "kafka-iam-auth";
import {
AgreementEvent,
CorrelationId,
generateId,
unsafeBrandId,
} from "pagopa-interop-models";
import { match } from "ts-pattern";
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { handleMessageV1 } from "./consumerServiceV1.js";
import { handleMessageV2 } from "./consumerServiceV2.js";
import { config } from "./config/config.js";

const dynamoDBClient = new DynamoDBClient({});
async function processMessage({
message,
partition,
}: EachMessagePayload): Promise<void> {
const decodedMessage = decodeKafkaMessage(message, AgreementEvent);

const loggerInstance = logger({
serviceName: "agreement-platformstate-writer",
eventType: decodedMessage.type,
eventVersion: decodedMessage.event_version,
streamId: decodedMessage.stream_id,
correlationId: decodedMessage.correlation_id
? unsafeBrandId<CorrelationId>(decodedMessage.correlation_id)
: generateId<CorrelationId>(),
});

await match(decodedMessage)
.with({ event_version: 1 }, (msg) => handleMessageV1(msg, dynamoDBClient))
.with({ event_version: 2 }, (msg) => handleMessageV2(msg, dynamoDBClient))
.exhaustive();

loggerInstance.info(
`Token-generation read model was updated. Partition number: ${partition}. Offset: ${message.offset}`
);
}

await runConsumer(config, [config.agreementTopic], processMessage);
95 changes: 95 additions & 0 deletions packages/agreement-platformstate-writer/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import {
genericInternalError,
PlatformStatesAgreementEntry,
PlatformStatesAgreementPK,
} from "pagopa-interop-models";
import {
DeleteItemCommand,
DeleteItemInput,
DynamoDBClient,
GetItemCommand,
GetItemCommandOutput,
GetItemInput,
PutItemCommand,
PutItemInput,
} from "@aws-sdk/client-dynamodb";
import { unmarshall } from "@aws-sdk/util-dynamodb";
import { config } from "./config/config.js";

export const writeAgreementEntry = async (
agreementEntry: PlatformStatesAgreementEntry,
dynamoDBClient: DynamoDBClient
): Promise<void> => {
const input: PutItemInput = {
Item: {
PK: {
S: agreementEntry.PK,
},
state: {
S: agreementEntry.state,
},
version: {
N: agreementEntry.version.toString(),
},
updatedAt: {
S: agreementEntry.updatedAt,
},
GSIPK_consumerId_eserviceId: {
S: agreementEntry.GSIPK_consumerId_eserviceId,
},
GSI_agreementTimestamp: {
S: agreementEntry.GSISK_agreementTimestamp,
},
agreementDescriptorId: {
S: agreementEntry.agreementDescriptorId,
},
},
TableName: config.tokenGenerationReadModelTableNamePlatform,
};
const command = new PutItemCommand(input);
await dynamoDBClient.send(command);
};

export const readAgreementEntry = async (
primaryKey: PlatformStatesAgreementPK,
dynamoDBClient: DynamoDBClient
): Promise<PlatformStatesAgreementEntry | undefined> => {
const input: GetItemInput = {
Key: {
PK: { S: primaryKey },
},
TableName: config.tokenGenerationReadModelTableNamePlatform,
};
const command = new GetItemCommand(input);
const data: GetItemCommandOutput = await dynamoDBClient.send(command);

if (!data.Item) {
return undefined;
} else {
const unmarshalled = unmarshall(data.Item);
const agreementEntry = PlatformStatesAgreementEntry.safeParse(unmarshalled);

if (!agreementEntry.success) {
throw genericInternalError(
`Unable to parse agreement entry item: result ${JSON.stringify(
agreementEntry
)} - data ${JSON.stringify(data)} `
);
}
return agreementEntry.data;
}
};

export const deleteAgreementEntry = async (
primaryKey: PlatformStatesAgreementPK,
dynamoDBClient: DynamoDBClient
): Promise<void> => {
const input: DeleteItemInput = {
Key: {
PK: { S: primaryKey },
},
TableName: config.tokenGenerationReadModelTableNamePlatform,
};
const command = new DeleteItemCommand(input);
await dynamoDBClient.send(command);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { describe, expect, it } from "vitest";

describe("sample", () => {
it("test", () => {
expect(1).toBe(1);
});
});
4 changes: 4 additions & 0 deletions packages/agreement-platformstate-writer/test/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"extends": "../tsconfig.json",
"include": ["."]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { setupTestContainersVitestGlobal } from "pagopa-interop-commons-test/index.js";

export default setupTestContainersVitestGlobal();
7 changes: 7 additions & 0 deletions packages/agreement-platformstate-writer/tsconfig.check.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"noEmit": true,
},
"include": ["src", "test"]
}
9 changes: 9 additions & 0 deletions packages/agreement-platformstate-writer/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "dist"
},
"include": [
"src"
]
}
11 changes: 11 additions & 0 deletions packages/agreement-platformstate-writer/vitest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { defineConfig } from "vitest/config";

export default defineConfig({
test: {
globalSetup: ["./test/vitestGlobalSetup.ts"],
testTimeout: 60000,
hookTimeout: 60000,
fileParallelism: false,
pool: "forks",
},
});
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import {
toReadModelAgreement,
} from "pagopa-interop-models";
import { describe, expect, it } from "vitest";
import { handleMessageV1 } from "../src/consumerServiceV1.js";
import {
toAgreementDocumentV1,
toAgreementV1,
} from "./protobufConverterToV1.js";
} from "pagopa-interop-commons-test";
import { handleMessageV1 } from "../src/consumerServiceV1.js";
import { agreements } from "./utils.js";

describe("events V1", async () => {
Expand Down
Loading

0 comments on commit 8339f68

Please sign in to comment.