Skip to content

Commit 5e5c1a7

Browse files
committed
refactor: change filter map to Filter type
1 parent cb30588 commit 5e5c1a7

File tree

10 files changed

+171
-38
lines changed

10 files changed

+171
-38
lines changed

core/mocks/resource_store.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/read.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,12 @@ func (s *Service) GetResource(ctx context.Context, urn string) (*resource.Resour
1919
return res, nil
2020
}
2121

22-
func (s *Service) ListResources(ctx context.Context, project string, kind string) ([]resource.Resource, error) {
23-
filter := map[string]string{}
24-
if kind != "" {
25-
filter["kind"] = kind
26-
}
27-
if project != "" {
28-
filter["project"] = project
29-
}
30-
22+
func (s *Service) ListResources(ctx context.Context, filter resource.Filter) ([]resource.Resource, error) {
3123
resources, err := s.store.List(ctx, filter)
3224
if err != nil {
3325
return nil, errors.ErrInternal.WithCausef(err.Error())
3426
}
35-
return resources, nil
27+
return filter.Apply(resources), nil
3628
}
3729

3830
func (s *Service) GetLog(ctx context.Context, urn string, filter map[string]string) (<-chan module.LogChunk, error) {

core/read_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ func TestService_ListResources(t *testing.T) {
8080
tests := []struct {
8181
name string
8282
setup func(t *testing.T) *core.Service
83-
project string
84-
kind string
83+
filter resource.Filter
8584
want []resource.Resource
8685
wantErr error
8786
}{
@@ -135,7 +134,7 @@ func TestService_ListResources(t *testing.T) {
135134
t.Parallel()
136135
svc := tt.setup(t)
137136

138-
got, err := svc.ListResources(context.Background(), tt.project, tt.kind)
137+
got, err := svc.ListResources(context.Background(), tt.filter)
139138
if tt.wantErr != nil {
140139
assert.Error(t, err)
141140
assert.True(t, errors.Is(err, tt.wantErr))

core/resource/resource.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ var namingPattern = regexp.MustCompile(`^[A-Za-z][A-Za-z0-9_-]+$`)
1818

1919
type Store interface {
2020
GetByURN(ctx context.Context, urn string) (*Resource, error)
21-
List(ctx context.Context, filter map[string]string) ([]Resource, error)
21+
List(ctx context.Context, filter Filter) ([]Resource, error)
2222

2323
Create(ctx context.Context, r Resource, hooks ...MutationHook) error
2424
Update(ctx context.Context, r Resource, hooks ...MutationHook) error
@@ -50,6 +50,12 @@ type Spec struct {
5050
Dependencies map[string]string `json:"dependencies"`
5151
}
5252

53+
type Filter struct {
54+
Kind string `json:"kind"`
55+
Project string `json:"project"`
56+
Labels map[string]string `json:"labels"`
57+
}
58+
5359
func (res *Resource) Validate() error {
5460
res.Kind = strings.TrimSpace(res.Kind)
5561
res.Name = strings.TrimSpace(res.Name)
@@ -73,6 +79,32 @@ func (res *Resource) Validate() error {
7379
return nil
7480
}
7581

82+
func (f Filter) Apply(arr []Resource) []Resource {
83+
var res []Resource
84+
for _, r := range arr {
85+
if f.isMatch(r) {
86+
res = append(res, r)
87+
}
88+
}
89+
return res
90+
}
91+
92+
func (f Filter) isMatch(r Resource) bool {
93+
kindMatch := f.Kind == "" || f.Kind == r.Kind
94+
projectMatch := f.Project == "" || f.Project == r.Project
95+
if !kindMatch || !projectMatch {
96+
return false
97+
}
98+
99+
for k, v := range f.Labels {
100+
if r.Labels[k] != v {
101+
return false
102+
}
103+
}
104+
105+
return true
106+
}
107+
76108
func generateURN(res Resource) string {
77109
parts := []string{"urn", "odpf", "entropy", res.Kind, res.Project, res.Name}
78110
return strings.Join(parts, urnSeparator)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/google/go-cmp v0.5.8
88
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
99
github.com/lib/pq v1.10.4
10+
github.com/mattn/go-sqlite3 v1.14.6
1011
github.com/mcuadros/go-defaults v1.2.0
1112
github.com/newrelic/go-agent/v3 v3.15.2
1213
github.com/newrelic/go-agent/v3/integrations/nrgrpc v1.3.1

internal/server/v1/mocks/resource_service.go

Lines changed: 12 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/server/v1/server.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717

1818
type ResourceService interface {
1919
GetResource(ctx context.Context, urn string) (*resource.Resource, error)
20-
ListResources(ctx context.Context, project string, kind string) ([]resource.Resource, error)
20+
ListResources(ctx context.Context, filter resource.Filter) ([]resource.Resource, error)
2121
CreateResource(ctx context.Context, res resource.Resource) (*resource.Resource, error)
2222
UpdateResource(ctx context.Context, urn string, newSpec resource.Spec) (*resource.Resource, error)
2323
DeleteResource(ctx context.Context, urn string) error
@@ -97,7 +97,13 @@ func (server APIServer) GetResource(ctx context.Context, request *entropyv1beta1
9797
}
9898

9999
func (server APIServer) ListResources(ctx context.Context, request *entropyv1beta1.ListResourcesRequest) (*entropyv1beta1.ListResourcesResponse, error) {
100-
resources, err := server.resourceService.ListResources(ctx, request.GetProject(), request.GetKind())
100+
filter := resource.Filter{
101+
Kind: request.GetKind(),
102+
Project: request.GetProject(),
103+
Labels: nil,
104+
}
105+
106+
resources, err := server.resourceService.ListResources(ctx, filter)
101107
if err != nil {
102108
return nil, generateRPCErr(err)
103109
}

internal/server/v1/server_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ func TestAPIServer_ListResources(t *testing.T) {
409409
t.Helper()
410410
resourceService := &mocks.ResourceService{}
411411
resourceService.EXPECT().
412-
ListResources(mock.Anything, mock.Anything, mock.Anything).
412+
ListResources(mock.Anything, mock.Anything).
413413
Return(nil, errors.New("failed")).Once()
414414

415415
return NewAPIServer(resourceService)
@@ -427,7 +427,7 @@ func TestAPIServer_ListResources(t *testing.T) {
427427
t.Helper()
428428
resourceService := &mocks.ResourceService{}
429429
resourceService.EXPECT().
430-
ListResources(mock.Anything, mock.Anything, mock.Anything).
430+
ListResources(mock.Anything, mock.Anything).
431431
Return([]resource.Resource{
432432
{
433433
URN: "p-testdata-gl-testname-log",

internal/store/mongodb/resources.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414

1515
const resourceRepoName = "resources"
1616

17+
var _ resource.Store = (*ResourceStore)(nil)
18+
1719
type ResourceStore struct{ coll *mongo.Collection }
1820

1921
func NewResourceStore(db *mongo.Database) *ResourceStore {
@@ -36,7 +38,17 @@ func (rc *ResourceStore) GetByURN(ctx context.Context, urn string) (*resource.Re
3638
return modelToResource(rm), nil
3739
}
3840

39-
func (rc *ResourceStore) List(ctx context.Context, filter map[string]string) ([]resource.Resource, error) {
41+
func (rc *ResourceStore) List(ctx context.Context, filter resource.Filter) ([]resource.Resource, error) {
42+
fq := map[string]string{}
43+
44+
if filter.Kind != "" {
45+
fq["kind"] = filter.Kind
46+
}
47+
48+
if filter.Project != "" {
49+
fq["project"] = filter.Project
50+
}
51+
4052
cur, err := rc.coll.Find(ctx, filter)
4153
if err != nil {
4254
return nil, err

pkg/worker/example/main.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"log"
8+
"time"
9+
10+
"go.uber.org/zap"
11+
12+
"github.com/odpf/entropy/pkg/errors"
13+
"github.com/odpf/entropy/pkg/worker"
14+
"github.com/odpf/entropy/pkg/worker/pgq"
15+
)
16+
17+
var (
18+
jID = flag.String("id", "test", "Job ID")
19+
kind = flag.String("kind", "print", "Job kind")
20+
count = flag.Int("count", 1, "Number of jobs to create")
21+
after = flag.Duration("after", 0, "Enqueue a job after")
22+
payload = flag.String("payload", "", "Payload for the job")
23+
24+
runWorker = flag.Bool("worker", false, "Run in worker mode")
25+
queueName = flag.String("queue", "demo", "Queue name")
26+
pgConStr = flag.String("pg", "postgresql://postgres@localhost:5432/postgres?sslmode=disable", "PostgreSQL connection string")
27+
)
28+
29+
func main() {
30+
flag.Parse()
31+
32+
lg, err := zap.NewDevelopment()
33+
if err != nil {
34+
panic(err)
35+
}
36+
37+
q, err := pgq.Open(*pgConStr, *queueName)
38+
if err != nil {
39+
panic(err)
40+
}
41+
42+
opts := []worker.Option{
43+
worker.WithJobKind("test", testJobFn),
44+
worker.WithLogger(lg),
45+
}
46+
47+
w, err := worker.New(q, opts...)
48+
if err != nil {
49+
panic(err)
50+
}
51+
52+
if *runWorker {
53+
if err := w.Run(context.Background()); err != nil {
54+
panic(err)
55+
}
56+
} else {
57+
for i := 0; i < *count; i++ {
58+
log.Println(w.Enqueue(context.Background(), worker.Job{
59+
ID: fmt.Sprintf("%s_%d", *jID, i),
60+
Kind: *kind,
61+
Payload: []byte(*payload),
62+
RunAt: time.Now().Add(*after),
63+
}))
64+
}
65+
}
66+
}
67+
68+
func testJobFn(_ context.Context, job worker.Job) ([]byte, error) {
69+
const maxAttempts = 3
70+
const attemptBackoff = 5 * time.Second
71+
72+
switch string(job.Payload) {
73+
case "fail_after_3":
74+
if job.AttemptsDone < maxAttempts {
75+
return nil, &worker.RetryableError{
76+
Cause: errors.New("fake error [retryable]"),
77+
RetryAfter: attemptBackoff,
78+
}
79+
}
80+
return nil, errors.New("fake error [permanent]")
81+
82+
case "panic":
83+
panic("simulated panic")
84+
85+
case "fail":
86+
return nil, errors.New("fake error [permanent]")
87+
88+
default:
89+
log.Printf("Test Job Says Hello! (attempt=%d)\n", job.AttemptsDone+1)
90+
return []byte("job is done"), nil
91+
}
92+
}

0 commit comments

Comments
 (0)