Skip to content
This repository was archived by the owner on Sep 18, 2025. It is now read-only.

Commit cf2e95d

Browse files
authored
Merge pull request #1 from recursethis/fork/main
Added Protobuf Support
2 parents f4a1a74 + c01b697 commit cf2e95d

File tree

11 files changed

+200
-10
lines changed

11 files changed

+200
-10
lines changed

README.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Datagen CLI
22

3-
This command line interface application allows you to take schemas defined in JSON (`.json`), Avro (`.avsc`), or SQL (`.sql`) and produce believable fake data to Kafka in JSON or Avro format or to Postgres.
3+
This command line interface application allows you to take schemas defined in JSON (`.json`), Avro (`.avsc`), or SQL (`.sql`) and produce believable fake data to Kafka in JSON or Protobuf format or Avro format or to Postgres.
44

55
The benefits of using this datagen tool are:
66
- You can specify what values are generated using the expansive [FakerJS API](https://fakerjs.dev/api/) to craft data that more faithfully imitates your use case. This allows you to more easily apply business logic downstream.
@@ -85,7 +85,7 @@ Fake Data Generator
8585
Options:
8686
-V, --version output the version number
8787
-s, --schema <char> Schema file to use
88-
-f, --format <char> The format of the produced data (choices: "json", "avro", "postgres", "webhook", default: "json")
88+
-f, --format <char> The format of the produced data (choices: "json", "avro", "postgres", "webhook", "proto", default: "json")
8989
-n, --number <char> Number of records to generate. For infinite records, use -1 (default: "10")
9090
-c, --clean Clean (delete) Kafka topics and schema subjects previously created
9191
-dr, --dry-run Dry run (no data will be produced to Kafka)
@@ -213,7 +213,11 @@ Here is the general syntax for a JSON input schema:
213213
{
214214
"_meta": {
215215
"topic": "<my kafka topic>",
216-
"key": "<field to be used for kafka record key>" ,
216+
"key": "<field to be used for kafka record key>" ,
217+
"proto": {
218+
"dir": "<directory with protobuf schemas>",
219+
"schema": "<protobfuf message schema name>"
220+
},
217221
"relationships": [
218222
{
219223
"topic": "<topic for dependent dataset>",

datagen.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ program
2323
.requiredOption('-s, --schema <char>', 'Schema file to use')
2424
.addOption(
2525
new Option('-f, --format <char>', 'The format of the produced data')
26-
.choices(['json', 'avro', 'postgres', 'webhook'])
26+
.choices(['json', 'avro', 'postgres', 'webhook', 'proto'])
2727
.default('json')
2828
)
2929
.addOption(

package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
},
4040
"dependencies": {
4141
"@avro/types": "^1.0.25",
42-
"@faker-js/faker": "^7.6.0",
42+
"@faker-js/faker": "^8.0.0",
4343
"@kafkajs/confluent-schema-registry": "^3.3.0",
4444
"@types/node": "^18.14.6",
4545
"arg": "^5.0.2",
@@ -53,7 +53,9 @@
5353
"dotenv": "^16.0.2",
5454
"kafkajs": "^2.2.3",
5555
"node-sql-parser": "^4.6.1",
56-
"pg": "^8.11.0"
56+
"pg": "^8.11.0",
57+
"protobufjs": "^6.11.4",
58+
"glob": "10.3.10"
5759
},
5860
"devDependencies": {
5961
"@types/jest": "^29.4.0",

src/formats/protoFormat.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { OutputFormat } from "./outputFormat";
2+
import protobuf from "protobufjs";
3+
import alert from "cli-alerts";
4+
import { globSync } from "glob";
5+
6+
export class ProtoFormat implements OutputFormat {
7+
private schemas: any = {};
8+
private schemaFiles: Set<string>;
9+
10+
static async getProtoSchemas(megaRecord: any, protoSchemaFiles: string[]) {
11+
12+
if (!protoSchemaFiles || protoSchemaFiles.length === 0) {
13+
protoSchemaFiles = [];
14+
protoSchemaFiles.push(...(await ProtoFormat.getProtoSchemaFiles(megaRecord)));
15+
}
16+
17+
const protoSchemas = {};
18+
const protoRoot = protobuf.loadSync(protoSchemaFiles);
19+
for (const topic in megaRecord) {
20+
21+
const protoSchema = {};
22+
try {
23+
protoSchema["messageType"] = protoRoot.lookupType(megaRecord[topic].schema);
24+
protoSchema["name"] = topic
25+
protoSchema["namespace"] = megaRecord[topic].schema
26+
27+
if (global.debug) {
28+
alert({
29+
type: `success`,
30+
name: `Proto Schema for topic ${topic}:`,
31+
msg: `\n ${JSON.stringify(protoSchema, null, 2)}`
32+
});
33+
}
34+
35+
protoSchemas[topic] = protoSchema;
36+
} catch (error) {
37+
alert({
38+
type: `error`,
39+
name: `protobuf lookup type error for schema ${megaRecord[topic].schema}`,
40+
msg: `${error}`
41+
});
42+
process.exit(1);
43+
44+
}
45+
}
46+
47+
return protoSchemas;
48+
}
49+
50+
static async getProtoSchemaFiles(megaRecord: any) {
51+
const protoFiles = new Set<string>();
52+
for (const topic in megaRecord) {
53+
(await ProtoFormat.getProtoSchemaFilesSync(megaRecord[topic].schemaDir)).forEach(file => protoFiles.add(file));
54+
}
55+
return protoFiles;
56+
}
57+
58+
static async getProtoSchemaFilesSync(directory: string) {
59+
if (!directory) {
60+
return [];
61+
}
62+
return globSync(directory + (directory.endsWith("/") ? "" : "/") + "**/*.proto");
63+
}
64+
65+
async register(megaRecord: any): Promise<void> {
66+
this.schemaFiles = await ProtoFormat.getProtoSchemaFiles(megaRecord);
67+
this.schemas = await ProtoFormat.getProtoSchemas(megaRecord, Array.from(this.schemaFiles));
68+
}
69+
70+
async encode(record: any, topic: string): Promise<Buffer> {
71+
const messageType = this.schemas[topic]['messageType'];
72+
73+
// check if the message is valid
74+
const error = messageType.verify(record);
75+
if (global.debug && error) {
76+
alert({
77+
type: `warning`,
78+
name: `${record} with ${this.schemas[topic]['namespace']} is not valid`,
79+
msg: `${error}`
80+
});
81+
}
82+
// if the message is not valid, convert plain object
83+
const message = error ? messageType.fromObject(record) : messageType.create(record);
84+
85+
return messageType.encode(message).finish();
86+
}
87+
}

src/kafkaDataGenerator.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import { AvroFormat } from './formats/avroFormat.js';
66
import { JsonFormat } from './formats/jsonFormat.js';
77
import sleep from './utils/sleep.js';
88
import asyncGenerator from './utils/asyncGenerator.js';
9+
import { accessRecordKey } from './utils/recordKey.js';
10+
import { ProtoFormat } from "./formats/protoFormat.js";
11+
912

1013
export default async function kafkaDataGenerator({
1114
format,
@@ -26,6 +29,8 @@ export default async function kafkaDataGenerator({
2629
outputFormat = await AvroFormat.create();
2730
} else if (format === 'json') {
2831
outputFormat = new JsonFormat();
32+
} else if (format === 'proto') {
33+
outputFormat = new ProtoFormat();
2934
}
3035

3136
producer = await KafkaProducer.create(outputFormat);
@@ -37,14 +42,18 @@ export default async function kafkaDataGenerator({
3742

3843
if (iteration === 0) {
3944
await producer?.prepare(megaRecord);
40-
if (global.debug && global.dryRun && format === 'avro') {
41-
await AvroFormat.getAvroSchemas(megaRecord);
45+
if (global.debug && global.dryRun) {
46+
if (format === 'avro') {
47+
await AvroFormat.getAvroSchemas(megaRecord);
48+
} else if(format === 'proto') {
49+
await ProtoFormat.getProtoSchemas(megaRecord, []);
50+
}
4251
}
4352
}
4453

4554
for (const topic in megaRecord) {
4655
for await (const record of megaRecord[topic].records) {
47-
let key = null;
56+
let key = accessRecordKey(megaRecord[topic].key, record)
4857
if (record[megaRecord[topic].key]) {
4958
key = record[megaRecord[topic].key];
5059
}

src/schemas/generateMegaRecord.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ export async function generateMegaRecord(schema: any) {
8282
});
8383
}
8484

85+
// specify the proto field for the topic
86+
if ("proto" in _meta) {
87+
megaRecord[topic]["schemaDir"] = _meta.proto.dir;
88+
megaRecord[topic]["schema"] = _meta.proto.schema;
89+
}
90+
8591
// for records that already exist, generate values
8692
// for every field that doesn't already have a value.
8793
megaRecord[topic]["key"] = _meta.key

src/utils/recordKey.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export function accessRecordKey(path: string, record: any): any {
2+
return path?.split('.').reduce((level, key) => level && level[key], record);
3+
}

tests/datagen.test.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ describe('Schema Parsing Tests', () => {
4141
expect(output).toContain('Dry run: Skipping record production...');
4242
expect(output).toContain('Stopping the data generator');
4343
});
44+
test('should parse json schema with proto definitions', () => {
45+
const schema = './tests/schema.json';
46+
const output = datagen(`-s ${schema} -n 2 -f proto`);
47+
expect(output).toContain('Parsing JSON schema...');
48+
expect(output).toContain('Dry run: Skipping record production...');
49+
expect(output).toContain('Stopping the data generator');
50+
});
4451
});
4552

4653

@@ -54,6 +61,24 @@ describe('Test missing schema file', () => {
5461
expect(error.status).toBe(1);
5562
}
5663
});
64+
test('should return error if proto schema is not defined', () => {
65+
const schema = './tests/iterationIndex.json'
66+
try {
67+
const output = datagen(`-s ${schema} -n 2 -f proto`);
68+
} catch (error) {
69+
expect(error.stdout.toString()).toContain(`Error: no such type`);
70+
expect(error.status).toBe(1);
71+
}
72+
});
73+
test('should return error if proto schema is not defined', () => {
74+
const schema = './tests/iterationIndex.json'
75+
try {
76+
const output = datagen(`-s ${schema} -n 2 -f proto`);
77+
} catch (error) {
78+
expect(error.stdout.toString()).toContain(`Error: no such type`);
79+
expect(error.status).toBe(1);
80+
}
81+
});
5782
});
5883

5984
describe('Test record size', () => {

tests/iterationIndex.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
{
22
"_meta": {
33
"topic": "air_quality",
4-
"key": "id"
4+
"key": "id",
5+
"proto": {
6+
"dir": "./tests",
7+
"schema": "datagen.dne"
8+
}
59
},
610
"id": "iteration.index",
711
"timestamp": "faker.date.between('2020-01-01T00:00:00.000Z', '2030-01-01T00:00:00.000Z')",

tests/schema.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
"_meta": {
44
"topic": "mz_datagen_users",
55
"key": "id",
6+
"proto": {
7+
"dir": "./tests",
8+
"schema": "datagen.User"
9+
},
610
"relationships": [
711
{
812
"topic": "mz_datagen_posts",
@@ -28,6 +32,10 @@
2832
"_meta": {
2933
"topic": "mz_datagen_posts",
3034
"key": "id",
35+
"proto": {
36+
"dir": "./tests",
37+
"schema": "datagen.Post"
38+
},
3139
"relationships": [
3240
{
3341
"topic": "mz_datagen_comments",
@@ -46,6 +54,10 @@
4654
"_meta": {
4755
"topic": "mz_datagen_comments",
4856
"key": "id",
57+
"proto": {
58+
"dir": "./tests",
59+
"schema": "datagen.Comment"
60+
},
4961
"relationships": [
5062
{
5163
"topic": "mz_datagen_users",

0 commit comments

Comments
 (0)