Skip to content

Commit

Permalink
[5756 Identity Management Overhaul] Placement (dapr#6179)
Browse files Browse the repository at this point in the history
* Update placement to use security pkg

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds `mtls.sentryAddres` to helm values file

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linting

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linting

Signed-off-by: joshvanl <me@joshvanl.dev>

* Review comment nits

Signed-off-by: joshvanl <me@joshvanl.dev>

* Use correct port for sentry port

Signed-off-by: joshvanl <me@joshvanl.dev>

* Revert sentry service port to 80 to maintain backwards compat

Signed-off-by: joshvanl <me@joshvanl.dev>

* Use legacy Server authenticator to allow placement to connect to legacy
(pre v1.12) daprd clients.

Signed-off-by: joshvanl <me@joshvanl.dev>

* Return connection error in daprd actor gRPC client

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds `cluster.local` DNS to placement server certificate

Signed-off-by: joshvanl <me@joshvanl.dev>

* Remove block and returning connection error for actor placement client

Signed-off-by: joshvanl <me@joshvanl.dev>

* Increase timeout for security file change test

Signed-off-by: joshvanl <me@joshvanl.dev>

* Use the `dapr.io/control-plane` annotation on the placement pod instead
of `dapr.io/app-id`

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
  • Loading branch information
JoshVanL and dapr-bot authored Aug 15, 2023
1 parent 2007395 commit 15283ed
Show file tree
Hide file tree
Showing 16 changed files with 753 additions and 89 deletions.
1 change: 1 addition & 0 deletions charts/dapr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The Helm chart has the follow configuration options that can be supplied:
| `global.mtls.workloadCertTTL` | TTL for workload cert | `24h` |
| `global.mtls.allowedClockSkew` | Allowed clock skew for workload cert rotation | `15m` |
| `global.mtls.controlPlaneTrustDomain ` | Trust domain for control plane | `cluster.local` |
| `global.mtls.sentryAddress` | Sentry address for control plane | `dapr-sentry.{{ .ReleaseNamespace }}.svc:443` |
| `global.dnsSuffix` | Kuberentes DNS suffix | `.cluster.local` |
| `global.daprControlPlaneOs` | Operating System for Dapr control plane | `linux` |
| `global.daprControlPlaneArch` | CPU Architecture for Dapr control plane | `amd64` |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ spec:
{{- with .Values.global.labels }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- if eq .Values.global.prometheus.enabled true }}
annotations:
dapr.io/control-plane: placement
{{- if eq .Values.global.prometheus.enabled true }}
prometheus.io/scrape: "{{ .Values.global.prometheus.enabled }}"
prometheus.io/port: "{{ .Values.global.prometheus.port }}"
prometheus.io/path: "/"
Expand Down Expand Up @@ -70,9 +71,11 @@ spec:
resources:
{{ toYaml .Values.resources | indent 10 }}
volumeMounts:
- name: credentials
mountPath: /var/run/dapr/credentials
- name: dapr-trust-bundle
mountPath: /var/run/secrets/dapr.io/tls
readOnly: true
- name: dapr-identity-token
mountPath: /var/run/secrets/dapr.io/sentrytoken
{{- if or (eq .Values.global.ha.enabled true) (eq .Values.ha true) }}
{{- if eq .Values.cluster.forceInMemoryLog false }}
- name: raft-log
Expand Down Expand Up @@ -142,14 +145,9 @@ spec:
- "--enable-metrics=false"
{{- end }}
- "--tls-enabled"
{{- with .Values.global.issuerFilenames }}
- "--issuer-ca-filename"
- "{{ .ca }}"
- "--issuer-certificate-filename"
- "{{ .cert }}"
- "--issuer-key-filename"
- "{{ .key }}"
{{- end }}
- "--trust-domain={{ .Values.global.mtls.controlPlaneTrustDomain }}"
- "--trust-anchors-file=/var/run/secrets/dapr.io/tls/ca.crt"
- "--sentry-address={{ if .Values.global.mtls.sentryAddress }}{{ .Values.global.mtls.sentryAddress }}{{ else }}dapr-sentry.{{ .Release.Namespace }}.svc:80{{ end }}"
{{- if eq .Values.global.daprControlPlaneOs "linux" }}
securityContext:
{{- if eq .Values.cluster.forceInMemoryLog true }}
Expand Down Expand Up @@ -180,9 +178,16 @@ spec:
fieldPath: metadata.namespace
serviceAccountName: dapr-placement
volumes:
- name: credentials
secret:
secretName: dapr-trust-bundle
- name: dapr-trust-bundle
configMap:
name: dapr-trust-bundle
- name: dapr-identity-token
projected:
sources:
- serviceAccountToken:
path: token
expirationSeconds: 600
audience: "spiffe://{{ .Values.global.mtls.controlPlaneTrustDomain }}/ns/{{ .Release.Namespace }}/dapr-sentry"
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
Expand Down
2 changes: 2 additions & 0 deletions charts/dapr/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ global:
workloadCertTTL: 24h
allowedClockSkew: 15m
controlPlaneTrustDomain: "cluster.local"
# Used to override `dapr-sentry.{{ .Release.Namespace }}.svc:80`
#sentryAddress:
actors:
enabled: true
daprControlPlaneOs: linux
Expand Down
38 changes: 20 additions & 18 deletions cmd/placement/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"github.com/dapr/dapr/cmd/placement/options"
"github.com/dapr/dapr/pkg/buildinfo"
"github.com/dapr/dapr/pkg/concurrency"
"github.com/dapr/dapr/pkg/credentials"
"github.com/dapr/dapr/pkg/health"
"github.com/dapr/dapr/pkg/metrics"
"github.com/dapr/dapr/pkg/placement"
"github.com/dapr/dapr/pkg/placement/hashing"
"github.com/dapr/dapr/pkg/placement/monitoring"
"github.com/dapr/dapr/pkg/placement/raft"
"github.com/dapr/dapr/pkg/security"
"github.com/dapr/dapr/pkg/signals"
"github.com/dapr/kit/logger"
)
Expand Down Expand Up @@ -63,30 +63,28 @@ func main() {
log.Fatal("Failed to create raft server.")
}

