Skip to content

Commit ffcc95d

Browse files
committed
refactor: refactor sharded wal to handle coordinator
1 parent 7b6e72c commit ffcc95d

File tree

8 files changed

+1702
-1120
lines changed

8 files changed

+1702
-1120
lines changed

packages/utils/src/lib/errors.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,20 @@ export function stringifyError(
3030
}
3131
return JSON.stringify(error);
3232
}
33+
34+
/**
35+
* Extends an error with a new message and keeps the original as the cause.
36+
* @param error - The error to extend
37+
* @param message - The new message to add to the error
38+
* @returns A new error with the extended message and the original as cause
39+
*/
40+
export function extendError(
41+
error: unknown,
42+
message: string,
43+
{ appendMessage = false } = {},
44+
) {
45+
const errorMessage = appendMessage
46+
? `${message}\n${stringifyError(error)}`
47+
: message;
48+
return new Error(errorMessage, { cause: error });
49+
}

packages/utils/src/lib/errors.unit.test.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import ansis from 'ansis';
22
import { z } from 'zod';
33
import { SchemaValidationError } from '@code-pushup/models';
4-
import { stringifyError } from './errors.js';
4+
import { extendError, stringifyError } from './errors.js';
55

66
describe('stringifyError', () => {
77
it('should use only message from plain Error instance', () => {
@@ -113,3 +113,25 @@ describe('stringifyError', () => {
113113
).toBe(`SchemaValidationError: Invalid ${ansis.bold('User')} […]`);
114114
});
115115
});
116+
117+
describe('extendError', () => {
118+
it('adds message, appends original error, and keeps cause', () => {
119+
const original = new Error('boom');
120+
121+
const extended = extendError(original, 'wrap failed', {
122+
appendMessage: true,
123+
});
124+
125+
expect(extended.message).toBe('wrap failed\nboom');
126+
expect(extended.cause).toBe(original);
127+
});
128+
129+
it('uses only the provided message by default', () => {
130+
const original = new Error('boom');
131+
132+
const extended = extendError(original, 'wrap failed');
133+
134+
expect(extended.message).toBe('wrap failed');
135+
expect(extended.cause).toBe(original);
136+
});
137+
});
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
import fs from 'node:fs';
2+
import path from 'node:path';
3+
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
4+
import { PROFILER_SHARDER_ID_ENV_VAR } from './profiler/constants.js';
5+
import { ShardedWal } from './wal-sharded.js';
6+
import { type WalFormat, type WalRecord, stringCodec } from './wal.js';
7+
8+
describe('ShardedWal Integration', () => {
9+
const testDir = path.join(
10+
process.cwd(),
11+
'tmp',
12+
'int',
13+
'utils',
14+
'wal-sharded',
15+
);
16+
const makeMockFormat = <T extends WalRecord>(
17+
overrides: Partial<WalFormat<T>>,
18+
): WalFormat<T> => {
19+
const {
20+
baseName = 'wal',
21+
walExtension = '.log',
22+
finalExtension = '.json',
23+
codec = stringCodec<T>(),
24+
finalizer = records => `${JSON.stringify(records)}\n`,
25+
} = overrides;
26+
27+
return {
28+
baseName,
29+
walExtension,
30+
finalExtension,
31+
codec,
32+
finalizer,
33+
};
34+
};
35+
let shardedWal: ShardedWal;
36+
37+
beforeEach(() => {
38+
if (fs.existsSync(testDir)) {
39+
fs.rmSync(testDir, { recursive: true, force: true });
40+
}
41+
fs.mkdirSync(testDir, { recursive: true });
42+
});
43+
44+
afterEach(() => {
45+
if (shardedWal) {
46+
shardedWal.cleanupIfCoordinator();
47+
}
48+
if (fs.existsSync(testDir)) {
49+
fs.rmSync(testDir, { recursive: true, force: true });
50+
}
51+
});
52+
53+
it('should create and finalize shards correctly', () => {
54+
shardedWal = new ShardedWal({
55+
debug: false,
56+
dir: testDir,
57+
format: makeMockFormat({
58+
baseName: 'trace',
59+
}),
60+
coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR,
61+
groupId: 'create-finalize',
62+
});
63+
64+
const shard1 = shardedWal.shard();
65+
shard1.open();
66+
shard1.append('record1');
67+
shard1.append('record2');
68+
shard1.close();
69+
70+
const shard2 = shardedWal.shard();
71+
shard2.open();
72+
shard2.append('record3');
73+
shard2.close();
74+
75+
shardedWal.finalize();
76+
77+
const finalFile = path.join(
78+
testDir,
79+
shardedWal.groupId,
80+
`trace.create-finalize.json`,
81+
);
82+
expect(fs.existsSync(finalFile)).toBeTrue();
83+
84+
const content = fs.readFileSync(finalFile, 'utf8');
85+
const records = JSON.parse(content.trim());
86+
expect(records).toEqual(['record1', 'record2', 'record3']);
87+
});
88+
89+
it('should merge multiple shards correctly', () => {
90+
shardedWal = new ShardedWal({
91+
debug: false,
92+
dir: testDir,
93+
format: makeMockFormat({
94+
baseName: 'merged',
95+
}),
96+
coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR,
97+
groupId: 'merge-shards',
98+
});
99+
100+
// eslint-disable-next-line functional/no-loop-statements
101+
for (let i = 1; i <= 5; i++) {
102+
const shard = shardedWal.shard();
103+
shard.open();
104+
shard.append(`record-from-shard-${i}`);
105+
shard.close();
106+
}
107+
108+
shardedWal.finalize();
109+
110+
const finalFile = path.join(
111+
testDir,
112+
shardedWal.groupId,
113+
`merged.merge-shards.json`,
114+
);
115+
const content = fs.readFileSync(finalFile, 'utf8');
116+
const records = JSON.parse(content.trim());
117+
expect(records).toHaveLength(5);
118+
expect(records[0]).toBe('record-from-shard-1');
119+
expect(records[4]).toBe('record-from-shard-5');
120+
});
121+
122+
it('should handle invalid entries during if debug true', () => {
123+
shardedWal = new ShardedWal({
124+
debug: true,
125+
dir: testDir,
126+
format: makeMockFormat({
127+
baseName: 'test',
128+
}),
129+
coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR,
130+
groupId: 'invalid-entries',
131+
});
132+
133+
const shard = shardedWal.shard();
134+
shard.open();
135+
shard.append('valid1');
136+
shard.append('invalid');
137+
shard.append('valid2');
138+
shard.close();
139+
140+
shardedWal.finalize();
141+
// When debug is true, lastRecover should contain recovery results
142+
expect(shardedWal.stats.lastRecover).toHaveLength(1);
143+
expect(shardedWal.stats.lastRecover[0]).toMatchObject({
144+
file: expect.stringContaining('test.'),
145+
result: expect.objectContaining({
146+
records: expect.arrayContaining(['valid1', 'invalid', 'valid2']),
147+
errors: [],
148+
partialTail: null,
149+
}),
150+
});
151+
152+
const finalFile = path.join(
153+
testDir,
154+
shardedWal.groupId,
155+
`test.invalid-entries.json`,
156+
);
157+
const content = fs.readFileSync(finalFile, 'utf8');
158+
const records = JSON.parse(content.trim());
159+
expect(records).toEqual(['valid1', 'invalid', 'valid2']);
160+
});
161+
162+
it('should cleanup shard files after finalization', () => {
163+
shardedWal = new ShardedWal({
164+
debug: false,
165+
dir: testDir,
166+
format: makeMockFormat({
167+
baseName: 'cleanup-test',
168+
}),
169+
coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR,
170+
groupId: 'cleanup-test',
171+
});
172+
173+
const shard1 = shardedWal.shard();
174+
shard1.open();
175+
shard1.append('record1');
176+
shard1.close();
177+
178+
const shard2 = shardedWal.shard();
179+
shard2.open();
180+
shard2.append('record2');
181+
shard2.close();
182+
183+
shardedWal.finalize();
184+
185+
const finalFile = path.join(
186+
testDir,
187+
shardedWal.groupId,
188+
`cleanup-test.cleanup-test.json`,
189+
);
190+
expect(fs.existsSync(finalFile)).toBeTrue();
191+
192+
shardedWal.cleanupIfCoordinator();
193+
194+
const groupDir = path.join(testDir, shardedWal.groupId);
195+
const files = fs.readdirSync(groupDir);
196+
expect(files).not.toContain(expect.stringMatching(/cleanup-test.*\.log$/));
197+
expect(files).toContain(`cleanup-test.cleanup-test.json`);
198+
});
199+
200+
it('should use custom options in finalizer', () => {
201+
shardedWal = new ShardedWal({
202+
debug: false,
203+
dir: testDir,
204+
format: makeMockFormat({
205+
baseName: 'custom',
206+
finalizer: (records, opt) =>
207+
`${JSON.stringify({ records, metadata: opt })}\n`,
208+
}),
209+
coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR,
210+
groupId: 'custom-finalizer',
211+
});
212+
213+
const shard = shardedWal.shard();
214+
shard.open();
215+
shard.append('record1');
216+
shard.close();
217+
218+
shardedWal.finalize({ version: '2.0', timestamp: Date.now() });
219+
220+
const finalFile = path.join(
221+
testDir,
222+
shardedWal.groupId,
223+
`custom.custom-finalizer.json`,
224+
);
225+
const content = fs.readFileSync(finalFile, 'utf8');
226+
const result = JSON.parse(content.trim());
227+
expect(result.records).toEqual(['record1']);
228+
expect(result.metadata).toEqual({
229+
version: '2.0',
230+
timestamp: expect.any(Number),
231+
});
232+
});
233+
234+
it('should handle empty shards correctly', () => {
235+
shardedWal = new ShardedWal({
236+
debug: false,
237+
dir: testDir,
238+
format: makeMockFormat({
239+
baseName: 'empty',
240+
}),
241+
coordinatorIdEnvVar: PROFILER_SHARDER_ID_ENV_VAR,
242+
groupId: 'empty-shards',
243+
});
244+
245+
const groupDir = path.join(testDir, shardedWal.groupId);
246+
fs.mkdirSync(groupDir, { recursive: true });
247+
248+
shardedWal.finalize();
249+
250+
const finalFile = path.join(
251+
testDir,
252+
shardedWal.groupId,
253+
`empty.${shardedWal.groupId}.json`,
254+
);
255+
expect(fs.existsSync(finalFile)).toBeTrue();
256+
const content = fs.readFileSync(finalFile, 'utf8');
257+
expect(content.trim()).toBe('[]');
258+
});
259+
});

0 commit comments

Comments
 (0)