Skip to content

Add instance and pricing information to cluster info output #1053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
May 12, 2020
Merged
195 changes: 165 additions & 30 deletions cli/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// cx cluster up/down

package cmd

import (
Expand All @@ -34,10 +32,11 @@ import (
"github.com/cortexlabs/cortex/pkg/lib/exit"
"github.com/cortexlabs/cortex/pkg/lib/files"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
"github.com/cortexlabs/cortex/pkg/lib/print"
"github.com/cortexlabs/cortex/pkg/lib/prompt"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/lib/table"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
"github.com/cortexlabs/cortex/pkg/operator/schema"
"github.com/cortexlabs/cortex/pkg/types"
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
"github.com/cortexlabs/cortex/pkg/types/clusterstate"
Expand Down Expand Up @@ -378,22 +377,7 @@ func cmdInfo(awsCreds AWSCredentials, accessConfig *clusterconfig.AccessConfig,
exit.Error(err)
}

clusterState, err := clusterstate.GetClusterState(awsClient, accessConfig)
if err != nil {
if errors.GetKind(err) == clusterstate.ErrUnexpectedCloudFormationStatus {
fmt.Println(fmt.Sprintf("cluster named \"%s\" in %s is in an unexpected state, please run `cortex cluster down` to delete the cluster or delete the cloudformation stacks manually in your AWS console %s", *accessConfig.ClusterName, *accessConfig.Region, getCloudFormationURLWithAccessConfig(accessConfig)))
}
exit.Error(err)
}

fmt.Println(clusterState.TableString())
if clusterState.Status == clusterstate.StatusCreateFailed || clusterState.Status == clusterstate.StatusDeleteFailed {
fmt.Println(fmt.Sprintf("More information can be found in your AWS console %s", getCloudFormationURLWithAccessConfig(accessConfig)))
fmt.Println()
}

err = assertClusterStatus(accessConfig, clusterState.Status, clusterstate.StatusCreateComplete)
if err != nil {
if err := printInfoClusterState(awsClient, accessConfig); err != nil {
exit.Error(err)
}

Expand All @@ -418,6 +402,41 @@ func cmdInfo(awsCreds AWSCredentials, accessConfig *clusterconfig.AccessConfig,
}
}

if err := printInfoOperatorResponse(clusterConfig, operatorEndpoint, awsCreds); err != nil {
exit.Error(err)
}

if err := updateInfoEnvironment(operatorEndpoint, awsCreds, disallowPrompt); err != nil {
exit.Error(err)
}
}

func printInfoClusterState(awsClient *aws.Client, accessConfig *clusterconfig.AccessConfig) error {
clusterState, err := clusterstate.GetClusterState(awsClient, accessConfig)
if err != nil {
if errors.GetKind(err) == clusterstate.ErrUnexpectedCloudFormationStatus {
fmt.Println(fmt.Sprintf("cluster named \"%s\" in %s is in an unexpected state, please run `cortex cluster down` to delete the cluster or delete the cloudformation stacks manually in your AWS console %s", *accessConfig.ClusterName, *accessConfig.Region, getCloudFormationURLWithAccessConfig(accessConfig)))
}
return err
}

fmt.Println(clusterState.TableString())
if clusterState.Status == clusterstate.StatusCreateFailed || clusterState.Status == clusterstate.StatusDeleteFailed {
fmt.Println(fmt.Sprintf("more information can be found in your AWS console %s", getCloudFormationURLWithAccessConfig(accessConfig)))
fmt.Println()
}

err = assertClusterStatus(accessConfig, clusterState.Status, clusterstate.StatusCreateComplete)
if err != nil {
return err
}

return nil
}

func printInfoOperatorResponse(clusterConfig clusterconfig.Config, operatorEndpoint string, awsCreds AWSCredentials) error {
fmt.Print("fetching cluster status ...\n\n")

operatorConfig := cluster.OperatorConfig{
Telemetry: isTelemetryEnabled(),
EnvName: _flagClusterEnv,
Expand All @@ -429,20 +448,135 @@ func cmdInfo(awsCreds AWSCredentials, accessConfig *clusterconfig.AccessConfig,

infoResponse, err := cluster.Info(operatorConfig)
if err != nil {
exit.Error(err)
return err
}

infoResponse.ClusterConfig.Config = clusterConfig

printInfoClusterConfig(infoResponse)
printInfoPricing(infoResponse, clusterConfig)
printInfoNodes(infoResponse)

return nil
}

func printInfoClusterConfig(infoResponse *schema.InfoResponse) {
var items table.KeyValuePairs
items.Add("aws access key id", infoResponse.MaskedAWSAccessKeyID)
items.AddAll(infoResponse.ClusterConfig.UserTable())

items.Print()
}

func printInfoPricing(infoResponse *schema.InfoResponse, clusterConfig clusterconfig.Config) {
numAPIInstances := len(infoResponse.NodeInfos)

var totalAPIInstancePrice float64
for _, nodeInfo := range infoResponse.NodeInfos {
totalAPIInstancePrice += nodeInfo.Price
}

eksPrice := aws.EKSPrices[*clusterConfig.Region]
operatorInstancePrice := aws.InstanceMetadatas[*clusterConfig.Region]["t3.medium"].Price
operatorEBSPrice := aws.EBSMetadatas[*clusterConfig.Region]["gp2"].PriceGB * 20 / 30 / 24
nlbPrice := aws.NLBMetadatas[*clusterConfig.Region].Price
natUnitPrice := aws.NATMetadatas[*clusterConfig.Region].Price
apiEBSPrice := aws.EBSMetadatas[*clusterConfig.Region][clusterConfig.InstanceVolumeType.String()].PriceGB * float64(clusterConfig.InstanceVolumeSize) / 30 / 24
if clusterConfig.InstanceVolumeType.String() == "io1" && clusterConfig.InstanceVolumeIOPS != nil {
apiEBSPrice += aws.EBSMetadatas[*clusterConfig.Region][clusterConfig.InstanceVolumeType.String()].PriceIOPS * float64(*clusterConfig.InstanceVolumeIOPS) / 30 / 24
}

var natTotalPrice float64
if clusterConfig.NATGateway == clusterconfig.SingleNATGateway {
natTotalPrice = natUnitPrice
} else if clusterConfig.NATGateway == clusterconfig.HighlyAvailableNATGateway {
natTotalPrice = natUnitPrice * float64(len(clusterConfig.AvailabilityZones))
}

totalPrice := eksPrice + totalAPIInstancePrice + apiEBSPrice*float64(numAPIInstances) + operatorInstancePrice + operatorEBSPrice + nlbPrice*2 + natTotalPrice
fmt.Printf(console.Bold("\nyour cluster currently costs %s per hour\n\n"), s.DollarsAndCents(totalPrice))

headers := []table.Header{
{Title: "aws resource"},
{Title: "cost per hour"},
}

rows := [][]interface{}{}
rows = append(rows, []interface{}{"1 eks cluster", s.DollarsMaxPrecision(eksPrice)})
rows = append(rows, []interface{}{fmt.Sprintf("%d %s for your apis", numAPIInstances, s.PluralS("instance", numAPIInstances)), s.DollarsAndTenthsOfCents(totalAPIInstancePrice) + " total"})
rows = append(rows, []interface{}{fmt.Sprintf("%d %dgb ebs %s for your apis", numAPIInstances, clusterConfig.InstanceVolumeSize, s.PluralS("volume", numAPIInstances)), s.DollarsAndTenthsOfCents(apiEBSPrice*float64(numAPIInstances)) + " total"})
rows = append(rows, []interface{}{"1 t3.medium instance for the operator", s.DollarsMaxPrecision(operatorInstancePrice)})
rows = append(rows, []interface{}{"1 20gb ebs volume for the operator", s.DollarsAndTenthsOfCents(operatorEBSPrice)})
rows = append(rows, []interface{}{"2 network load balancers", s.DollarsMaxPrecision(nlbPrice*2) + " total"})

if clusterConfig.NATGateway == clusterconfig.SingleNATGateway {
rows = append(rows, []interface{}{"1 nat gateway", s.DollarsMaxPrecision(natUnitPrice)})
} else if clusterConfig.NATGateway == clusterconfig.HighlyAvailableNATGateway {
numNATs := len(clusterConfig.AvailabilityZones)
rows = append(rows, []interface{}{fmt.Sprintf("%d nat gateways", numNATs), s.DollarsMaxPrecision(natUnitPrice*float64(numNATs)) + " total"})
}

t := table.Table{
Headers: headers,
Rows: rows,
}
t.MustPrint(&table.Opts{Sort: pointer.Bool(false)})
}

func printInfoNodes(infoResponse *schema.InfoResponse) {
numAPIInstances := len(infoResponse.NodeInfos)

var totalReplicas int
var doesClusterHaveGPUs bool
for _, nodeInfo := range infoResponse.NodeInfos {
totalReplicas += nodeInfo.NumReplicas
if nodeInfo.ComputeCapacity.GPU > 0 {
doesClusterHaveGPUs = true
}
}

var pendingReplicasStr string
if infoResponse.NumPendingReplicas > 0 {
pendingReplicasStr = fmt.Sprintf(", and %d unscheduled %s", infoResponse.NumPendingReplicas, s.PluralS("replica", infoResponse.NumPendingReplicas))
}

fmt.Printf(console.Bold("\nyour cluster has %d API %s running across %d %s%s\n"), totalReplicas, s.PluralS("replica", totalReplicas), numAPIInstances, s.PluralS("instance", numAPIInstances), pendingReplicasStr)

if len(infoResponse.NodeInfos) == 0 {
return
}

headers := []table.Header{
{Title: "instance type"},
{Title: "lifecycle"},
{Title: "replicas"},
{Title: "CPU (free / total)"},
{Title: "memory (free / total)"},
{Title: "GPU (free / total)", Hidden: !doesClusterHaveGPUs},
}

rows := [][]interface{}{}
for _, nodeInfo := range infoResponse.NodeInfos {
lifecycle := "on-demand"
if nodeInfo.IsSpot {
lifecycle = "spot"
}
cpuStr := nodeInfo.ComputeAvailable.CPU.String() + " / " + nodeInfo.ComputeCapacity.CPU.String()
memStr := nodeInfo.ComputeAvailable.Mem.String() + " / " + nodeInfo.ComputeCapacity.Mem.String()
gpuStr := s.Int64(nodeInfo.ComputeAvailable.GPU) + " / " + s.Int64(nodeInfo.ComputeCapacity.GPU)
rows = append(rows, []interface{}{nodeInfo.InstanceType, lifecycle, nodeInfo.NumReplicas, cpuStr, memStr, gpuStr})
}

t := table.Table{
Headers: headers,
Rows: rows,
}
fmt.Println()
t.MustPrint(&table.Opts{Sort: pointer.Bool(false)})
}

func updateInfoEnvironment(operatorEndpoint string, awsCreds AWSCredentials, disallowPrompt bool) error {
prevEnv, err := readEnv(_flagClusterEnv)
if err != nil {
exit.Error(err)
return err
}

newEnvironment := cliconfig.Environment{
Expand All @@ -456,25 +590,26 @@ func cmdInfo(awsCreds AWSCredentials, accessConfig *clusterconfig.AccessConfig,
shouldWriteEnv := false
if prevEnv == nil {
shouldWriteEnv = true
} else if *prevEnv.OperatorEndpoint != operatorConfig.OperatorEndpoint || *prevEnv.AWSAccessKeyID != operatorConfig.AWSAccessKeyID || *prevEnv.AWSSecretAccessKey != operatorConfig.AWSSecretAccessKey {
fmt.Println()
fmt.Println(newEnvironment.String(false))
} else if *prevEnv.OperatorEndpoint != operatorEndpoint || *prevEnv.AWSAccessKeyID != awsCreds.AWSAccessKeyID || *prevEnv.AWSSecretAccessKey != awsCreds.AWSSecretAccessKey {
if disallowPrompt {
fmt.Print(fmt.Sprintf("found an existing environment named \"%s\"; overwriting it with the configuration above\n\n", _flagClusterEnv))
fmt.Print(fmt.Sprintf("\nfound an existing environment named \"%s\"; overwriting it to connect to this cluster\n", _flagClusterEnv))
shouldWriteEnv = true
} else {
shouldWriteEnv = prompt.YesOrNo(fmt.Sprintf("found an existing environment named \"%s\"; would you like to overwrite it with the configuration above?", _flagClusterEnv), "", "")
shouldWriteEnv = prompt.YesOrNo(fmt.Sprintf("\nfound an existing environment named \"%s\"; would you like to overwrite it to connect to this cluster?", _flagClusterEnv), "", "")
}
}

if shouldWriteEnv {
err := addEnvToCLIConfig(newEnvironment)
if err != nil {
exit.Error(err)
return err
}

print.BoldFirstLine(fmt.Sprintf("configured %s environment", _flagClusterEnv))
fmt.Printf(console.Bold("configured %s environment\n"), _flagClusterEnv)
}

return nil
}

func cmdDebug(awsCreds AWSCredentials, accessConfig *clusterconfig.AccessConfig) {
Expand Down Expand Up @@ -511,7 +646,7 @@ func refreshCachedClusterConfig(awsCreds AWSCredentials, disallowPrompt bool) cl

mountedConfigPath := mountedClusterConfigPath(*accessConfig.ClusterName, *accessConfig.Region)

fmt.Println("fetching cluster configuration ..." + "\n")
fmt.Println("syncing cluster configuration ..." + "\n")
out, exitCode, err := runManagerAccessCommand("/root/refresh.sh "+mountedConfigPath, *accessConfig, awsCreds, _flagClusterEnv)
if err != nil {
exit.Error(err)
Expand Down
3 changes: 1 addition & 2 deletions cli/local/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ func ValidateLocalAPIs(apis []userconfig.API, projectFiles ProjectFiles, awsClie
}

if api.Compute.CPU != nil && (api.Compute.CPU.MilliValue() > int64(dockerClient.Info.NCPU)*1000) {
qty := k8s.NewQuantity(int64(dockerClient.Info.NCPU))
api.Compute.CPU = &qty
api.Compute.CPU = k8s.NewQuantity(int64(dockerClient.Info.NCPU))
}

if api.Compute.GPU > 0 {
Expand Down
26 changes: 26 additions & 0 deletions pkg/lib/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
kmeta "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _nodeTypeMeta = kmeta.TypeMeta{
APIVersion: "v1",
Kind: "Node",
}

func (c *Client) ListNodes(opts *kmeta.ListOptions) ([]kcore.Node, error) {
if opts == nil {
opts = &kmeta.ListOptions{}
Expand All @@ -30,5 +35,26 @@ func (c *Client) ListNodes(opts *kmeta.ListOptions) ([]kcore.Node, error) {
if err != nil {
return nil, errors.WithStack(err)
}
for i := range nodeList.Items {
nodeList.Items[i].TypeMeta = _nodeTypeMeta
}
return nodeList.Items, nil
}

func (c *Client) ListNodesByLabels(labels map[string]string) ([]kcore.Node, error) {
opts := &kmeta.ListOptions{
LabelSelector: LabelSelector(labels),
}
return c.ListNodes(opts)
}

func (c *Client) ListNodesByLabel(labelKey string, labelValue string) ([]kcore.Node, error) {
return c.ListNodesByLabels(map[string]string{labelKey: labelValue})
}

func (c *Client) ListNodesWithLabelKeys(labelKeys ...string) ([]kcore.Node, error) {
opts := &kmeta.ListOptions{
LabelSelector: LabelExistsSelector(labelKeys...),
}
return c.ListNodes(opts)
}
5 changes: 1 addition & 4 deletions pkg/lib/k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,7 @@ func TotalPodCompute(podSpec *kcore.PodSpec) (Quantity, Quantity, int64) {
totalCPU.Add(requests[kcore.ResourceCPU])
totalMem.Add(requests[kcore.ResourceMemory])
if gpu, ok := requests["nvidia.com/gpu"]; ok {
gpuVal, ok := gpu.AsInt64()
if ok {
totalGPU += gpuVal
}
totalGPU += gpu.Value()
}
}

Expand Down
37 changes: 31 additions & 6 deletions pkg/lib/k8s/quantity.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,24 @@ func QuantityParser(v *QuantityValidation) func(string) (interface{}, error) {
}
}

func NewQuantity(value int64) Quantity {
func WrapQuantity(k8sQuantity kresource.Quantity) *Quantity {
return &Quantity{
Quantity: k8sQuantity,
}
}

func NewQuantity(value int64) *Quantity {
k8sQuantity := kresource.NewQuantity(value, kresource.DecimalSI)

return Quantity{
Quantity: *k8sQuantity,
UserString: s.Int64(value),
return &Quantity{
Quantity: *k8sQuantity,
}
}

func NewMilliQuantity(milliValue int64) Quantity {
func NewMilliQuantity(milliValue int64) *Quantity {
k8sQuantity := kresource.NewMilliQuantity(milliValue, kresource.DecimalSI)

return Quantity{
return &Quantity{
Quantity: *k8sQuantity,
UserString: s.Int64(milliValue) + "m",
}
Expand Down Expand Up @@ -113,6 +118,26 @@ func SplitInTwo(quantity *kresource.Quantity) (*kresource.Quantity, *kresource.Q
return q1, q2
}

func (quantity *Quantity) Sub(q2 kresource.Quantity) {
quantity.Quantity.Sub(q2)
quantity.UserString = ""
}

func (quantity *Quantity) SubQty(q2 Quantity) {
quantity.Quantity.Sub(q2.Quantity)
quantity.UserString = ""
}

func (quantity *Quantity) Add(q2 kresource.Quantity) {
quantity.Quantity.Add(q2)
quantity.UserString = ""
}

func (quantity *Quantity) AddQty(q2 Quantity) {
quantity.Quantity.Add(q2.Quantity)
quantity.UserString = ""
}

func (quantity *Quantity) String() string {
if quantity.UserString != "" {
return quantity.UserString
Expand Down
Loading