Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/action/triggers/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
} from '../../output/github-checks.js';
import { logGroup, logGroupEnd } from '../workflow/base.js';
import { DEFAULT_FILE_CONCURRENCY } from '../../sdk/types.js';
import type { Semaphore } from '../../utils/index.js';
import { Verbosity } from '../../cli/output/verbosity.js';

/** Log-mode output for CI: no TTY, no color. */
Expand Down Expand Up @@ -54,6 +55,8 @@ export interface TriggerExecutorDeps {
globalRequestChanges?: boolean;
/** Global fail-check from action inputs (trigger-specific takes precedence) */
globalFailCheck?: boolean;
/** Global semaphore for limiting concurrent file analyses across triggers */
semaphore?: Semaphore;
}

/**
Expand Down Expand Up @@ -138,7 +141,8 @@ export async function executeTrigger(
};

const callbacks = createDefaultCallbacks([taskOptions], CI_OUTPUT_MODE, Verbosity.Normal);
const result = await runSkillTask(taskOptions, DEFAULT_FILE_CONCURRENCY, callbacks);
const fileConcurrency = deps.semaphore ? Number.MAX_SAFE_INTEGER : DEFAULT_FILE_CONCURRENCY;
const result = await runSkillTask(taskOptions, fileConcurrency, callbacks, deps.semaphore);
const report = result.report;

if (!report) {
Expand Down
40 changes: 19 additions & 21 deletions src/action/workflow/pr-workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import { evaluateFixAttempts } from '../fix-evaluation/index.js';
import { setFailed } from './base.js';
import { runPRWorkflow } from './pr-workflow.js';
import { clearSkillsCache } from '../../skills/loader.js';
import { Semaphore } from '../../utils/index.js';

// Type the mocks
const mockRunSkillTask = vi.mocked(runSkillTask);
Expand Down Expand Up @@ -311,12 +312,14 @@ describe('runPRWorkflow', () => {
await runPRWorkflow(mockOctokit, createDefaultInputs(), 'pull_request', EVENT_PAYLOAD_PATH, FIXTURES_DIR);

expect(mockRunSkillTask).toHaveBeenCalledTimes(1);
// runSkillTask(options, concurrency, callbacks)
expect(mockRunSkillTask).toHaveBeenCalledWith(
expect.objectContaining({ name: expect.any(String) }),
expect.any(Number),
expect.any(Object)
);
const [taskOptions, fileConcurrency, _callbacks, semaphore] = mockRunSkillTask.mock.calls[0]!;
expect(taskOptions).toEqual(expect.objectContaining({
name: 'test-skill',
displayName: 'test-skill',
}));
// When a semaphore is provided, fileConcurrency is unlimited (semaphore is the gate)
expect(fileConcurrency).toBe(Number.MAX_SAFE_INTEGER);
expect(semaphore).toBeInstanceOf(Semaphore);
});

it('records trigger failure and updates check before failing', async () => {
Expand Down Expand Up @@ -467,23 +470,18 @@ describe('runPRWorkflow', () => {

await runPRWorkflow(mockOctokit, createDefaultInputs(), 'pull_request', EVENT_PAYLOAD_PATH, FIXTURES_DIR);

// runSkillTask receives options with context embedded
expect(mockRunSkillTask).toHaveBeenCalledWith(
expect.objectContaining({
context: expect.objectContaining({
pullRequest: expect.objectContaining({
files: expect.arrayContaining([
expect.objectContaining({
filename: 'src/custom.ts',
status: 'added',
}),
]),
}),
// runSkillTask receives options with context containing the custom files
const [taskOptions, fileConcurrency, _callbacks, semaphore] = mockRunSkillTask.mock.calls[0]!;
expect(taskOptions.context.pullRequest?.files).toEqual(
expect.arrayContaining([
expect.objectContaining({
filename: 'src/custom.ts',
status: 'added',
}),
}),
expect.any(Number),
expect.any(Object)
])
);
expect(fileConcurrency).toBe(Number.MAX_SAFE_INTEGER);
expect(semaphore).toBeInstanceOf(Semaphore);
});
});

Expand Down
11 changes: 8 additions & 3 deletions src/action/workflow/pr-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { fetchExistingComments } from '../../output/dedup.js';
import type { ExistingComment } from '../../output/dedup.js';
import { buildAnalyzedScope, findStaleComments, resolveStaleComments } from '../../output/stale.js';
import type { EventContext, SkillReport, Finding } from '../../types/index.js';
import { processInBatches } from '../../utils/index.js';
import { runPool, Semaphore } from '../../utils/index.js';
import { evaluateFixAttempts, postThreadReply } from '../fix-evaluation/index.js';
import type { FixEvaluation } from '../fix-evaluation/index.js';
import { logAction, warnAction } from '../../cli/output/tty.js';
Expand Down Expand Up @@ -241,8 +241,13 @@ async function executeAllTriggers(
const concurrency = config.runner?.concurrency ?? inputs.parallel;
const claudePath = await findClaudeCodeExecutable();

return processInBatches(
// Global semaphore gates file-level work across all triggers.
// All triggers launch immediately; the semaphore limits concurrent file analyses.
const semaphore = new Semaphore(concurrency);

return runPool(
matchedTriggers,
matchedTriggers.length,
(trigger) =>
executeTrigger(trigger, {
octokit,
Expand All @@ -255,8 +260,8 @@ async function executeAllTriggers(
globalMaxFindings: inputs.maxFindings,
globalRequestChanges: inputs.requestChanges,
globalFailCheck: inputs.failCheck,
semaphore,
}),
concurrency
);
}

Expand Down
36 changes: 14 additions & 22 deletions src/cli/output/ink-runner.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
type FileState,
} from './tasks.js';
import { formatDuration, formatCost, truncate, countBySeverity, formatSeverityDot } from './formatters.js';
import { runPool } from '../../utils/index.js';
import { runPool, Semaphore } from '../../utils/index.js';
import { Verbosity } from './verbosity.js';
import { ICON_CHECK, ICON_SKIPPED, ICON_PENDING, ICON_ERROR, SPINNER_FRAMES } from './icons.js';
import figures from 'figures';
Expand Down Expand Up @@ -226,13 +226,12 @@ export async function runSkillTasksWithInk(
const { verbosity, concurrency } = options;

if (tasks.length === 0 || verbosity === Verbosity.Quiet) {
// No tasks or quiet mode - run without UI
const results: SkillTaskResult[] = [];
for (const task of tasks) {
if (task.runnerOptions?.abortController?.signal.aborted) break;
const result = await runSkillTask(task, 5, noopCallbacks);
results.push(result);
}
// No tasks or quiet mode - run without UI using global semaphore
const semaphore = new Semaphore(concurrency);
const results = await runPool(tasks, tasks.length,
(task) => runSkillTask(task, Number.MAX_SAFE_INTEGER, noopCallbacks, semaphore),
{ shouldAbort: () => tasks[0]?.runnerOptions?.abortController?.signal.aborted ?? false }
);
return results;
}

Expand Down Expand Up @@ -335,21 +334,14 @@ export async function runSkillTasksWithInk(
: undefined,
};

const fileConcurrency = 5;
const results: SkillTaskResult[] = [];
// Global semaphore gates file-level work across all skills.
const semaphore = new Semaphore(concurrency);

if (concurrency <= 1) {
for (const task of tasks) {
if (task.runnerOptions?.abortController?.signal.aborted) break;
const result = await runSkillTask(task, fileConcurrency, callbacks);
results.push(result);
}
} else {
results.push(...await runPool(tasks, concurrency,
(task) => runSkillTask(task, fileConcurrency, callbacks),
{ shouldAbort: () => tasks[0]?.runnerOptions?.abortController?.signal.aborted ?? false }
));
}
// Launch all skills in parallel; the semaphore is the sole concurrency gate.
const results = await runPool(tasks, tasks.length,
(task) => runSkillTask(task, Number.MAX_SAFE_INTEGER, callbacks, semaphore),
{ shouldAbort: () => tasks[0]?.runnerOptions?.abortController?.signal.aborted ?? false }
);

// Cleanup - set unmounted flag before unmount to prevent pending setImmediate
// callbacks from calling rerender on the unmounted Ink instance
Expand Down
33 changes: 33 additions & 0 deletions src/cli/output/tasks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Verbosity } from './verbosity.js';
import type { OutputMode } from './tty.js';
import type { SkillReport, Finding } from '../../types/index.js';
import type { SkillTaskOptions } from './tasks.js';
import { Semaphore, runPool } from '../../utils/index.js';

function makeFinding(overrides: Partial<Finding> = {}): Finding {
return {
Expand Down Expand Up @@ -479,3 +480,35 @@ describe('runSkillTasks', () => {
expect(resolveSkill).not.toHaveBeenCalled();
});
});

describe('Semaphore integration with runPool', () => {
it('limits concurrent file analyses across skills to the semaphore size', async () => {
// Track concurrent active file analyses
let active = 0;
let maxActive = 0;
const concurrencyLimit = 2;
const semaphore = new Semaphore(concurrencyLimit);

// Simulate 3 skills each with 3 files (9 total file analyses).
// runPool gets unlimited concurrency (like skills launching in parallel),
// but the semaphore gates how many run simultaneously.
const fileWork = Array.from({ length: 9 }, (_, i) => i);

const results = await runPool(fileWork, fileWork.length, async (item) => {
await semaphore.acquire();
try {
active++;
maxActive = Math.max(maxActive, active);
await new Promise((resolve) => setTimeout(resolve, 5));
active--;
return item;
} finally {
semaphore.release();
}
});

expect(results).toHaveLength(9);
expect(maxActive).toBeLessThanOrEqual(concurrencyLimit);
expect(maxActive).toBe(concurrencyLimit);
});
});
60 changes: 36 additions & 24 deletions src/cli/output/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import type { OutputMode } from './tty.js';
import { ICON_CHECK, ICON_SKIPPED } from './icons.js';
import { timestamp } from './tty.js';
import { formatDuration, formatCost, formatLocation, formatSeverityPlain, formatFindingCountsPlain, countBySeverity, pluralize } from './formatters.js';
import { runPool } from '../../utils/index.js';
import { runPool, Semaphore } from '../../utils/index.js';

/**
* Result from processing a single file within a skill task.
Expand Down Expand Up @@ -157,7 +157,8 @@ export interface SkillProgressCallbacks {
export async function runSkillTask(
options: SkillTaskOptions,
fileConcurrency: number,
callbacks: SkillProgressCallbacks
callbacks: SkillProgressCallbacks,
semaphore?: Semaphore
): Promise<SkillTaskResult> {
const { name, displayName = name, failOn, resolveSkill, context, runnerOptions = {} } = options;

Expand Down Expand Up @@ -316,16 +317,36 @@ export async function runSkillTask(
};
};

// Return an empty result for files skipped due to abort
const processSkippedFile = (index: number): FileProcessResult => {
const localState = fileStates[index];
if (localState) localState.status = 'skipped';
const filename = preparedFiles[index]?.filename ?? 'unknown';
callbacks.onFileUpdate(name, filename, { status: 'skipped' });
return { findings: [], durationMs: 0, failedHunks: 0, failedExtractions: 0 };
};

// Process files with sliding-window concurrency pool
const batchDelayMs = runnerOptions.batchDelayMs ?? 0;
const shouldAbort = () => runnerOptions.abortController?.signal.aborted ?? false;
// The effective concurrency for batch delay: when a semaphore gates work,
// use its permit count (the actual concurrency limit) rather than fileConcurrency.
const effectiveConcurrency = semaphore ? semaphore.initialPermits : fileConcurrency;
const allResults = await runPool(preparedFiles, fileConcurrency,
async (file, index) => {
// Rate-limit: delay items beyond the first concurrent wave
if (index >= fileConcurrency && batchDelayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, batchDelayMs));
if (semaphore) await semaphore.acquire();
try {
// Check abort after acquiring the semaphore -- the file may have
// been queued behind others and a SIGINT could have arrived while waiting.
if (shouldAbort()) return processSkippedFile(index);
// Rate-limit: delay items beyond the first concurrent wave
if (index >= effectiveConcurrency && batchDelayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, batchDelayMs));
}
return await processFile(file, index);
} finally {
if (semaphore) semaphore.release();
}
return processFile(file, index);
},
{ shouldAbort }
);
Expand Down Expand Up @@ -546,8 +567,10 @@ export async function runSkillTasks(
): Promise<SkillTaskResult[]> {
const { mode, verbosity, concurrency } = options;

// File-level concurrency (within each skill)
const fileConcurrency = 5;
// Global semaphore gates file-level work across all skills.
// All skills launch immediately so the UI shows them as "running",
// but only `concurrency` files will be analysed at any time.
const semaphore = new Semaphore(concurrency);

const effectiveCallbacks = callbacks ?? createDefaultCallbacks(tasks, mode, verbosity);

Expand All @@ -564,22 +587,11 @@ export async function runSkillTasks(
}, { once: true });
}

const results: SkillTaskResult[] = [];

if (concurrency <= 1) {
// Sequential execution
for (const task of tasks) {
if (task.runnerOptions?.abortController?.signal.aborted) break;
const result = await runSkillTask(task, fileConcurrency, effectiveCallbacks);
results.push(result);
}
} else {
// Parallel execution with sliding-window concurrency pool
results.push(...await runPool(tasks, concurrency,
(task) => runSkillTask(task, fileConcurrency, effectiveCallbacks),
{ shouldAbort: () => tasks[0]?.runnerOptions?.abortController?.signal.aborted ?? false }
));
}
// Launch all skills in parallel; the semaphore is the sole concurrency gate.
const results = await runPool(tasks, tasks.length,
(task) => runSkillTask(task, Number.MAX_SAFE_INTEGER, effectiveCallbacks, semaphore),
{ shouldAbort: () => tasks[0]?.runnerOptions?.abortController?.signal.aborted ?? false }
);

return results;
}
2 changes: 1 addition & 1 deletion src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export type SkillConfig = z.infer<typeof SkillConfigSchema>;

// Runner configuration
export const RunnerConfigSchema = z.object({
/** Max concurrent trigger executions (default: 4) */
/** Max concurrent file analyses across all skills (default: 4) */
concurrency: z.number().int().positive().optional(),
});
export type RunnerConfig = z.infer<typeof RunnerConfigSchema>;
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export interface AuxiliaryUsageEntry {
usage: UsageStats;
}

/** Default concurrency for file-level parallel processing */
/** Default concurrency for file-level parallel processing (standalone SDK usage only) */
export const DEFAULT_FILE_CONCURRENCY = 5;

/** Threshold in characters above which to warn about large prompts (~25k tokens) */
Expand Down
Loading