Skip to content

Commit a9d2594

Browse files
feat: NATS storage integration
- Added proper SSL support
1 parent 6f3d64e commit a9d2594

File tree

9 files changed

+313
-25
lines changed

9 files changed

+313
-25
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ config.production.json
203203
config.apache-kafka-no-auth.json
204204
config.aws-msk-iam.json
205205
test-config-confluent.json
206+
certs/*
206207
*.key
207208
*.pem
208209
*.crt

README.md

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,20 @@ npx superstream-kafka-analyzer
7070
```bash
7171
# Using a configuration file
7272
npx superstream-kafka-analyzer --config config.json
73+
74+
# Using a configuration file and uploading results to NATS
75+
npx superstream-kafka-analyzer --config config.json --nats-config nats-config.json
7376
```
7477

78+
### NATS Config Mode (Upload to NATS)
79+
80+
```bash
81+
# Run normal analysis but upload results to NATS instead of generating files
82+
npx superstream-kafka-analyzer --config config.json --nats-config nats-config.json
83+
```
84+
85+
This mode works like the normal configuration file mode, but instead of generating local files, it uploads the analysis results to NATS Object Store.
86+
7587
### Configuration File Examples
7688

7789
**Available Examples:**
@@ -95,6 +107,7 @@ The full list is under the `./config-examples/` folder:
95107
- [Generic OAuth](config-examples/config.example.generic-oauth.json) - Generic OAuth provider
96108
- [With Timestamp](config-examples/config.example.with-timestamp.json) - Include timestamp in filenames
97109
- [Without Timestamp](config-examples/config.example.without-timestamp.json) - No timestamp in filenames
110+
- [NATS Config](config-examples/config.example.nats-config.json) - configuration for NATS connectivity
98111

99112
**Basic Configuration** (`config.example.json`):
100113
```json
@@ -139,6 +152,18 @@ The full list is under the `./config-examples/` folder:
139152
}
140153
```
141154

155+
**NATS Only Configuration** (`config.example.nats-config.json`):
156+
```json
157+
{
158+
"servers": ["nats://localhost:4222"],
159+
"jwt": "your_jwt_token_here",
160+
"nkey": "your_nkey_seed_here",
161+
"account": "your_account_name",
162+
"bucket": "kafka-analysis-test",
163+
"objectName": "analysis-result-test"
164+
}
165+
```
166+
142167
**AWS MSK with SCRAM** (`config.example.aws-msk.json`):
143168
```json
144169
{
@@ -218,14 +243,11 @@ The full list is under the `./config-examples/` folder:
218243
"brokers": ["kafka-xxxxx-aiven-kafka.aivencloud.com:12345"],
219244
"clientId": "superstream-analyzer",
220245
"vendor": "aiven",
221-
"useSasl": true,
222-
"sasl": {
223-
"mechanism": "SCRAM-SHA-256",
224-
"username": "avnadmin",
225-
"password": "YOUR_AVNADMIN_PASSWORD"
226-
},
246+
"useSasl": false,
227247
"ssl": {
228-
"ca": "./path/to/ca.pem"
248+
"ca": "path/to/ca.pem",
249+
"cert": "path/to/service.cert",
250+
"key": "path/to/service.key"
229251
}
230252
},
231253
"file": {
@@ -268,6 +290,7 @@ The full list is under the `./config-examples/` folder:
268290
| Option | Description | Default |
269291
|--------|-------------|---------|
270292
| `--config <path>` | Path to configuration file | - |
293+
| `--nats-config <path>` | Path to NATS configuration file (uploads to NATS data store instead of generating files) | - |
271294

272295
## 🔐 Security Protocols
273296

bin/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ program
1313
.description('Interactive utility to analyze Kafka clusters health and configuration')
1414
.version('1.0.0')
1515
.option('-c, --config <path>', 'Path to configuration file')
16+
.option('--nats-config <path>', 'Path to NATS configuration file (uploads to NATS instead of generating files)')
1617
.option('-b, --bootstrap-servers <servers>', 'Comma-separated list of Kafka bootstrap servers')
1718
.option('-v, --verbose', 'Enable verbose logging')
1819
.option('-t, --timeout <seconds>', 'Connection timeout in seconds', '30')

config-examples/config.example.aiven-kafka.json

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@
33
"brokers": ["kafka-xxxxx-aiven-kafka.aivencloud.com:12345"],
44
"clientId": "superstream-analyzer",
55
"vendor": "aiven",
6-
"useSasl": true,
7-
"sasl": {
8-
"mechanism": "SCRAM-SHA-256",
9-
"username": "avnadmin",
10-
"password": "YOUR_AVNADMIN_PASSWORD"
11-
},
6+
"useSasl": false,
127
"ssl": {
13-
"ca": "/PATH/TO/YOUR/ca.pem"
8+
"ca": "path/to/ca.pem",
9+
"cert": "path/to/service.cert",
10+
"key": "path/to/service.key"
1411
}
1512
},
1613
"file": {
@@ -19,4 +16,4 @@
1916
"includeMetadata": true
2017
},
2118
"email": "user@example.com"
22-
}
19+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"servers": ["nats://localhost:4222"],
3+
"jwt": "nats-jwt-token",
4+
"nkey": "nats-nkey-seed",
5+
"account": "internal",
6+
"bucket": "kafka-analysis-reports",
7+
"objectName": "selected-object-name"
8+
}

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
},
2020
"scripts": {
2121
"start": "node bin/index.js",
22+
"nats-config": "node bin/index.js --config config-examples/config.example.json --nats-config config-examples/config.example.nats-config.json",
2223
"test": "jest",
2324
"lint": "eslint src/",
2425
"format": "prettier --write src/"
@@ -38,6 +39,7 @@
3839
"jsonwebtoken": "^9.0.2",
3940
"jwks-rsa": "^3.1.0",
4041
"kafkajs": "^2.2.4",
42+
"nats": "^2.19.0",
4143
"ora": "^5.4.1",
4244
"simple-oauth2": "^5.1.0",
4345
"superstream-kafka-analyzer": "^1.0.13"

src/cli.js

Lines changed: 87 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ const { Validators } = require('./validators');
1111
const { HealthChecker } = require('./health-checker');
1212
const { displayValidationResults, displayTopicSummary } = require('./utils');
1313
const { SupabaseAnalytics } = require('./analytics');
14+
const { NATSObjectStore } = require('./nats-object-store');
15+
const { log } = require('console');
1416

1517
class CLI {
1618
constructor(options = {}) {
@@ -21,6 +23,7 @@ class CLI {
2123
};
2224
this.kafkaClient = null;
2325
this.fileService = null;
26+
this.natsObjectStore = null;
2427
this.analytics = new SupabaseAnalytics();
2528

2629
// Handle bootstrap-servers option mapping to brokers
@@ -115,6 +118,39 @@ class CLI {
115118
}
116119
}
117120

121+
async loadNATSConfig(natsConfigPath) {
122+
try {
123+
const fullPath = path.resolve(natsConfigPath);
124+
if (!fs.existsSync(fullPath)) {
125+
throw new Error(`NATS config file not found: ${fullPath}`);
126+
}
127+
128+
const configContent = fs.readFileSync(fullPath, 'utf8');
129+
const natsConfig = JSON.parse(configContent);
130+
131+
// Create NATS object store
132+
this.natsObjectStore = new NATSObjectStore(natsConfig);
133+
134+
// Connect to NATS
135+
const connected = await this.natsObjectStore.connect();
136+
if (!connected) {
137+
throw new Error('Failed to connect to NATS');
138+
}
139+
140+
// Setup object store bucket
141+
const bucketReady = await this.natsObjectStore.getOrCreateBucket();
142+
if (!bucketReady) {
143+
throw new Error('Failed to setup Object Store bucket');
144+
}
145+
146+
console.log(chalk.green(`✅ NATS configuration loaded from: ${fullPath}`));
147+
return true;
148+
} catch (error) {
149+
console.error(chalk.red(`❌ Failed to load NATS config file: ${error.message}`));
150+
return false;
151+
}
152+
}
153+
118154
async promptForConfig() {
119155
console.log(chalk.blue('\n🚀 Superstream Kafka Analyzer\n'));
120156
console.log(chalk.gray('Configure your analysis settings:\n'));
@@ -177,6 +213,7 @@ class CLI {
177213

178214
// Authentication configuration based on vendor
179215
let saslConfig = null;
216+
let sslConfig = null;
180217
if (vendorAnswer.vendor === 'aws-msk') {
181218
console.log(chalk.yellow('\n🔐 AWS MSK Authentication'));
182219
const authType = await inquirer.prompt([
@@ -268,27 +305,50 @@ class CLI {
268305
{
269306
type: 'input',
270307
name: 'username',
271-
message: 'Username:',
308+
message: 'Username (leave empty is using SSL only):',
272309
validate: (input) => {
273-
if (!input.trim()) return 'Username is required for Aiven';
274310
return true;
275311
}
276312
},
277313
{
278314
type: 'password',
279315
name: 'password',
280-
message: 'Password:',
316+
message: 'Password (leave empty if using SSL only):',
281317
validate: (input) => {
282-
if (!input.trim()) return 'Password is required for Aiven';
283318
return true;
284319
}
320+
},
321+
{
322+
type: 'input',
323+
name: 'caPath',
324+
message: 'CA Certificate path (optional, e.g., "./certs/ca.pem"):',
325+
default: './certs/ca.pem'
326+
},
327+
{
328+
type: 'input',
329+
name: 'certPath',
330+
message: 'Client Certificate path (optional, eg. "./certs/service.cert"):',
331+
default: './certs/service.cert',
332+
},
333+
{
334+
type: 'input',
335+
name: 'keyPath',
336+
message: 'Client Private Key path (optional, eg. "./certs/service.key"):',
337+
default: './certs/service.key',
285338
}
286339
]);
340+
287341
saslConfig = {
288342
mechanism: 'scram-sha-256',
289343
username: aivenAnswers.username,
290344
password: aivenAnswers.password
291345
};
346+
347+
sslConfig = {
348+
ca: aivenAnswers.caPath,
349+
cert: aivenAnswers.certPath,
350+
key: aivenAnswers.keyPath
351+
};
292352
} else {
293353
// For other vendors, ask if SASL is needed
294354
const useSasl = await inquirer.prompt([
@@ -346,7 +406,8 @@ class CLI {
346406
brokers: kafkaAnswers.brokers.split(',').map(broker => broker.trim()), // Convert string to array
347407
vendor: vendorAnswer.vendor,
348408
useSasl: !!saslConfig,
349-
sasl: saslConfig
409+
sasl: saslConfig,
410+
ssl: sslConfig
350411
};
351412

352413
// File Output Configuration
@@ -417,6 +478,7 @@ class CLI {
417478

418479
// Use Commander.js options
419480
const configPath = this.options.config;
481+
const natsConfigPath = this.options.natsConfig;
420482

421483
if (configPath) {
422484
console.log(chalk.gray(`Debug: Loading config from: ${configPath}`));
@@ -430,6 +492,16 @@ class CLI {
430492
await this.promptForConfig();
431493
}
432494

495+
// Load NATS config if provided
496+
if (natsConfigPath) {
497+
console.log(chalk.gray(`Debug: Loading NATS config from: ${natsConfigPath}`));
498+
const natsConfigLoaded = await this.loadNATSConfig(natsConfigPath);
499+
if (!natsConfigLoaded) {
500+
process.exit(1);
501+
}
502+
console.log(chalk.gray('Debug: NATS config loaded successfully'));
503+
}
504+
433505
// Initialize services without validation
434506
console.log(chalk.yellow('\n⚠️ Validation skipped - proceeding directly to analysis'));
435507
this.kafkaClient = new KafkaClient(this.config.kafka);
@@ -582,6 +654,16 @@ class CLI {
582654
if (healthResults) {
583655
analysisResults.healthChecks = healthResults;
584656
}
657+
658+
// Store results in NATS
659+
if (this.options.natsConfig) {
660+
spinner.text = 'Saving results to NATS...';
661+
spinner.render();
662+
await this.natsObjectStore.storeObject(analysisResults);
663+
spinner.stop();
664+
console.log(chalk.green('\n✅ Analysis completed and stored to NATS!'));
665+
process.exit(0);
666+
}
585667

586668
// Check if email is provided for file generation
587669
if (this.config.email && this.config.email.trim()) {

0 commit comments

Comments
 (0)