Skip to content
Closed
180 changes: 83 additions & 97 deletions package-lock.json

Large diffs are not rendered by default.

533 changes: 156 additions & 377 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

34 changes: 29 additions & 5 deletions src/app/TaskRequestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import { Logger } from '../services/logger';
import { WorkerTaskExecutor } from './WorkerTaskExecutor';
import { TaskAssignmentValidator, TaskReassignmentCheck } from '../services/worker/task-assignment-validator';
import { BaseBranchExtractor } from '../services/git';
import { StateManager } from '../services/state-manager';

export class TaskRequestHandler {
private readonly workerTaskExecutor: WorkerTaskExecutor;
private readonly taskAssignmentValidator: TaskAssignmentValidator;
private readonly stateManager: StateManager;

constructor(
private readonly workerPoolManager: WorkerPoolManager,
Expand All @@ -35,6 +37,8 @@ export class TaskRequestHandler {
logger: this.logger || console as any,
workspaceManager: this.workerPoolManager.getWorkspaceManager()
});
// WorkerPoolManager에서 StateManager 가져오기
this.stateManager = this.workerPoolManager.getStateManager();
}

async handleTaskRequest(request: TaskRequest): Promise<TaskResponse> {
Expand Down Expand Up @@ -96,14 +100,18 @@ export class TaskRequestHandler {
};
}

// StateManager에서 Task의 lastSyncTime 가져오기
const taskLastSyncTime = await this.stateManager.getTaskLastSyncTime(request.taskId);

// PRD 요구사항에 맞는 전체 작업 정보 생성
const repositoryId = this.getRepositoryIdFromRequest(request);
const workerTask = await this.enrichTaskWithBaseBranch({
taskId: request.taskId,
action: WorkerAction.START_NEW_TASK,
boardItem: request.boardItem,
repositoryId,
assignedAt: new Date()
assignedAt: new Date(),
...(taskLastSyncTime && { lastSyncTime: taskLastSyncTime })
});

// 작업 할당 및 즉시 실행 (Planner가 결과를 감지하도록 WorkerTaskExecutor 사용)
Expand Down Expand Up @@ -179,14 +187,18 @@ export class TaskRequestHandler {
// 새 워커에 피드백 작업 할당
const repositoryId = this.getRepositoryIdFromRequest(request);

// StateManager에서 Task의 lastSyncTime 가져오기
const taskLastSyncTime = await this.stateManager.getTaskLastSyncTime(request.taskId);

const feedbackTask = await this.enrichTaskWithBaseBranch({
taskId: request.taskId,
action: WorkerAction.PROCESS_FEEDBACK,
boardItem: request.boardItem,
...(request.pullRequestUrl && { pullRequestUrl: request.pullRequestUrl }),
...(request.comments && { comments: request.comments }),
repositoryId,
assignedAt: new Date()
assignedAt: new Date(),
...(taskLastSyncTime && { lastSyncTime: taskLastSyncTime })
});
await this.workerPoolManager.assignWorkerTask(workerId, feedbackTask);
} else {
Expand All @@ -203,14 +215,18 @@ export class TaskRequestHandler {
};
}

// StateManager에서 Task의 lastSyncTime 가져오기
const taskLastSyncTime = await this.stateManager.getTaskLastSyncTime(request.taskId);

// 기존 작업에 피드백 정보 추가
let feedbackTask: WorkerTask = {
...worker.currentTask,
...(request.boardItem && { boardItem: request.boardItem }),
action: WorkerAction.PROCESS_FEEDBACK,
...(request.pullRequestUrl && { pullRequestUrl: request.pullRequestUrl }),
...(request.comments && { comments: request.comments }),
assignedAt: new Date()
assignedAt: new Date(),
...(taskLastSyncTime && { lastSyncTime: taskLastSyncTime })
};

feedbackTask = await this.enrichTaskWithBaseBranch(feedbackTask);
Expand Down Expand Up @@ -292,6 +308,9 @@ export class TaskRequestHandler {
});
}