var certChain *credentials.CertChain
if opts.TLSEnabled {
tlsCreds := credentials.NewTLSCredentials(opts.CertChainPath)

certChain, err = credentials.LoadFromDisk(tlsCreds.RootCertPath(), tlsCreds.CertPath(), tlsCreds.KeyPath())
if err != nil {
log.Fatal(err)
}

log.Info("TLS certificates loaded successfully")
}

// Start Placement gRPC server.
hashing.SetReplicationFactor(opts.ReplicationFactor)
apiServer, err := placement.NewPlacementService(raftServer, certChain)
ctx := signals.Context()
secProvider, err := security.New(ctx, security.Options{
SentryAddress: opts.SentryAddress,
ControlPlaneTrustDomain: opts.TrustDomain,
ControlPlaneNamespace: security.CurrentNamespace(),
TrustAnchorsFile: opts.TrustAnchorsFile,
AppID: "dapr-placement",
MTLSEnabled: opts.TLSEnabled,
})
if err != nil {
log.Fatal(err)
}

hashing.SetReplicationFactor(opts.ReplicationFactor)
apiServer := placement.NewPlacementService(raftServer)

err = concurrency.NewRunnerManager(
func(ctx context.Context) error {
return raftServer.StartRaft(ctx, nil)
},
metricsExporter.Run,
secProvider.Start,
apiServer.MonitorLeadership,
func(ctx context.Context) error {
var metadataOptions []health.RouterOptions
Expand All @@ -101,9 +99,13 @@ func main() {
return nil
},
func(ctx context.Context) error {
return apiServer.Run(ctx, strconv.Itoa(opts.PlacementPort))
sec, sErr := secProvider.Handler(ctx)
if sErr != nil {
return sErr
}
return apiServer.Run(ctx, strconv.Itoa(opts.PlacementPort), sec)
},
).Run(signals.Context())
).Run(ctx)
if err != nil {
log.Fatal(err)
}
Expand Down
28 changes: 21 additions & 7 deletions cmd/placement/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ package options

import (
"flag"
"fmt"
"os"
"strings"

"github.com/dapr/dapr/utils"
"github.com/dapr/kit/logger"

"github.com/dapr/dapr/pkg/credentials"
"github.com/dapr/dapr/pkg/metrics"
"github.com/dapr/dapr/pkg/placement/raft"
"github.com/dapr/dapr/pkg/security"
)

