Skip to content

Commit

Permalink
v1alpha2 metrics collector - controller (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardsliu authored and k8s-ci-robot committed May 10, 2019
1 parent 7cadd62 commit 9ebbc79
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 3 deletions.
23 changes: 23 additions & 0 deletions pkg/controller/v1alpha2/experiment/experiment_util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package experiment

import (
"bytes"
"context"
"fmt"

Expand All @@ -12,9 +13,12 @@ import (
trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2"
apiv1alpha2 "github.com/kubeflow/katib/pkg/api/v1alpha2"
"github.com/kubeflow/katib/pkg/controller/v1alpha2/experiment/util"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
)

func (r *ReconcileExperiment) createTrialInstance(expInstance *experimentsv1alpha2.Experiment, trialInstance *apiv1alpha2.Trial) error {
BUFSIZE := 1024
logger := log.WithValues("Experiment", types.NamespacedName{Name: expInstance.GetName(), Namespace: expInstance.GetNamespace()})

trial := &trialsv1alpha2.Trial{}
Expand Down Expand Up @@ -46,6 +50,25 @@ func (r *ReconcileExperiment) createTrialInstance(expInstance *experimentsv1alph

trial.Spec.RunSpec = runSpec

buf := bytes.NewBufferString(runSpec)
job := &unstructured.Unstructured{}
if err := k8syaml.NewYAMLOrJSONDecoder(buf, BUFSIZE).Decode(job); err != nil {
return fmt.Errorf("Invalid spec.trialTemplate: %v.", err)
}

var metricNames []string
metricNames = append(metricNames, expInstance.Spec.Objective.ObjectiveMetricName)
for _, mn := range expInstance.Spec.Objective.AdditionalMetricNames {
metricNames = append(metricNames, mn)
}

mcSpec, err := util.GetMetricsCollectorManifest(expInstance.GetName(), trial.Name, job.GetKind(), trial.Namespace, metricNames, expInstance.Spec.MetricsCollectorSpec)
if err != nil {
logger.Error(err, "Error getting metrics collector manifest")
return err
}
trial.Spec.MetricsCollectorSpec = mcSpec.String()

if err := r.Create(context.TODO(), trial); err != nil {
logger.Error(err, "Trial create error", "Trial name", trial.Name)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/v1alpha2/experiment/util/manifest_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func getMetricsCollectorManifest(experimentName string, trialName string, jobKind string, namespace string, metricNames []string, mcs *katibv1alpha2.MetricsCollectorSpec) (*bytes.Buffer, error) {
func GetMetricsCollectorManifest(experimentName string, trialName string, jobKind string, namespace string, metricNames []string, mcs *katibv1alpha2.MetricsCollectorSpec) (*bytes.Buffer, error) {
var mtp *template.Template = nil
var err error
tmpValues := map[string]string{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/v1alpha2/experiment/util/webhook_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func validateMetricsCollector(inst *ep_v1alpha2.Experiment) error {
}

var mcjob batchv1beta.CronJob
mcm, err := getMetricsCollectorManifest(experimentName, trialName, job.GetKind(), namespace, metricNames, inst.Spec.MetricsCollectorSpec)
mcm, err := GetMetricsCollectorManifest(experimentName, trialName, job.GetKind(), namespace, metricNames, inst.Spec.MetricsCollectorSpec)
if err != nil {
logger.Printf("getMetricsCollectorManifest error %v", err)
return err
Expand Down
29 changes: 28 additions & 1 deletion pkg/controller/v1alpha2/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"

batchv1 "k8s.io/api/batch/v1"
batchv1beta "k8s.io/api/batch/v1beta1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -223,7 +224,11 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha2.Trial, desiredJob
}
}

//TODO create Metric colletor
err = r.spawnMetricsCollector(instance)
if err != nil {
logger.Error(err, "Metrics collector spawning error")
return nil, err
}

msg := "Trial is running"
instance.MarkTrialStatusRunning(util.TrialRunningReason, msg)
Expand All @@ -248,3 +253,25 @@ func (r *ReconcileTrial) getDesiredJobSpec(instance *trialsv1alpha2.Trial) (*uns

return desiredJobSpec, nil
}

func (r *ReconcileTrial) spawnMetricsCollector(instance *trialsv1alpha2.Trial) error {
var mcjob batchv1beta.CronJob
bufSize := 1024
logger := log.WithValues("Metrics collector for Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()})
buf := bytes.NewBufferString(instance.Spec.MetricsCollectorSpec)

if err := k8syaml.NewYAMLOrJSONDecoder(buf, bufSize).Decode(mcjob); err != nil {
logger.Error(err, "Yaml decode error")
return err
}
if err := controllerutil.SetControllerReference(instance, &mcjob, r.scheme); err != nil {
logger.Error(err, "Set controller reference error")
return err
}
if err := r.Create(context.TODO(), &mcjob); err != nil {
logger.Error(err, "MetricsCollector Job Create error")
return err
}

return nil
}

0 comments on commit 9ebbc79

Please sign in to comment.