Skip to content

Commit

Permalink
Actors: Reminder migration state->scheduler
Browse files Browse the repository at this point in the history
Adds a backwards compatible migration mechanism to migrate Actor State
Reminders to the new Scheduler based reminder system. Is activated on
every Placement dissemination event, where all stored Actor reminders
for the locally implemented Actor types are migrated to Scheduler Actor
Reminer Jobs.

Migration of reminders are implemented on a per daprd basis, whereby
upon dissemination, each daprd will list the reminders in the state
store and Scheduler for actor types that it implements. Reminders for
actor IDs which are not hosted for that daprd are discarded preventing
multiple daprd's from migrating the same reminder. Daprd will determine
which reminders to migrate based on missing items or differing contents
from state store and Scheduler.

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Oct 21, 2024
1 parent 990aa6e commit 686e061
Show file tree
Hide file tree
Showing 43 changed files with 2,644 additions and 157 deletions.
5 changes: 4 additions & 1 deletion dapr/proto/scheduler/v1/scheduler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,11 @@ message NamedJob {
// name is the name of the job.
string name = 1;

// The metadata associated with the job.
JobMetadata metadata = 2;

// The job scheduled.
Job job = 2;
Job job = 3;
}

// ListJobsRequest is the message used by the daprd sidecar to list all jobs.
Expand Down
21 changes: 14 additions & 7 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/dapr/dapr/pkg/config"
diag "github.com/dapr/dapr/pkg/diagnostics"
diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils"
"github.com/dapr/dapr/pkg/healthz"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/modes"
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
Expand Down Expand Up @@ -144,6 +145,7 @@ type actorsRuntime struct {
closed atomic.Bool
closeCh chan struct{}
apiLevel atomic.Uint32
htarget healthz.Target

lock sync.Mutex
internalReminderInProgress map[string]struct{}
Expand All @@ -165,6 +167,7 @@ type ActorsOpts struct {
Security security.Handler
SchedulerClients *clients.Clients
SchedulerReminders bool
Healthz healthz.Healthz

// TODO: @joshvanl Remove in Dapr 1.12 when ActorStateTTL is finalized.
StateTTLEnabled bool
Expand Down Expand Up @@ -194,6 +197,7 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) (ActorRuntime,
internalActors: haxmap.New[string, InternalActor](32),
compStore: opts.CompStore,
sec: opts.Security,
htarget: opts.Healthz.AddTarget(),

internalReminderInProgress: map[string]struct{}{},
schedulerReminderFeatureEnabled: opts.SchedulerReminders,
Expand Down Expand Up @@ -242,22 +246,24 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) (ActorRuntime,
}
log.Debug("Using Scheduler service for reminders.")
a.actorsReminders = reminders.NewScheduler(reminders.SchedulerOptions{
Clients: opts.Config.SchedulerClients,
Namespace: opts.Config.Namespace,
AppID: opts.Config.AppID,
Clients: opts.Config.SchedulerClients,
Namespace: opts.Config.Namespace,
AppID: opts.Config.AppID,
ProviderOpts: providerOpts,
ListActorTypesFn: a.Entities,
})
} else {
factory, err := opts.Config.GetRemindersProvider(a.placement)
if err != nil {
return nil, fmt.Errorf("failed to initialize reminders provider: %w", err)
}
a.actorsReminders = factory(providerOpts)

a.actorsReminders.SetExecuteReminderFn(a.executeReminder)
a.actorsReminders.SetStateStoreProviderFn(a.stateStore)
a.actorsReminders.SetLookupActorFn(a.isActorLocallyHosted)
}

a.actorsReminders.SetExecuteReminderFn(a.executeReminder)
a.actorsReminders.SetStateStoreProviderFn(a.stateStore)
a.actorsReminders.SetLookupActorFn(a.isActorLocallyHosted)

a.idleActorProcessor = eventqueue.NewProcessor[string, *actor](a.idleProcessorExecuteFn).WithClock(clock)
return a, nil
}
Expand Down Expand Up @@ -317,6 +323,7 @@ func (a *actorsRuntime) Init(ctx context.Context) (err error) {
a.placement.SetOnTableUpdateFn(func() {
a.drainRebalancedActors()
a.actorsReminders.OnPlacementTablesUpdated(ctx)
a.htarget.Ready()
})

a.checker, err = a.getAppHealthChecker()
Expand Down
5 changes: 5 additions & 0 deletions pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/dapr/dapr/pkg/apis/resiliency/v1alpha1"
"github.com/dapr/dapr/pkg/channel"
"github.com/dapr/dapr/pkg/config"
"github.com/dapr/dapr/pkg/healthz"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/modes"
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
Expand Down Expand Up @@ -207,6 +208,7 @@ func (b *runtimeBuilder) buildActorRuntime(t *testing.T) *actorsRuntime {
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.FromConfigurations(log, testResiliency),
StateStoreName: storeName,
Healthz: healthz.New(),
}, clock)
require.NoError(t, err)

