-
Notifications
You must be signed in to change notification settings - Fork 698
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
21 changed files
with
734 additions
and
2,229 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
Copyright 2017 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package options | ||
|
||
import ( | ||
"github.com/spf13/pflag" | ||
"time" | ||
) | ||
|
||
// ServerOption is the main context object for the controller manager. | ||
type ServerOption struct { | ||
ChaosLevel int | ||
ControllerConfigFile string | ||
PrintVersion bool | ||
GCInterval time.Duration | ||
} | ||
|
||
// NewServerOption creates a new CMServer with a default config. | ||
func NewServerOption() *ServerOption { | ||
s := ServerOption{} | ||
return &s | ||
} | ||
|
||
// AddFlags adds flags for a specific CMServer to the specified FlagSet | ||
func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { | ||
// chaos level will be removed once we have a formal tool to inject failures. | ||
fs.IntVar(&s.ChaosLevel, "chaos-level", -1, "DO NOT USE IN PRODUCTION - level of chaos injected into the TfJob created by the operator.") | ||
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") | ||
fs.DurationVar(&s.GCInterval, "gc-interval", 10*time.Minute, "GC interval") | ||
fs.StringVar(&s.ControllerConfigFile, "controller_config_file", "", "Path to file containing the controller config.") | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
/* | ||
Copyright 2017 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package app | ||
|
||
import ( | ||
"github.com/tensorflow/k8s/pkg/controller" | ||
|
||
"github.com/tensorflow/k8s/cmd/tf_operator/app/options" | ||
tfjobclient "github.com/tensorflow/k8s/pkg/client/clientset/versioned" | ||
informers "github.com/tensorflow/k8s/pkg/client/informers/externalversions" | ||
"github.com/tensorflow/k8s/pkg/util/k8sutil" | ||
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" | ||
|
||
"fmt" | ||
"github.com/golang/glog" | ||
"github.com/tensorflow/k8s/pkg/apis/tensorflow/v1alpha1" | ||
"github.com/tensorflow/k8s/pkg/util" | ||
"github.com/tensorflow/k8s/version" | ||
"io/ioutil" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
election "k8s.io/client-go/tools/leaderelection" | ||
"k8s.io/client-go/tools/leaderelection/resourcelock" | ||
"k8s.io/client-go/tools/record" | ||
"os" | ||
"runtime" | ||
|
||
"github.com/ghodss/yaml" | ||
"github.com/tensorflow/k8s/pkg/client/clientset/versioned/scheme" | ||
"k8s.io/api/core/v1" | ||
clientset "k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/rest" | ||
"time" | ||
) | ||
|
||
var ( | ||
leaseDuration = 15 * time.Second | ||
renewDuration = 5 * time.Second | ||
retryPeriod = 3 * time.Second | ||
) | ||
|
||
func Run(opt *options.ServerOption) error { | ||
namespace := os.Getenv("MY_POD_NAMESPACE") | ||
if len(namespace) == 0 { | ||
glog.Fatalf("must set env MY_POD_NAMESPACE") | ||
} | ||
|
||
if opt.PrintVersion { | ||
fmt.Println("tf_operator Version:", version.Version) | ||
fmt.Println("Git SHA:", version.GitSHA) | ||
fmt.Println("Go Version:", runtime.Version()) | ||
fmt.Printf("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH) | ||
os.Exit(0) | ||
} | ||
|
||
glog.Infof("tf_operator Version: %v", version.Version) | ||
glog.Infof("Git SHA: %s", version.GitSHA) | ||
glog.Infof("Go Version: %s", runtime.Version()) | ||
glog.Infof("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH) | ||
|
||
config, err := k8sutil.GetClusterConfig() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
kubeClient, leaderElectionClient, tfJobClient, apiExtensionsclient, err := createClients(config) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
controllerConfig := readControllerConfig(opt.ControllerConfigFile) | ||
|
||
neverStop := make(chan struct{}) | ||
defer close(neverStop) | ||
|
||
tfJobInformerFactory := informers.NewSharedInformerFactory(tfJobClient, time.Second*30) | ||
controller, err := controller.New(kubeClient, apiExtensionsclient, tfJobClient, *controllerConfig, tfJobInformerFactory) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
go tfJobInformerFactory.Start(neverStop) | ||
|
||
run := func(stopCh <-chan struct{}) { | ||
controller.Run(1, stopCh) | ||
} | ||
|
||
id, err := os.Hostname() | ||
if err != nil { | ||
return fmt.Errorf("failed to get hostname: %v", err) | ||
} | ||
|
||
// Prepare event clients. | ||
eventBroadcaster := record.NewBroadcaster() | ||
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "tf-operator"}) | ||
|
||
rl := &resourcelock.EndpointsLock{ | ||
EndpointsMeta: metav1.ObjectMeta{ | ||
Namespace: namespace, | ||
Name: "tf-operator", | ||
}, | ||
Client: leaderElectionClient.CoreV1(), | ||
LockConfig: resourcelock.ResourceLockConfig{ | ||
Identity: id, | ||
EventRecorder: recorder, | ||
}, | ||
} | ||
|
||
election.RunOrDie(election.LeaderElectionConfig{ | ||
Lock: rl, | ||
LeaseDuration: leaseDuration, | ||
RenewDeadline: renewDuration, | ||
RetryPeriod: retryPeriod, | ||
Callbacks: election.LeaderCallbacks{ | ||
OnStartedLeading: run, | ||
OnStoppedLeading: func() { | ||
glog.Fatalf("leader election lost") | ||
}, | ||
}, | ||
}) | ||
|
||
panic("unreachable") | ||
|
||
return nil | ||
} | ||
|
||
func readControllerConfig(controllerConfigFile string) *v1alpha1.ControllerConfig { | ||
controllerConfig := &v1alpha1.ControllerConfig{} | ||
if controllerConfigFile != "" { | ||
glog.Infof("Loading controller config from %v.", controllerConfigFile) | ||
data, err := ioutil.ReadFile(controllerConfigFile) | ||
if err != nil { | ||
glog.Fatalf("Could not read file: %v. Error: %v", controllerConfigFile, err) | ||
return controllerConfig | ||
} | ||
err = yaml.Unmarshal(data, controllerConfig) | ||
if err != nil { | ||
glog.Fatalf("Could not parse controller config; Error: %v\n", err) | ||
} | ||
glog.Infof("ControllerConfig: %v", util.Pformat(controllerConfig)) | ||
} else { | ||
glog.Info("No controller_config_file provided; using empty config.") | ||
} | ||
return controllerConfig | ||
} | ||
|
||
func createClients(config *rest.Config) (clientset.Interface, clientset.Interface, tfjobclient.Interface, apiextensionsclient.Interface, error) { | ||
kubeClient, err := clientset.NewForConfig(rest.AddUserAgent(config, "tfjob_operator")) | ||
if err != nil { | ||
return nil, nil, nil, nil, err | ||
} | ||
|
||
leaderElectionClient, err := clientset.NewForConfig(rest.AddUserAgent(config, "leader-election")) | ||
if err != nil { | ||
return nil, nil, nil, nil, err | ||
} | ||
|
||
tfJobClient, err := tfjobclient.NewForConfig(config) | ||
if err != nil { | ||
return nil, nil, nil, nil, err | ||
} | ||
|
||
apiExtensionsclient, err := apiextensionsclient.NewForConfig(config) | ||
if err != nil { | ||
return nil, nil, nil, nil, err | ||
} | ||
|
||
return kubeClient, leaderElectionClient, tfJobClient, apiExtensionsclient, nil | ||
} |
Oops, something went wrong.