Skip to content

Commit

Permalink
Placement: disseminate min Actor API version + reject nodes with vers…
Browse files Browse the repository at this point in the history
…ion lower than minium (dapr#7134)

* WIP: Placement service disseminates min version

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Added max-api-level flag

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Placement disseminates min API level

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Some work on tests

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Updated tests and fixes

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Completed tests and fixed some bugs

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Fixed tests

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Some test fixes

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Fixed tests

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Added option to Helm chart

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Persist API level and make sure it's never decreased

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Updated some tests

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Fixed updating on node deletion

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Updated tests

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Lint

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Updated per review feedback

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Update tests/integration/suite/placement/apilevel/no_max.go

Co-authored-by: Cassie Coyle <cassie.i.coyle@gmail.com>
Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>

* Review feedback pt 1

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Handle maxApiLevel using a pointer

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Use util.HTTPClient

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Typos

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

---------

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
Co-authored-by: Cassie Coyle <cassie.i.coyle@gmail.com>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
  • Loading branch information
4 people authored Nov 13, 2023
1 parent 5f189af commit d3ded6a
Show file tree
Hide file tree
Showing 21 changed files with 851 additions and 98 deletions.
2 changes: 2 additions & 0 deletions charts/dapr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ The Helm chart has the follow configuration options that can be supplied:
| `dapr_placement.cluster.logStoreWinPath` | Mount path for persistent volume for log store in windows when HA is true | `C:\\raft-log` |
| `dapr_placement.volumeclaims.storageSize` | Attached volume size | `1Gi` |
| `dapr_placement.volumeclaims.storageClassName` | storage class name | |
| `dapr_placement.maxActorApiLevel` | Sets the `max-api-level` flag which prevents the Actor API level from going above this value. The Placement service reports to all connected hosts the Actor API level as the minimum value observed in all actor hosts in the cluster. Actor hosts with a lower API level than the current API level in the cluster will not be able to connect to Placement. Setting a cap helps making sure that older versions of Dapr can connect to Placement as actor hosts, but may limit the capabilities of the actor subsystem. The default value of -1 means no cap. | `-1` |
| `dapr_placement.minActorApiLevel` | Sets the `min-api-level` flag, which enforces a minimum value for the Actor API level in the cluster. | `0` |
| `dapr_placement.runAsNonRoot` | Boolean value for `securityContext.runAsNonRoot`. Does not apply unless `forceInMemoryLog` is set to `true`. You may have to set this to `false` when running in Minikube | `false` |
| `dapr_placement.resources` | Value of `resources` attribute. Can be used to set memory/cpu resources/limits. See the section "Resource configuration" above. Defaults to empty | `{}` |
| `dapr_placement.debug.enabled` | Boolean value for enabling debug mode | `{}` |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ spec:
- "--enable-metrics"
- "--replicationFactor"
- "{{ .Values.replicationFactor }}"
- "--max-api-level"
- "{{ .Values.maxActorApiLevel }}"
- "--min-api-level"
- "{{ .Values.minActorApiLevel }}"
- "--metrics-port"
- "{{ .Values.global.prometheus.port }}"
{{- else }}
Expand Down
3 changes: 3 additions & 0 deletions charts/dapr/charts/dapr_placement/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ ports:

ha: false

maxActorApiLevel: -1
minActorApiLevel: 0

cluster:
forceInMemoryLog: false
logStorePath: /var/run/dapr/raft-log
Expand Down
15 changes: 14 additions & 1 deletion cmd/placement/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main
import (
"context"
"fmt"
"math"
"strconv"

"github.com/dapr/dapr/cmd/placement/options"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/dapr/dapr/pkg/security"
"github.com/dapr/kit/concurrency"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
"github.com/dapr/kit/signals"
)

Expand Down Expand Up @@ -79,7 +81,18 @@ func main() {
}

hashing.SetReplicationFactor(opts.ReplicationFactor)
apiServer := placement.NewPlacementService(raftServer, secProvider)

placementOpts := placement.PlacementServiceOpts{
RaftNode: raftServer,
SecProvider: secProvider,
}
if opts.MinAPILevel >= 0 && opts.MinAPILevel < math.MaxUint32 {
placementOpts.MinAPILevel = uint32(opts.MinAPILevel)
}
if opts.MaxAPILevel >= 0 && opts.MaxAPILevel < math.MaxUint32 {
placementOpts.MaxAPILevel = ptr.Of(uint32(opts.MaxAPILevel))
}
apiServer := placement.NewPlacementService(placementOpts)

err = concurrency.NewRunnerManager(
func(ctx context.Context) error {
Expand Down
4 changes: 4 additions & 0 deletions cmd/placement/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Options struct {
PlacementPort int
HealthzPort int
MetadataEnabled bool
MaxAPILevel int
MinAPILevel int

TLSEnabled bool
TrustDomain string
Expand All @@ -75,6 +77,8 @@ func New() *Options {
flag.IntVar(&opts.HealthzPort, "healthz-port", defaultHealthzPort, "sets the HTTP port for the healthz server")
flag.BoolVar(&opts.TLSEnabled, "tls-enabled", false, "Should TLS be enabled for the placement gRPC server")
flag.BoolVar(&opts.MetadataEnabled, "metadata-enabled", opts.MetadataEnabled, "Expose the placement tables on the healthz server")
flag.IntVar(&opts.MaxAPILevel, "max-api-level", -1, "If set to >= 0, causes the reported 'api-level' in the cluster to never exceed this value")
flag.IntVar(&opts.MinAPILevel, "min-api-level", 0, "Enforces a minimum 'api-level' in the cluster")
flag.IntVar(&opts.ReplicationFactor, "replicationFactor", defaultReplicationFactor, "sets the replication factor for actor distribution on vnodes")

flag.StringVar(&opts.TrustDomain, "trust-domain", "localhost", "Trust domain for the Dapr control plane")
Expand Down
2 changes: 2 additions & 0 deletions dapr/proto/placement/v1/placement.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ message PlacementOrder {
message PlacementTables {
map<string, PlacementTable> entries = 1;
string version = 2;
// Minimum observed version of the Actor APIs supported by connected runtimes
uint32 api_level = 3;
}

message PlacementTable {
Expand Down
10 changes: 10 additions & 0 deletions pkg/placement/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,16 @@ func (p *Service) performTablesUpdate(ctx context.Context, hosts []placementGRPC
// Otherwise, each Dapr runtime will have inconsistent hashing table.
startedAt := p.clock.Now()

// Enforce maximum API level
if newTable != nil {
if newTable.ApiLevel < p.minAPILevel {
newTable.ApiLevel = p.minAPILevel
}
if p.maxAPILevel != nil && newTable.ApiLevel > *p.maxAPILevel {
newTable.ApiLevel = *p.maxAPILevel
}
}

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

Expand Down
6 changes: 3 additions & 3 deletions pkg/placement/membership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestMembershipChangeWorker(t *testing.T) {
// arrange
testServer.faultyHostDetectDuration.Store(int64(faultyHostDetectInitialDuration))

conn, stream := newTestClient(t, serverAddress)
conn, _, stream := newTestClient(t, serverAddress)

done := make(chan bool)
go func() {
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestPerformTableUpdate(t *testing.T) {
clientUpToDateCh := make(chan struct{}, testClients)

for i := 0; i < testClients; i++ {
conn, stream := newTestClient(t, serverAddress)
conn, _, stream := newTestClient(t, serverAddress)
clientConns = append(clientConns, conn)
clientStreams = append(clientStreams, stream)
clientRecvData = append(clientRecvData, map[string]int64{})
Expand Down Expand Up @@ -327,7 +327,7 @@ func PerformTableUpdateCostTime(t *testing.T) (wastedTime int64) {
startFlag.Store(false)
wg.Add(testClients)
for i := 0; i < testClients; i++ {
conn, stream := newTestClient(t, serverAddress)
conn, _, stream := newTestClient(t, serverAddress)
clientConns = append(clientConns, conn)
clientStreams = append(clientStreams, stream)
go func(clientID int, clientStream v1pb.Placement_ReportDaprStatusClient) {
Expand Down
48 changes: 41 additions & 7 deletions pkg/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ type Service struct {
// consistent hashing table. Only actor runtime's heartbeat will increase this.
memberUpdateCount atomic.Uint32

// Maximum API level to return.
// If nil, there's no limit.
maxAPILevel *uint32
// Minimum API level to return
minAPILevel uint32

// faultyHostDetectDuration
faultyHostDetectDuration *atomic.Int64

Expand All @@ -121,19 +127,29 @@ type Service struct {
wg sync.WaitGroup
}

// PlacementServiceOpts contains options for the NewPlacementService method.
type PlacementServiceOpts struct {
RaftNode *raft.Server
MaxAPILevel *uint32
MinAPILevel uint32
SecProvider security.Provider
}

// NewPlacementService returns a new placement service.
func NewPlacementService(raftNode *raft.Server, sec security.Provider) *Service {
func NewPlacementService(opts PlacementServiceOpts) *Service {
fhdd := &atomic.Int64{}
fhdd.Store(int64(faultyHostDetectInitialDuration))

return &Service{
streamConnPool: []placementGRPCStream{},
membershipCh: make(chan hostMemberChange, membershipChangeChSize),
faultyHostDetectDuration: fhdd,
raftNode: raftNode,
raftNode: opts.RaftNode,
maxAPILevel: opts.MaxAPILevel,
minAPILevel: opts.MinAPILevel,
clock: &clock.RealClock{},
closedCh: make(chan struct{}),
sec: sec,
sec: opts.SecProvider,
}
}

Expand Down Expand Up @@ -215,7 +231,22 @@ func (p *Service) ReportDaprStatus(stream placementv1pb.Placement_ReportDaprStat
return status.Errorf(codes.PermissionDenied, "client ID %s is not allowed", req.Id)
}

state := p.raftNode.FSM().State()

if registeredMemberID == "" {
// New connection
// Ensure that the reported API level is at least equal to the current one in the cluster
clusterAPILevel := state.APILevel()
if clusterAPILevel < p.minAPILevel {
clusterAPILevel = p.minAPILevel
}
if p.maxAPILevel != nil && clusterAPILevel > *p.maxAPILevel {
clusterAPILevel = *p.maxAPILevel
}
if req.ApiLevel < clusterAPILevel {
return status.Errorf(codes.FailedPrecondition, "The cluster's Actor API level is %d, which is higher than the reported API level %d", clusterAPILevel, req.ApiLevel)
}

registeredMemberID = req.Name
p.addStreamConn(stream)
// TODO: If each sidecar can report table version, then placement
Expand All @@ -234,16 +265,18 @@ func (p *Service) ReportDaprStatus(stream placementv1pb.Placement_ReportDaprStat
continue
}

now := p.clock.Now()

for _, entity := range req.Entities {
monitoring.RecordActorHeartbeat(req.Id, entity, req.Name, req.Pod, p.clock.Now())
monitoring.RecordActorHeartbeat(req.Id, entity, req.Name, req.Pod, now)
}

// Record the heartbeat timestamp. This timestamp will be used to check if the member
// state maintained by raft is valid or not. If the member is outdated based the timestamp
// the member will be marked as faulty node and removed.
p.lastHeartBeat.Store(req.Name, p.clock.Now().UnixNano())
p.lastHeartBeat.Store(req.Name, now.UnixNano())

members := p.raftNode.FSM().State().Members()
members := state.Members()

// Upsert incoming member only if it is an actor service (not actor client) and
// the existing member info is unmatched with the incoming member info.
Expand All @@ -261,7 +294,8 @@ func (p *Service) ReportDaprStatus(stream placementv1pb.Placement_ReportDaprStat
Name: req.Name,
AppID: req.Id,
Entities: req.Entities,
UpdatedAt: p.clock.Now().UnixNano(),
UpdatedAt: now.UnixNano(),
APILevel: req.ApiLevel,
},
}
log.Debugf("Member changed upserting appid %s with entities %v", req.Id, req.Entities)
Expand Down
42 changes: 27 additions & 15 deletions pkg/placement/placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ const testStreamSendLatency = time.Second
func newTestPlacementServer(t *testing.T, raftServer *raft.Server) (string, *Service, *clocktesting.FakeClock, context.CancelFunc) {
t.Helper()

testServer := NewPlacementService(raftServer, securityfake.New())
testServer := NewPlacementService(PlacementServiceOpts{
RaftNode: raftServer,
SecProvider: securityfake.New(),
})
clock := clocktesting.NewFakeClock(time.Now())
testServer.clock = clock

Expand Down Expand Up @@ -74,17 +77,25 @@ func newTestPlacementServer(t *testing.T, raftServer *raft.Server) (string, *Ser
return serverAddress, testServer, clock, cleanUpFn
}

func newTestClient(t *testing.T, serverAddress string) (*grpc.ClientConn, v1pb.Placement_ReportDaprStatusClient) { //nolint:nosnakecase
func newTestClient(t *testing.T, serverAddress string) (*grpc.ClientConn, *net.TCPConn, v1pb.Placement_ReportDaprStatusClient) { //nolint:nosnakecase
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
conn, err := grpc.DialContext(ctx, serverAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
tcpConn, err := net.Dial("tcp", serverAddress)
require.NoError(t, err)
conn, err := grpc.DialContext(ctx, "",
grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
return tcpConn, nil
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
require.NoError(t, err)

client := v1pb.NewPlacementClient(conn)
stream, err := client.ReportDaprStatus(context.Background())
require.NoError(t, err)

return conn, stream
return conn, tcpConn.(*net.TCPConn), stream
}

func TestMemberRegistration_NoLeadership(t *testing.T) {
Expand All @@ -94,7 +105,7 @@ func TestMemberRegistration_NoLeadership(t *testing.T) {
testServer.hasLeadership.Store(false)

// arrange
conn, stream := newTestClient(t, serverAddress)
conn, _, stream := newTestClient(t, serverAddress)

host := &v1pb.Host{
Name: "127.0.0.1:50102",
Expand Down Expand Up @@ -125,7 +136,7 @@ func TestMemberRegistration_Leadership(t *testing.T) {

t.Run("Connect server and disconnect it gracefully", func(t *testing.T) {
// arrange
conn, stream := newTestClient(t, serverAddress)
conn, _, stream := newTestClient(t, serverAddress)

host := &v1pb.Host{
Name: "127.0.0.1:50102",
Expand Down Expand Up @@ -166,15 +177,15 @@ func TestMemberRegistration_Leadership(t *testing.T) {
assert.Equal(t, host.Name, memberChange.host.Name)

case <-time.After(testStreamSendLatency):
require.True(t, false, "no membership change")
require.Fail(t, "no membership change")
}

conn.Close()
})

t.Run("Connect server and disconnect it forcefully", func(t *testing.T) {
// arrange
conn, stream := newTestClient(t, serverAddress)
_, tcpConn, stream := newTestClient(t, serverAddress)

// act
host := &v1pb.Host{
Expand All @@ -187,7 +198,7 @@ func TestMemberRegistration_Leadership(t *testing.T) {
stream.Send(host)

// assert
assert.Eventually(t, func() bool {
assert.EventuallyWithT(t, func(t *assert.CollectT) {
clock.Step(disseminateTimerInterval)
select {
case memberChange := <-testServer.membershipCh:
Expand All @@ -199,21 +210,22 @@ func TestMemberRegistration_Leadership(t *testing.T) {
l := len(testServer.streamConnPool)
testServer.streamConnPoolLock.Unlock()
assert.Equal(t, 1, l)
return true
default:
return false
assert.Fail(t, "No member change")
}
}, testStreamSendLatency+3*time.Second, time.Millisecond, "no membership change")

// act
// Close tcp connection before closing stream, which simulates the scenario
// where dapr runtime disconnects the connection from placement service unexpectedly.
conn.Close()
// Use SetLinger to forcefully close the TCP connection.
tcpConn.SetLinger(0)
tcpConn.Close()

// assert
select {
case <-testServer.membershipCh:
require.True(t, false, "should not have any member change message because faulty host detector time will clean up")
require.Fail(t, "should not have any member change message because faulty host detector time will clean up")

case <-time.After(testStreamSendLatency):
testServer.streamConnPoolLock.RLock()
Expand All @@ -225,7 +237,7 @@ func TestMemberRegistration_Leadership(t *testing.T) {

t.Run("non actor host", func(t *testing.T) {
// arrange
conn, stream := newTestClient(t, serverAddress)
conn, _, stream := newTestClient(t, serverAddress)

// act
host := &v1pb.Host{
Expand All @@ -243,7 +255,7 @@ func TestMemberRegistration_Leadership(t *testing.T) {
require.Fail(t, "should not have any membership change")

case <-time.After(testStreamSendLatency):
require.True(t, true)
// All good
}

// act
Expand Down
15 changes: 9 additions & 6 deletions pkg/placement/raft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,16 @@ func (c *FSM) PlacementState() *v1pb.PlacementTables {
defer c.stateLock.RUnlock()

newTable := &v1pb.PlacementTables{
Version: strconv.FormatUint(c.state.TableGeneration(), 10),
Entries: make(map[string]*v1pb.PlacementTable),
Version: strconv.FormatUint(c.state.TableGeneration(), 10),
Entries: make(map[string]*v1pb.PlacementTable),
ApiLevel: c.state.APILevel(),
}

totalHostSize := 0
totalSortedSet := 0
totalLoadMap := 0
var (
totalHostSize int
totalSortedSet int
totalLoadMap int
)

entries := c.state.hashingTableMap()
for k, v := range entries {
Expand Down Expand Up @@ -112,7 +115,7 @@ func (c *FSM) PlacementState() *v1pb.PlacementTables {
totalLoadMap += len(table.LoadMap)
}

logging.Debugf("PlacementTable Size, Hosts: %d, SortedSet: %d, LoadMap: %d", totalHostSize, totalSortedSet, totalLoadMap)
logging.Debugf("PlacementTable HostsCount=%d SortedSetCount=%d LoadMapCount=%d ApiLevel=%d", totalHostSize, totalSortedSet, totalLoadMap, newTable.ApiLevel)

return newTable
}
Expand Down
Loading

0 comments on commit d3ded6a

Please sign in to comment.