Skip to content

Commit 916998d

Browse files
feat: NATS storage integration
- Added proper SSL support CR fixes
1 parent 6f3d64e commit 916998d

File tree

8 files changed

+295
-25
lines changed

8 files changed

+295
-25
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ config.production.json
203203
config.apache-kafka-no-auth.json
204204
config.aws-msk-iam.json
205205
test-config-confluent.json
206+
certs/*
206207
*.key
207208
*.pem
208209
*.crt

README.md

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ npx superstream-kafka-analyzer
7070
```bash
7171
# Using a configuration file
7272
npx superstream-kafka-analyzer --config config.json
73+
74+
# Using a configuration file and uploading results to NATS
75+
npx superstream-kafka-analyzer --config config.json --nats-config nats-config.json
7376
```
7477

7578
### Configuration File Examples
@@ -95,6 +98,7 @@ The full list is under the `./config-examples/` folder:
9598
- [Generic OAuth](config-examples/config.example.generic-oauth.json) - Generic OAuth provider
9699
- [With Timestamp](config-examples/config.example.with-timestamp.json) - Include timestamp in filenames
97100
- [Without Timestamp](config-examples/config.example.without-timestamp.json) - No timestamp in filenames
101+
- [NATS Config](config-examples/config.example.nats-config.json) - configuration for NATS connectivity
98102

99103
**Basic Configuration** (`config.example.json`):
100104
```json
@@ -139,6 +143,18 @@ The full list is under the `./config-examples/` folder:
139143
}
140144
```
141145

146+
**NATS Only Configuration** (`config.example.nats-config.json`):
147+
```json
148+
{
149+
"servers": ["nats://localhost:4222"],
150+
"jwt": "your_jwt_token_here",
151+
"nkey": "your_nkey_seed_here",
152+
"account": "your_account_name",
153+
"bucket": "kafka-analysis-test",
154+
"objectName": "analysis-result-test"
155+
}
156+
```
157+
142158
**AWS MSK with SCRAM** (`config.example.aws-msk.json`):
143159
```json
144160
{
@@ -218,14 +234,11 @@ The full list is under the `./config-examples/` folder:
218234
"brokers": ["kafka-xxxxx-aiven-kafka.aivencloud.com:12345"],
219235
"clientId": "superstream-analyzer",
220236
"vendor": "aiven",
221-
"useSasl": true,
222-
"sasl": {
223-
"mechanism": "SCRAM-SHA-256",
224-
"username": "avnadmin",
225-
"password": "YOUR_AVNADMIN_PASSWORD"
226-
},
237+
"useSasl": false,
227238
"ssl": {
228-
"ca": "./path/to/ca.pem"
239+
"ca": "path/to/ca.pem",
240+
"cert": "path/to/service.cert",
241+
"key": "path/to/service.key"
229242
}
230243
},
231244
"file": {
@@ -268,6 +281,7 @@ The full list is under the `./config-examples/` folder:
268281
| Option | Description | Default |
269282
|--------|-------------|---------|
270283
| `--config <path>` | Path to configuration file | - |
284+
| `--nats-config <path>` | Path to NATS configuration file (uploads to NATS data store instead of generating files) | - |
271285

272286
## 🔐 Security Protocols
273287

bin/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ program
1313
.description('Interactive utility to analyze Kafka clusters health and configuration')
1414
.version('1.0.0')
1515
.option('-c, --config <path>', 'Path to configuration file')
16+
.option('--nats-config <path>', 'Path to NATS configuration file (uploads to NATS instead of generating files)')
1617
.option('-b, --bootstrap-servers <servers>', 'Comma-separated list of Kafka bootstrap servers')
1718
.option('-v, --verbose', 'Enable verbose logging')
1819
.option('-t, --timeout <seconds>', 'Connection timeout in seconds', '30')

config-examples/config.example.aiven-kafka.json

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@
33
"brokers": ["kafka-xxxxx-aiven-kafka.aivencloud.com:12345"],
44
"clientId": "superstream-analyzer",
55
"vendor": "aiven",
6-
"useSasl": true,
7-
"sasl": {
8-
"mechanism": "SCRAM-SHA-256",
9-
"username": "avnadmin",
10-
"password": "YOUR_AVNADMIN_PASSWORD"
11-
},
6+
"useSasl": false,
127
"ssl": {
13-
"ca": "/PATH/TO/YOUR/ca.pem"
8+
"ca": "path/to/ca.pem",
9+
"cert": "path/to/service.cert",
10+
"key": "path/to/service.key"
1411
}
1512
},
1613
"file": {
@@ -19,4 +16,4 @@
1916
"includeMetadata": true
2017
},
2118
"email": "user@example.com"
22-
}
19+
}

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"jsonwebtoken": "^9.0.2",
3939
"jwks-rsa": "^3.1.0",
4040
"kafkajs": "^2.2.4",
41+
"nats": "^2.19.0",
4142
"ora": "^5.4.1",
4243
"simple-oauth2": "^5.1.0",
4344
"superstream-kafka-analyzer": "^1.0.13"

src/cli.js

Lines changed: 87 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ const { Validators } = require('./validators');
1111
const { HealthChecker } = require('./health-checker');
1212
const { displayValidationResults, displayTopicSummary } = require('./utils');
1313
const { SupabaseAnalytics } = require('./analytics');
14+
const { NATSObjectStore } = require('./nats-object-store');
15+
const { log } = require('console');
1416

1517
class CLI {
1618
constructor(options = {}) {
@@ -21,6 +23,7 @@ class CLI {
2123
};
2224
this.kafkaClient = null;
2325
this.fileService = null;
26+
this.natsObjectStore = null;
2427
this.analytics = new SupabaseAnalytics();
2528

2629
// Handle bootstrap-servers option mapping to brokers
@@ -115,6 +118,39 @@ class CLI {
115118
}
116119
}
117120

121+
async loadNATSConfig(natsConfigPath) {
122+
try {
123+
const fullPath = path.resolve(natsConfigPath);
124+
if (!fs.existsSync(fullPath)) {
125+
throw new Error(`NATS config file not found: ${fullPath}`);
126+
}
127+
128+
const configContent = fs.readFileSync(fullPath, 'utf8');
129+
const natsConfig = JSON.parse(configContent);
130+
131+
// Create NATS object store
132+
this.natsObjectStore = new NATSObjectStore(natsConfig);
133+
134+
// Connect to NATS
135+
const connected = await this.natsObjectStore.connect();
136+
if (!connected) {
137+
throw new Error('Failed to connect to NATS');
138+
}
139+
140+
// Setup object store bucket
141+
const bucketReady = await this.natsObjectStore.getOrCreateBucket();
142+
if (!bucketReady) {
143+
throw new Error('Failed to setup Object Store bucket');
144+
}
145+
146+
console.log(chalk.green(`✅ NATS configuration loaded from: ${fullPath}`));
147+
return true;
148+
} catch (error) {
149+
console.error(chalk.red(`❌ Failed to load NATS config file: ${error.message}`));
150+
return false;
151+
}
152+
}
153+
118154
async promptForConfig() {
119155
console.log(chalk.blue('\n🚀 Superstream Kafka Analyzer\n'));
120156
console.log(chalk.gray('Configure your analysis settings:\n'));
@@ -177,6 +213,7 @@ class CLI {
177213

178214
// Authentication configuration based on vendor
179215
let saslConfig = null;
216+
let sslConfig = null;
180217
if (vendorAnswer.vendor === 'aws-msk') {
181218
console.log(chalk.yellow('\n🔐 AWS MSK Authentication'));
182219
const authType = await inquirer.prompt([
@@ -268,27 +305,50 @@ class CLI {
268305
{
269306
type: 'input',
270307
name: 'username',
271-
message: 'Username:',
308+
message: 'Username (leave empty is using SSL only):',
272309
validate: (input) => {
273-
if (!input.trim()) return 'Username is required for Aiven';
274310
return true;
275311
}
276312
},
277313
{
278314
type: 'password',
279315
name: 'password',
280-
message: 'Password:',
316+
message: 'Password (leave empty if using SSL only):',
281317
validate: (input) => {
282-
if (!input.trim()) return 'Password is required for Aiven';
283318
return true;
284319
}
320+
},
321+
{
322+
type: 'input',
323+
name: 'caPath',
324+
message: 'CA Certificate path (optional, e.g., "./certs/ca.pem"):',
325+
default: './certs/ca.pem'
326+
},
327+
{
328+
type: 'input',
329+
name: 'certPath',
330+
message: 'Client Certificate path (optional, eg. "./certs/service.cert"):',
331+
default: './certs/service.cert',
332+
},
333+
{
334+
type: 'input',
335+
name: 'keyPath',
336+
message: 'Client Private Key path (optional, eg. "./certs/service.key"):',
337+
default: './certs/service.key',
285338
}
286339
]);
340+
287341
saslConfig = {
288342
mechanism: 'scram-sha-256',
289343
username: aivenAnswers.username,
290344
password: aivenAnswers.password
291345
};
346+
347+
sslConfig = {
348+
ca: aivenAnswers.caPath,
349+
cert: aivenAnswers.certPath,
350+
key: aivenAnswers.keyPath
351+
};
292352
} else {
293353
// For other vendors, ask if SASL is needed
294354
const useSasl = await inquirer.prompt([
@@ -346,7 +406,8 @@ class CLI {
346406
brokers: kafkaAnswers.brokers.split(',').map(broker => broker.trim()), // Convert string to array
347407
vendor: vendorAnswer.vendor,
348408
useSasl: !!saslConfig,
349-
sasl: saslConfig
409+
sasl: saslConfig,
410+
ssl: sslConfig
350411
};
351412

352413
// File Output Configuration
@@ -417,6 +478,7 @@ class CLI {
417478

418479
// Use Commander.js options
419480
const configPath = this.options.config;
481+
const natsConfigPath = this.options.natsConfig;
420482

421483
if (configPath) {
422484
console.log(chalk.gray(`Debug: Loading config from: ${configPath}`));
@@ -430,6 +492,16 @@ class CLI {
430492
await this.promptForConfig();
431493
}
432494

495+
// Load NATS config if provided
496+
if (natsConfigPath) {
497+
console.log(chalk.gray(`Debug: Loading NATS config from: ${natsConfigPath}`));
498+
const natsConfigLoaded = await this.loadNATSConfig(natsConfigPath);
499+
if (!natsConfigLoaded) {
500+
process.exit(1);
501+
}
502+
console.log(chalk.gray('Debug: NATS config loaded successfully'));
503+
}
504+
433505
// Initialize services without validation
434506
console.log(chalk.yellow('\n⚠️ Validation skipped - proceeding directly to analysis'));
435507
this.kafkaClient = new KafkaClient(this.config.kafka);
@@ -582,6 +654,16 @@ class CLI {
582654
if (healthResults) {
583655
analysisResults.healthChecks = healthResults;
584656
}
657+
658+
// Store results in NATS
659+
if (this.options.natsConfig) {
660+
spinner.text = 'Saving results to NATS...';
661+
spinner.render();
662+
await this.natsObjectStore.storeObject(analysisResults);
663+
spinner.stop();
664+
console.log(chalk.green('\n✅ Analysis completed and stored to NATS!'));
665+
process.exit(0);
666+
}
585667

586668
// Check if email is provided for file generation
587669
if (this.config.email && this.config.email.trim()) {

src/kafka-client.js

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const { Kafka, logLevel } = require('kafkajs');
22
const { Kafka: ConfluentKafka } = require('@confluentinc/kafka-javascript').KafkaJS;
33
const fs = require('fs').promises;
4+
const path = require('path');
45
const chalk = require('chalk');
56
const crypto = require('crypto');
67
const { generateAuthToken } = require('aws-msk-iam-sasl-signer-js');
@@ -35,6 +36,11 @@ class KafkaClient {
3536
}
3637
};
3738

39+
// If SSL keys are provided, disable SASL
40+
if (this.config.ssl && (this.config.ssl.ca || this.config.ssl.cert || this.config.ssl.key)) {
41+
this.config.useSasl = false;
42+
}
43+
3844
// Handle authentication based on vendor and configuration
3945
if (this.config.useSasl && this.config.sasl) {
4046
const vendor = this.config.vendor;
@@ -175,7 +181,11 @@ class KafkaClient {
175181

176182
case 'aiven':
177183
// Aiven uses SASL_SSL with SCRAM-SHA-256 or OAuth
178-
kafkaConfig.ssl = await this.buildSslConfig();
184+
console.log('🔐 Aiven detected - configuring SSL with certificates...');
185+
if (this.config.ssl && (this.config.ssl.ca || this.config.ssl.cert || this.config.ssl.key)) {
186+
kafkaConfig.ssl = await this.buildSslConfig();
187+
}
188+
179189
if (mechanism === 'oauthbearer' && useOIDC) {
180190
const oidcProvider = await createOIDCProvider('oidc', {
181191
...this.config.sasl,
@@ -425,7 +435,9 @@ class KafkaClient {
425435
case 'apache':
426436
default:
427437
// Apache Kafka - SSL depends on configuration
428-
if (this.config.ssl !== false) {
438+
if (this.config.ssl && (this.config.ssl.ca || this.config.ssl.cert || this.config.ssl.key)) {
439+
kafkaConfig.ssl = await this.buildSslConfig();
440+
} else if (this.config.ssl !== false) {
429441
kafkaConfig.ssl = true; // Default to SSL for security
430442
}
431443

@@ -455,6 +467,10 @@ class KafkaClient {
455467
// AWS MSK without SASL - still needs SSL
456468
console.log('🔐 AWS MSK detected - enabling SSL for security');
457469
kafkaConfig.ssl = true;
470+
} else if (this.config.ssl && (this.config.ssl.ca || this.config.ssl.cert || this.config.ssl.key)) {
471+
// Any vendor with SSL certificates - use SSL authentication
472+
console.log(`🔐 ${this.config.vendor} detected with SSL certificates - configuring SSL authentication...`);
473+
kafkaConfig.ssl = await this.buildSslConfig();
458474
}
459475

460476
this.admin = new Kafka(kafkaConfig).admin();
@@ -877,28 +893,44 @@ class KafkaClient {
877893
// Load certificate files if provided
878894
if (this.config.ssl?.ca) {
879895
try {
880-
sslConfig.ca = await fs.readFile(this.config.ssl.ca);
896+
const caPath = path.resolve(this.config.ssl.ca);
897+
console.log(`🔐 Loading CA certificate from: ${caPath}`);
898+
sslConfig.ca = await fs.readFile(caPath);
899+
console.log('✅ CA certificate loaded successfully');
881900
} catch (error) {
882901
throw new Error(`Failed to read CA certificate file: ${error.message}`);
883902
}
884903
}
885904

886905
if (this.config.ssl?.cert) {
887906
try {
888-
sslConfig.cert = await fs.readFile(this.config.ssl.cert);
907+
const certPath = path.resolve(this.config.ssl.cert);
908+
console.log(`🔐 Loading client certificate from: ${certPath}`);
909+
sslConfig.cert = await fs.readFile(certPath);
910+
console.log('✅ Client certificate loaded successfully');
889911
} catch (error) {
890912
throw new Error(`Failed to read client certificate file: ${error.message}`);
891913
}
892914
}
893915

894916
if (this.config.ssl?.key) {
895917
try {
896-
sslConfig.key = await fs.readFile(this.config.ssl.key);
918+
const keyPath = path.resolve(this.config.ssl.key);
919+
console.log(`🔐 Loading client private key from: ${keyPath}`);
920+
sslConfig.key = await fs.readFile(keyPath);
921+
console.log('✅ Client private key loaded successfully');
897922
} catch (error) {
898923
throw new Error(`Failed to read client private key file: ${error.message}`);
899924
}
900925
}
901926

927+
// Log SSL configuration summary
928+
console.log('🔐 SSL Configuration:');
929+
console.log(` CA Certificate: ${this.config.ssl?.ca ? 'Loaded' : 'Not provided'}`);
930+
console.log(` Client Certificate: ${this.config.ssl?.cert ? 'Loaded' : 'Not provided'}`);
931+
console.log(` Client Private Key: ${this.config.ssl?.key ? 'Loaded' : 'Not provided'}`);
932+
console.log(` Reject Unauthorized: ${sslConfig.rejectUnauthorized}`);
933+
902934
return sslConfig;
903935
}
904936

0 commit comments

Comments
 (0)