Skip to content

Commit

Permalink
Actor Reminders: Default JSON serialization. (dapr#7548)
Browse files Browse the repository at this point in the history
* Actor Reminders: Default JSON serialization.

To support downgrades to 1.12 from 1.13, this PR changes the reminder
serialization storage format back to JSON by default. This means a 1.12
actor reminder client can read reminders written by 1.13 actors.

1.13 will continue to understand both JSON and protobuf. Protobuf
serialization can be enabled with the `ActorReminderStorageProtobuf`
feature gate. The actor "API Level" has been changed back to 10.

Adds test to ensure the default serialization is JSON.

Signed-off-by: joshvanl <me@joshvanl.dev>

* Remove ActorReminderStorageProtobuf feature gate in favour of using API
level

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fix api level tests

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
  • Loading branch information
JoshVanL and yaron2 authored Feb 21, 2024
1 parent 05868fe commit 5ecc595
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 23 deletions.
2 changes: 1 addition & 1 deletion charts/dapr/charts/dapr_placement/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ports:
scaleZero: false
ha: false

maxActorApiLevel: -1
maxActorApiLevel: 10
minActorApiLevel: 0

cluster:
Expand Down
2 changes: 1 addition & 1 deletion cmd/placement/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func New(origArgs []string) *Options {
fs.IntVar(&opts.HealthzPort, "healthz-port", defaultHealthzPort, "sets the HTTP port for the healthz server")
fs.BoolVar(&opts.TLSEnabled, "tls-enabled", false, "Should TLS be enabled for the placement gRPC server")
fs.BoolVar(&opts.MetadataEnabled, "metadata-enabled", opts.MetadataEnabled, "Expose the placement tables on the healthz server")
fs.IntVar(&opts.MaxAPILevel, "max-api-level", -1, "If set to >= 0, causes the reported 'api-level' in the cluster to never exceed this value")
fs.IntVar(&opts.MaxAPILevel, "max-api-level", 10, "If set to >= 0, causes the reported 'api-level' in the cluster to never exceed this value")
fs.IntVar(&opts.MinAPILevel, "min-api-level", 0, "Enforces a minimum 'api-level' in the cluster")
fs.IntVar(&opts.ReplicationFactor, "replicationFactor", defaultReplicationFactor, "sets the replication factor for actor distribution on vnodes")

Expand Down
6 changes: 3 additions & 3 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"github.com/dapr/dapr/pkg/actors/internal"
"github.com/dapr/dapr/pkg/actors/timers"
"github.com/dapr/dapr/pkg/channel"
configuration "github.com/dapr/dapr/pkg/config"
"github.com/dapr/dapr/pkg/config"
diag "github.com/dapr/dapr/pkg/diagnostics"
diagUtils "github.com/dapr/dapr/pkg/diagnostics/utils"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
Expand Down Expand Up @@ -118,7 +118,7 @@ type actorsRuntime struct {
timers internal.TimersProvider
actorsReminders internal.RemindersProvider
actorsTable *sync.Map
tracingSpec configuration.TracingSpec
tracingSpec config.TracingSpec
resiliency resiliency.Provider
storeName string
compStore *compstore.ComponentStore
Expand All @@ -141,7 +141,7 @@ type ActorsOpts struct {
AppChannel channel.AppChannel
GRPCConnectionFn GRPCConnectionFn
Config Config
TracingSpec configuration.TracingSpec
TracingSpec config.TracingSpec
Resiliency resiliency.Provider
StateStoreName string
CompStore *compstore.ComponentStore
Expand Down
4 changes: 2 additions & 2 deletions pkg/actors/internal_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (*mockInternalActor) InvokeTimer(ctx context.Context, timer InternalActorRe
func newTestActorsRuntimeWithInternalActors(internalActors map[string]InternalActorFactory) (*actorsRuntime, error) {
spec := config.TracingSpec{SamplingRate: "1"}
store := fakeStore()
config := NewConfig(ConfigOpts{
cfg := NewConfig(ConfigOpts{
AppID: TestAppID,
ActorsService: "placement:placement:5050",
HostAddress: "localhost",
Expand All @@ -98,7 +98,7 @@ func newTestActorsRuntimeWithInternalActors(internalActors map[string]InternalAc
compStore.AddStateStore("actorStore", store)
a, err := NewActors(ActorsOpts{
CompStore: compStore,
Config: config,
Config: cfg,
TracingSpec: spec,
Resiliency: resiliency.New(log),
StateStoreName: "actorStore",
Expand Down
3 changes: 1 addition & 2 deletions pkg/actors/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,12 +454,11 @@ func (p *actorPlacement) establishStreamConn(ctx context.Context) (established b
// onPlacementError closes the current placement stream and reestablish the connection again,
// uses a different placement server depending on the error code
func (p *actorPlacement) onPlacementError(err error) {
log.Debugf("Disconnected from placement: %v", err)
s, ok := status.FromError(err)
// If the current server is not leader, then it will try to the next server.
if ok && s.Code() == codes.FailedPrecondition {
p.serverIndex.Store((p.serverIndex.Load() + 1) % int32(len(p.serverAddr)))
} else {
log.Debugf("Disconnected from placement: %v", err)
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type Feature string
const (
// Enables support for setting TTL on Actor state keys.
ActorStateTTL Feature = "ActorStateTTL"
// Enables support for hot reloading of Daprd Components and HTTPEndpoints.

// Enables support for hot reloading of Daprd Components.
HotReload Feature = "HotReload"
)

Expand Down
8 changes: 4 additions & 4 deletions tests/integration/framework/process/placement/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type options struct {
tlsEnabled bool
sentryAddress *string
trustAnchorsFile *string
maxAPILevel int
minAPILevel int
maxAPILevel *int
minAPILevel *int
metadataEnabled bool
}

Expand Down Expand Up @@ -105,13 +105,13 @@ func WithInitialClusterPorts(ports ...int) Option {

func WithMaxAPILevel(val int) Option {
return func(o *options) {
o.maxAPILevel = val
o.maxAPILevel = &val
}
}

func WithMinAPILevel(val int) Option {
return func(o *options) {
o.minAPILevel = val
o.minAPILevel = &val
}
}

Expand Down
10 changes: 6 additions & 4 deletions tests/integration/framework/process/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ func New(t *testing.T, fopts ...Option) *Placement {
metricsPort: fp.Port(t, 2),
initialCluster: uid.String() + "=127.0.0.1:" + strconv.Itoa(fp.Port(t, 3)),
initialClusterPorts: []int{fp.Port(t, 3)},
maxAPILevel: -1,
minAPILevel: 0,
metadataEnabled: false,
}

Expand All @@ -87,10 +85,14 @@ func New(t *testing.T, fopts ...Option) *Placement {
"--metrics-port=" + strconv.Itoa(opts.metricsPort),
"--initial-cluster=" + opts.initialCluster,
"--tls-enabled=" + strconv.FormatBool(opts.tlsEnabled),
"--max-api-level=" + strconv.Itoa(opts.maxAPILevel),
"--min-api-level=" + strconv.Itoa(opts.minAPILevel),
"--metadata-enabled=" + strconv.FormatBool(opts.metadataEnabled),
}
if opts.maxAPILevel != nil {
args = append(args, "--max-api-level="+strconv.Itoa(*opts.maxAPILevel))
}
if opts.minAPILevel != nil {
args = append(args, "--min-api-level="+strconv.Itoa(*opts.minAPILevel))
}
if opts.sentryAddress != nil {
args = append(args, "--sentry-address="+*opts.sentryAddress)
}
Expand Down
92 changes: 92 additions & 0 deletions tests/integration/suite/actors/reminders/serialization/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package serialization

import (
"context"
"fmt"
"runtime"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
prochttp "github.com/dapr/dapr/tests/integration/framework/process/http"
"github.com/dapr/dapr/tests/integration/framework/process/placement"
"github.com/dapr/dapr/tests/integration/framework/process/sqlite"
"github.com/dapr/dapr/tests/integration/framework/util"
"github.com/dapr/dapr/tests/integration/suite"
)

func init() {
suite.Register(new(defaultS))
}

// defaultS ensures that reminders are stored as JSON by default.
type defaultS struct {
daprd *daprd.Daprd
srv *prochttp.HTTP
handler *httpServer
place *placement.Placement
db *sqlite.SQLite
}

func (d *defaultS) Setup(t *testing.T) []framework.Option {
if runtime.GOOS == "windows" {
t.Skip("Skipping test on Windows due to SQLite limitations")
}

d.place = placement.New(t)

d.db = sqlite.New(t, sqlite.WithActorStateStore(true))

d.handler = new(httpServer)
d.srv = prochttp.New(t, prochttp.WithHandler(d.handler.NewHandler()))
d.daprd = daprd.New(t,
daprd.WithResourceFiles(d.db.GetComponent(t)),
daprd.WithPlacementAddresses("127.0.0.1:"+strconv.Itoa(d.place.Port())),
daprd.WithAppPort(d.srv.Port()),
)

return []framework.Option{
framework.WithProcesses(d.db, d.place, d.srv, d.daprd),
}
}

func (d *defaultS) Run(t *testing.T, ctx context.Context) {
d.place.WaitUntilRunning(t, ctx)
d.daprd.WaitUntilRunning(t, ctx)
require.NoError(t, d.handler.WaitForActorsReady(ctx))

client := util.HTTPClient(t)
baseURL := fmt.Sprintf("http://localhost:%d/v1.0/actors/myactortype/myactorid", d.daprd.HTTPPort())

invokeActor(t, ctx, baseURL, client)

storeReminder(t, ctx, baseURL, client)

// Check the data in the SQLite database
// The value must begin with `[{`, which indicates it was serialized as JSON
storedVal := loadRemindersFromDB(t, ctx, d.db.GetConnection(t))
assert.Truef(t, strings.HasPrefix(storedVal, "[{"), "Prefix not found in value: '%v'", storedVal)

assert.Eventually(t, func() bool {
return d.handler.remindersInvokeCount.Load() > 0
}, 5*time.Second, 10*time.Millisecond, "Reminder was not invoked at least once")
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ func (j *jsonFormat) Setup(t *testing.T) []framework.Option {
daprd.WithResourceFiles(j.db.GetComponent(t)),
daprd.WithPlacementAddresses("127.0.0.1:"+strconv.Itoa(j.place.Port())),
daprd.WithAppPort(j.srv.Port()),
// Daprd is super noisy in debug mode when connecting to placement.
daprd.WithLogLevel("info"),
)

return []framework.Option{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (p *protobufFormat) Setup(t *testing.T) []framework.Option {
}

// Init placement with minimum API level of 20
p.place = placement.New(t, placement.WithMinAPILevel(20))
p.place = placement.New(t, placement.WithMaxAPILevel(-1), placement.WithMinAPILevel(20))

// Create a SQLite database and ensure state tables exist
now := time.Now().UTC().Format(time.RFC3339)
Expand All @@ -77,8 +77,6 @@ INSERT INTO state VALUES
daprd.WithResourceFiles(p.db.GetComponent(t)),
daprd.WithPlacementAddresses("127.0.0.1:"+strconv.Itoa(p.place.Port())),
daprd.WithAppPort(p.srv.Port()),
// Daprd is super noisy in debug mode when connecting to placement.
daprd.WithLogLevel("info"),
)

return []framework.Option{
Expand Down
1 change: 1 addition & 0 deletions tests/integration/suite/placement/apilevel/no_max.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type noMax struct {
func (n *noMax) Setup(t *testing.T) []framework.Option {
n.place = placement.New(t,
placement.WithMetadataEnabled(true),
placement.WithMaxAPILevel(-1),
)

return []framework.Option{
Expand Down
1 change: 1 addition & 0 deletions tests/integration/suite/placement/apilevel/with_min.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type withMin struct {

func (n *withMin) Setup(t *testing.T) []framework.Option {
n.place = placement.New(t,
placement.WithMaxAPILevel(-1),
placement.WithMinAPILevel(20),
placement.WithMetadataEnabled(true),
)
Expand Down

0 comments on commit 5ecc595

Please sign in to comment.