Skip to content

Commit 1554360

Browse files
sdangoldreamorosi
andauthored
feat(parser): implemented a helper function Base64Encoded to decode base64 encoded payloads (#4413)
Co-authored-by: Andrea Amorosi <dreamorosi@gmail.com>
1 parent 51b344f commit 1554360

File tree

3 files changed

+199
-31
lines changed

3 files changed

+199
-31
lines changed

packages/parser/src/helpers/index.ts

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
1+
import { gunzipSync } from 'node:zlib';
2+
import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64';
13
import { type ZodType, z } from 'zod';
24

5+
const decoder = new TextDecoder();
6+
7+
const decompress = (data: string): string => {
8+
try {
9+
return JSON.parse(gunzipSync(fromBase64(data, 'base64')).toString('utf8'));
10+
} catch {
11+
return data;
12+
}
13+
};
14+
315
/**
416
* A helper function to parse a JSON string and validate it against a schema.
517
*
@@ -54,4 +66,53 @@ const JSONStringified = <T extends ZodType>(schema: T) =>
5466
})
5567
.pipe(schema);
5668

57-
export { JSONStringified };
69+
/**
70+
* A helper function to decode a Base64 string and validate it against a schema.
71+
*
72+
*
73+
* Use it for built-in schemas like `KinesisDataStreamRecordPayload` that have fields that are base64 encoded
74+
* and extend them with your custom schema.
75+
*
76+
* For example, if you have an event with a base64 encoded body similar to the following:
77+
*
78+
* ```json
79+
* {
80+
* // ... other fields
81+
* "data": "e3Rlc3Q6ICJ0ZXN0In0=",
82+
* }
83+
* ```
84+
*
85+
* You can extend any built-in schema with your custom schema using the `Base64Encoded` helper function.
86+
*
87+
* @example
88+
* ```typescript
89+
* import { Base64Encoded } from '@aws-lambda-powertools/parser/helpers';
90+
* import { KinesisDataStreamRecordPayload } from '@aws-lambda-powertools/parser/schemas/kinesis';
91+
* import { z } from 'zod';
92+
*
93+
* const extendedSchema = KinesisDataStreamRecordPayload.extend({
94+
* data: Base64Encoded(z.object({
95+
* test: z.string(),
96+
* }))
97+
* });
98+
* type _ExtendedKinesisDataStream = z.infer<typeof extendedSchema>;
99+
* ```
100+
*
101+
* @param schema - The schema to validate the Base 64 decoded value against
102+
*/
103+
const Base64Encoded = <T extends ZodType>(schema: T) =>
104+
z
105+
.string()
106+
.transform((data) => {
107+
const decompressed = decompress(data);
108+
const decoded = decoder.decode(fromBase64(data, 'base64'));
109+
try {
110+
// If data was not compressed, try to parse it as JSON otherwise it must be string
111+
return decompressed === data ? JSON.parse(decoded) : decompressed;
112+
} catch {
113+
return decoded;
114+
}
115+
})
116+
.pipe(schema);
117+
118+
export { JSONStringified, Base64Encoded };

packages/parser/src/schemas/kinesis.ts

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,16 @@
1-
import { gunzipSync } from 'node:zlib';
2-
import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64';
31
import { z } from 'zod';
2+
import { Base64Encoded } from '../helpers/index.js';
43
import type { KinesisDataStreamEvent } from '../types/schema.js';
54
import { DynamoDBStreamToKinesisRecord } from './dynamodb.js';
65

7-
const decoder = new TextDecoder();
8-
96
const KinesisDataStreamRecordPayload = z.object({
107
kinesisSchemaVersion: z.string(),
118
partitionKey: z.string(),
129
sequenceNumber: z.string(),
1310
approximateArrivalTimestamp: z.number(),
14-
data: z.string().transform((data) => {
15-
const decompressed = decompress(data);
16-
const decoded = decoder.decode(fromBase64(data, 'base64'));
17-
try {
18-
// If data was not compressed, try to parse it as JSON otherwise it must be string
19-
return decompressed === data ? JSON.parse(decoded) : decompressed;
20-
} catch {
21-
return decoded;
22-
}
23-
}),
11+
data: Base64Encoded(z.any()),
2412
});
2513

26-
const decompress = (data: string): string => {
27-
try {
28-
return JSON.parse(gunzipSync(fromBase64(data, 'base64')).toString('utf8'));
29-
} catch {
30-
return data;
31-
}
32-
};
33-
3414
const KinesisDataStreamRecord = z.object({
3515
eventSource: z.literal('aws:kinesis'),
3616
eventVersion: z.string(),
@@ -46,13 +26,7 @@ const KinesisDynamoDBStreamSchema = z.object({
4626
Records: z.array(
4727
KinesisDataStreamRecord.extend({
4828
kinesis: KinesisDataStreamRecordPayload.extend({
49-
data: z
50-
.string()
51-
.transform((data) => {
52-
const decoded = decoder.decode(fromBase64(data, 'base64'));
53-
return JSON.parse(decoded);
54-
})
55-
.pipe(DynamoDBStreamToKinesisRecord),
29+
data: Base64Encoded(z.any()).pipe(DynamoDBStreamToKinesisRecord),
5630
}),
5731
})
5832
),

packages/parser/tests/unit/helpers.test.ts

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1+
import { gzipSync } from 'node:zlib';
2+
import {
3+
KinesisDataStreamRecord,
4+
KinesisDataStreamRecordPayload,
5+
KinesisDataStreamSchema,
6+
} from 'src/schemas/kinesis.js';
17
import { describe, expect, it } from 'vitest';
28
import { z } from 'zod';
39
import { DynamoDBMarshalled } from '../../src/helpers/dynamodb.js';
4-
import { JSONStringified } from '../../src/helpers/index.js';
10+
import { Base64Encoded, JSONStringified } from '../../src/helpers/index.js';
511
import { AlbSchema } from '../../src/schemas/alb.js';
612
import {
713
DynamoDBStreamRecord,
@@ -14,6 +20,7 @@ import {
1420
import { SqsRecordSchema, SqsSchema } from '../../src/schemas/sqs.js';
1521
import type {
1622
DynamoDBStreamEvent,
23+
KinesisDataStreamEvent,
1724
SnsEvent,
1825
SqsEvent,
1926
} from '../../src/types/schema.js';
@@ -277,3 +284,129 @@ describe('Helper: DynamoDBMarshalled', () => {
277284
expect(() => extendedSchema.parse(event)).toThrow();
278285
});
279286
});
287+
288+
describe('Helper: Base64Encoded', () => {
289+
it('returns a valid base64 decoded object when passed an encoded object', () => {
290+
// Prepare
291+
const data = {
292+
body: Buffer.from(JSON.stringify(structuredClone(basePayload))).toString(
293+
'base64'
294+
),
295+
};
296+
297+
// Act
298+
const extendedSchema = envelopeSchema.extend({
299+
body: Base64Encoded(bodySchema),
300+
});
301+
302+
// Assess
303+
expect(extendedSchema.parse(data)).toStrictEqual({
304+
body: basePayload,
305+
});
306+
});
307+
308+
it('returns a valid base64 decoded object when passed a compressed object', () => {
309+
// Prepare
310+
const data = {
311+
body: Buffer.from(
312+
gzipSync(JSON.stringify(structuredClone(basePayload)))
313+
).toString('base64'),
314+
};
315+
316+
// Act
317+
const extendedSchema = envelopeSchema.extend({
318+
body: Base64Encoded(bodySchema),
319+
});
320+
321+
// Assess
322+
expect(extendedSchema.parse(data)).toStrictEqual({
323+
body: basePayload,
324+
});
325+
});
326+
327+
it('throws an error if the payload is does not match the schema', () => {
328+
// Prepare
329+
const data = {
330+
body: Buffer.from(
331+
JSON.stringify({ ...basePayload, email: 'invalid' })
332+
).toString('base64'),
333+
};
334+
335+
// Act
336+
const extendedSchema = envelopeSchema.extend({
337+
body: Base64Encoded(bodySchema),
338+
});
339+
340+
// Assess
341+
expect(() => extendedSchema.parse(data)).toThrow();
342+
});
343+
344+
it('throws an error if the payload is malformed', () => {
345+
// Prepare
346+
const data = {
347+
body: Buffer.from('{"foo": 1, }').toString('base64'),
348+
};
349+
350+
// Act
351+
const extendedSchema = envelopeSchema.extend({
352+
body: Base64Encoded(bodySchema),
353+
});
354+
355+
// Assess
356+
expect(() => extendedSchema.parse(data)).toThrow();
357+
});
358+
359+
it('throws an error if the base64 payload is malformed', () => {
360+
// Prepare
361+
const data = {
362+
body: 'invalid-base64-string',
363+
};
364+
365+
// Act
366+
const extendedSchema = envelopeSchema.extend({
367+
body: Base64Encoded(bodySchema),
368+
});
369+
370+
// Assess
371+
expect(() => extendedSchema.parse(data)).toThrow();
372+
});
373+
374+
it('parses extended KinesisDataStreamSchema', () => {
375+
// Prepare
376+
const testEvent = getTestEvent<KinesisDataStreamEvent>({
377+
eventsPath: 'kinesis',
378+
filename: 'stream',
379+
});
380+
const stringifiedBody = JSON.stringify(basePayload);
381+
testEvent.Records[0].kinesis.data =
382+
Buffer.from(stringifiedBody).toString('base64');
383+
testEvent.Records[1].kinesis.data =
384+
Buffer.from(stringifiedBody).toString('base64');
385+
386+
// Act
387+
const extendedSchema = KinesisDataStreamSchema.extend({
388+
Records: z.array(
389+
KinesisDataStreamRecord.extend({
390+
kinesis: KinesisDataStreamRecordPayload.extend({
391+
data: Base64Encoded(bodySchema),
392+
}),
393+
})
394+
),
395+
});
396+
397+
// Assess
398+
expect(extendedSchema.parse(testEvent)).toStrictEqual({
399+
...testEvent,
400+
Records: [
401+
{
402+
...testEvent.Records[0],
403+
kinesis: { ...testEvent.Records[0].kinesis, data: basePayload },
404+
},
405+
{
406+
...testEvent.Records[1],
407+
kinesis: { ...testEvent.Records[1].kinesis, data: basePayload },
408+
},
409+
],
410+
});
411+
});
412+
});

0 commit comments

Comments
 (0)