Skip to content

Commit 207b558

Browse files
author
Matthias Klein
committed
Merge remote-tracking branch 'upstream/master' into master
2 parents 3766e8e + 7befa1f commit 207b558

File tree

11 files changed

+529
-119
lines changed

11 files changed

+529
-119
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ VERSION ?= $(shell git describe --tags --always --dirty)
88
GOPKGS = $(shell go list ./... | grep -v /vendor/)
99
BUILD_FLAGS ?=
1010
LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s
11-
TAG ?= "v0.2.5"
11+
TAG ?= "v0.2.6"
1212
GOARCH ?= amd64
1313
GOOS ?= linux
1414

README.md

Lines changed: 112 additions & 82 deletions
Large diffs are not rendered by default.

cmd/kafka-proxy/server.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package server
22

33
import (
44
"fmt"
5+
56
"github.com/grepplabs/kafka-proxy/config"
67
"github.com/grepplabs/kafka-proxy/proxy"
78
"github.com/oklog/run"
@@ -20,13 +21,14 @@ import (
2021
"time"
2122

2223
"errors"
24+
"strings"
25+
2326
"github.com/grepplabs/kafka-proxy/pkg/apis"
2427
localauth "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared"
2528
tokeninfo "github.com/grepplabs/kafka-proxy/plugin/token-info/shared"
2629
tokenprovider "github.com/grepplabs/kafka-proxy/plugin/token-provider/shared"
2730
"github.com/hashicorp/go-hclog"
2831
"github.com/hashicorp/go-plugin"
29-
"strings"
3032

3133
"github.com/grepplabs/kafka-proxy/pkg/registry"
3234
// built-in plugins
@@ -105,6 +107,14 @@ func initFlags() {
105107
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCipherSuites, "proxy-listener-cipher-suites", []string{}, "List of supported cipher suites")
106108
Server.Flags().StringSliceVar(&c.Proxy.TLS.ListenerCurvePreferences, "proxy-listener-curve-preferences", []string{}, "List of curve preferences")
107109

110+
Server.Flags().BoolVar(&c.Proxy.TLS.ClientCert.ValidateSubject, "proxy-listener-tls-client-cert-validate-subject", false, "Whether to validate client certificate subject")
111+
Server.Flags().StringVar(&c.Proxy.TLS.ClientCert.Subject.CommonName, "proxy-listener-tls-required-client-subject-common-name", "", "Required client certificate subject common name")
112+
Server.Flags().StringSliceVar(&c.Proxy.TLS.ClientCert.Subject.Country, "proxy-listener-tls-required-client-subject-country", []string{}, "Required client certificate subject country")
113+
Server.Flags().StringSliceVar(&c.Proxy.TLS.ClientCert.Subject.Province, "proxy-listener-tls-required-client-subject-province", []string{}, "Required client certificate subject province")
114+
Server.Flags().StringSliceVar(&c.Proxy.TLS.ClientCert.Subject.Locality, "proxy-listener-tls-required-client-subject-locality", []string{}, "Required client certificate subject locality")
115+
Server.Flags().StringSliceVar(&c.Proxy.TLS.ClientCert.Subject.Organization, "proxy-listener-tls-required-client-subject-organization", []string{}, "Required client certificate subject organization")
116+
Server.Flags().StringSliceVar(&c.Proxy.TLS.ClientCert.Subject.OrganizationalUnit, "proxy-listener-tls-required-client-subject-organizational-unit", []string{}, "Required client certificate subject organizational unit")
117+
108118
// local authentication plugin
109119
Server.Flags().BoolVar(&c.Auth.Local.Enable, "auth-local-enable", false, "Enable local SASL/PLAIN authentication performed by listener - SASL handshake will not be passed to kafka brokers")
110120
Server.Flags().StringVar(&c.Auth.Local.Command, "auth-local-command", "", "Path to authentication plugin binary")

cmd/plugin-auth-ldap/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,12 +252,12 @@ func main() {
252252
os.Exit(1)
253253
}
254254
if pluginMeta.bindDN != "" {
255-
logrus.Infof("user-search-base='%s',user-filter='%s'", pluginMeta.userSearchBase,pluginMeta.userFilter)
255+
logrus.Infof("user-search-base='%s',user-filter='%s'", pluginMeta.userSearchBase, pluginMeta.userFilter)
256256

257257
if pluginMeta.userSearchBase == "" {
258258
logrus.Errorf("user-search-base is required")
259259
}
260-
if !strings.Contains(pluginMeta.userFilter,UsernamePlaceholder) {
260+
if !strings.Contains(pluginMeta.userFilter, UsernamePlaceholder) {
261261
logrus.Errorf("user-filter must contain '%s' as username placeholder", UsernamePlaceholder)
262262
}
263263

config/config.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ package config
22

33
import (
44
"fmt"
5-
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
6-
"github.com/pkg/errors"
75
"net"
86
"net/url"
97
"strings"
108
"time"
9+
10+
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
11+
"github.com/pkg/errors"
1112
)
1213

1314
const defaultClientID = "kafka-proxy"
@@ -70,6 +71,17 @@ type Config struct {
7071
CAChainCertFile string
7172
ListenerCipherSuites []string
7273
ListenerCurvePreferences []string
74+
ClientCert struct {
75+
ValidateSubject bool
76+
Subject struct {
77+
CommonName string
78+
Country []string
79+
Province []string
80+
Locality []string
81+
Organization []string
82+
OrganizationalUnit []string
83+
}
84+
}
7385
}
7486
}
7587
Auth struct {

proxy/protocol/real_decoder.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -250,19 +250,14 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) {
250250
}
251251

252252
func (rd *realDecoder) getStringArray() ([]string, error) {
253-
if rd.remaining() < 4 {
254-
rd.off = len(rd.raw)
255-
return nil, ErrInsufficientData
256-
}
257-
n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
258-
rd.off += 4
253+
n, err := rd.getArrayLength()
259254

260-
if n == 0 {
261-
return nil, nil
255+
if err != nil {
256+
return nil, err
262257
}
263258

264-
if n < 0 {
265-
return nil, errInvalidArrayLength
259+
if n == -1 {
260+
return nil, nil
266261
}
267262

268263
ret := make([]string, n)

proxy/protocol/request_key_version.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,10 @@ func (r *RequestKeyVersion) ResponseHeaderVersion() int16 {
258258
return 0
259259
case 49: // AlterClientQuotas
260260
return 0
261+
case 50: // DescribeUserScramCredentials
262+
return 1
263+
case 51: // AlterUserScramCredentials
264+
return 1
261265
default:
262266
// throw new UnsupportedVersionException("Unsupported API key " + apiKey);
263267
return -1

proxy/proxy.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package proxy
33
import (
44
"crypto/tls"
55
"fmt"
6+
"net"
7+
"sync"
8+
69
"github.com/grepplabs/kafka-proxy/config"
710
"github.com/grepplabs/kafka-proxy/pkg/libs/util"
811
"github.com/sirupsen/logrus"
9-
"net"
10-
"sync"
1112
)
1213

1314
type ListenFunc func(cfg config.ListenerConfig) (l net.Listener, err error)

proxy/tls.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"crypto/tls"
55
"crypto/x509"
66
"encoding/pem"
7+
"fmt"
78
"io/ioutil"
89
"net"
10+
"sort"
911
"strings"
1012
"time"
1113

@@ -14,6 +16,22 @@ import (
1416
"github.com/pkg/errors"
1517
)
1618

19+
type clientCertSubjectField string
20+
21+
const (
22+
clientCertSubjectCommonName = "CN"
23+
clientCertSubjectCountry = "C"
24+
clientCertSubjectProvince = "S"
25+
clientCertSubjectLocality = "L"
26+
clientCertSubjectOrganization = "O"
27+
clientCertSubjectOrganizationalUnit = "OU"
28+
)
29+
30+
type clientCertExpectedData struct {
31+
fields map[clientCertSubjectField]string
32+
parts []string
33+
}
34+
1735
var (
1836
defaultCurvePreferences = []tls.CurveID{
1937
tls.CurveP256,
@@ -120,9 +138,142 @@ func newTLSListenerConfig(conf *config.Config) (*tls.Config, error) {
120138
cfg.ClientCAs = clientCAs
121139
cfg.ClientAuth = tls.RequireAndVerifyClientCert
122140
}
141+
142+
cfg.VerifyPeerCertificate = tlsClientCertVerificationFunc(conf)
143+
123144
return cfg, nil
124145
}
125146

147+
func tlsClientCertVerificationFunc(conf *config.Config) func([][]byte, [][]*x509.Certificate) error {
148+
expectedData := getClientCertExpectedData(conf)
149+
return func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
150+
if conf.Proxy.TLS.ClientCert.ValidateSubject {
151+
152+
if len(expectedData.fields) == 0 {
153+
return nil // nothing to validate
154+
}
155+
156+
for _, chain := range verifiedChains {
157+
for _, cert := range chain {
158+
159+
certificateAcceptable := true
160+
161+
for k, v := range expectedData.fields {
162+
switch k {
163+
case clientCertSubjectCommonName:
164+
if v != cert.Subject.CommonName {
165+
certificateAcceptable = false
166+
break
167+
}
168+
case clientCertSubjectCountry:
169+
currentValues := cert.Subject.Country
170+
sort.Strings(currentValues)
171+
if fmt.Sprintf("%v", currentValues) != v {
172+
certificateAcceptable = false
173+
break
174+
}
175+
case clientCertSubjectProvince:
176+
currentValues := cert.Subject.Province
177+
sort.Strings(currentValues)
178+
if fmt.Sprintf("%v", currentValues) != v {
179+
certificateAcceptable = false
180+
break
181+
}
182+
case clientCertSubjectLocality:
183+
currentValues := cert.Subject.Locality
184+
sort.Strings(currentValues)
185+
if fmt.Sprintf("%v", currentValues) != v {
186+
certificateAcceptable = false
187+
break
188+
}
189+
case clientCertSubjectOrganization:
190+
currentValues := cert.Subject.Organization
191+
sort.Strings(currentValues)
192+
if fmt.Sprintf("%v", currentValues) != v {
193+
certificateAcceptable = false
194+
break
195+
}
196+
case clientCertSubjectOrganizationalUnit:
197+
currentValues := cert.Subject.OrganizationalUnit
198+
sort.Strings(currentValues)
199+
if fmt.Sprintf("%v", currentValues) != v {
200+
certificateAcceptable = false
201+
break
202+
}
203+
}
204+
}
205+
206+
if certificateAcceptable {
207+
return nil
208+
}
209+
210+
}
211+
}
212+
213+
return fmt.Errorf("tls: no client certificate presented required subject '%s'", strings.Join(expectedData.parts, "/"))
214+
215+
}
216+
return nil
217+
}
218+
}
219+
220+
func getClientCertExpectedData(conf *config.Config) *clientCertExpectedData {
221+
222+
expectedFields := map[clientCertSubjectField]string{}
223+
expectedParts := []string{"s:"} // these are calculated here because the order is relevant to us
224+
values := []string{}
225+
226+
if conf.Proxy.TLS.ClientCert.Subject.CommonName != "" {
227+
expectedFields[clientCertSubjectCommonName] = conf.Proxy.TLS.ClientCert.Subject.CommonName
228+
expectedParts = append(expectedParts, fmt.Sprintf("%s=%s", clientCertSubjectCommonName, expectedFields[clientCertSubjectCommonName]))
229+
}
230+
values = removeEmptyStrings(conf.Proxy.TLS.ClientCert.Subject.Country)
231+
if len(values) > 0 {
232+
sort.Strings(values)
233+
expectedFields[clientCertSubjectCountry] = fmt.Sprintf("%v", values)
234+
expectedParts = append(expectedParts, fmt.Sprintf("%s=%s", clientCertSubjectCountry, expectedFields[clientCertSubjectCountry]))
235+
}
236+
values = removeEmptyStrings(conf.Proxy.TLS.ClientCert.Subject.Province)
237+
if len(values) > 0 {
238+
sort.Strings(values)
239+
expectedFields[clientCertSubjectProvince] = fmt.Sprintf("%v", values)
240+
expectedParts = append(expectedParts, fmt.Sprintf("%s=%s", clientCertSubjectProvince, expectedFields[clientCertSubjectProvince]))
241+
}
242+
values = removeEmptyStrings(conf.Proxy.TLS.ClientCert.Subject.Locality)
243+
if len(values) > 0 {
244+
sort.Strings(values)
245+
expectedFields[clientCertSubjectLocality] = fmt.Sprintf("%v", values)
246+
expectedParts = append(expectedParts, fmt.Sprintf("%s=%s", clientCertSubjectLocality, expectedFields[clientCertSubjectLocality]))
247+
}
248+
values = removeEmptyStrings(conf.Proxy.TLS.ClientCert.Subject.Organization)
249+
if len(values) > 0 {
250+
sort.Strings(values)
251+
expectedFields[clientCertSubjectOrganization] = fmt.Sprintf("%v", values)
252+
expectedParts = append(expectedParts, fmt.Sprintf("%s=%s", clientCertSubjectOrganization, expectedFields[clientCertSubjectOrganization]))
253+
}
254+
values = removeEmptyStrings(conf.Proxy.TLS.ClientCert.Subject.OrganizationalUnit)
255+
if len(values) > 0 {
256+
sort.Strings(values)
257+
expectedFields[clientCertSubjectOrganizationalUnit] = fmt.Sprintf("%v", values)
258+
expectedParts = append(expectedParts, fmt.Sprintf("%s=%s", clientCertSubjectOrganizationalUnit, expectedFields[clientCertSubjectOrganizationalUnit]))
259+
}
260+
return &clientCertExpectedData{
261+
parts: expectedParts,
262+
fields: expectedFields,
263+
}
264+
}
265+
266+
func removeEmptyStrings(input []string) []string {
267+
output := []string{}
268+
for _, value := range input {
269+
if value == "" {
270+
continue
271+
}
272+
output = append(output, value)
273+
}
274+
return output
275+
}
276+
126277
func getCipherSuites(enabledCipherSuites []string) ([]uint16, error) {
127278
suites := make([]uint16, 0)
128279
for _, suite := range enabledCipherSuites {

0 commit comments

Comments
 (0)