Skip to content

Commit

Permalink
Merge pull request #3 from eventpop/dtinth/refactor-main
Browse files Browse the repository at this point in the history
Refactor main.ts to extract specific functionalities
  • Loading branch information
dtinth authored May 17, 2024
2 parents 67c6542 + 7a4e2e4 commit f3a2ec1
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 204 deletions.
17 changes: 17 additions & 0 deletions src/createContext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import * as dynamodb from "@aws-sdk/client-dynamodb";
import * as sqs from "@aws-sdk/client-sqs";
import os from "os";

interface Context {
sqsClient: sqs.SQSClient;
dynamodbClient: dynamodb.DynamoDBClient;
}

function createContext(): Context {
return {
dynamodbClient: new dynamodb.DynamoDBClient({}),
sqsClient: new sqs.SQSClient({}),
};
}

export { Context, createContext };
11 changes: 11 additions & 0 deletions src/createDurationTracker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
function createDurationTracker() {
const start = Date.now();
return {
formatDuration: () => {
const end = Date.now();
return ((end - start) / 1000).toFixed(2);
},
};
}

export { createDurationTracker };
131 changes: 131 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import * as dynamodb from "@aws-sdk/client-dynamodb";
import os from "os";
import { z } from "zod";
import { Context } from "./createContext";
import { env } from "./env";
import { Task } from "./schema";

async function ensureDynamodbTableCreated({ dynamodbClient }: Context) {
try {
await dynamodbClient.send(
new dynamodb.CreateTableCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,
KeySchema: [
{
AttributeName: "TaskListId",
KeyType: "HASH",
},
{
AttributeName: "TaskId",
KeyType: "RANGE",
},
],
AttributeDefinitions: [
{
AttributeName: "TaskListId",
AttributeType: "S",
},
{
AttributeName: "TaskId",
AttributeType: "S",
},
],
BillingMode: "PAY_PER_REQUEST",
})
);
} catch (error) {
if (
error instanceof dynamodb.ResourceInUseException ||
error instanceof dynamodb.TableAlreadyExistsException
) {
return;
}
throw error;
}
}

async function getPreviouslyRunTaskStatuses(
{ dynamodbClient }: Context,
jobId: string
) {
const response = await dynamodbClient.send(
new dynamodb.QueryCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,
KeyConditionExpression: "TaskListId = :taskListId",
ExpressionAttributeValues: {
":taskListId": { S: jobId },
},
})
);
return response;
}

async function checkTaskCompletionStatus(
{ dynamodbClient }: Context,
jobId: string,
taskId: string
): Promise<string> {
const response = await dynamodbClient.send(
new dynamodb.GetItemCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,
Key: {
TaskListId: { S: jobId },
TaskId: { S: taskId },
},
})
);
return response.Item?.Status?.S || "PENDING";
}

async function updateTaskStatusInDynamoDB(
{ dynamodbClient }: Context,
jobId: string,
task: z.infer<typeof Task>,
status: string,
isStart: boolean
) {
const workerId = process.env.PARALLELIZER_WORKER_ID || os.hostname();
const timestamp = new Date().toISOString();
let updateExpression =
"set #status = :status, #taskDisplayName = :taskDisplayName, #timestamp = :timestamp, #workerId = :workerId";
let expressionAttributeNames: Record<string, string> = {
"#status": "Status",
"#taskDisplayName": "TaskDisplayName",
"#timestamp": isStart ? "StartedAt" : "FinishedAt",
"#workerId": "WorkerId",
};
let expressionAttributeValues: Record<string, dynamodb.AttributeValue> = {
":status": { S: status },
":taskDisplayName": { S: task.displayName },
":timestamp": { S: timestamp },
":workerId": { S: workerId },
};

if (isStart) {
updateExpression +=
", #attemptCount = if_not_exists(#attemptCount, :zero) + :inc";
expressionAttributeNames["#attemptCount"] = "AttemptCount";
expressionAttributeValues[":zero"] = { N: "0" };
expressionAttributeValues[":inc"] = { N: "1" };
}

await dynamodbClient.send(
new dynamodb.UpdateItemCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,
Key: {
TaskListId: { S: jobId },
TaskId: { S: task.id },
},
UpdateExpression: updateExpression,
ExpressionAttributeNames: expressionAttributeNames,
ExpressionAttributeValues: expressionAttributeValues,
})
);
}

export {
checkTaskCompletionStatus,
ensureDynamodbTableCreated,
getPreviouslyRunTaskStatuses,
updateTaskStatusInDynamoDB,
};
10 changes: 10 additions & 0 deletions src/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { z } from "zod";
import { Env } from "@(-.-)/env";

const envSchema = z.object({
PARALLELIZER_DYNAMODB_TABLE: z.string().default("parallelizer"),
});

const env = Env(envSchema);

export { env };
Loading

0 comments on commit f3a2ec1

Please sign in to comment.