Skip to content

Commit 3fdb93d

Browse files
authored
feat(kafka): add support for custom certificates (#3321)
Hello there! In our company we use custom certificates to authenticate to our Kafka instances. Unfortunately PeerDB doesn't support uploading custom certificates for Kafka peers. This PR adds the possibility. The PR reuses the same logic as used for Clickhouse peers. Thank you for your work, review, feedback & eventual merge ❤️🙏 <img width="1320" height="1338" alt="image" src="https://github.com/user-attachments/assets/9c422c5f-8f75-4352-917e-74c4d0dc2390" />
1 parent 248bf3a commit 3fdb93d

File tree

5 files changed

+77
-2
lines changed

5 files changed

+77
-2
lines changed

flow/connectors/kafka/kafka.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package connkafka
33
import (
44
"context"
55
"crypto/tls"
6+
"crypto/x509"
67
"errors"
78
"fmt"
89
"log/slog"
@@ -75,7 +76,26 @@ func NewKafkaConnector(
7576
kgo.WithLogger(kgoLogger(logger)),
7677
)
7778
if !config.DisableTls {
78-
optionalOpts = append(optionalOpts, kgo.DialTLSConfig(&tls.Config{MinVersion: tls.VersionTLS13}))
79+
tlsSetting := &tls.Config{MinVersion: tls.VersionTLS12}
80+
if config.Certificate != nil || config.PrivateKey != nil {
81+
if config.Certificate == nil || config.PrivateKey == nil {
82+
return nil, errors.New("both certificate and private key must be provided if using certificate-based authentication")
83+
}
84+
cert, err := tls.X509KeyPair([]byte(*config.Certificate), []byte(*config.PrivateKey))
85+
if err != nil {
86+
return nil, fmt.Errorf("failed to parse provided certificate: %w", err)
87+
}
88+
tlsSetting.Certificates = []tls.Certificate{cert}
89+
}
90+
if config.RootCa != nil {
91+
caPool := x509.NewCertPool()
92+
if !caPool.AppendCertsFromPEM([]byte(*config.RootCa)) {
93+
return nil, errors.New("failed to parse provided root CA")
94+
}
95+
tlsSetting.RootCAs = caPool
96+
}
97+
98+
optionalOpts = append(optionalOpts, kgo.DialTLSConfig(tlsSetting))
7999
}
80100
switch config.Partitioner {
81101
case "LeastBackup":

nexus/analyzer/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,9 @@ fn parse_db_options(db_type: DbType, with_options: &[SqlOption]) -> anyhow::Resu
859859
.get("disable_tls")
860860
.and_then(|s| s.parse::<bool>().ok())
861861
.unwrap_or_default(),
862+
certificate: opts.get("certificate").map(|s| s.to_string()),
863+
private_key: opts.get("private_key").map(|s| s.to_string()),
864+
root_ca: opts.get("root_ca").map(|s| s.to_string()),
862865
};
863866
Config::KafkaConfig(kafka_config)
864867
}

protos/peers.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,9 @@ message KafkaConfig {
237237
string sasl = 4;
238238
bool disable_tls = 5;
239239
string partitioner = 6;
240+
optional string certificate = 7 [(peerdb_redacted) = true];
241+
optional string private_key = 8 [(peerdb_redacted) = true];
242+
optional string root_ca = 9 [(peerdb_redacted) = true];
240243
}
241244

242245
enum ElasticsearchAuthType {

ui/app/peers/create/[peerType]/helpers/ka.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,54 @@ export const kaSetting: PeerSetting[] = [
6161
tips: 'If you are using a non-TLS connection for Kafka server, check this box.',
6262
optional: true,
6363
},
64+
{
65+
label: 'Certificate',
66+
stateHandler: (value, setter) => {
67+
if (!value) {
68+
// remove key from state if empty
69+
setter((curr) => {
70+
const newCurr = { ...curr } as KafkaConfig;
71+
delete newCurr.certificate;
72+
return newCurr;
73+
});
74+
} else setter((curr) => ({ ...curr, certificate: value as string }));
75+
},
76+
type: 'file',
77+
optional: true,
78+
tips: 'This is only needed if the user is authenticated via certificate.',
79+
},
80+
{
81+
label: 'Private Key',
82+
stateHandler: (value, setter) => {
83+
if (!value) {
84+
// remove key from state if empty
85+
setter((curr) => {
86+
const newCurr = { ...curr } as KafkaConfig;
87+
delete newCurr.privateKey;
88+
return newCurr;
89+
});
90+
} else setter((curr) => ({ ...curr, privateKey: value as string }));
91+
},
92+
type: 'file',
93+
optional: true,
94+
tips: 'This is only needed if the user is authenticated via certificate.',
95+
},
96+
{
97+
label: 'Root Certificate',
98+
stateHandler: (value, setter) => {
99+
if (!value) {
100+
// remove key from state if empty
101+
setter((curr) => {
102+
const newCurr = { ...curr } as KafkaConfig;
103+
delete newCurr.rootCa;
104+
return newCurr;
105+
});
106+
} else setter((curr) => ({ ...curr, rootCa: value as string }));
107+
},
108+
type: 'file',
109+
optional: true,
110+
tips: 'If not provided, host CA roots will be used.',
111+
},
64112
];
65113

66114
export const blankKafkaSetting: KafkaConfig = {

ui/components/PeerForms/KafkaConfig.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { Switch } from '@/lib/Switch/Switch';
99
import { TextField } from '@/lib/TextField';
1010
import { Tooltip } from '@/lib/Tooltip';
1111
import ReactSelect from 'react-select';
12+
import { handleFieldChange } from "@/components/PeerForms/common";
1213

1314
interface KafkaProps {
1415
setter: PeerSetter;
@@ -99,7 +100,7 @@ export default function KafkaForm({ setter }: KafkaProps) {
99100
type={setting.type}
100101
defaultValue={setting.default}
101102
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
102-
setting.stateHandler(e.target.value, setter)
103+
handleFieldChange(e, setting, setter)
103104
}
104105
/>
105106
{setting.tips && (

0 commit comments

Comments
 (0)