// StateManager에서 Task의 lastSyncTime 가져오기
const taskLastSyncTime = await this.stateManager.getTaskLastSyncTime(request.taskId);

// 병합 요청을 위한 작업 정보 생성
const repositoryId = this.getRepositoryIdFromRequest(request);
const mergeTask: WorkerTask = {
Expand All @@ -300,7 +319,8 @@ export class TaskRequestHandler {
...(request.pullRequestUrl && { pullRequestUrl: request.pullRequestUrl }),
...(request.boardItem && { boardItem: request.boardItem }),
repositoryId,
assignedAt: new Date()
assignedAt: new Date(),
...(taskLastSyncTime && { lastSyncTime: taskLastSyncTime })
};

// Worker에 병합 작업 할당
Expand Down Expand Up @@ -428,14 +448,18 @@ export class TaskRequestHandler {
});
}

// StateManager에서 Task의 lastSyncTime 가져오기
const taskLastSyncTime = await this.stateManager.getTaskLastSyncTime(request.taskId);

// 작업 재할당 (RESUME_TASK 액션으로)
const repositoryId = this.getRepositoryIdFromRequest(request);
let resumeTask: WorkerTask = {
taskId: request.taskId,
action: WorkerAction.RESUME_TASK,
boardItem: request.boardItem,
repositoryId,
assignedAt: new Date()
assignedAt: new Date(),
...(taskLastSyncTime && { lastSyncTime: taskLastSyncTime })
};

// Base branch 추출
Expand Down
8 changes: 8 additions & 0 deletions src/services/manager/worker-pool-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -765,4 +765,12 @@ export class WorkerPoolManager implements WorkerPoolManagerInterface {
getWorkspaceManager(): WorkspaceManagerInterface | undefined {
return this.dependencies.workspaceManager;
}

/**
* StateManager 인스턴스를 반환합니다.
* TaskRequestHandler에서 Task의 lastSyncTime을 가져오기 위해 사용됩니다.
*/
getStateManager(): StateManager {
return this.dependencies.stateManager;
}
}
96 changes: 80 additions & 16 deletions src/services/planner/review-task-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,37 @@ export class ReviewTaskHandler {
});

// 작업별 lastSyncTime 가져오기 (Worker의 currentTask에서 조회)
const taskLastSyncTime = await this.dependencies.stateManager.getTaskLastSyncTime(item.id);
// since는 항상 Date 객체가 되도록 보장
const since = taskLastSyncTime ? new Date(taskLastSyncTime) : new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
let taskLastSyncTime: Date | null = null;
try {
taskLastSyncTime = await this.dependencies.stateManager.getTaskLastSyncTime(item.id);
} catch (error) {
this.logger.warn('Failed to get task lastSyncTime, using default', {
taskId: item.id,
error: error instanceof Error ? error.message : String(error)
});
}

// since는 항상 Date 객체가 되도록 보장하고, 미래 시간 방지
const now = Date.now();
const sevenDaysAgo = new Date(now - 7 * 24 * 60 * 60 * 1000);
let since: Date;

if (taskLastSyncTime) {
const syncTime = new Date(taskLastSyncTime);
// 미래 시간인 경우 현재 시간으로 제한
if (syncTime.getTime() > now) {
this.logger.warn('Task lastSyncTime is in the future, using current time', {
taskId: item.id,
futureTime: syncTime.toISOString(),
currentTime: new Date(now).toISOString()
});
since = new Date(now);
} else {
since = syncTime;
}
} else {
since = sevenDaysAgo;
}

