Skip to content

Commit e677d4f

Browse files
authored
feat(utils): add WriteAheadLog classes (#1210)
1 parent ea27cdc commit e677d4f

18 files changed

+2654
-104
lines changed

packages/utils/eslint.config.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ export default tseslint.config(
1212
},
1313
},
1414
},
15+
{
16+
files: ['packages/utils/src/lib/**/wal*.ts'],
17+
rules: {
18+
'n/no-sync': 'off',
19+
},
20+
},
1521
{
1622
files: ['**/*.json'],
1723
rules: {

packages/utils/mocks/sink.mock.ts

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,61 @@
1-
import type { Sink } from '../src/lib/sink-source.type';
1+
import type { AppendableSink, Codec } from '../src/lib/wal.js';
22

3-
export class MockSink implements Sink<string, string> {
3+
export class MockFileSink implements AppendableSink<string> {
44
private writtenItems: string[] = [];
55
private closed = false;
66

7+
constructor(options?: { file?: string; codec?: Codec<string> }) {
8+
const file = options?.file || '/tmp/mock-sink.log';
9+
const codec = options?.codec || {
10+
encode: (input: string) => input,
11+
decode: (data: string) => data,
12+
};
13+
}
14+
15+
#fd: number | null = null;
16+
17+
get path(): string {
18+
return '/tmp/mock-sink.log';
19+
}
20+
21+
getPath(): string {
22+
return this.path;
23+
}
24+
725
open(): void {
8-
this.closed = false;
26+
this.#fd = 1; // Mock file descriptor
927
}
1028

11-
write(input: string): void {
12-
this.writtenItems.push(input);
29+
append(v: string): void {
30+
this.writtenItems.push(v);
1331
}
1432

1533
close(): void {
34+
this.#fd = null;
1635
this.closed = true;
1736
}
1837

1938
isClosed(): boolean {
20-
return this.closed;
39+
return this.#fd === null;
40+
}
41+
42+
recover(): any {
43+
return {
44+
records: this.writtenItems,
45+
errors: [],
46+
partialTail: null,
47+
};
2148
}
2249

23-
encode(input: string): string {
24-
return `${input}-${this.constructor.name}-encoded`;
50+
repack(): void {
51+
// Mock implementation - do nothing
2552
}
2653

2754
getWrittenItems(): string[] {
2855
return [...this.writtenItems];
2956
}
57+
58+
clearWrittenItems(): void {
59+
this.writtenItems = [];
60+
}
3061
}

packages/utils/src/lib/clock-epoch.unit.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { defaultClock, epochClock } from './clock-epoch.js';
33
describe('epochClock', () => {
44
it('should create epoch clock with defaults', () => {
55
const c = epochClock();
6-
expect(c.timeOriginMs).toBe(500_000);
6+
expect(c.timeOriginMs).toBe(1_700_000_000_000);
77
expect(c.tid).toBe(2);
88
expect(c.pid).toBe(10_001);
99
expect(c.fromEpochMs).toBeFunction();
@@ -32,8 +32,8 @@ describe('epochClock', () => {
3232

3333
it('should support performance clock by default for epochNowUs', () => {
3434
const c = epochClock();
35-
expect(c.timeOriginMs).toBe(500_000);
36-
expect(c.epochNowUs()).toBe(500_000_000);
35+
expect(c.timeOriginMs).toBe(1_700_000_000_000);
36+
expect(c.epochNowUs()).toBe(1_700_000_000_000_000);
3737
});
3838

3939
it.each([
@@ -55,8 +55,8 @@ describe('epochClock', () => {
5555
});
5656

5757
it.each([
58-
[0, 500_000_000],
59-
[1000, 501_000_000],
58+
[0, 1_700_000_000_000_000],
59+
[1000, 1_700_000_001_000_000],
6060
])(
6161
'should convert performance milliseconds to microseconds',
6262
(perfMs, expected) => {

packages/utils/src/lib/performance-observer.int.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
import { type PerformanceEntry, performance } from 'node:perf_hooks';
22
import type { MockedFunction } from 'vitest';
3-
import { MockSink } from '../../mocks/sink.mock';
3+
import { MockFileSink } from '../../mocks/sink.mock';
44
import {
55
type PerformanceObserverOptions,
66
PerformanceObserverSink,
77
} from './performance-observer.js';
88

99
describe('PerformanceObserverSink', () => {
1010
let encode: MockedFunction<(entry: PerformanceEntry) => string[]>;
11-
let sink: MockSink;
11+
let sink: MockFileSink;
1212
let options: PerformanceObserverOptions<string>;
1313

1414
const awaitObserverCallback = () =>
1515
new Promise(resolve => setTimeout(resolve, 10));
1616

1717
beforeEach(() => {
18-
sink = new MockSink();
18+
sink = new MockFileSink();
1919
encode = vi.fn((entry: PerformanceEntry) => [
2020
`${entry.name}:${entry.entryType}`,
2121
]);

packages/utils/src/lib/performance-observer.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,24 @@ import {
44
type PerformanceObserverEntryList,
55
performance,
66
} from 'node:perf_hooks';
7-
import type { Buffered, Encoder, Observer, Sink } from './sink-source.type';
7+
import type { AppendableSink } from './wal.js';
88

99
const OBSERVED_TYPES = ['mark', 'measure'] as const;
1010
type ObservedEntryType = 'mark' | 'measure';
1111
export const DEFAULT_FLUSH_THRESHOLD = 20;
1212

1313
export type PerformanceObserverOptions<T> = {
14-
sink: Sink<T, unknown>;
14+
sink: AppendableSink<T>;
1515
encode: (entry: PerformanceEntry) => T[];
1616
buffered?: boolean;
1717
flushThreshold?: number;
1818
};
1919

20-
export class PerformanceObserverSink<T>
21-
implements Observer, Buffered, Encoder<PerformanceEntry, T[]>
22-
{
20+
export class PerformanceObserverSink<T> {
2321
#encode: (entry: PerformanceEntry) => T[];
2422
#buffered: boolean;
2523
#flushThreshold: number;
26-
#sink: Sink<T, unknown>;
24+
#sink: AppendableSink<T>;
2725
#observer: PerformanceObserver | undefined;
2826

2927
#pendingCount = 0;
@@ -84,7 +82,7 @@ export class PerformanceObserverSink<T>
8482
try {
8583
fresh
8684
.flatMap(entry => this.encode(entry))
87-
.forEach(item => this.#sink.write(item));
85+
.forEach(item => this.#sink.append(item));
8886

8987
this.#written.set(t, written + fresh.length);
9088
} catch (error) {

packages/utils/src/lib/performance-observer.unit.test.ts

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,28 @@
11
import { type PerformanceEntry, performance } from 'node:perf_hooks';
22
import type { MockedFunction } from 'vitest';
33
import { MockPerformanceObserver } from '@code-pushup/test-utils';
4-
import { MockSink } from '../../mocks/sink.mock';
4+
import { MockFileSink } from '../../mocks/sink.mock';
55
import {
66
type PerformanceObserverOptions,
77
PerformanceObserverSink,
88
} from './performance-observer.js';
9+
import type { Codec } from './wal.js';
910

1011
describe('PerformanceObserverSink', () => {
1112
let encode: MockedFunction<(entry: PerformanceEntry) => string[]>;
12-
let sink: MockSink;
13+
let sink: MockFileSink;
1314
let options: PerformanceObserverOptions<string>;
1415

1516
beforeEach(() => {
1617
vi.clearAllMocks();
17-
sink = new MockSink();
18+
sink = new MockFileSink();
1819
encode = vi.fn((entry: PerformanceEntry) => [
1920
`${entry.name}:${entry.entryType}`,
2021
]);
2122
options = {
2223
sink,
2324
encode,
24-
// we test buffered behavior separately
25+
2526
flushThreshold: 1,
2627
};
2728

@@ -43,24 +44,21 @@ describe('PerformanceObserverSink', () => {
4344
}),
4445
).not.toThrow();
4546
expect(MockPerformanceObserver.instances).toHaveLength(0);
46-
// Instance creation covers the default flushThreshold assignment
4747
});
4848

4949
it('automatically flushes when pendingCount reaches flushThreshold', () => {
5050
const observer = new PerformanceObserverSink({
5151
sink,
5252
encode,
53-
flushThreshold: 2, // Set threshold to 2
53+
flushThreshold: 2,
5454
});
5555
observer.subscribe();
5656

5757
const mockObserver = MockPerformanceObserver.lastInstance();
5858

59-
// Emit 1 entry - should not trigger flush yet (pendingCount = 1 < 2)
6059
mockObserver?.emitMark('first-mark');
6160
expect(sink.getWrittenItems()).toStrictEqual([]);
6261

63-
// Emit 1 more entry - should trigger flush (pendingCount = 2 >= 2)
6462
mockObserver?.emitMark('second-mark');
6563
expect(sink.getWrittenItems()).toStrictEqual([
6664
'first-mark:mark',
@@ -135,7 +133,7 @@ describe('PerformanceObserverSink', () => {
135133
it('internal PerformanceObserver should process observed entries', () => {
136134
const observer = new PerformanceObserverSink({
137135
...options,
138-
flushThreshold: 20, // Disable automatic flushing for this test
136+
flushThreshold: 20,
139137
});
140138
observer.subscribe();
141139

@@ -248,12 +246,22 @@ describe('PerformanceObserverSink', () => {
248246
});
249247

250248
it('flush wraps sink write errors with descriptive error message', () => {
251-
const failingSink = {
252-
write: vi.fn(() => {
249+
const failingCodec: Codec<string> = {
250+
encode: () => {
253251
throw new Error('Sink write failed');
254-
}),
252+
},
253+
decode: (data: string) => data,
255254
};
256255

256+
const failingSink = new MockFileSink({
257+
file: '/test/path',
258+
codec: failingCodec,
259+
});
260+
261+
vi.spyOn(failingSink, 'append').mockImplementation(() => {
262+
throw new Error('Sink write failed');
263+
});
264+
257265
const observer = new PerformanceObserverSink({
258266
sink: failingSink as any,
259267
encode,
@@ -298,4 +306,26 @@ describe('PerformanceObserverSink', () => {
298306
}),
299307
);
300308
});
309+
310+
it('accepts custom sinks with append method', () => {
311+
const collectedItems: string[] = [];
312+
const customSink = {
313+
// eslint-disable-next-line functional/immutable-data
314+
append: (item: string) => collectedItems.push(item),
315+
};
316+
317+
const observer = new PerformanceObserverSink({
318+
sink: customSink,
319+
encode: (entry: PerformanceEntry) => [`${entry.name}:${entry.duration}`],
320+
});
321+
322+
observer.subscribe();
323+
324+
const mockObserver = MockPerformanceObserver.lastInstance();
325+
mockObserver?.emitMark('test-mark');
326+
327+
observer.flush();
328+
329+
expect(collectedItems).toContain('test-mark:0');
330+
});
301331
});
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,6 @@
11
export const PROFILER_ENABLED_ENV_VAR = 'CP_PROFILING';
2+
export const PROFILER_COORDINATOR_FLAG_ENV_VAR = 'CP_PROFILER_COORDINATOR';
3+
export const PROFILER_ORIGIN_PID_ENV_VAR = 'CP_PROFILER_ORIGIN_PID';
4+
export const PROFILER_DIRECTORY_ENV_VAR = 'CP_PROFILER_DIR';
5+
export const PROFILER_BASE_NAME = 'trace';
6+
export const PROFILER_DIRECTORY = './tmp/profiles';

packages/utils/src/lib/profiler/profiler.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import process from 'node:process';
2+
import { threadId } from 'node:worker_threads';
23
import { isEnvVarEnabled } from '../env.js';
34
import {
45
type ActionTrackConfigs,
@@ -16,6 +17,14 @@ import type {
1617
} from '../user-timing-extensibility-api.type.js';
1718
import { PROFILER_ENABLED_ENV_VAR } from './constants.js';
1819

20+
/**
21+
* Generates a unique profiler ID based on performance time origin, process ID, thread ID, and instance count.
22+
*/
23+
export function getProfilerId() {
24+
// eslint-disable-next-line functional/immutable-data
25+
return `${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.${++Profiler.instanceCount}`;
26+
}
27+
1928
/**
2029
* Configuration options for creating a Profiler instance.
2130
*
@@ -59,6 +68,8 @@ export type ProfilerOptions<T extends ActionTrackConfigs = ActionTrackConfigs> =
5968
*
6069
*/
6170
export class Profiler<T extends ActionTrackConfigs> {
71+
static instanceCount = 0;
72+
readonly id = getProfilerId();
6273
#enabled: boolean;
6374
readonly #defaults: ActionTrackEntryPayload;
6475
readonly tracks: Record<keyof T, ActionTrackEntryPayload> | undefined;

packages/utils/src/lib/profiler/profiler.unit.test.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
11
import { performance } from 'node:perf_hooks';
2+
import { threadId } from 'node:worker_threads';
23
import type { ActionTrackEntryPayload } from '../user-timing-extensibility-api.type.js';
3-
import { Profiler, type ProfilerOptions } from './profiler.js';
4+
import { Profiler, type ProfilerOptions, getProfilerId } from './profiler.js';
5+
6+
describe('getProfilerId', () => {
7+
it('should generate a unique id per process', () => {
8+
expect(getProfilerId()).toBe(
9+
`${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.1`,
10+
);
11+
expect(getProfilerId()).toBe(
12+
`${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.2`,
13+
);
14+
});
15+
});
416

517
describe('Profiler', () => {
618
const getProfiler = (overrides?: Partial<ProfilerOptions>) =>

0 commit comments

Comments
 (0)