Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics retrieval and storage #58

Merged
merged 26 commits into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
aaf7dbf
ENH: Add metrics client
zbrookle Feb 28, 2021
ff6cb5e
ENH: Add additional testing for kube metrics client
zbrookle Feb 28, 2021
d44bb7b
ENH: Integrate metrics client
zbrookle Feb 28, 2021
18b12cf
ENH: Start creating metrics listener
zbrookle Mar 1, 2021
d058e0c
ENH: Add testing default namespace
zbrookle Mar 1, 2021
ff44dc7
ENH: Add listing metrics to client
zbrookle Mar 1, 2021
c231eff
ENH: Create reactive fake metrics client
zbrookle Mar 1, 2021
b7636d9
ENH: Change metrics client to pull from status
zbrookle Mar 2, 2021
7047547
ENH: Metrics client can now exec directly on containers
zbrookle Mar 3, 2021
1d07fca
CLN: Remove unneeded deps
zbrookle Mar 3, 2021
412baad
ENH: Metrics client properly returning metrics for containers
zbrookle Mar 3, 2021
470004d
META: Change to go.sum
zbrookle Mar 3, 2021
eeeec10
ENH: Share restconfig through all of dags metric client
zbrookle Mar 3, 2021
72a03e8
ENH: Switch to new metric client structure
zbrookle Mar 3, 2021
f9ada51
TST: Finish metrics tests
zbrookle Mar 3, 2021
5b9cc97
BUG: Need to use 64 bits for store TB or higher amounts of mem
zbrookle Mar 3, 2021
d322da1
ENH: Add int64 to db types
zbrookle Mar 3, 2021
79e4265
BUG: Was returning entire struct
zbrookle Mar 3, 2021
758da0b
ENH: Add metrics database code
zbrookle Mar 3, 2021
e402c1b
BUG: Cap should always be unlimited for metrics
zbrookle Mar 3, 2021
6f57ddf
ENH: Metrics should return ASC
zbrookle Mar 3, 2021
0adf472
CLN: Don't need to print metrics
zbrookle Mar 3, 2021
488386c
DOC: Typo
zbrookle Mar 3, 2021
38169c2
ENH: Remove dead code
zbrookle Mar 3, 2021
0090f78
BUG: Don't try to clean up env in test mode
zbrookle Mar 3, 2021
6ffd6da
ENH: Fix tables when fetching unlimited
zbrookle Mar 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ module goflow
go 1.15

require (
github.com/google/go-cmp v0.4.0
github.com/google/go-cmp v0.5.2
github.com/gorilla/mux v1.8.0
github.com/kennygrant/sanitize v1.2.4
github.com/mattn/go-sqlite3 v1.14.5
github.com/robfig/cron v1.2.0
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6 // indirect
k8s.io/api v0.19.0
k8s.io/apimachinery v0.19.0
k8s.io/client-go v0.19.0
github.com/sirupsen/logrus v1.8.0 // indirect
k8s.io/api v0.20.4
k8s.io/apimachinery v0.20.4
k8s.io/cli-runtime v0.20.4
k8s.io/client-go v0.20.4
k8s.io/kubectl v0.20.4
k8s.io/metrics v0.20.4
)
424 changes: 382 additions & 42 deletions go.sum

Large diffs are not rendered by default.

21 changes: 17 additions & 4 deletions goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"goflow/internal/config"
"goflow/internal/dag/metrics"
"goflow/internal/dag/orchestrator"
"goflow/internal/k8s/client"
"goflow/internal/k8s/pod/utils"
Expand All @@ -13,14 +14,16 @@ import (
"io/ioutil"
"time"

core "k8s.io/api/core/v1"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)

var host string
var port int

