Skip to content

Feature/acl #209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions pkg/apis/acl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package apis

import (
"context"
"strings"
"time"
)

// ResourceType represents the type of Kafka resource
type ResourceType string

const (
ResourceTypeTopic ResourceType = "TOPIC"
ResourceTypeGroup ResourceType = "GROUP"
ResourceTypeCluster ResourceType = "CLUSTER"
ResourceTypeTransactionalID ResourceType = "TRANSACTIONAL_ID"
)

// Operation represents the type of operation allowed/denied
type Operation string

const (
OperationRead Operation = "READ"
OperationWrite Operation = "WRITE"
OperationCreate Operation = "CREATE"
OperationDelete Operation = "DELETE"
OperationAlter Operation = "ALTER"
OperationDescribe Operation = "DESCRIBE"
OperationClusterAction Operation = "CLUSTER_ACTION"
OperationAll Operation = "ALL"
)

// PermissionType represents whether the ACL allows or denies access
type PermissionType string

const (
PermissionAllow PermissionType = "ALLOW"
PermissionDeny PermissionType = "DENY"
)

// ACLEntry represents a single ACL rule
type ACLEntry struct {
Principal string `json:"principal"` // The user or group
ResourceType ResourceType `json:"resourceType"` // Type of resource
ResourceName string `json:"resourceName"` // Name of resource
PatternType string `json:"patternType"` // LITERAL or PREFIXED
Operation Operation `json:"operation"` // Type of operation
PermissionType PermissionType `json:"permissionType"` // Allow or Deny
Host string `json:"host"` // Host from which access is allowed
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}

// ACLCollection represents a collection of ACL entries
type ACLCollection struct {
ACLs []ACLEntry
}

// ACLChecker interface for ACL plugins.
type ACLChecker interface {
// CheckACL checks if a given request is allowed based on configured ACL rules.
CheckACL(ctx context.Context, APIKey int, topics []string) (bool, []string, error)
}

// ACLCheckerFactory interface for creating ACL checkers.
type ACLCheckerFactory interface {
New(ACLCollection) (ACLChecker, error)
}

// ACLDecision represents the result of ACL evaluation
type ACLDecision struct {
Allowed bool
Reason string
}

// ACLRequest represents a request to check permissions
type ACLRequest struct {
Principal string
ResourceType ResourceType
ResourceName string
Operation Operation
Host string
}

// EvaluateAccess checks if the requested operation is allowed
func (ac *ACLCollection) EvaluateAccess(req ACLRequest) ACLDecision {
// Quick check if there are no ACLs
if len(ac.ACLs) == 0 {
return ACLDecision{
Allowed: false,
Reason: "no ACLs defined",
}
}

// First pass: Look for explicit DENY rules (these take precedence)
for _, acl := range ac.ACLs {
if isDenyMatch(acl, req) {
return ACLDecision{
Allowed: false,
Reason: "explicitly denied by ACL",
}
}
}

// Second pass: Look for ALLOW rules
for _, acl := range ac.ACLs {
if isAllowMatch(acl, req) {
return ACLDecision{
Allowed: true,
Reason: "explicitly allowed by ACL",
}
}
}

// Default deny if no matching rules found
return ACLDecision{
Allowed: false,
Reason: "no matching allow rules found",
}
}

// isDenyMatch checks if an ACL entry explicitly denies access
func isDenyMatch(acl ACLEntry, req ACLRequest) bool {
if acl.PermissionType != PermissionDeny {
return false
}

return isMatch(acl, req)
}

// isAllowMatch checks if an ACL entry allows access
func isAllowMatch(acl ACLEntry, req ACLRequest) bool {
if acl.PermissionType != PermissionAllow {
return false
}

return isMatch(acl, req)
}

// isMatch performs the actual matching logic
func isMatch(acl ACLEntry, req ACLRequest) bool {
// Check Principal (using exact match or wildcard)
if acl.Principal != "*" && acl.Principal != req.Principal {
return false
}

// Check Resource Type
if acl.ResourceType != req.ResourceType {
return false
}

// Check Resource Name (using pattern matching)
switch acl.PatternType {
case "LITERAL":
if acl.ResourceName != req.ResourceName {
return false
}
case "PREFIXED":
if !strings.HasPrefix(req.ResourceName, acl.ResourceName) {
return false
}
}

// Check Operation (including ALL operation)
if acl.Operation != OperationAll && acl.Operation != req.Operation {
return false
}

// Check Host (using exact match or wildcard)
if acl.Host != "*" && acl.Host != req.Host {
return false
}

return true
}
64 changes: 64 additions & 0 deletions pkg/libs/acl-plugin/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package aclplugin

import (
"flag"
"fmt"
"strings"

"github.com/grepplabs/kafka-proxy/pkg/apis"
"github.com/grepplabs/kafka-proxy/pkg/registry"
)

func init() {
registry.NewComponentInterface(new(apis.ACLCheckerFactory))
registry.Register(new(Factory), "acl-plugin")
}

type pluginMeta struct {
rules []string
}

type Factory struct{}

func (f *Factory) New(params []string) (apis.ACLChecker, error) {
meta := &pluginMeta{}
fs := flag.NewFlagSet("acl-plugin settings", flag.ContinueOnError)
fs.Var(&stringArrayValue{&meta.rules}, "rule", "ACL rule (Operation,TopicPattern,Allow)")

if err := fs.Parse(params); err != nil {
return nil, err
}

rules, err := parseRules(meta.rules)
if err != nil {
return nil, err
}

return NewACLChecker(rules)
}

