Skip to content

Commit cbf4da2

Browse files
feat: NATS storage integration
Added proper SSL support
1 parent 6f3d64e commit cbf4da2

File tree

9 files changed

+331
-25
lines changed

9 files changed

+331
-25
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ config.production.json
203203
config.apache-kafka-no-auth.json
204204
config.aws-msk-iam.json
205205
test-config-confluent.json
206+
certs/*
206207
*.key
207208
*.pem
208209
*.crt

README.md

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,20 @@ npx superstream-kafka-analyzer
7070
```bash
7171
# Using a configuration file
7272
npx superstream-kafka-analyzer --config config.json
73+
74+
# Using a configuration file and uploading results to NATS
75+
npx superstream-kafka-analyzer --config config.json --nats-config nats-config.json
7376
```
7477

78+
### NATS Config Mode (Upload to NATS)
79+
80+
```bash
81+
# Run normal analysis but upload results to NATS instead of generating files
82+
npx superstream-kafka-analyzer --config config.json --nats-config nats-config.json
83+
```
84+
85+
This mode works like the normal configuration file mode, but instead of generating local files, it uploads the analysis results to NATS Object Store.
86+
7587
### Configuration File Examples
7688

7789
**Available Examples:**
@@ -95,6 +107,7 @@ The full list is under the `./config-examples/` folder:
95107
- [Generic OAuth](config-examples/config.example.generic-oauth.json) - Generic OAuth provider
96108
- [With Timestamp](config-examples/config.example.with-timestamp.json) - Include timestamp in filenames
97109
- [Without Timestamp](config-examples/config.example.without-timestamp.json) - No timestamp in filenames
110+
- [NATS Config](config-examples/config.example.nats-config.json) - configuration for NATS connectivity
98111

99112
**Basic Configuration** (`config.example.json`):
100113
```json
@@ -139,6 +152,18 @@ The full list is under the `./config-examples/` folder:
139152
}
140153
```
141154

155+
**NATS Only Configuration** (`config.example.nats-config.json`):
156+
```json
157+
{
158+
"servers": ["nats://localhost:4222"],
159+
"jwt": "your_jwt_token_here",
160+
"nkey": "your_nkey_seed_here",
161+
"account": "your_account_name",
162+
"bucket": "kafka-analysis-test",
163+
"objectName": "analysis-result-test"
164+
}
165+
```
166+
142167
**AWS MSK with SCRAM** (`config.example.aws-msk.json`):
143168
```json
144169
{
@@ -218,14 +243,11 @@ The full list is under the `./config-examples/` folder:
218243
"brokers": ["kafka-xxxxx-aiven-kafka.aivencloud.com:12345"],
219244
"clientId": "superstream-analyzer",
220245
"vendor": "aiven",
221-
"useSasl": true,
222-
"sasl": {
223-
"mechanism": "SCRAM-SHA-256",
224-
"username": "avnadmin",
225-
"password": "YOUR_AVNADMIN_PASSWORD"
226-
},
246+
"useSasl": false,
227247
"ssl": {
228-
"ca": "./path/to/ca.pem"
248+
"ca": "path/to/ca.pem",
249+
"cert": "path/to/service.cert",
250+
"key": "path/to/service.key"
229251
}
230252
},
231253
"file": {
@@ -268,6 +290,7 @@ The full list is under the `./config-examples/` folder:
268290
| Option | Description | Default |
269291
|--------|-------------|---------|
270292
| `--config <path>` | Path to configuration file | - |
293+
| `--nats-config <path>` | Path to NATS configuration file (uploads to NATS data store instead of generating files) | - |
271294

272295
## 🔐 Security Protocols
273296

bin/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ program
1313
.description('Interactive utility to analyze Kafka clusters health and configuration')
1414
.version('1.0.0')
1515
.option('-c, --config <path>', 'Path to configuration file')
16+
.option('--nats-config <path>', 'Path to NATS configuration file (uploads to NATS instead of generating files)')
1617
.option('-b, --bootstrap-servers <servers>', 'Comma-separated list of Kafka bootstrap servers')
1718
.option('-v, --verbose', 'Enable verbose logging')
1819
.option('-t, --timeout <seconds>', 'Connection timeout in seconds', '30')

config-examples/config.example.aiven-kafka.json

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@
33
"brokers": ["kafka-xxxxx-aiven-kafka.aivencloud.com:12345"],
44
"clientId": "superstream-analyzer",
55
"vendor": "aiven",
6-
"useSasl": true,
7-
"sasl": {
8-
"mechanism": "SCRAM-SHA-256",
9-
"username": "avnadmin",
10-
"password": "YOUR_AVNADMIN_PASSWORD"
11-
},
6+
"useSasl": false,
127
"ssl": {
13-
"ca": "/PATH/TO/YOUR/ca.pem"
8+
"ca": "path/to/ca.pem",
9+
"cert": "path/to/service.cert",
10+
"key": "path/to/service.key"
1411
}
1512
},
1613
"file": {
@@ -19,4 +16,4 @@
1916
"includeMetadata": true
2017
},
2118
"email": "user@example.com"
22-
}
19+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"servers": ["nats://localhost:4222"],
3+
"jwt": "eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJLVlZMSUJaUkpFNTNHSFZSQ0JWQUtJMjRRQ0hFQTJJSllKTEpCU0xWMzdZTFpZQlNTUTNBIiwiaWF0IjoxNzU0Mjc4MjUxLCJpc3MiOiJBQVlXTUI0S0RQUUtPNTdJS1M3MlhBWVVCV1hWTDZMWlhHTkczRjQzT1UzUlg1WVpFSU9LUlZSMiIsIm5hbWUiOiIyMjM2NzE5OTAiLCJzdWIiOiJVQ01JQ0dMNEMyVTdFWkhTQU9aWkVIWE5OTkhORjVaRFpYT0lWU0JVWkdDNVgzT0U0NkJTSjdOSCIsIm5hdHMiOnsicHViIjp7fSwic3ViIjp7fSwic3VicyI6LTEsImRhdGEiOi0xLCJwYXlsb2FkIjotMSwidHlwZSI6InVzZXIiLCJ2ZXJzaW9uIjoyfX0.s_FEjZ2b4ww_4mRcwpvGJBo6Oou_Migp32nqcgfOogzGYLOS6Zcp3ocCZAtHoPB4cBapOW2grXB8nEsPEhq2BA",
4+
"nkey": "SUALTRX34EJFH2DTLSJQD2VRYRZYZVEPMECWCJTPJ3334KNUX2ZAWOZC3M",
5+
"account": "internal",
6+
"bucket": "kafka-analysis-reports",
7+
"objectName": "analysis-223671990-1"
8+
}

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
},
2020
"scripts": {
2121
"start": "node bin/index.js",
22+
"nats-config": "node bin/index.js --config config-examples/config.example.json --nats-config config-examples/config.example.nats-config.json",
2223
"test": "jest",
2324
"lint": "eslint src/",
2425
"format": "prettier --write src/"
@@ -38,6 +39,7 @@
3839
"jsonwebtoken": "^9.0.2",
3940
"jwks-rsa": "^3.1.0",
4041
"kafkajs": "^2.2.4",
42+
"nats": "^2.19.0",
4143
"ora": "^5.4.1",
4244
"simple-oauth2": "^5.1.0",
4345
"superstream-kafka-analyzer": "^1.0.13"

src/cli.js

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ const { Validators } = require('./validators');
1111
const { HealthChecker } = require('./health-checker');
1212
const { displayValidationResults, displayTopicSummary } = require('./utils');
1313
const { SupabaseAnalytics } = require('./analytics');
14+
const { NATSObjectStore } = require('./nats-object-store');
15+
const { log } = require('console');
1416

1517
class CLI {
1618
constructor(options = {}) {
@@ -21,12 +23,31 @@ class CLI {
2123
};
2224
this.kafkaClient = null;
2325
this.fileService = null;
26+
this.natsObjectStore = null;
2427
this.analytics = new SupabaseAnalytics();
2528

2629
// Handle bootstrap-servers option mapping to brokers
2730
if (options.bootstrapServers && !options.brokers) {
2831
this.options.brokers = options.bootstrapServers;
2932
}
33+
34+
// Setup graceful shutdown
35+
this.setupGracefulShutdown();
36+
}
37+
38+
setupGracefulShutdown() {
39+
process.on('SIGINT', async () => {
40+
console.log(chalk.yellow('\n\n🛑 Received SIGINT, shutting down gracefully...'));
41+
42+
// Stop NATS monitoring if active
43+
if (this.natsObjectStore) {
44+
this.natsObjectStore.stopMonitoring();
45+
await this.natsObjectStore.disconnect();
46+
}
47+
48+
console.log(chalk.green('✅ Graceful shutdown completed'));
49+
process.exit(0);
50+
});
3051
}
3152

3253
async loadConfigFromFile(configPath) {
@@ -115,6 +136,39 @@ class CLI {
115136
}
116137
}
117138

139+
async loadNATSConfig(natsConfigPath) {
140+
try {
141+
const fullPath = path.resolve(natsConfigPath);
142+
if (!fs.existsSync(fullPath)) {
143+
throw new Error(`NATS config file not found: ${fullPath}`);
144+
}
145+
146+
const configContent = fs.readFileSync(fullPath, 'utf8');
147+
const natsConfig = JSON.parse(configContent);
148+
149+
// Create NATS object store
150+
this.natsObjectStore = new NATSObjectStore(natsConfig);
151+
152+
// Connect to NATS
153+
const connected = await this.natsObjectStore.connect();
154+
if (!connected) {
155+
throw new Error('Failed to connect to NATS');
156+
}
157+
158+
// Setup object store bucket
159+
const bucketReady = await this.natsObjectStore.getOrCreateBucket();
160+
if (!bucketReady) {
161+
throw new Error('Failed to setup Object Store bucket');
162+
}
163+
164+
console.log(chalk.green(`✅ NATS configuration loaded from: ${fullPath}`));
165+
return true;
166+
} catch (error) {
167+
console.error(chalk.red(`❌ Failed to load NATS config file: ${error.message}`));
168+
return false;
169+
}
170+
}
171+
118172
async promptForConfig() {
119173
console.log(chalk.blue('\n🚀 Superstream Kafka Analyzer\n'));
120174
console.log(chalk.gray('Configure your analysis settings:\n'));
@@ -177,6 +231,7 @@ class CLI {
177231

178232
// Authentication configuration based on vendor
179233
let saslConfig = null;
234+
let sslConfig = null;
180235
if (vendorAnswer.vendor === 'aws-msk') {
181236
console.log(chalk.yellow('\n🔐 AWS MSK Authentication'));
182237
const authType = await inquirer.prompt([
@@ -268,27 +323,50 @@ class CLI {
268323
{
269324
type: 'input',
270325
name: 'username',
271-
message: 'Username:',
326+
message: 'Username (leave empty is using SSL only):',
272327
validate: (input) => {
273-
if (!input.trim()) return 'Username is required for Aiven';
274328
return true;
275329
}
276330
},
277331
{
278332
type: 'password',
279333
name: 'password',
280-
message: 'Password:',
334+
message: 'Password (leave empty if using SSL only):',
281335
validate: (input) => {
282-
if (!input.trim()) return 'Password is required for Aiven';
283336
return true;
284337
}
338+
},
339+
{
340+
type: 'input',
341+
name: 'caPath',
342+
message: 'CA Certificate path (optional, e.g., "./certs/ca.pem"):',
343+
default: './certs/ca.pem'
344+
},
345+
{
346+
type: 'input',
347+
name: 'certPath',
348+
message: 'Client Certificate path (optional, eg. "./certs/service.cert"):',
349+
default: './certs/service.cert',
350+
},
351+
{
352+
type: 'input',
353+
name: 'keyPath',
354+
message: 'Client Private Key path (optional, eg. "./certs/service.key"):',
355+
default: './certs/service.key',
285356
}
286357
]);
358+
287359
saslConfig = {
288360
mechanism: 'scram-sha-256',
289361
username: aivenAnswers.username,
290362
password: aivenAnswers.password
291363
};
364+
365+
sslConfig = {
366+
ca: aivenAnswers.caPath,
367+
cert: aivenAnswers.certPath,
368+
key: aivenAnswers.keyPath
369+
};
292370
} else {
293371
// For other vendors, ask if SASL is needed
294372
const useSasl = await inquirer.prompt([
@@ -346,7 +424,8 @@ class CLI {
346424
brokers: kafkaAnswers.brokers.split(',').map(broker => broker.trim()), // Convert string to array
347425
vendor: vendorAnswer.vendor,
348426
useSasl: !!saslConfig,
349-
sasl: saslConfig
427+
sasl: saslConfig,
428+
ssl: sslConfig
350429
};
351430

352431
// File Output Configuration
@@ -417,6 +496,7 @@ class CLI {
417496

418497
// Use Commander.js options
419498
const configPath = this.options.config;
499+
const natsConfigPath = this.options.natsConfig;
420500

421501
if (configPath) {
422502
console.log(chalk.gray(`Debug: Loading config from: ${configPath}`));
@@ -430,6 +510,16 @@ class CLI {
430510
await this.promptForConfig();
431511
}
432512

513+
// Load NATS config if provided
514+
if (natsConfigPath) {
515+
console.log(chalk.gray(`Debug: Loading NATS config from: ${natsConfigPath}`));
516+
const natsConfigLoaded = await this.loadNATSConfig(natsConfigPath);
517+
if (!natsConfigLoaded) {
518+
process.exit(1);
519+
}
520+
console.log(chalk.gray('Debug: NATS config loaded successfully'));
521+
}
522+
433523
// Initialize services without validation
434524
console.log(chalk.yellow('\n⚠️ Validation skipped - proceeding directly to analysis'));
435525
this.kafkaClient = new KafkaClient(this.config.kafka);
@@ -582,6 +672,16 @@ class CLI {
582672
if (healthResults) {
583673
analysisResults.healthChecks = healthResults;
584674
}
675+
676+
// Store results in NATS
677+
if (this.options.natsConfig) {
678+
spinner.text = 'Saving results to NATS...';
679+
spinner.render();
680+
await this.natsObjectStore.storeObject(analysisResults);
681+
spinner.stop();
682+
console.log(chalk.green('\n✅ Analysis completed and stored to NATS!'));
683+
process.exit(0);
684+
}
585685

586686
// Check if email is provided for file generation
587687
if (this.config.email && this.config.email.trim()) {

0 commit comments

Comments
 (0)