func main() {
defer utils.CleanUpEnvironment(client.CreateKubeClient())
configPath := flag.String(
"path",
paths.GetGoDefaultHomePath(),
Expand All @@ -38,11 +41,21 @@ func main() {

var orch *orchestrator.Orchestrator
if *testMode {
orch = orchestrator.NewOrchestratorFromClientAndConfig(
fake.NewSimpleClientset(),
config.CreateConfig(*configPath),
kubeClient := fake.NewSimpleClientset()
kubeClient.Tracker().Add(&core.Namespace{
ObjectMeta: v1.ObjectMeta{
Name: "default",
},
})
config := config.CreateConfig(*configPath)
config.DAGsOn = true
orch = orchestrator.NewOrchestratorFromClientsAndConfig(
kubeClient,
config,
metrics.NewDAGMetricsClient(kubeClient, true),
)
} else {
defer utils.CleanUpEnvironment(client.CreateKubeClient())
orch = orchestrator.NewOrchestrator(*configPath)
}
orch.Start(1 * time.Second)
Expand Down
15 changes: 10 additions & 5 deletions internal/dag/dagtype/dag_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
goflowconfig "goflow/internal/config"
"goflow/internal/dag/activeruns"
dagconfig "goflow/internal/dag/config"
"goflow/internal/dag/metrics"
dagrun "goflow/internal/dag/run"
"goflow/internal/jsonpanic"
"goflow/internal/k8s/pod/event/holder"
Expand Down Expand Up @@ -48,13 +49,9 @@ type DAG struct {
ID int
IsOn bool
LastUpdated time.Time
metricsClient *metrics.DAGMetricsClient
}

// type DAGMetrics struct {
// Successes int;
// Failures int;
// }

func readDAGFile(dagFilePath string) ([]byte, error) {
dat, err := ioutil.ReadFile(dagFilePath)
if err != nil {
Expand All @@ -76,6 +73,7 @@ func CreateDAG(
config *dagconfig.DAGConfig,
code string,
client kubernetes.Interface,
metricsClient *metrics.DAGMetricsClient,
schedules ScheduleCache,
tableClient *dagtable.TableClient,
filePath string,
Expand All @@ -101,6 +99,7 @@ func CreateDAG(
filePath: filePath,
dagRunTableClient: dagRunTableClient,
IsOn: defaultIsOn,
metricsClient: metricsClient,
}
dag.StartDateTime = getDateFromString(dag.Config.StartDateTime)
if dag.Config.EndDateTime != "" {
Expand Down Expand Up @@ -131,6 +130,7 @@ func newDagRow(dag *DAG) dagtable.Row {
func createDAGFromJSONBytes(
dagBytes []byte,
client kubernetes.Interface,
metricsClient *metrics.DAGMetricsClient,
goflowConfig goflowconfig.GoFlowConfig,
scheduleCache ScheduleCache,
tableClient *dagtable.TableClient,
Expand Down Expand Up @@ -160,6 +160,7 @@ func createDAGFromJSONBytes(
&dagConfigStruct,
string(dagBytes),
client,
metricsClient,
scheduleCache,
tableClient,
filePath,
Expand All @@ -173,6 +174,7 @@ func createDAGFromJSONBytes(
func getDAGFromJSON(
dagFilePath string,
client kubernetes.Interface,
metricsClient *metrics.DAGMetricsClient,
goflowConfig goflowconfig.GoFlowConfig,
scheduleCache ScheduleCache,
tableClient *dagtable.TableClient,
Expand All @@ -185,6 +187,7 @@ func getDAGFromJSON(
dagJSON, err := createDAGFromJSONBytes(
dagBytes,
client,
metricsClient,
goflowConfig,
scheduleCache,
tableClient,
Expand Down Expand Up @@ -230,6 +233,7 @@ func getDirSliceRecur(directory string) []string {
func GetDAGSFromFolder(
folder string,
client kubernetes.Interface,
metricsClient *metrics.DAGMetricsClient,
goflowConfig goflowconfig.GoFlowConfig,
schedules ScheduleCache,
tableClient *dagtable.TableClient,
Expand All @@ -242,6 +246,7 @@ func GetDAGSFromFolder(
dag, err := getDAGFromJSON(
file,
client,
metricsClient,
goflowConfig,
schedules,
tableClient,
Expand Down
8 changes: 6 additions & 2 deletions internal/dag/dagtype/dag_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
goflowconfig "goflow/internal/config"
"goflow/internal/dag/activeruns"
dagconfig "goflow/internal/dag/config"
"goflow/internal/dag/metrics"
dagrun "goflow/internal/dag/run"
"goflow/internal/database"
k8sclient "goflow/internal/k8s/client"
Expand All @@ -31,6 +32,7 @@ var DAGPATH string
var TABLECLIENT *dagtable.TableClient
var SQLCLIENT *database.SQLClient
var RUNTABLECLIENT *dagruntable.TableClient
var METRICSCLIENT metrics.DAGMetricsClient

func setUpNamespaces(client kubernetes.Interface) {
namespaceClient := client.CoreV1().Namespaces()
Expand Down Expand Up @@ -119,9 +121,11 @@ func TestDAGFromJSONBytes(t *testing.T) {
timeLock: &sync.Mutex{},
}
expectedJSONString := string(expectedDAG.Marshal())
kubeClient := fake.NewSimpleClientset()
dag, err := createDAGFromJSONBytes(
[]byte(formattedJSONString),
fake.NewSimpleClientset(),
kubeClient,
metrics.NewDAGMetricsClient(kubeClient, true),
goflowconfig.GoFlowConfig{},
make(ScheduleCache),
TABLECLIENT,
Expand Down Expand Up @@ -174,7 +178,7 @@ func getTestDAG(client kubernetes.Interface) *DAG {
MaxActiveRuns: 1,
StartDateTime: "2019-01-01",
EndDateTime: "",
}, "", client, make(ScheduleCache), TABLECLIENT, "path", RUNTABLECLIENT, false)
}, "", client, metrics.NewDAGMetricsClient(client, true), make(ScheduleCache), TABLECLIENT, "path", RUNTABLECLIENT, false)
return &dag
}

Expand Down
72 changes: 72 additions & 0 deletions internal/dag/metrics/kube_exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package metrics

import (
"io"

core "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"
)

type writeWrapper struct {
data []byte
}

func (w *writeWrapper) Write(data []byte) (int, error) {
w.data = append(w.data, data...)
return len(data), nil
}

func newWriteWrapper() writeWrapper {
return writeWrapper{make([]byte, 0)}
}

func execCmd(
client kubernetes.Interface,
config *restclient.Config,
podName string,
command string,
stdin io.Reader,
stdout io.Writer,
stderr io.Writer,
containerName string,
) error {
cmd := []string{
"sh",
"-c",
command,
}
req := client.CoreV1().RESTClient().Post().Resource("pods").Name(podName).
Namespace("default").SubResource("exec")
option := &core.PodExecOptions{
Command: cmd,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
Container: containerName,
}
if stdin == nil {
option.Stdin = false
}
req.VersionedParams(
option,
scheme.ParameterCodec,
)
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return err
}
err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
})
if err != nil {
return err
}

return nil
}
Loading