this.logger.debug('Using sync time for comment filtering', {
taskId: item.id,
Expand All @@ -286,26 +314,50 @@ export class ReviewTaskHandler {
filterOptions
);

// 이미 처리된 코멘트 필터링
// processedCommentIds를 사용하는 이유:
// 1. lastSyncTime 업데이트가 실패한 경우의 안전망
// 2. Worker가 중간에 실패하여 lastSyncTime은 업데이트되었지만
// 실제로는 코멘트 처리가 완료되지 않은 경우 대비
// 3. 동시에 여러 인스턴스가 실행되는 경우의 동시성 문제 방지
let processedCommentIds: ReadonlyArray<string> = [];
try {
const ids = await this.dependencies.stateManager.getProcessedCommentsForTask(item.id);
// null 또는 undefined 처리
processedCommentIds = ids || [];
} catch (error) {
this.logger.warn('Failed to get processed comment IDs, assuming none processed', {
taskId: item.id,
error: error instanceof Error ? error.message : String(error)
});
}

const unprocessedComments = newComments.filter(
(comment: PullRequestComment) => !processedCommentIds.includes(comment.id)
);

this.logger.debug('Comment check result', {
taskId: item.id,
since: since.toISOString(),
newCommentCount: newComments.length,
commentDetails: newComments.map((c: PullRequestComment) => ({
totalNewComments: newComments.length,
processedCommentIds: processedCommentIds.length,
unprocessedComments: unprocessedComments.length,
commentDetails: unprocessedComments.map((c: PullRequestComment) => ({
id: c.id,
author: c.author,
createdAt: c.createdAt.toISOString(),
type: c.metadata?.type
}))
});

if (newComments.length > 0) {
this.logger.info('Found new comments for processing', {
if (unprocessedComments.length > 0) {
this.logger.info('Found new unprocessed comments for processing', {
taskId: item.id,
commentCount: newComments.length
commentCount: unprocessedComments.length
});
await this.handleNewComments(item, prUrl, newComments);
await this.handleNewComments(item, prUrl, unprocessedComments);
} else {
this.logger.debug('No new comments found since last sync', {
this.logger.debug('No new unprocessed comments found since last sync', {
taskId: item.id,
lastSyncTime: since.toISOString()
});
Expand Down Expand Up @@ -333,35 +385,47 @@ export class ReviewTaskHandler {
const response = await this.dependencies.managerCommunicator.sendTaskToManager(request);

if (response.status === ResponseStatus.ACCEPTED) {
// 처리된 코멘트로 기록
for (const comment of newComments) {
this.workflowStateManager.getState().processedComments.add(comment.id);
}
// 처리된 코멘트로 기록 (StateManager의 task에 저장)
const commentIds = newComments.map((comment: PullRequestComment) => comment.id);
await this.dependencies.stateManager.addProcessedCommentsToTask(item.id, commentIds);

// 작업별 lastSyncTime 업데이트
const currentTime = new Date();
await this.dependencies.stateManager.updateTaskLastSyncTime(item.id, currentTime);

// WorkflowStateManager에도 기록 (호환성 유지)
for (const comment of newComments) {
this.workflowStateManager.getState().processedComments.add(comment.id);
}
this.workflowStateManager.updateActiveTaskStatus(item.id, 'IN_REVIEW');

this.logger.info('Feedback processed', {
this.logger.info('Feedback processed and recorded', {
taskId: item.id,
commentCount: newComments.length,
processedCommentIds: commentIds,
updatedLastSyncTime: currentTime.toISOString()
});
} else if (response.status === ResponseStatus.COMPLETED && response.pullRequestUrl) {
// 피드백 처리 완료 시 새로운 PR URL 추가
await this.dependencies.projectBoardService.addPullRequestToItem(item.id, response.pullRequestUrl);

// 처리된 코멘트로 기록
// 처리된 코멘트로 기록 (이미 위의 ACCEPTED 경로와 동일하게 처리)
const commentIds = newComments.map((comment: PullRequestComment) => comment.id);
await this.dependencies.stateManager.addProcessedCommentsToTask(item.id, commentIds);

// 작업별 lastSyncTime 업데이트
const currentTime = new Date();
await this.dependencies.stateManager.updateTaskLastSyncTime(item.id, currentTime);

// WorkflowStateManager에도 기록 (호환성 유지)
for (const comment of newComments) {
this.workflowStateManager.getState().processedComments.add(comment.id);
}

this.logger.info('Feedback processing completed with new PR', {
taskId: item.id,
newPullRequestUrl: response.pullRequestUrl,
processedCommentIds: commentIds,
updatedLastSyncTime: currentTime.toISOString()
});
} else if (response.status === ResponseStatus.ERROR) {
Expand Down
29 changes: 28 additions & 1 deletion src/services/state-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,20 @@ export class StateManager {
}

async getTaskLastSyncTime(taskId: string): Promise<Date | null> {
// 먼저 Task에서 직접 lastSyncTime을 가져옴
const task = this.tasks.get(taskId);
if (task?.lastSyncTime) {
// 문자열로 저장된 경우 Date 객체로 변환
if (typeof task.lastSyncTime === 'string') {
return new Date(task.lastSyncTime);
}
// 이미 Date 객체인 경우 그대로 반환
if (task.lastSyncTime instanceof Date) {
return task.lastSyncTime;
}
}

// Task에 없으면 Worker에서 가져옴 (호환성 유지)
const worker = await this.getWorkerByTaskId(taskId);
const lastSyncTime = worker?.currentTask?.lastSyncTime;

Expand All @@ -204,6 +218,19 @@ export class StateManager {

async updateTaskLastSyncTime(taskId: string, lastSyncTime: Date): Promise<void> {
await this.withLock(async () => {
// Task에 직접 lastSyncTime 저장
const task = this.tasks.get(taskId);
if (task) {
const updatedTask: Task = {
...task,
lastSyncTime,
updatedAt: new Date()
};
this.tasks.set(taskId, updatedTask);
await this.persistTasks();
}

// Worker에도 업데이트 (호환성 유지)
for (const [workerId, worker] of this.workers.entries()) {
if (worker.currentTask?.taskId === taskId) {
const updatedWorker: Worker = {
Expand Down Expand Up @@ -543,7 +570,7 @@ export class StateManager {
}

private dateReviver(key: string, value: unknown): unknown {
if (typeof value === 'string' && (key.endsWith('At') || key.endsWith('Date'))) {
if (typeof value === 'string' && (key.endsWith('At') || key.endsWith('Date') || key.endsWith('Time'))) {
return new Date(value);
}
return value;
Expand Down
1 change: 1 addition & 0 deletions src/types/task.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export interface Task {
readonly retryCount?: number;
readonly lastRetryAt?: Date;
readonly failureReasons?: ReadonlyArray<string>;
readonly lastSyncTime?: Date; // 이 작업에 대한 마지막 동기화 시간 (PR 코멘트 확인 시점)
}

export interface TaskUpdate {
Expand Down
19 changes: 14 additions & 5 deletions tests/helpers/mock-builders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,22 @@ export class MockWorkerPoolManagerBuilder {
this.methods.set('shutdown', jest.fn());
this.methods.set('storeTaskResult', jest.fn());
this.methods.set('getTaskResult', jest.fn());
this.methods.set('clearTaskResult', jest.fn());
this.methods.set('getStateManager', jest.fn(() => ({
saveWorkerState: jest.fn(),
getWorkerState: jest.fn(),
getAllWorkerStates: jest.fn(),
deleteWorkerState: jest.fn(),
saveTaskResult: jest.fn(),
getTaskResult: jest.fn(),
getTaskLastSyncTime: jest.fn(),
saveTaskLastSyncTime: jest.fn()
})));
this.methods.set('getWorkspaceManager', jest.fn(() => ({
// Mock WorkspaceManager
getWorkspaceInfo: jest.fn(),
createWorkspace: jest.fn(),
cleanupWorkspace: jest.fn()
prepareWorkspace: jest.fn(),
cleanupWorkspace: jest.fn(),
getWorkspaceInfo: jest.fn()
})));
this.methods.set('clearTaskResult', jest.fn());
}

withWorker(worker: Worker): this {
Expand Down
Loading
Loading