Skip to content

Commit 11596a9

Browse files
authored
refactor(firehose): configs to remove additional default configurations (#62)
* refactor(firehose): configs to remove additional default configurations * fix(firehose): linting errors * refactor(firehose): remove redundat config prefix from firehose configs key * refactor(firehose): remove unused go modules * fix(firehose): update firehose jsonschema
1 parent 0aa714a commit 11596a9

File tree

9 files changed

+401
-386
lines changed

9 files changed

+401
-386
lines changed

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ require (
66
github.com/Masterminds/squirrel v1.5.2
77
github.com/google/go-cmp v0.5.8
88
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
9-
github.com/k0kubun/pp/v3 v3.1.0
109
github.com/lib/pq v1.10.4
1110
github.com/mcuadros/go-defaults v1.2.0
1211
github.com/newrelic/go-agent/v3 v3.15.2

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,8 +470,6 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
470470
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
471471
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
472472
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
473-
github.com/k0kubun/pp/v3 v3.1.0 h1:ifxtqJkRZhw3h554/z/8zm6AAbyO4LLKDlA5eV+9O8Q=
474-
github.com/k0kubun/pp/v3 v3.1.0/go.mod h1:vIrP5CF0n78pKHm2Ku6GVerpZBJvscg48WepUYEk2gw=
475473
github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw=
476474
github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
477475
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=

modules/firehose/config.go

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ const (
1414
defaultChartString = "firehose"
1515
defaultVersionString = "0.1.1"
1616
defaultRepositoryString = "https://odpf.github.io/charts/"
17+
defaultImagePullPolicy = "IfNotPresent"
18+
defaultImageRepository = "odpf/firehose"
19+
defaultImageTag = "latest"
20+
defaultReplicaCount = 1
1721
)
1822

1923
var (
@@ -25,20 +29,52 @@ var (
2529
)
2630

2731
type moduleConfig struct {
28-
State string `json:"state"`
29-
ReleaseConfigs helm.ReleaseConfig `json:"release_configs"`
32+
State string `json:"state"`
33+
ChartVersion string `json:"chart_version"`
34+
Firehose struct {
35+
Replicas int `json:"replicas"`
36+
KafkaBrokerAddress string `json:"kafka_broker_address"`
37+
KafkaTopic string `json:"kafka_topic"`
38+
KafkaConsumerID string `json:"kafka_consumer_id"`
39+
EnvVariables map[string]string `json:"env_variables"`
40+
} `json:"firehose"`
3041
}
3142

32-
func (mc *moduleConfig) sanitiseAndValidate(r resource.Resource) {
33-
rc := mc.ReleaseConfigs
43+
func (mc *moduleConfig) sanitiseAndValidate() {
44+
if mc.ChartVersion == "" {
45+
mc.ChartVersion = defaultVersionString
46+
}
47+
}
48+
49+
func (mc moduleConfig) GetHelmReleaseConfig(r resource.Resource) *helm.ReleaseConfig {
50+
rc := helm.DefaultReleaseConfig()
3451
rc.Name = fmt.Sprintf("%s-%s-firehose", r.Project, r.Name)
3552
rc.Repository = defaultRepositoryString
3653
rc.Chart = defaultChartString
37-
rc.Version = defaultVersionString
3854
rc.Namespace = defaultNamespace
3955
rc.ForceUpdate = true
56+
rc.Version = mc.ChartVersion
57+
58+
fc := mc.Firehose
59+
fc.EnvVariables["SOURCE_KAFKA_BROKERS"] = fc.KafkaBrokerAddress
60+
fc.EnvVariables["SOURCE_KAFKA_TOPIC"] = fc.KafkaTopic
61+
fc.EnvVariables["SOURCE_KAFKA_CONSUMER_GROUP_ID"] = fc.KafkaConsumerID
62+
63+
hv := map[string]interface{}{
64+
"replicaCount": defaultReplicaCount,
65+
"firehose": map[string]interface{}{
66+
"image": map[string]interface{}{
67+
"repository": defaultImageRepository,
68+
"pullPolicy": defaultImagePullPolicy,
69+
"tag": defaultImageTag,
70+
},
71+
},
72+
"config": fc.EnvVariables,
73+
}
74+
75+
rc.Values = hv
4076

41-
mc.ReleaseConfigs = rc
77+
return rc
4278
}
4379

4480
func (mc moduleConfig) JSON() []byte {

modules/firehose/log.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package firehose
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
7+
"github.com/odpf/entropy/core/module"
8+
"github.com/odpf/entropy/modules/kubernetes"
9+
"github.com/odpf/entropy/pkg/errors"
10+
"github.com/odpf/entropy/pkg/kube"
11+
)
12+
13+
func (*firehoseModule) Log(ctx context.Context, spec module.Spec, filter map[string]string) (<-chan module.LogChunk, error) {
14+
r := spec.Resource
15+
16+
var conf moduleConfig
17+
if err := json.Unmarshal(r.Spec.Configs, &conf); err != nil {
18+
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
19+
}
20+
21+
var kubeOut kubernetes.Output
22+
if err := json.Unmarshal(spec.Dependencies[keyKubeDependency].Output, &kubeOut); err != nil {
23+
return nil, err
24+
}
25+
26+
if filter == nil {
27+
filter = make(map[string]string)
28+
}
29+
filter["app"] = conf.GetHelmReleaseConfig(r).Name
30+
31+
kubeCl := kube.NewClient(kubeOut.Configs)
32+
logs, err := kubeCl.StreamLogs(ctx, defaultNamespace, filter)
33+
if err != nil {
34+
return nil, err
35+
}
36+
37+
mappedLogs := make(chan module.LogChunk)
38+
go func() {
39+
defer close(mappedLogs)
40+
for {
41+
select {
42+
case log, ok := <-logs:
43+
if !ok {
44+
return
45+
}
46+
mappedLogs <- module.LogChunk{Data: log.Data, Labels: log.Labels}
47+
case <-ctx.Done():
48+
return
49+
}
50+
}
51+
}()
52+
53+
return mappedLogs, err
54+
}

modules/firehose/module.go

Lines changed: 4 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,10 @@
11
package firehose
22

33
import (
4-
"context"
54
_ "embed"
6-
"encoding/json"
75

86
"github.com/odpf/entropy/core/module"
9-
"github.com/odpf/entropy/core/resource"
107
"github.com/odpf/entropy/modules/kubernetes"
11-
"github.com/odpf/entropy/pkg/errors"
12-
"github.com/odpf/entropy/pkg/helm"
13-
"github.com/odpf/entropy/pkg/kube"
148
)
159

1610
const (
@@ -20,15 +14,16 @@ const (
2014
)
2115

2216
const (
23-
helmCreate = "helm_create"
24-
helmUpdate = "helm_update"
17+
releaseCreate = "release_create"
18+
releaseUpdate = "release_update"
19+
)
2520

21+
const (
2622
stateRunning = "RUNNING"
2723
stateStopped = "STOPPED"
2824
)
2925

3026
const (
31-
keyReplicaCount = "replicaCount"
3227
keyKubeDependency = "kube_cluster"
3328
)
3429

@@ -66,176 +61,3 @@ var Module = module.Descriptor{
6661
}
6762

6863
type firehoseModule struct{}
69-
70-
func (m *firehoseModule) Plan(_ context.Context, spec module.Spec, act module.ActionRequest) (*resource.Resource, error) {
71-
if act.Name == module.CreateAction {
72-
return m.planCreate(spec, act)
73-
}
74-
return m.planChange(spec, act)
75-
}
76-
77-
func (m *firehoseModule) Sync(_ context.Context, spec module.Spec) (*resource.State, error) {
78-
r := spec.Resource
79-
80-
var data moduleData
81-
if err := json.Unmarshal(r.State.ModuleData, &data); err != nil {
82-
return nil, err
83-
}
84-
85-
if len(data.PendingSteps) == 0 {
86-
return &resource.State{
87-
Status: resource.StatusCompleted,
88-
Output: r.State.Output,
89-
ModuleData: r.State.ModuleData,
90-
}, nil
91-
}
92-
93-
pendingStep := data.PendingSteps[0]
94-
data.PendingSteps = data.PendingSteps[1:]
95-
96-
var conf moduleConfig
97-
if err := json.Unmarshal(r.Spec.Configs, &conf); err != nil {
98-
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
99-
}
100-
101-
var kubeOut kubernetes.Output
102-
if err := json.Unmarshal(spec.Dependencies[keyKubeDependency].Output, &kubeOut); err != nil {
103-
return nil, err
104-
}
105-
106-
if err := m.helmSync(pendingStep == helmCreate, conf, kubeOut); err != nil {
107-
return nil, err
108-
}
109-
110-
return &resource.State{
111-
Status: resource.StatusCompleted,
112-
Output: Output{
113-
Namespace: conf.ReleaseConfigs.Namespace,
114-
ReleaseName: conf.ReleaseConfigs.Name,
115-
}.JSON(),
116-
ModuleData: data.JSON(),
117-
}, nil
118-
}
119-
120-
func (*firehoseModule) Log(ctx context.Context, spec module.Spec, filter map[string]string) (<-chan module.LogChunk, error) {
121-
r := spec.Resource
122-
123-
var conf moduleConfig
124-
if err := json.Unmarshal(r.Spec.Configs, &conf); err != nil {
125-
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
126-
}
127-
128-
var kubeOut kubernetes.Output
129-
if err := json.Unmarshal(spec.Dependencies[keyKubeDependency].Output, &kubeOut); err != nil {
130-
return nil, err
131-
}
132-
133-
if filter == nil {
134-
filter = make(map[string]string)
135-
}
136-
filter["app"] = conf.ReleaseConfigs.Name
137-
138-
kubeCl := kube.NewClient(kubeOut.Configs)
139-
logs, err := kubeCl.StreamLogs(ctx, defaultNamespace, filter)
140-
if err != nil {
141-
return nil, err
142-
}
143-
144-
mappedLogs := make(chan module.LogChunk)
145-
go func() {
146-
defer close(mappedLogs)
147-
for {
148-
select {
149-
case log, ok := <-logs:
150-
if !ok {
151-
return
152-
}
153-
mappedLogs <- module.LogChunk{Data: log.Data, Labels: log.Labels}
154-
case <-ctx.Done():
155-
return
156-
}
157-
}
158-
}()
159-
160-
return mappedLogs, err
161-
}
162-
163-
func (*firehoseModule) planCreate(spec module.Spec, act module.ActionRequest) (*resource.Resource, error) {
164-
r := spec.Resource
165-
166-
var reqConf moduleConfig
167-
if err := json.Unmarshal(act.Params, &reqConf); err != nil {
168-
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
169-
} else {
170-
reqConf.sanitiseAndValidate(r)
171-
}
172-
173-
r.Spec.Configs = reqConf.JSON()
174-
r.State = resource.State{
175-
Status: resource.StatusPending,
176-
ModuleData: moduleData{
177-
PendingSteps: []string{helmCreate},
178-
}.JSON(),
179-
}
180-
return &r, nil
181-
}
182-
183-
func (*firehoseModule) planChange(spec module.Spec, act module.ActionRequest) (*resource.Resource, error) {
184-
r := spec.Resource
185-
186-
var conf moduleConfig
187-
if err := json.Unmarshal(r.Spec.Configs, &conf); err != nil {
188-
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
189-
}
190-
191-
switch act.Name {
192-
case module.UpdateAction:
193-
var reqConf moduleConfig
194-
if err := json.Unmarshal(act.Params, &reqConf); err != nil {
195-
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
196-
}
197-
conf = reqConf
198-
199-
case ScaleAction:
200-
var scaleParams struct {
201-
Replicas int `json:"replicas"`
202-
}
203-
if err := json.Unmarshal(act.Params, &scaleParams); err != nil {
204-
return nil, errors.ErrInvalid.WithMsgf("invalid config json: %v", err)
205-
}
206-
conf.ReleaseConfigs.Values[keyReplicaCount] = scaleParams.Replicas
207-
208-
case StartAction:
209-
conf.State = stateRunning
210-
211-
case StopAction:
212-
conf.State = stateStopped
213-
}
214-
215-
conf.sanitiseAndValidate(r)
216-
r.Spec.Configs = conf.JSON()
217-
r.State = resource.State{
218-
Status: resource.StatusPending,
219-
ModuleData: moduleData{
220-
PendingSteps: []string{helmUpdate},
221-
}.JSON(),
222-
}
223-
return &r, nil
224-
}
225-
226-
func (*firehoseModule) helmSync(isCreate bool, conf moduleConfig, kube kubernetes.Output) error {
227-
helmCl := helm.NewClient(&helm.Config{Kubernetes: kube.Configs})
228-
229-
if conf.State == stateStopped {
230-
conf.ReleaseConfigs.Values[keyReplicaCount] = 0
231-
}
232-
233-
var helmErr error
234-
if isCreate {
235-
_, helmErr = helmCl.Create(&conf.ReleaseConfigs)
236-
} else {
237-
_, helmErr = helmCl.Update(&conf.ReleaseConfigs)
238-
}
239-
240-
return helmErr
241-
}

0 commit comments

Comments
 (0)