Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
4923115
feat(policy): cache entitlement policy in authz v2
jakedoublev May 28, 2025
c86d9d7
hook to run after services are started
jakedoublev May 28, 2025
12e2c4d
policy config
jakedoublev May 28, 2025
9096218
consume policy cache
jakedoublev May 28, 2025
0bfd2e3
lint fixes
jakedoublev May 28, 2025
603fb63
fix cache utilization in auth service and lower defaults
jakedoublev May 28, 2025
cc99503
lint fixes
jakedoublev May 28, 2025
5747db1
test: sleep to ensure roundtrip tests hit cached policy
jakedoublev May 28, 2025
281f41d
put back rttests
jakedoublev May 28, 2025
a5859e1
put back authz doing any caching
jakedoublev May 28, 2025
a14f9ae
set cache on relevant policy services and refactor to servicesStarted…
jakedoublev May 28, 2025
a668dd7
improve log
jakedoublev May 28, 2025
9505dd0
working cache
jakedoublev May 29, 2025
3ba044c
lint fixes
jakedoublev May 29, 2025
b1ec09a
fix limit/offset
jakedoublev May 29, 2025
eae73f6
rm debounce on write-triggered refresh
jakedoublev May 29, 2025
f1d2fbb
Merge branch 'main' into feat/authz-v2-cache-policy
jakedoublev May 29, 2025
5abe1fa
Merge branch 'main' into feat/authz-v2-cache-policy
jakedoublev May 30, 2025
ffa528c
rm mutation cache refreshes
jakedoublev May 30, 2025
e18c04a
disable caching by default
jakedoublev May 30, 2025
0103264
fixes
jakedoublev May 30, 2025
f0a56a6
ensure close is called to shut down db clients and caches
jakedoublev May 30, 2025
184cfff
lint fixes
jakedoublev May 30, 2025
f864a33
lint fixes
jakedoublev May 30, 2025
07d6849
Merge remote-tracking branch 'origin' into feat/authz-v2-cache-policy
jakedoublev Jun 2, 2025
a1f8ac6
ristretto cache/go-cache implementation
jakedoublev Jun 2, 2025
b7cd66c
lint fix
jakedoublev Jun 2, 2025
57873d8
fix shutdown panic
jakedoublev Jun 2, 2025
2253bf5
feat(authz): cache entitlement policy in auth service v2
jakedoublev Jun 2, 2025
1441506
update documented config
jakedoublev Jun 2, 2025
a275978
rm extraneous panic
jakedoublev Jun 2, 2025
f9b7618
lint fix
jakedoublev Jun 2, 2025
a554eda
lint fixes
jakedoublev Jun 2, 2025
4e178d3
rm extraneous log
jakedoublev Jun 2, 2025
7171195
Merge branch 'main' into feat/authz-v2-cache-authz
jakedoublev Jun 3, 2025
8434881
cache ERS responses
jakedoublev Jun 3, 2025
8178db9
Merge remote-tracking branch 'origin' into feat/authz-v2-cache-authz
jakedoublev Jun 4, 2025
67890ea
sample configs
jakedoublev Jun 5, 2025
040ea89
Merge remote-tracking branch 'origin' into feat/authz-v2-cache-authz
jakedoublev Jun 5, 2025
5d0e8e2
auth config
jakedoublev Jun 5, 2025
0715924
Merge remote-tracking branch 'origin' into feat/DSPX-1268-cache
jakedoublev Jun 17, 2025
7682258
checkpoint consuming new cache implementation
jakedoublev Jun 17, 2025
5f6ff8b
restore policy config
jakedoublev Jun 17, 2025
fd53190
checkpoint no generics
jakedoublev Jun 17, 2025
b01c4b9
Merge branch 'main' into feat/DSPX-1268-cache
jakedoublev Jun 17, 2025
a006315
rm caching from ERS
jakedoublev Jun 17, 2025
21ffa7c
Merge remote-tracking branch 'origin' into feat/DSPX-1268-cache
jakedoublev Jun 17, 2025
7a9b5dc
cleanup
jakedoublev Jun 17, 2025
258c27d
example configs
jakedoublev Jun 17, 2025
11391ea
lint fix
jakedoublev Jun 18, 2025
fac5e87
comment
jakedoublev Jun 18, 2025
e6265be
improvements
jakedoublev Jun 18, 2025
a3d8af4
Merge remote-tracking branch 'origin' into feat/DSPX-1268-cache
jakedoublev Jun 18, 2025
a858d95
Merge remote-tracking branch 'origin' into feat/DSPX-1268-cache
jakedoublev Jun 18, 2025
b09fda6
working
jakedoublev Jun 18, 2025
1ac03ef
do not log request containing entity identifier
jakedoublev Jun 18, 2025
d3a1160
tweaks
jakedoublev Jun 18, 2025
b1071e8
cleanup
jakedoublev Jun 18, 2025
91e491e
tweaks
jakedoublev Jun 18, 2025
455a155
working caching without services started hook
jakedoublev Jun 18, 2025
c3a0f28
rm deprecated services started hook
jakedoublev Jun 18, 2025
b25b50c
put back extra lines
jakedoublev Jun 18, 2025
1a179f5
put back extra lines
jakedoublev Jun 18, 2025
c4a9686
cleanup
jakedoublev Jun 18, 2025
a70e401
fill cache on first call if it's empty
jakedoublev Jun 18, 2025
e350d71
cleanup
jakedoublev Jun 18, 2025
d402fb4
put attributes back
jakedoublev Jun 18, 2025
facda6d
suggestion
jakedoublev Jun 20, 2025
bc41811
Merge remote-tracking branch 'origin' into feat/DSPX-1268-cache
jakedoublev Jun 20, 2025
ffc4ffe
cache behavior suggestion
jakedoublev Jun 20, 2025
bae127d
unit tests WIP
jakedoublev Jun 23, 2025
a746029
Merge remote-tracking branch 'origin' into feat/DSPX-1268-cache
jakedoublev Jun 23, 2025
2f19fa4
cache initialization improvement
jakedoublev Jun 23, 2025
557fdd0
sanity check authz cache tests
jakedoublev Jun 23, 2025
65c268f
lint fixes
jakedoublev Jun 23, 2025
80aaad2
feedback on expiry of test cache
jakedoublev Jun 23, 2025
7813c8b
restructure config and more tests
jakedoublev Jun 23, 2025
d546a61
lint fixes
jakedoublev Jun 23, 2025
7295181
lint fixes
jakedoublev Jun 23, 2025
8ec4d27
Merge remote-tracking branch 'origin' into feat/DSPX-1268-cache
jakedoublev Jun 23, 2025
dad639b
Merge remote-tracking branch 'origin' into feat/DSPX-1268-cache
jakedoublev Jun 23, 2025
1dcd7f8
lint fix
jakedoublev Jun 23, 2025
984329c
fix test flake
jakedoublev Jun 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions opentdf-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ services:
# policy is enabled by default in mode 'all'
# policy:
# enabled: true
# list_request_limit_default: 1000
# list_request_limit_max: 2500
# list_request_limit_default: 1000
# list_request_limit_max: 2500
# authorization:
# entitlement_policy_cache:
# enabled: false
# refresh_interval: 30s
server:
public_hostname: localhost
tls:
Expand Down
8 changes: 6 additions & 2 deletions opentdf-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ services:
# policy is enabled by default in mode 'all'
# policy:
# enabled: true
# list_request_limit_default: 1000
# list_request_limit_max: 2500
# list_request_limit_default: 1000
# list_request_limit_max: 2500
# authorization:
# entitlement_policy_cache:
# enabled: false
# refresh_interval: 30s
server:
auth:
enabled: true
Expand Down
92 changes: 74 additions & 18 deletions service/authorization/v2/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ package authorization
import (
"context"
"errors"
"fmt"
"log/slog"
"time"

"connectrpc.com/connect"
"github.com/creasty/defaults"
"github.com/go-viper/mapstructure/v2"
authzV2 "github.com/opentdf/platform/protocol/go/authorization/v2"
authzV2Connect "github.com/opentdf/platform/protocol/go/authorization/v2/authorizationv2connect"
otdf "github.com/opentdf/platform/sdk"
"github.com/opentdf/platform/service/internal/access/v2"
"github.com/opentdf/platform/service/logger"
"github.com/opentdf/platform/service/pkg/cache"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
Expand All @@ -23,41 +28,93 @@ type Service struct {
config *Config
logger *logger.Logger
trace.Tracer
cache *EntitlementPolicyCache
}

type Config struct{}

func NewRegistration() *serviceregistry.Service[authzV2Connect.AuthorizationServiceHandler] {
as := new(Service)

return &serviceregistry.Service[authzV2Connect.AuthorizationServiceHandler]{
Close: as.Close,
ServiceOptions: serviceregistry.ServiceOptions[authzV2Connect.AuthorizationServiceHandler]{
Namespace: "authorization",
Version: "v2",
ServiceDesc: &authzV2.AuthorizationService_ServiceDesc,
ConnectRPCFunc: authzV2Connect.NewAuthorizationServiceHandler,
RegisterFunc: func(srp serviceregistry.RegistrationParams) (authzV2Connect.AuthorizationServiceHandler, serviceregistry.HandlerServer) {
authZCfg := new(Config)

logger := srp.Logger
l := srp.Logger

as.sdk = srp.SDK
as.logger = logger
as.logger = l
as.config = authZCfg
as.Tracer = srp.Tracer

err := defaults.Set(authZCfg)
if err != nil {
l.Error("failed to set defaults for authorization service config", slog.Any("error", err))
panic(fmt.Errorf("failed to set defaults for authorization service config: %w", err))
}

// Only decode config if it exists
if srp.Config != nil {
if err := mapstructure.Decode(srp.Config, &authZCfg); err != nil {
l.Error("failed to decode authorization service config", slog.Any("error", err))
panic(fmt.Errorf("invalid authorization svc config [%v] %w", srp.Config, err))
}
}

if err := authZCfg.Validate(); err != nil {
l.Error("invalid authorization service config",
slog.Any("config", authZCfg.LogValue()),
slog.Any("error", err),
)
panic(fmt.Errorf("invalid authorization svc config %w", err))
}
l.Debug("authorization service config", slog.Any("config", authZCfg.LogValue()))

if !authZCfg.Cache.Enabled {
l.Debug("entitlement policy cache is disabled")
return as, nil
}

cacheClient, err := srp.NewCacheClient(cache.Options{})
if err != nil || cacheClient == nil {
l.Error("failed to create platform cache client", slog.Any("error", err))
panic(fmt.Errorf("failed to create platform cache client: %w", err))
}

refreshInterval, err := time.ParseDuration(authZCfg.Cache.RefreshInterval)
if err != nil {
l.Error("failed to parse entitlement policy cache refresh interval", slog.Any("error", err))
panic(fmt.Errorf("failed to parse entitlement policy cache refresh interval [%s]: %w", authZCfg.Cache.RefreshInterval, err))
}

retriever := access.NewEntitlementPolicyRetriever(as.sdk)
as.cache, err = NewEntitlementPolicyCache(context.Background(), l, retriever, cacheClient, refreshInterval)
if err != nil {
l.Error("failed to create entitlement policy cache", slog.Any("error", err))
panic(fmt.Errorf("failed to create entitlement policy cache: %w", err))
}

// if err := srp.RegisterReadinessCheck("authorization", as.IsReady); err != nil {
// logger.Error("failed to register authorization readiness check", slog.String("error", err.Error()))
// }

as.config = authZCfg
as.Tracer = srp.Tracer
logger.Debug("authorization v2 service register func")

return as, nil
},
},
}
}

// Close gracefully shuts down the authorization service, closing the entitlement policy cache.
func (as *Service) Close() {
as.logger.Info("gracefully shutting down authorization service")
if as.cache != nil {
as.cache.Stop()
}
}

// TODO: uncomment after v1 is deprecated, as cannot have more than one readiness check under a namespace
// func (as Service) IsReady(ctx context.Context) error {
// as.logger.TraceContext(ctx, "checking readiness of authorization service")
Expand All @@ -79,7 +136,7 @@ func (as *Service) GetEntitlements(ctx context.Context, req *connect.Request[aut
withComprehensiveHierarchy := req.Msg.GetWithComprehensiveHierarchy()

// When authorization service can consume cached policy, switch to the other PDP (process based on policy passed in)
pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk)
pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk, as.cache)
if err != nil {
as.logger.ErrorContext(ctx, "failed to create JIT PDP", slog.Any("error", err))
return nil, connect.NewError(connect.CodeInternal, err)
Expand All @@ -91,7 +148,6 @@ func (as *Service) GetEntitlements(ctx context.Context, req *connect.Request[aut
as.logger.ErrorContext(ctx, "failed to get entitlements", slog.Any("error", err))
return nil, connect.NewError(connect.CodeInternal, err)
}

rsp := &authzV2.GetEntitlementsResponse{
Entitlements: entitlements,
}
Expand All @@ -110,7 +166,7 @@ func (as *Service) GetDecision(ctx context.Context, req *connect.Request[authzV2
propagator := otel.GetTextMapPropagator()
ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header()))

pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk)
pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk, as.cache)
if err != nil {
as.logger.ErrorContext(ctx, "failed to create JIT PDP", slog.Any("error", err))
return nil, connect.NewError(connect.CodeInternal, err)
Expand Down Expand Up @@ -148,7 +204,7 @@ func (as *Service) GetDecisionMultiResource(ctx context.Context, req *connect.Re
propagator := otel.GetTextMapPropagator()
ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header()))

pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk)
pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk, as.cache)
if err != nil {
return nil, statusifyError(ctx, as.logger, errors.Join(errors.New("failed to create JIT PDP"), err))
}
Expand All @@ -159,12 +215,12 @@ func (as *Service) GetDecisionMultiResource(ctx context.Context, req *connect.Re

decisions, allPermitted, err := pdp.GetDecision(ctx, entityIdentifier, action, resources)
if err != nil {
return nil, statusifyError(ctx, as.logger, errors.Join(errors.New("failed to get decision"), err), slog.Any("request", request))
return nil, statusifyError(ctx, as.logger, errors.Join(errors.New("failed to get decision"), err))
}

resourceDecisions, err := rollupMultiResourceDecisions(decisions)
if err != nil {
return nil, statusifyError(ctx, as.logger, errors.Join(errors.New("failed to rollup multi-resource decision"), err), slog.Any("request", request))
return nil, statusifyError(ctx, as.logger, errors.Join(errors.New("failed to rollup multi-resource decision"), err))
}

resp := &authzV2.GetDecisionMultiResourceResponse{
Expand All @@ -188,7 +244,7 @@ func (as *Service) GetDecisionBulk(ctx context.Context, req *connect.Request[aut
propagator := otel.GetTextMapPropagator()
ctx = propagator.Extract(ctx, propagation.HeaderCarrier(req.Header()))

pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk)
pdp, err := access.NewJustInTimePDP(ctx, as.logger, as.sdk, as.cache)
if err != nil {
return nil, statusifyError(ctx, as.logger, errors.Join(errors.New("failed to create JIT PDP"), err))
}
Expand All @@ -205,12 +261,12 @@ func (as *Service) GetDecisionBulk(ctx context.Context, req *connect.Request[aut

decisions, allPermitted, err := pdp.GetDecision(ctx, entityIdentifier, action, resources)
if err != nil {
return nil, statusifyError(ctx, as.logger, errors.Join(errors.New("failed to get bulk decision"), err), slog.Any("request", request))
return nil, statusifyError(ctx, as.logger, errors.Join(errors.New("failed to get bulk decision"), err))
}

resourceDecisions, err := rollupMultiResourceDecisions(decisions)
if err != nil {
return nil, statusifyError(ctx, as.logger, errors.Join(errors.New("failed to rollup bulk multi-resource decision"), err), slog.Any("request", request), slog.Int("index", idx))
return nil, statusifyError(ctx, as.logger, errors.Join(errors.New("failed to rollup bulk multi-resource decision"), err), slog.Int("index", idx))
}

decisionResponse := &authzV2.GetDecisionMultiResourceResponse{
Expand Down
Loading
Loading