Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ config.production.json
config.apache-kafka-no-auth.json
config.aws-msk-iam.json
test-config-confluent.json
certs/*
*.key
*.pem
*.crt
Expand Down
11 changes: 4 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,11 @@ The full list is under the `./config-examples/` folder:
"brokers": ["kafka-xxxxx-aiven-kafka.aivencloud.com:12345"],
"clientId": "superstream-analyzer",
"vendor": "aiven",
"useSasl": true,
"sasl": {
"mechanism": "SCRAM-SHA-256",
"username": "avnadmin",
"password": "YOUR_AVNADMIN_PASSWORD"
},
"useSasl": false,
"ssl": {
"ca": "./path/to/ca.pem"
"ca": "path/to/ca.pem",
"cert": "path/to/service.cert",
"key": "path/to/service.key"
}
},
"file": {
Expand Down
1 change: 1 addition & 0 deletions bin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ program
.description('Interactive utility to analyze Kafka clusters health and configuration')
.version('1.0.0')
.option('-c, --config <path>', 'Path to configuration file')
.option('--nats-config <path>', 'Path to NATS configuration file (uploads to NATS instead of generating files)')
.option('-b, --bootstrap-servers <servers>', 'Comma-separated list of Kafka bootstrap servers')
.option('-v, --verbose', 'Enable verbose logging')
.option('-t, --timeout <seconds>', 'Connection timeout in seconds', '30')
Expand Down
13 changes: 5 additions & 8 deletions config-examples/config.example.aiven-kafka.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
"brokers": ["kafka-xxxxx-aiven-kafka.aivencloud.com:12345"],
"clientId": "superstream-analyzer",
"vendor": "aiven",
"useSasl": true,
"sasl": {
"mechanism": "SCRAM-SHA-256",
"username": "avnadmin",
"password": "YOUR_AVNADMIN_PASSWORD"
},
"useSasl": false,
"ssl": {
"ca": "/PATH/TO/YOUR/ca.pem"
"ca": "path/to/ca.pem",
"cert": "path/to/service.cert",
"key": "path/to/service.key"
}
},
"file": {
Expand All @@ -19,4 +16,4 @@
"includeMetadata": true
},
"email": "user@example.com"
}
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"jsonwebtoken": "^9.0.2",
"jwks-rsa": "^3.1.0",
"kafkajs": "^2.2.4",
"nats": "^2.19.0",
"ora": "^5.4.1",
"simple-oauth2": "^5.1.0",
"superstream-kafka-analyzer": "^1.0.13"
Expand Down
92 changes: 87 additions & 5 deletions src/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const { Validators } = require('./validators');
const { HealthChecker } = require('./health-checker');
const { displayValidationResults, displayTopicSummary } = require('./utils');
const { SupabaseAnalytics } = require('./analytics');
const { NATSObjectStore } = require('./nats-object-store');
const { log } = require('console');

class CLI {
constructor(options = {}) {
Expand All @@ -21,6 +23,7 @@ class CLI {
};
this.kafkaClient = null;
this.fileService = null;
this.natsObjectStore = null;
this.analytics = new SupabaseAnalytics();

// Handle bootstrap-servers option mapping to brokers
Expand Down Expand Up @@ -115,6 +118,39 @@ class CLI {
}
}

async loadNATSConfig(natsConfigPath) {
try {
const fullPath = path.resolve(natsConfigPath);
if (!fs.existsSync(fullPath)) {
throw new Error(`NATS config file not found: ${fullPath}`);
}

const configContent = fs.readFileSync(fullPath, 'utf8');
const natsConfig = JSON.parse(configContent);

// Create NATS object store
this.natsObjectStore = new NATSObjectStore(natsConfig);

// Connect to NATS
const connected = await this.natsObjectStore.connect();
if (!connected) {
throw new Error('Failed to connect to NATS');
}

// Setup object store bucket
const bucketReady = await this.natsObjectStore.getOrCreateBucket();
if (!bucketReady) {
throw new Error('Failed to setup Object Store bucket');
}

console.log(chalk.green(`✅ NATS configuration loaded from: ${fullPath}`));
return true;
} catch (error) {
console.error(chalk.red(`❌ Failed to load NATS config file: ${error.message}`));
return false;
}
}

async promptForConfig() {
console.log(chalk.blue('\n🚀 Superstream Kafka Analyzer\n'));
console.log(chalk.gray('Configure your analysis settings:\n'));
Expand Down Expand Up @@ -177,6 +213,7 @@ class CLI {

// Authentication configuration based on vendor
let saslConfig = null;
let sslConfig = null;
if (vendorAnswer.vendor === 'aws-msk') {
console.log(chalk.yellow('\n🔐 AWS MSK Authentication'));
const authType = await inquirer.prompt([
Expand Down Expand Up @@ -268,27 +305,50 @@ class CLI {
{
type: 'input',
name: 'username',
message: 'Username:',
message: 'Username (leave empty is using SSL only):',
validate: (input) => {
if (!input.trim()) return 'Username is required for Aiven';
return true;
}
},
{
type: 'password',
name: 'password',
message: 'Password:',
message: 'Password (leave empty if using SSL only):',
validate: (input) => {
if (!input.trim()) return 'Password is required for Aiven';
return true;
}
},
{
type: 'input',
name: 'caPath',
message: 'CA Certificate path (optional, e.g., "./certs/ca.pem"):',
default: './certs/ca.pem'
},
{
type: 'input',
name: 'certPath',
message: 'Client Certificate path (optional, eg. "./certs/service.cert"):',
default: './certs/service.cert',
},
{
type: 'input',
name: 'keyPath',
message: 'Client Private Key path (optional, eg. "./certs/service.key"):',
default: './certs/service.key',
}
]);

saslConfig = {
mechanism: 'scram-sha-256',
username: aivenAnswers.username,
password: aivenAnswers.password
};

sslConfig = {
ca: aivenAnswers.caPath,
cert: aivenAnswers.certPath,
key: aivenAnswers.keyPath
};
} else {
// For other vendors, ask if SASL is needed
const useSasl = await inquirer.prompt([
Expand Down Expand Up @@ -346,7 +406,8 @@ class CLI {
brokers: kafkaAnswers.brokers.split(',').map(broker => broker.trim()), // Convert string to array
vendor: vendorAnswer.vendor,
useSasl: !!saslConfig,
sasl: saslConfig
sasl: saslConfig,
ssl: sslConfig
};

// File Output Configuration
Expand Down Expand Up @@ -417,6 +478,7 @@ class CLI {

// Use Commander.js options
const configPath = this.options.config;
const natsConfigPath = this.options.natsConfig;

if (configPath) {
console.log(chalk.gray(`Debug: Loading config from: ${configPath}`));
Expand All @@ -430,6 +492,16 @@ class CLI {
await this.promptForConfig();
}

// Load NATS config if provided
if (natsConfigPath) {
console.log(chalk.gray(`Debug: Loading NATS config from: ${natsConfigPath}`));
const natsConfigLoaded = await this.loadNATSConfig(natsConfigPath);
if (!natsConfigLoaded) {
process.exit(1);
}
console.log(chalk.gray('Debug: NATS config loaded successfully'));
}

// Initialize services without validation
console.log(chalk.yellow('\n⚠️ Validation skipped - proceeding directly to analysis'));
this.kafkaClient = new KafkaClient(this.config.kafka);
Expand Down Expand Up @@ -582,6 +654,16 @@ class CLI {
if (healthResults) {
analysisResults.healthChecks = healthResults;
}

// Store results in NATS
if (this.options.natsConfig) {
spinner.text = 'Saving results to NATS...';
spinner.render();
await this.natsObjectStore.storeObject(analysisResults);
spinner.stop();
console.log(chalk.green('\n✅ Analysis completed and stored to NATS!'));
process.exit(0);
}

// Check if email is provided for file generation
if (this.config.email && this.config.email.trim()) {
Expand Down
42 changes: 37 additions & 5 deletions src/kafka-client.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { Kafka, logLevel } = require('kafkajs');
const { Kafka: ConfluentKafka } = require('@confluentinc/kafka-javascript').KafkaJS;
const fs = require('fs').promises;
const path = require('path');
const chalk = require('chalk');
const crypto = require('crypto');
const { generateAuthToken } = require('aws-msk-iam-sasl-signer-js');
Expand Down Expand Up @@ -35,6 +36,11 @@ class KafkaClient {
}
};

// If SSL keys are provided, disable SASL
if (this.config.ssl && (this.config.ssl.ca || this.config.ssl.cert || this.config.ssl.key)) {
this.config.useSasl = false;
}

// Handle authentication based on vendor and configuration
if (this.config.useSasl && this.config.sasl) {
const vendor = this.config.vendor;
Expand Down Expand Up @@ -175,7 +181,11 @@ class KafkaClient {

case 'aiven':
// Aiven uses SASL_SSL with SCRAM-SHA-256 or OAuth
kafkaConfig.ssl = await this.buildSslConfig();
console.log('🔐 Aiven detected - configuring SSL with certificates...');
if (this.config.ssl && (this.config.ssl.ca || this.config.ssl.cert || this.config.ssl.key)) {
kafkaConfig.ssl = await this.buildSslConfig();
}

if (mechanism === 'oauthbearer' && useOIDC) {
const oidcProvider = await createOIDCProvider('oidc', {
...this.config.sasl,
Expand Down Expand Up @@ -425,7 +435,9 @@ class KafkaClient {
case 'apache':
default:
// Apache Kafka - SSL depends on configuration
if (this.config.ssl !== false) {
if (this.config.ssl && (this.config.ssl.ca || this.config.ssl.cert || this.config.ssl.key)) {
kafkaConfig.ssl = await this.buildSslConfig();
} else if (this.config.ssl !== false) {
kafkaConfig.ssl = true; // Default to SSL for security
}

Expand Down Expand Up @@ -455,6 +467,10 @@ class KafkaClient {
// AWS MSK without SASL - still needs SSL
console.log('🔐 AWS MSK detected - enabling SSL for security');
kafkaConfig.ssl = true;
} else if (this.config.ssl && (this.config.ssl.ca || this.config.ssl.cert || this.config.ssl.key)) {
// Any vendor with SSL certificates - use SSL authentication
console.log(`🔐 ${this.config.vendor} detected with SSL certificates - configuring SSL authentication...`);
kafkaConfig.ssl = await this.buildSslConfig();
}

this.admin = new Kafka(kafkaConfig).admin();
Expand Down Expand Up @@ -877,28 +893,44 @@ class KafkaClient {
// Load certificate files if provided
if (this.config.ssl?.ca) {
try {
sslConfig.ca = await fs.readFile(this.config.ssl.ca);
const caPath = path.resolve(this.config.ssl.ca);
console.log(`🔐 Loading CA certificate from: ${caPath}`);
sslConfig.ca = await fs.readFile(caPath);
console.log('✅ CA certificate loaded successfully');
} catch (error) {
throw new Error(`Failed to read CA certificate file: ${error.message}`);
}
}

if (this.config.ssl?.cert) {
try {
sslConfig.cert = await fs.readFile(this.config.ssl.cert);
const certPath = path.resolve(this.config.ssl.cert);
console.log(`🔐 Loading client certificate from: ${certPath}`);
sslConfig.cert = await fs.readFile(certPath);
console.log('✅ Client certificate loaded successfully');
} catch (error) {
throw new Error(`Failed to read client certificate file: ${error.message}`);
}
}

if (this.config.ssl?.key) {
try {
sslConfig.key = await fs.readFile(this.config.ssl.key);
const keyPath = path.resolve(this.config.ssl.key);
console.log(`🔐 Loading client private key from: ${keyPath}`);
sslConfig.key = await fs.readFile(keyPath);
console.log('✅ Client private key loaded successfully');
} catch (error) {
throw new Error(`Failed to read client private key file: ${error.message}`);
}
}

// Log SSL configuration summary
console.log('🔐 SSL Configuration:');
console.log(` CA Certificate: ${this.config.ssl?.ca ? 'Loaded' : 'Not provided'}`);
console.log(` Client Certificate: ${this.config.ssl?.cert ? 'Loaded' : 'Not provided'}`);
console.log(` Client Private Key: ${this.config.ssl?.key ? 'Loaded' : 'Not provided'}`);
console.log(` Reject Unauthorized: ${sslConfig.rejectUnauthorized}`);

return sslConfig;
}

Expand Down
Loading