Skip to content

Commit

Permalink
Jobs: Return Job execution SUCCESS/FAILURE (dapr#8240)
Browse files Browse the repository at this point in the history
* Jobs: Return Job execution SUCCESS/FAILURE

Returns the job execution result to scheduler (SUCCESS/FAILURE) to
correctly trigger failure policy if required. Adds daprd jobs/actor
reminder failure policy integration tests.

Adds test to job execution to ensure NotFound HTTP status is treated as
SUCCESS as it is "not retry-able".

Signed-off-by: joshvanl <me@joshvanl.dev>

* Update diagridio/go-etcd-cron to master HEAD

Signed-off-by: joshvanl <me@joshvanl.dev>

* go mod tidy

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fix race condition in scheduler watch job tests

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL authored Nov 4, 2024
1 parent d697612 commit 07ed8d4
Show file tree
Hide file tree
Showing 32 changed files with 1,112 additions and 112 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/dapr/components-contrib v1.14.1-0.20241016043026-4ca04dbb61c5
github.com/dapr/kit v0.13.1-0.20241015130326-866002abe68a
github.com/diagridio/go-etcd-cron v0.3.1-0.20241021143028-8ee6207c5ea9
github.com/diagridio/go-etcd-cron v0.3.1-0.20241030204150-468a6e23bf53
github.com/evanphx/json-patch/v5 v5.9.0
github.com/go-chi/chi/v5 v5.0.11
github.com/go-chi/cors v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA=
github.com/diagridio/go-etcd-cron v0.3.1-0.20241021143028-8ee6207c5ea9 h1:7+j7UbNs3RekWc+d+jeeUiyABlj1cjTO3s8FCqwqHQ0=
github.com/diagridio/go-etcd-cron v0.3.1-0.20241021143028-8ee6207c5ea9/go.mod h1:GiH3yYGvU8neLSTYWxQ8ceqU8MeBuDZgp4dij+cNazg=
github.com/diagridio/go-etcd-cron v0.3.1-0.20241030204150-468a6e23bf53 h1:vjK/MuB/k5DLt56LEw+W33kJmE4s3xf6KmtLnW3hygQ=
github.com/diagridio/go-etcd-cron v0.3.1-0.20241030204150-468a6e23bf53/go.mod h1:GiH3yYGvU8neLSTYWxQ8ceqU8MeBuDZgp4dij+cNazg=
github.com/didip/tollbooth/v7 v7.0.1 h1:TkT4sBKoQoHQFPf7blQ54iHrZiTDnr8TceU+MulVAog=
github.com/didip/tollbooth/v7 v7.0.1/go.mod h1:VZhDSGl5bDSPj4wPsih3PFa4Uh9Ghv8hgacaTm5PRT4=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
Expand Down
25 changes: 17 additions & 8 deletions pkg/runtime/scheduler/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ func (s *streamer) receive(ctx context.Context) error {
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.handleJob(ctx, resp)
result := s.handleJob(ctx, resp)
select {
case <-ctx.Done():
case <-s.stream.Context().Done():
case s.resultCh <- &schedulerv1pb.WatchJobsRequest{
WatchJobRequestType: &schedulerv1pb.WatchJobsRequest_Result{
Result: &schedulerv1pb.WatchJobsRequestResult{
Id: resp.GetId(),
Status: schedulerv1pb.WatchJobsRequestResultStatus_SUCCESS,
Status: result,
},
},
}:
Expand Down Expand Up @@ -100,14 +100,16 @@ func (s *streamer) outgoing(ctx context.Context) error {
}

// handleJob invokes the appropriate app or actor reminder based on the job metadata.
func (s *streamer) handleJob(ctx context.Context, job *schedulerv1pb.WatchJobsResponse) {
func (s *streamer) handleJob(ctx context.Context, job *schedulerv1pb.WatchJobsResponse) schedulerv1pb.WatchJobsRequestResultStatus {
meta := job.GetMetadata()

switch t := meta.GetTarget(); t.GetType().(type) {
case *schedulerv1pb.JobTargetMetadata_Job:
if err := s.invokeApp(ctx, job); err != nil {
log.Errorf("failed to invoke schedule app job: %s", err)
return schedulerv1pb.WatchJobsRequestResultStatus_FAILED
}
return schedulerv1pb.WatchJobsRequestResultStatus_SUCCESS

case *schedulerv1pb.JobTargetMetadata_Actor:
if err := s.invokeActorReminder(ctx, job); err != nil {
Expand All @@ -117,22 +119,26 @@ func (s *streamer) handleJob(ctx context.Context, job *schedulerv1pb.WatchJobsRe
// issues. This will be updated in the future releases, but for now we will see this err. To not
// spam users only log if the error is not reminder canceled because this is expected for now.
if errors.Is(err, actors.ErrReminderCanceled) {
return
return schedulerv1pb.WatchJobsRequestResultStatus_SUCCESS
}

// If the actor was hosted on another instance, the error will be a gRPC status error,
// so we need to unwrap it and match on the error message
if st, ok := status.FromError(err); ok {
if st.Message() == actors.ErrReminderCanceled.Error() {
return
return schedulerv1pb.WatchJobsRequestResultStatus_FAILED
}
}

log.Errorf("failed to invoke scheduled actor reminder named: %s due to: %s", job.GetName(), err)
return schedulerv1pb.WatchJobsRequestResultStatus_FAILED
}

return schedulerv1pb.WatchJobsRequestResultStatus_SUCCESS

default:
log.Errorf("Unknown job metadata type: %+v", t)
return schedulerv1pb.WatchJobsRequestResultStatus_FAILED
}
}

Expand Down Expand Up @@ -160,13 +166,16 @@ func (s *streamer) invokeApp(ctx context.Context, job *schedulerv1pb.WatchJobsRe
switch codes.Code(statusCode) {
case codes.OK:
log.Debugf("Sent job %s to app", job.GetName())
return nil
case codes.NotFound:
log.Errorf("non-retriable error returned from app while processing triggered job %s. status code returned: %v", job.GetName(), statusCode)
// return nil to signal SUCCESS
return nil
default:
log.Errorf("unexpected status code returned from app while processing triggered job %s. status code returned: %v", job.GetName(), statusCode)
err := fmt.Errorf("unexpected status code returned from app while processing triggered job %s. status code returned: %v", job.GetName(), statusCode)
log.Error(err.Error())
return err
}

return nil
}

// invokeActorReminder calls the actor ID with the given reminder data.
Expand Down
24 changes: 15 additions & 9 deletions tests/integration/framework/process/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package scheduler

import (
"context"
"errors"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -50,7 +51,6 @@ import (
"github.com/dapr/dapr/tests/integration/framework/process/exec"
"github.com/dapr/dapr/tests/integration/framework/process/ports"
"github.com/dapr/dapr/tests/integration/framework/process/sentry"
"github.com/dapr/kit/concurrency/slice"
"github.com/dapr/kit/ptr"
)

Expand Down Expand Up @@ -370,7 +370,7 @@ func (s *Scheduler) EtcdJobs(t *testing.T, ctx context.Context) []*mvccpb.KeyVal
return resp.Kvs
}

func (s *Scheduler) WatchJobs(t *testing.T, ctx context.Context, initial *schedulerv1pb.WatchJobsRequestInitial, respStatus *atomic.Value) slice.Slice[string] {
func (s *Scheduler) WatchJobs(t *testing.T, ctx context.Context, initial *schedulerv1pb.WatchJobsRequestInitial, respStatus *atomic.Value) <-chan string {
t.Helper()

watchErr := make(chan error)
Expand All @@ -393,7 +393,7 @@ func (s *Scheduler) WatchJobs(t *testing.T, ctx context.Context, initial *schedu
WatchJobRequestType: &schedulerv1pb.WatchJobsRequest_Initial{Initial: initial},
}))

triggered := slice.String()
ch := make(chan string)

go func() {
defer func() {
Expand All @@ -407,30 +407,36 @@ func (s *Scheduler) WatchJobs(t *testing.T, ctx context.Context, initial *schedu
if !assert.NoError(t, err) {
return
}
assert.NoError(t, watch.Send(&schedulerv1pb.WatchJobsRequest{
err = watch.Send(&schedulerv1pb.WatchJobsRequest{
WatchJobRequestType: &schedulerv1pb.WatchJobsRequest_Result{
Result: &schedulerv1pb.WatchJobsRequestResult{
Id: resp.GetId(),
Status: respStatus.Load().(schedulerv1pb.WatchJobsRequestResultStatus),
},
},
}))
triggered.Append(resp.GetName())
})
if !errors.Is(err, io.EOF) {
assert.NoError(t, err)
}
select {
case ch <- resp.GetName():
case <-ctx.Done():
}
}
}()

return triggered
return ch
}

func (s *Scheduler) WatchJobsSuccess(t *testing.T, ctx context.Context, initial *schedulerv1pb.WatchJobsRequestInitial) slice.Slice[string] {
func (s *Scheduler) WatchJobsSuccess(t *testing.T, ctx context.Context, initial *schedulerv1pb.WatchJobsRequestInitial) <-chan string {
t.Helper()

var status atomic.Value
status.Store(schedulerv1pb.WatchJobsRequestResultStatus_SUCCESS)
return s.WatchJobs(t, ctx, initial, &status)
}

func (s *Scheduler) WatchJobsFailed(t *testing.T, ctx context.Context, initial *schedulerv1pb.WatchJobsRequestInitial) slice.Slice[string] {
func (s *Scheduler) WatchJobsFailed(t *testing.T, ctx context.Context, initial *schedulerv1pb.WatchJobsRequestInitial) <-chan string {
t.Helper()

var status atomic.Value
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package failurepolicy

import (
_ "github.com/dapr/dapr/tests/integration/suite/actors/reminders/failurepolicy/noset"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package noset

import (
"context"
"net/http"
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd/actors"
"github.com/dapr/dapr/tests/integration/framework/process/scheduler"
"github.com/dapr/dapr/tests/integration/suite"
"github.com/dapr/kit/concurrency/slice"
)

func init() {
suite.Register(new(allfail))
}

type allfail struct {
actors *actors.Actors
triggered slice.Slice[string]
}

func (a *allfail) Setup(t *testing.T) []framework.Option {
a.triggered = slice.String()

scheduler := scheduler.New(t)
a.actors = actors.New(t,
actors.WithScheduler(scheduler),
actors.WithFeatureSchedulerReminders(true),
actors.WithActorTypes("helloworld"),
actors.WithActorTypeHandler("helloworld", func(w http.ResponseWriter, req *http.Request) {
a.triggered.Append(path.Base(req.URL.Path))
w.WriteHeader(http.StatusInternalServerError)
}),
)

return []framework.Option{
framework.WithProcesses(scheduler, a.actors),
}
}

func (a *allfail) Run(t *testing.T, ctx context.Context) {
a.actors.WaitUntilRunning(t, ctx)

_, err := a.actors.GRPCClient(t, ctx).RegisterActorReminder(ctx, &rtv1.RegisterActorReminderRequest{
ActorType: "helloworld",
ActorId: "1234",
Name: "test",
DueTime: "0s",
})
require.NoError(t, err)

assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.ElementsMatch(c, []string{"test", "test", "test", "test"}, a.triggered.Slice())
}, time.Second*10, time.Millisecond*10)

time.Sleep(time.Second * 2)
assert.ElementsMatch(t, []string{"test", "test", "test", "test"}, a.triggered.Slice())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package noset

import (
"context"
"net/http"
"path"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd/actors"
"github.com/dapr/dapr/tests/integration/framework/process/scheduler"
"github.com/dapr/dapr/tests/integration/suite"
"github.com/dapr/kit/concurrency/slice"
)

func init() {
suite.Register(new(failfirst))
}

type failfirst struct {
actors *actors.Actors
triggered slice.Slice[string]
respErr atomic.Bool
}

func (f *failfirst) Setup(t *testing.T) []framework.Option {
f.triggered = slice.String()
f.respErr.Store(true)

scheduler := scheduler.New(t)
f.actors = actors.New(t,
actors.WithScheduler(scheduler),
actors.WithFeatureSchedulerReminders(true),
actors.WithActorTypes("helloworld"),
actors.WithActorTypeHandler("helloworld", func(w http.ResponseWriter, req *http.Request) {
defer f.triggered.Append(path.Base(req.URL.Path))
if f.respErr.Load() {
w.WriteHeader(http.StatusInternalServerError)
}
}),
)

return []framework.Option{
framework.WithProcesses(scheduler, f.actors),
}
}

func (f *failfirst) Run(t *testing.T, ctx context.Context) {
f.actors.WaitUntilRunning(t, ctx)

_, err := f.actors.GRPCClient(t, ctx).RegisterActorReminder(ctx, &rtv1.RegisterActorReminderRequest{
ActorType: "helloworld",
ActorId: "1234",
Name: "test",
DueTime: "0s",
})
require.NoError(t, err)

assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.ElementsMatch(c, []string{"test"}, f.triggered.Slice())
}, time.Second*10, time.Millisecond*10)

f.respErr.Store(false)

assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.ElementsMatch(c, []string{"test", "test"}, f.triggered.Slice())
}, time.Second*10, time.Millisecond*10)

time.Sleep(time.Second * 2)
assert.ElementsMatch(t, []string{"test", "test"}, f.triggered.Slice())
}
Loading

0 comments on commit 07ed8d4

Please sign in to comment.