Expand Down Expand Up @@ -235,6 +237,7 @@ func newTestActorsRuntimeWithMock(t *testing.T, appChannel channel.AppChannel) *
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
Healthz: healthz.New(),
MockPlacement: NewMockPlacement(TestAppID),
}, clock)
require.NoError(t, err)
Expand All @@ -258,6 +261,7 @@ func newTestActorsRuntimeWithMockWithoutPlacement(t *testing.T, appChannel chann
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
Healthz: healthz.New(),
}, clock)
require.NoError(t, err)

Expand All @@ -280,6 +284,7 @@ func newTestActorsRuntimeWithMockAndNoStore(t *testing.T, appChannel channel.App
TracingSpec: config.TracingSpec{SamplingRate: "1"},
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
Healthz: healthz.New(),
}, clock)
require.NoError(t, err)

Expand Down
152 changes: 152 additions & 0 deletions pkg/actors/internal/fake/reminders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fake

import (
"context"

"github.com/dapr/dapr/pkg/actors/internal"
)

type FakeRemindersProvider struct {
initFn func(ctx context.Context) error
getReminderFn func(ctx context.Context, req *internal.GetReminderRequest) (*internal.Reminder, error)
createReminderFn func(ctx context.Context, req *internal.CreateReminderRequest) error
deleteReminderFn func(ctx context.Context, req internal.DeleteReminderRequest) error
listRemindersFn func(ctx context.Context, req internal.ListRemindersRequest) ([]*internal.Reminder, error)
drainRebalancedRemindersFn func(actorType string, actorID string)
onPlacementTablesUpdatedFn func(ctx context.Context)
setExecuteReminderFn func(fn internal.ExecuteReminderFn)
setStateStoreProviderFn func(fn internal.StateStoreProviderFn)
setLookupActorFn func(fn internal.LookupActorFn)
}

func NewRemindersProvider() *FakeRemindersProvider {
return &FakeRemindersProvider{
initFn: func(ctx context.Context) error {
return nil
},
getReminderFn: func(ctx context.Context, req *internal.GetReminderRequest) (*internal.Reminder, error) {
return nil, nil
},
createReminderFn: func(ctx context.Context, req *internal.CreateReminderRequest) error {
return nil
},
deleteReminderFn: func(ctx context.Context, req internal.DeleteReminderRequest) error {
return nil
},
listRemindersFn: func(ctx context.Context, req internal.ListRemindersRequest) ([]*internal.Reminder, error) {
return nil, nil
},
drainRebalancedRemindersFn: func(actorType string, actorID string) {},
onPlacementTablesUpdatedFn: func(ctx context.Context) {},
setExecuteReminderFn: func(fn internal.ExecuteReminderFn) {},
setStateStoreProviderFn: func(fn internal.StateStoreProviderFn) {},
setLookupActorFn: func(fn internal.LookupActorFn) {},
}
}

func (f *FakeRemindersProvider) WithInit(fn func(ctx context.Context) error) *FakeRemindersProvider {
f.initFn = fn
return f
}

func (f *FakeRemindersProvider) WithGetReminder(fn func(ctx context.Context, req *internal.GetReminderRequest) (*internal.Reminder, error)) *FakeRemindersProvider {
f.getReminderFn = fn
return f
}

func (f *FakeRemindersProvider) WithCreateReminder(fn func(ctx context.Context, req *internal.CreateReminderRequest) error) *FakeRemindersProvider {
f.createReminderFn = fn
return f
}

func (f *FakeRemindersProvider) WithDeleteReminder(fn func(ctx context.Context, req internal.DeleteReminderRequest) error) *FakeRemindersProvider {
f.deleteReminderFn = fn
return f
}

func (f *FakeRemindersProvider) WithListReminders(fn func(ctx context.Context, req internal.ListRemindersRequest) ([]*internal.Reminder, error)) *FakeRemindersProvider {
f.listRemindersFn = fn
return f
}

func (f *FakeRemindersProvider) WithDrainRebalancedReminders(fn func(actorType string, actorID string)) *FakeRemindersProvider {
f.drainRebalancedRemindersFn = fn
return f
}

func (f *FakeRemindersProvider) WithOnPlacementTablesUpdated(fn func(ctx context.Context)) *FakeRemindersProvider {
f.onPlacementTablesUpdatedFn = fn
return f
}

