Skip to content

Commit 927c901

Browse files
committed
Exposes FailurePolicy to the Jobs api
Signed-off-by: Albert Callarisa <albert@diagrid.io>
1 parent 16374cd commit 927c901

File tree

11 files changed

+260
-173
lines changed

11 files changed

+260
-173
lines changed

client/client_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"google.golang.org/grpc/credentials/insecure"
3333
"google.golang.org/grpc/test/bufconn"
3434
"google.golang.org/protobuf/types/known/anypb"
35+
"google.golang.org/protobuf/types/known/durationpb"
3536
"google.golang.org/protobuf/types/known/emptypb"
3637

3738
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
@@ -563,10 +564,11 @@ func (s *testDaprServer) ScheduleJobAlpha1(ctx context.Context, in *pb.ScheduleJ
563564

564565
func (s *testDaprServer) GetJobAlpha1(ctx context.Context, in *pb.GetJobRequest) (*pb.GetJobResponse, error) {
565566
var (
566-
schedule = "@every 10s"
567-
dueTime = "10s"
568-
repeats uint32 = 4
569-
ttl = "10s"
567+
schedule = "@every 10s"
568+
dueTime = "10s"
569+
repeats uint32 = 4
570+
ttl = "10s"
571+
maxRetries = uint32(4)
570572
)
571573
return &pb.GetJobResponse{
572574
Job: &pb.Job{
@@ -576,6 +578,14 @@ func (s *testDaprServer) GetJobAlpha1(ctx context.Context, in *pb.GetJobRequest)
576578
DueTime: &dueTime,
577579
Ttl: &ttl,
578580
Data: nil,
581+
FailurePolicy: &commonv1pb.JobFailurePolicy{
582+
Policy: &commonv1pb.JobFailurePolicy_Constant{
583+
Constant: &commonv1pb.JobFailurePolicyConstant{
584+
MaxRetries: &maxRetries,
585+
Interval: &durationpb.Duration{Seconds: 10},
586+
},
587+
},
588+
},
579589
},
580590
}, nil
581591
}

client/jobs.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
Copyright 2021 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package client
15+
16+
import (
17+
"context"
18+
"log"
19+
"time"
20+
21+
"google.golang.org/protobuf/types/known/anypb"
22+
"google.golang.org/protobuf/types/known/durationpb"
23+
24+
commonpb "github.com/dapr/dapr/pkg/proto/common/v1"
25+
runtimepb "github.com/dapr/dapr/pkg/proto/runtime/v1"
26+
)
27+
28+
type FailurePolicy interface {
29+
GetPBFailurePolicy() *commonpb.JobFailurePolicy
30+
}
31+
32+
type JobFailurePolicyConstant struct {
33+
maxRetries *uint32
34+
interval *time.Duration
35+
}
36+
37+
func (f *JobFailurePolicyConstant) GetPBFailurePolicy() *commonpb.JobFailurePolicy {
38+
policy := &commonpb.JobFailurePolicy{
39+
Policy: &commonpb.JobFailurePolicy_Constant{
40+
Constant: &commonpb.JobFailurePolicyConstant{},
41+
},
42+
}
43+
if f.maxRetries != nil {
44+
policy.Policy.(*commonpb.JobFailurePolicy_Constant).Constant.MaxRetries = f.maxRetries
45+
}
46+
if f.interval != nil {
47+
policy.Policy.(*commonpb.JobFailurePolicy_Constant).Constant.Interval = &durationpb.Duration{Seconds: int64(f.interval.Seconds())}
48+
}
49+
return policy
50+
}
51+
52+
type JobFailurePolicyDrop struct {
53+
}
54+
55+
func (f *JobFailurePolicyDrop) GetPBFailurePolicy() *commonpb.JobFailurePolicy {
56+
return &commonpb.JobFailurePolicy{
57+
Policy: &commonpb.JobFailurePolicy_Drop{
58+
Drop: &commonpb.JobFailurePolicyDrop{},
59+
},
60+
}
61+
}
62+
63+
func NewFailurePolicyConstant(maxRetries *uint32, interval *time.Duration) FailurePolicy {
64+
return &JobFailurePolicyConstant{
65+
maxRetries: maxRetries,
66+
interval: interval,
67+
}
68+
}
69+
70+
func NewFailurePolicyDrop() FailurePolicy {
71+
return &JobFailurePolicyDrop{}
72+
}
73+
74+
type Job struct {
75+
Name string
76+
Schedule string // Optional
77+
Repeats uint32 // Optional
78+
DueTime string // Optional
79+
TTL string // Optional
80+
Data *anypb.Any
81+
FailurePolicy FailurePolicy
82+
}
83+
84+
// ScheduleJobAlpha1 raises and schedules a job.
85+
func (c *GRPCClient) ScheduleJobAlpha1(ctx context.Context, job *Job) error {
86+
// TODO: Assert job fields are defined: Name, Data
87+
jobRequest := &runtimepb.Job{
88+
Name: job.Name,
89+
Data: job.Data,
90+
}
91+
92+
if job.Schedule != "" {
93+
jobRequest.Schedule = &job.Schedule
94+
}
95+
96+
if job.Repeats != 0 {
97+
jobRequest.Repeats = &job.Repeats
98+
}
99+
100+
if job.DueTime != "" {
101+
jobRequest.DueTime = &job.DueTime
102+
}
103+
104+
if job.TTL != "" {
105+
jobRequest.Ttl = &job.TTL
106+
}
107+
108+
if job.FailurePolicy != nil {
109+
jobRequest.FailurePolicy = job.FailurePolicy.GetPBFailurePolicy()
110+
}
111+
_, err := c.protoClient.ScheduleJobAlpha1(ctx, &runtimepb.ScheduleJobRequest{
112+
Job: jobRequest,
113+
})
114+
return err
115+
}
116+
117+
// GetJobAlpha1 retrieves a scheduled job.
118+
func (c *GRPCClient) GetJobAlpha1(ctx context.Context, name string) (*Job, error) {
119+
// TODO: Name validation
120+
resp, err := c.protoClient.GetJobAlpha1(ctx, &runtimepb.GetJobRequest{
121+
Name: name,
122+
})
123+
log.Println(resp)
124+
if err != nil {
125+
return nil, err
126+
}
127+
128+
var failurePolicy FailurePolicy
129+
switch policy := resp.GetJob().GetFailurePolicy().Policy.(type) {
130+
case *commonpb.JobFailurePolicy_Constant:
131+
interval := time.Duration(policy.Constant.Interval.GetSeconds()) * time.Second
132+
failurePolicy = &JobFailurePolicyConstant{
133+
maxRetries: policy.Constant.MaxRetries,
134+
interval: &interval,
135+
}
136+
case *commonpb.JobFailurePolicy_Drop:
137+
failurePolicy = &JobFailurePolicyDrop{}
138+
}
139+
140+
return &Job{
141+
Name: resp.GetJob().GetName(),
142+
Schedule: resp.GetJob().GetSchedule(),
143+
Repeats: resp.GetJob().GetRepeats(),
144+
DueTime: resp.GetJob().GetDueTime(),
145+
TTL: resp.GetJob().GetTtl(),
146+
Data: resp.GetJob().GetData(),
147+
FailurePolicy: failurePolicy,
148+
}, nil
149+
}
150+
151+
// DeleteJobAlpha1 deletes a scheduled job.
152+
func (c *GRPCClient) DeleteJobAlpha1(ctx context.Context, name string) error {
153+
// TODO: Name validation
154+
_, err := c.protoClient.DeleteJobAlpha1(ctx, &runtimepb.DeleteJobRequest{
155+
Name: name,
156+
})
157+
return err
158+
}

client/scheduling_test.go renamed to client/jobs_test.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package client
1515

1616
import (
1717
"testing"
18+
"time"
1819

1920
"github.com/stretchr/testify/assert"
2021
"github.com/stretchr/testify/require"
@@ -26,22 +27,27 @@ func TestSchedulingAlpha1(t *testing.T) {
2627

2728
t.Run("schedule job - valid", func(t *testing.T) {
2829
err := testClient.ScheduleJobAlpha1(ctx, &Job{
29-
Name: "test",
30-
Schedule: "test",
31-
Data: &anypb.Any{},
30+
Name: "test",
31+
Schedule: "test",
32+
Data: &anypb.Any{},
33+
FailurePolicy: NewFailurePolicyConstant(nil, nil),
3234
})
3335

3436
require.NoError(t, err)
3537
})
3638

3739
t.Run("get job - valid", func(t *testing.T) {
40+
maxRetries := uint32(4)
41+
interval := time.Second * 10
42+
3843
expected := &Job{
39-
Name: "name",
40-
Schedule: "@every 10s",
41-
Repeats: 4,
42-
DueTime: "10s",
43-
TTL: "10s",
44-
Data: nil,
44+
Name: "name",
45+
Schedule: "@every 10s",
46+
Repeats: 4,
47+
DueTime: "10s",
48+
TTL: "10s",
49+
Data: nil,
50+
FailurePolicy: NewFailurePolicyConstant(&maxRetries, &interval),
4551
}
4652

4753
resp, err := testClient.GetJobAlpha1(ctx, "name")

client/scheduling.go

Lines changed: 0 additions & 90 deletions
This file was deleted.

examples/go.mod

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/dapr/go-sdk/examples
22

3-
go 1.24.2
3+
go 1.24.4
44

55
replace github.com/dapr/go-sdk => ../
66

@@ -9,7 +9,7 @@ require (
99
github.com/dapr/go-sdk v0.0.0-00010101000000-000000000000
1010
github.com/go-redis/redis/v8 v8.11.5
1111
github.com/google/uuid v1.6.0
12-
google.golang.org/grpc v1.72.0
12+
google.golang.org/grpc v1.73.0
1313
google.golang.org/grpc/examples v0.0.0-20240516203910-e22436abb809
1414
google.golang.org/protobuf v1.36.6
1515
)
@@ -19,22 +19,22 @@ require (
1919
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
2020
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2121
github.com/dapr/dapr v1.15.5 // indirect
22-
github.com/dapr/durabletask-go v0.6.5 // indirect
23-
github.com/dapr/kit v0.15.2 // indirect
22+
github.com/dapr/durabletask-go v0.7.2 // indirect
23+
github.com/dapr/kit v0.15.3-0.20250522135818-baea6263991d // indirect
2424
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
2525
github.com/go-chi/chi/v5 v5.1.0 // indirect
2626
github.com/go-logr/logr v1.4.2 // indirect
2727
github.com/go-logr/stdr v1.2.2 // indirect
2828
github.com/sirupsen/logrus v1.9.3 // indirect
2929
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
3030
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
31-
go.opentelemetry.io/otel v1.35.0 // indirect
32-
go.opentelemetry.io/otel/metric v1.35.0 // indirect
33-
go.opentelemetry.io/otel/trace v1.35.0 // indirect
34-
golang.org/x/net v0.40.0 // indirect
31+
go.opentelemetry.io/otel v1.36.0 // indirect
32+
go.opentelemetry.io/otel/metric v1.36.0 // indirect
33+
go.opentelemetry.io/otel/trace v1.36.0 // indirect
34+
golang.org/x/net v0.41.0 // indirect
3535
golang.org/x/sys v0.33.0 // indirect
36-
golang.org/x/text v0.25.0 // indirect
37-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect
36+
golang.org/x/text v0.26.0 // indirect
37+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
3838
gopkg.in/yaml.v3 v3.0.1 // indirect
39-
k8s.io/utils v0.0.0-20250321185631-1f6e0b77f77e // indirect
39+
k8s.io/utils v0.0.0-20250502105355-0f33e8f1c979 // indirect
4040
)

0 commit comments

Comments
 (0)