const (
Expand All @@ -46,17 +47,22 @@ type Options struct {
// Placement server configurations
PlacementPort int
HealthzPort int
CertChainPath string
TLSEnabled bool
MetadataEnabled bool

TLSEnabled bool
TrustDomain string
TrustAnchorsFile string
SentryAddress string

ReplicationFactor int

// Log and metrics configurations
Logger logger.Options
Metrics *metrics.Options
}

var log = logger.NewLogger("dapr.placement.options")

func New() *Options {
// Default options
var opts Options
Expand All @@ -67,14 +73,18 @@ func New() *Options {
flag.StringVar(&opts.RaftLogStorePath, "raft-logstore-path", "", "raft log store path.")
flag.IntVar(&opts.PlacementPort, "port", defaultPlacementPort, "sets the gRPC port for the placement service")
flag.IntVar(&opts.HealthzPort, "healthz-port", defaultHealthzPort, "sets the HTTP port for the healthz server")
flag.StringVar(&opts.CertChainPath, "certchain", defaultCredentialsPath, "Path to the credentials directory holding the cert chain")
flag.BoolVar(&opts.TLSEnabled, "tls-enabled", false, "Should TLS be enabled for the placement gRPC server")
flag.BoolVar(&opts.MetadataEnabled, "metadata-enabled", opts.MetadataEnabled, "Expose the placement tables on the healthz server")
flag.IntVar(&opts.ReplicationFactor, "replicationFactor", defaultReplicationFactor, "sets the replication factor for actor distribution on vnodes")

flag.StringVar(&credentials.RootCertFilename, "issuer-ca-filename", credentials.RootCertFilename, "Certificate Authority certificate filename")
flag.StringVar(&credentials.IssuerCertFilename, "issuer-certificate-filename", credentials.IssuerCertFilename, "Issuer certificate filename")
flag.StringVar(&credentials.IssuerKeyFilename, "issuer-key-filename", credentials.IssuerKeyFilename, "Issuer private key filename")
flag.StringVar(&opts.TrustDomain, "trust-domain", "cluster.local", "Trust domain for the Dapr control plane")
flag.StringVar(&opts.TrustAnchorsFile, "trust-anchors-file", "/var/run/secrets/dapr.io/tls/ca.crt", "Filepath to the trust anchors for the Dapr control plane")
flag.StringVar(&opts.SentryAddress, "sentry-address", fmt.Sprintf("dapr-sentry.%s.svc:80", security.CurrentNamespace()), "Filepath to the trust anchors for the Dapr control plane")

depCC := flag.String("certchain", "", "DEPRECATED")
depRCF := flag.String("issuer-ca-filename", "", "DEPRECATED")
depICF := flag.String("issuer-certificate-filename", "", "DEPRECATED")
depIKF := flag.String("issuer-key-filename", "", "DEPRECATED")

opts.Logger = logger.DefaultOptions()
opts.Logger.AttachCmdFlags(flag.StringVar, flag.BoolVar)
Expand All @@ -87,6 +97,10 @@ func New() *Options {

flag.Parse()

if len(*depRCF) > 0 || len(*depICF) > 0 || len(*depIKF) > 0 || len(*depCC) > 0 {
log.Warn("--certchain, --issuer-ca-filename, --issuer-certificate-filename and --issuer-key-filename are deprecated and will be removed in v1.14.")
}

opts.RaftPeers = parsePeersFromFlag(opts.RaftPeerString)
if opts.RaftLogStorePath != "" {
opts.RaftInMemEnabled = false
Expand Down
28 changes: 9 additions & 19 deletions pkg/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"google.golang.org/grpc/status"
"k8s.io/utils/clock"

daprCredentials "github.com/dapr/dapr/pkg/credentials"
"github.com/dapr/dapr/pkg/placement/monitoring"
"github.com/dapr/dapr/pkg/placement/raft"
placementv1pb "github.com/dapr/dapr/pkg/proto/placement/v1"
"github.com/dapr/dapr/pkg/security"
"github.com/dapr/kit/logger"
)

Expand Down Expand Up @@ -78,9 +78,6 @@ type hostMemberChange struct {

// Service updates the Dapr runtimes with distributed hash tables for stateful entities.
type Service struct {
// grpcServer is the gRPC server for placement service.
grpcServer *grpc.Server

// streamConnPool has the stream connections established between placement gRPC server and Dapr runtime.
streamConnPool []placementGRPCStream

Expand Down Expand Up @@ -122,32 +119,23 @@ type Service struct {
}

// NewPlacementService returns a new placement service.
func NewPlacementService(raftNode *raft.Server, certChain *daprCredentials.CertChain) (*Service, error) {
func NewPlacementService(raftNode *raft.Server) *Service {
fhdd := &atomic.Int64{}
fhdd.Store(int64(faultyHostDetectInitialDuration))

opts, err := daprCredentials.GetServerOptions(certChain)
if err != nil {
return nil, fmt.Errorf("error creating gRPC options: %w", err)
}

p := &Service{
return &Service{
streamConnPool: []placementGRPCStream{},
membershipCh: make(chan hostMemberChange, membershipChangeChSize),
faultyHostDetectDuration: fhdd,
raftNode: raftNode,
grpcServer: grpc.NewServer(opts...),
clock: &clock.RealClock{},
closedCh: make(chan struct{}),
}

placementv1pb.RegisterPlacementServer(p.grpcServer, p)
return p, nil
}

// Run starts the placement service gRPC server.
// Blocks until the service is closed and all connections are drained.
func (p *Service) Run(ctx context.Context, port string) error {
func (p *Service) Run(ctx context.Context, port string, sec security.Handler) error {
if p.closed.Load() {
return errors.New("placement service is closed")
}
Expand All @@ -156,17 +144,19 @@ func (p *Service) Run(ctx context.Context, port string) error {
return errors.New("placement service is already running")
}

var err error
serverListener, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
if err != nil {
return fmt.Errorf("failed to listen: %w", err)
}
grpcServer := grpc.NewServer(sec.GRPCServerOption())

placementv1pb.RegisterPlacementServer(grpcServer, p)

log.Infof("starting placement service started on port %d", serverListener.Addr().(*net.TCPAddr).Port)

errCh := make(chan error)
go func() {
errCh <- p.grpcServer.Serve(serverListener)
errCh <- grpcServer.Serve(serverListener)
log.Info("placement service stopped")
}()

Expand All @@ -176,7 +166,7 @@ func (p *Service) Run(ctx context.Context, port string) error {
close(p.closedCh)
}

p.grpcServer.GracefulStop()
grpcServer.GracefulStop()
p.wg.Wait()

return <-errCh
Expand Down
6 changes: 3 additions & 3 deletions pkg/placement/placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ import (

"github.com/dapr/dapr/pkg/placement/raft"
v1pb "github.com/dapr/dapr/pkg/proto/placement/v1"
securityfake "github.com/dapr/dapr/pkg/security/fake"
)

const testStreamSendLatency = time.Second

func newTestPlacementServer(t *testing.T, raftServer *raft.Server) (string, *Service, *clocktesting.FakeClock, context.CancelFunc) {
t.Helper()

testServer, err := NewPlacementService(raftServer, nil)
require.NoError(t, err)
testServer := NewPlacementService(raftServer)
clock := clocktesting.NewFakeClock(time.Now())
testServer.clock = clock

Expand All @@ -50,7 +50,7 @@ func newTestPlacementServer(t *testing.T, raftServer *raft.Server) (string, *Ser
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer close(serverStopped)
require.NoError(t, testServer.Run(ctx, strconv.Itoa(port)))
require.NoError(t, testServer.Run(ctx, strconv.Itoa(port), securityfake.New()))
}()

assert.Eventually(t, func() bool {
Expand Down
16 changes: 15 additions & 1 deletion pkg/security/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ package fake

import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type Fake struct {
grpcServerOptionFn func() grpc.ServerOption
grpcServerOptionNoClientAuthFn func() grpc.ServerOption
}

func New() *Fake {
return &Fake{
grpcServerOptionFn: func() grpc.ServerOption {
return grpc.Creds(insecure.NewCredentials())
},
grpcServerOptionNoClientAuthFn: func() grpc.ServerOption {
return grpc.Creds(nil)
return grpc.Creds(insecure.NewCredentials())
},
}
}
Expand All @@ -37,6 +42,15 @@ func (f *Fake) WithGRPCServerOptionNoClientAuthFn(fn func() grpc.ServerOption) *
return f
}

func (f *Fake) WithGRPCServerOptionFn(fn func() grpc.ServerOption) *Fake {
f.grpcServerOptionFn = fn
return f
}

func (f *Fake) GRPCServerOptionNoClientAuth() grpc.ServerOption {
return f.grpcServerOptionNoClientAuthFn()
}

func (f *Fake) GRPCServerOption() grpc.ServerOption {
return f.grpcServerOptionFn()
}
Loading

0 comments on commit 15283ed

Please sign in to comment.