func (f *FakeRemindersProvider) WithSetExecuteReminder(fn func(fn internal.ExecuteReminderFn)) *FakeRemindersProvider {
f.setExecuteReminderFn = fn
return f
}

func (f *FakeRemindersProvider) WithSetStateStoreProvider(fn func(fn internal.StateStoreProviderFn)) *FakeRemindersProvider {
f.setStateStoreProviderFn = fn
return f
}

func (f *FakeRemindersProvider) WithSetLookupActor(fn func(fn internal.LookupActorFn)) *FakeRemindersProvider {
f.setLookupActorFn = fn
return f
}

func (f *FakeRemindersProvider) Init(ctx context.Context) error {
return f.initFn(ctx)
}

func (f *FakeRemindersProvider) GetReminder(ctx context.Context, req *internal.GetReminderRequest) (*internal.Reminder, error) {
return f.getReminderFn(ctx, req)
}

func (f *FakeRemindersProvider) CreateReminder(ctx context.Context, req *internal.CreateReminderRequest) error {
return f.createReminderFn(ctx, req)
}

func (f *FakeRemindersProvider) DeleteReminder(ctx context.Context, req internal.DeleteReminderRequest) error {
return f.deleteReminderFn(ctx, req)
}

func (f *FakeRemindersProvider) ListReminders(ctx context.Context, req internal.ListRemindersRequest) ([]*internal.Reminder, error) {
return f.listRemindersFn(ctx, req)
}

func (f *FakeRemindersProvider) DrainRebalancedReminders(actorType string, actorID string) {
f.drainRebalancedRemindersFn(actorType, actorID)
}

func (f *FakeRemindersProvider) OnPlacementTablesUpdated(ctx context.Context) {
f.onPlacementTablesUpdatedFn(ctx)
}

func (f *FakeRemindersProvider) SetExecuteReminderFn(fn internal.ExecuteReminderFn) {
f.setExecuteReminderFn(fn)
}

func (f *FakeRemindersProvider) SetStateStoreProviderFn(fn internal.StateStoreProviderFn) {
f.setStateStoreProviderFn(fn)
}

func (f *FakeRemindersProvider) SetLookupActorFn(fn internal.LookupActorFn) {
f.setLookupActorFn(fn)
}

func (f *FakeRemindersProvider) Close() error {
return nil
}
24 changes: 24 additions & 0 deletions pkg/actors/internal/fake/reminders_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fake

import (
"testing"

"github.com/dapr/dapr/pkg/actors/internal"
)

func Test_FakeRemindersProvider(t *testing.T) {
var _ internal.RemindersProvider = NewRemindersProvider()
}
3 changes: 3 additions & 0 deletions pkg/actors/internal/reminders.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ type LookupActorFn func(ctx context.Context, actorType string, actorID string) (
type StateStoreProviderFn func() (string, TransactionalStateStore, error)

// RemindersProvider is the interface for the object that provides reminders services.
//
//nolint:interfacebloat
type RemindersProvider interface {
io.Closer

Init(ctx context.Context) error
GetReminder(ctx context.Context, req *GetReminderRequest) (*Reminder, error)
CreateReminder(ctx context.Context, req *CreateReminderRequest) error
DeleteReminder(ctx context.Context, req DeleteReminderRequest) error
ListReminders(ctx context.Context, req ListRemindersRequest) ([]*Reminder, error)
DrainRebalancedReminders(actorType string, actorID string)
OnPlacementTablesUpdated(ctx context.Context)

Expand Down
4 changes: 4 additions & 0 deletions pkg/actors/internal/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ func (req DeleteReminderRequest) Key() string {
return req.ActorType + daprSeparator + req.ActorID + daprSeparator + req.Name
}

type ListRemindersRequest struct {
ActorType string
}

// DeleteTimerRequest is a request object for deleting a timer.
type DeleteTimerRequest struct {
Name string
Expand Down
2 changes: 2 additions & 0 deletions pkg/actors/internal_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/dapr/dapr/pkg/actors/internal"
"github.com/dapr/dapr/pkg/config"
"github.com/dapr/dapr/pkg/healthz"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/proto/internals/v1"
"github.com/dapr/dapr/pkg/resiliency"
Expand Down Expand Up @@ -108,6 +109,7 @@ func newTestActorsRuntimeWithInternalActors(internalActors map[string]InternalAc
StateStoreName: "actorStore",
Security: fake.New(),
MockPlacement: NewMockPlacement(TestAppID),
Healthz: healthz.New(),
})
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 686e061

Please sign in to comment.