type stringArrayValue struct {
target *[]string
}

func (s *stringArrayValue) String() string {
return strings.Join(*s.target, ",")
}

func (s *stringArrayValue) Set(value string) error {
*s.target = append(*s.target, value)
return nil
}

func parseRules(ruleStrings []string) ([]apis.ACLRule, error) {
rules := make([]apis.ACLRule, len(ruleStrings))
for i, ruleStr := range ruleStrings {
parts := strings.Split(ruleStr, ",")
if len(parts) != 3 {
return nil, fmt.Errorf("invalid rule format: %s", ruleStr)
}
allow := parts[2] == "true"
rules[i] = apis.ACLRule{Operation: apis.Operation(parts[0]), Topic: parts[1], Allow: allow}
}
return rules, nil
}
62 changes: 62 additions & 0 deletions pkg/libs/acl-plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package aclplugin

import (
"context"
"regexp"

"github.com/grepplabs/kafka-proxy/pkg/apis"
"github.com/grepplabs/kafka-proxy/proxy/protocol"
)

type ACLCheckerImpl struct {
rules []apis.ACLRule
}

func NewACLChecker(rules []apis.ACLRule) (apis.ACLChecker, error) {
compiledRules := make([]apis.ACLRule, len(rules))
for i, rule := range rules {
if rule.Topic != "" {
re, err := regexp.Compile(rule.Topic)
if err != nil {
return nil, err
}
compiledRules[i] = apis.ACLRule{Operation: rule.Operation, Topic: rule.Topic, Allow: rule.Allow, Re: re}
} else {
compiledRules[i] = rule
}
}
return &ACLCheckerImpl{rules: compiledRules}, nil
}

func (a *ACLCheckerImpl) CheckACL(ctx context.Context, req *protocol.RequestKeyVersion, topics []string) (bool, []string, error) {
op := getOperationFromKey(req.ApiKey)
for _, topic := range topics {
anyMatched := false
for _, rule := range a.rules {
if (rule.Operation == apis.OperationAll || rule.Operation == op) &&
(rule.Topic == "" || (topic != "" && rule.Re.MatchString(topic))) {
anyMatched = true
if !rule.Allow {
return false, nil, nil
}
}
}
if !anyMatched {
return false, nil, nil
}
}
return true, nil, nil
}

func getOperationFromKey(apiKey int16) apis.Operation {
switch apiKey {
case 0:
return apis.OperationProduce
case 1:
return apis.OperationFetch
case 3:
return apis.OperationMetadata
default:
return apis.OperationAll
}
}
25 changes: 21 additions & 4 deletions proxy/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package proxy

import (
"errors"
"time"

"github.com/grepplabs/kafka-proxy/config"
"github.com/grepplabs/kafka-proxy/pkg/apis"
"github.com/grepplabs/kafka-proxy/proxy/protocol"
"time"
)

const (
Expand All @@ -16,9 +18,16 @@ const (
defaultReadTimeout = 30 * time.Second
minOpenRequests = 16

apiKeyProduce = int16(0)
apiKeySaslHandshake = int16(17)
apiKeyApiApiVersions = int16(18)
apiKeyProduce = int16(0)
apiKeyFetch = int16(1)
apiKeyListOffsets = int16(2)
apiKeyCreateTopics = int16(19)
apiKeyDeleteTopics = int16(20)
apiKeyDeleteRecords = int16(21)
apiKeySaslHandshake = int16(17)
apiKeyApiApiVersions = int16(18)
apiKeyAddPartitionsToTxn = int16(24)
apiKeyCreatePartitions = int16(37)

minRequestApiKey = int16(0) // 0 - Produce
maxRequestApiKey = int16(120) // so far 67 is the last (reserve some for the feature)
Expand Down Expand Up @@ -63,6 +72,8 @@ type processor struct {
brokerAddress string
// producer will never send request with acks=0
producerAcks0Disabled bool

acl *apis.ACLCollection
}

func newProcessor(cfg ProcessorConfig, brokerAddress string) *processor {
Expand Down Expand Up @@ -130,6 +141,7 @@ func (p *processor) RequestsLoop(dst DeadlineWriter, src DeadlineReaderWriter) (
localSasl: p.localSasl,
localSaslDone: false, // sequential processing - mutex is required
producerAcks0Disabled: p.producerAcks0Disabled,
acl: p.acl,
}

return ctx.requestsLoop(dst, src)
Expand All @@ -148,6 +160,9 @@ type RequestsLoopContext struct {
localSasl *LocalSasl
localSaslDone bool

aclChecker apis.ACLChecker
acl *apis.ACLCollection

producerAcks0Disabled bool
}

Expand Down Expand Up @@ -218,6 +233,7 @@ func (p *processor) ResponsesLoop(dst DeadlineWriter, src DeadlineReader) (readE
timeout: p.readTimeout,
brokerAddress: p.brokerAddress,
buf: make([]byte, p.responseBufferSize),
acl: p.acl,
}
return ctx.responsesLoop(dst, src)
}
Expand All @@ -229,6 +245,7 @@ type ResponsesLoopContext struct {
timeout time.Duration
brokerAddress string
buf []byte // bufSize
acl *apis.ACLCollection
}

type ResponseHandler interface {
Expand Down
Loading