Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloud 87 #211

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion hopsworksai/internal/api/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func GetClusters(ctx context.Context, apiClient APIHandler, cloud CloudProvider)
return response.Payload.Clusters, nil
}

// Creating a new cluster
func NewCluster(ctx context.Context, apiClient APIHandler, createRequest interface{}) (string, error) {
var cloudProvider CloudProvider
switch createRequest.(type) {
Expand Down Expand Up @@ -291,8 +292,24 @@ func RollbackUpgradeCluster(ctx context.Context, apiClient APIHandler, clusterId
return nil
}

func ReconfigureRonDB(ctx context.Context, apiClient APIHandler, clusterId string, ronDBConfiguration RonDBConfiguration) error {
payload, err := json.Marshal(ronDBConfiguration)
if err != nil {
return fmt.Errorf("failed to marshal request: %s", err)
}
var response BaseResponse
if err := apiClient.doRequest(ctx, http.MethodPut, "/api/clusters/"+clusterId+"/rondb/reconfigure", bytes.NewBuffer(payload), &response); err != nil {
return err
}
return nil
}

func IsNodeTypeModifyable(nodeType NodeType) bool {
return nodeType != WorkerNode && nodeType != RonDBManagementNode
}

func ModifyInstanceType(ctx context.Context, apiClient APIHandler, clusterId string, nodeType NodeType, instanceType string) error {
if nodeType == WorkerNode || nodeType == RonDBManagementNode {
if !IsNodeTypeModifyable(nodeType) {
return fmt.Errorf("modifying instance type for %s is not supported", nodeType.String())
}
req := ModifyInstanceTypeRequest{
Expand Down
209 changes: 109 additions & 100 deletions hopsworksai/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hopsworksai

import (
"context"
"errors"
"fmt"
"regexp"
"time"
Expand Down Expand Up @@ -337,7 +338,6 @@ func clusterSchema() map[string]*schema.Schema {
Description: "Setup a cluster with managed RonDB.",
Type: schema.TypeList,
Required: true,
ForceNew: true,
MaxItems: 1,
Elem: ronDBSchema(),
},
Expand Down Expand Up @@ -471,16 +471,22 @@ func ronDBSchema() *schema.Resource {
Type: schema.TypeList,
Optional: true,
Computed: true,
ForceNew: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"rondbVersion": {
Description: "Version of RonDB. It is intended for development purposes only. To change between major RonDB versions, change the Hopsworks version instead.",
Type: schema.TypeString,
Optional: true,
Computed: true,
MaxItems: 1,
},
"benchmark": {
Description: "The configurations required to benchmark RonDB.",
Type: schema.TypeList,
Optional: true,
Computed: true,
ForceNew: true,
ForceNew: true, // should not be necessary; works fine if benchmarking==false --> benchmarking==true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then it can be removed
also you would need to remove the ForceNew on almost all RonDB attributes to allow reconfiguration

MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
Expand Down Expand Up @@ -1006,6 +1012,7 @@ func azureAttributesSchema() *schema.Resource {
}
}

// root of resources
func clusterResource() *schema.Resource {
return &schema.Resource{
Description: "Use this resource to create, read, update, and delete clusters on Hopsworks.ai.",
Expand Down Expand Up @@ -1043,9 +1050,7 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, meta int
if len(awsAttributes) > 0 {
createRequest = createAWSCluster(d, baseRequest)
}
}

if azure, ok := d.GetOk("azure_attributes"); ok {
} else if azure, ok := d.GetOk("azure_attributes"); ok {
azureAttributes := azure.([]interface{})
if len(azureAttributes) > 0 {
createRequest = createAzureCluster(d, baseRequest)
Expand Down Expand Up @@ -1236,72 +1241,12 @@ func createClusterBaseRequest(d *schema.ResourceData) (*api.CreateCluster, error
createCluster.ClusterConfiguration.Workers = workersConfig
}

if _, ok := d.GetOk("rondb"); ok {
defaultRonDB := defaultRonDBConfiguration()
if singleRonDB, ok := d.GetOk("rondb.0.single_node"); ok {
createCluster.RonDB = &api.RonDBConfiguration{
Configuration: api.RonDBBaseConfiguration{
NdbdDefault: api.RonDBNdbdDefaultConfiguration{
ReplicationFactor: 1,
},
},
ManagementNodes: api.RonDBNodeConfiguration{
NodeConfiguration: api.NodeConfiguration{
DiskSize: defaultRonDB.ManagementNodes.DiskSize,
},
Count: 1,
},
MYSQLNodes: api.RonDBNodeConfiguration{
NodeConfiguration: api.NodeConfiguration{
DiskSize: defaultRonDB.MYSQLNodes.DiskSize,
},
Count: 1,
},
DataNodes: api.RonDBNodeConfiguration{
NodeConfiguration: structure.ExpandNode(singleRonDB.([]interface{})[0].(map[string]interface{})),
Count: 1,
},
}
} else {
var replicationFactor = defaultRonDB.Configuration.NdbdDefault.ReplicationFactor
if v, ok := d.GetOk("rondb.0.configuration.0.ndbd_default.0.replication_factor"); ok {
replicationFactor = v.(int)
}

var grantUserPrivileges = defaultRonDB.Configuration.General.Benchmark.GrantUserPrivileges
if v, ok := d.GetOk("rondb.0.configuration.0.general.0.benchmark.0.grant_user_privileges"); ok {
grantUserPrivileges = v.(bool)
}

createCluster.RonDB = &api.RonDBConfiguration{
Configuration: api.RonDBBaseConfiguration{
NdbdDefault: api.RonDBNdbdDefaultConfiguration{
ReplicationFactor: replicationFactor,
},
General: api.RonDBGeneralConfiguration{
Benchmark: api.RonDBBenchmarkConfiguration{
GrantUserPrivileges: grantUserPrivileges,
},
},
},
ManagementNodes: structure.ExpandRonDBNodeConfiguration(d.Get("rondb.0.management_nodes").([]interface{})[0].(map[string]interface{})),
DataNodes: structure.ExpandRonDBNodeConfiguration(d.Get("rondb.0.data_nodes").([]interface{})[0].(map[string]interface{})),
MYSQLNodes: structure.ExpandRonDBNodeConfiguration(d.Get("rondb.0.mysql_nodes").([]interface{})[0].(map[string]interface{})),
}

if n, ok := d.GetOk("rondb.0.api_nodes"); ok {
createCluster.RonDB.APINodes = structure.ExpandRonDBNodeConfiguration(n.([]interface{})[0].(map[string]interface{}))
}

if createCluster.RonDB.DataNodes.Count%createCluster.RonDB.Configuration.NdbdDefault.ReplicationFactor != 0 {
return nil, fmt.Errorf("number of RonDB data nodes must be multiples of RonDB replication factor")
}

if createCluster.RonDB.IsSingleNodeSetup() {
return nil, fmt.Errorf("your configuration creates a single rondb node, you should use singe_node configuration block instead or modifiy the configuration to run RonDB in a cluster mode")
}
}
// "d" has access to the keys/values in the yaml file
ronDBConfig, err := createAndValidateRonDB(d)
if err != nil {
return nil, err
}
createCluster.RonDB = ronDBConfig

if v, ok := d.GetOk("autoscale"); ok {
createCluster.Autoscale = structure.ExpandAutoscaleConfiguration(v.([]interface{}))
Expand All @@ -1322,6 +1267,77 @@ func createClusterBaseRequest(d *schema.ResourceData) (*api.CreateCluster, error
return createCluster, nil
}

func createAndValidateRonDB(d *schema.ResourceData) (*api.RonDBConfiguration, error) {
if _, ok := d.GetOk("rondb"); !ok {
return nil, errors.New("rondb has to be defined")
}

var ronDBConfig *api.RonDBConfiguration

defaultRonDB := defaultRonDBConfiguration()
if singleRonDB, ok := d.GetOk("rondb.0.single_node"); ok {
nodeConfig := structure.ExpandNode(singleRonDB.([]interface{})[0].(map[string]interface{}))
ronDBConfig = &api.RonDBConfiguration{
Configuration: api.RonDBBaseConfiguration{
NdbdDefault: api.RonDBNdbdDefaultConfiguration{
ReplicationFactor: 1,
},
},
ManagementNodes: api.RonDBNodeConfiguration{
NodeConfiguration: nodeConfig,
Count: 1,
},
MYSQLNodes: api.RonDBNodeConfiguration{
NodeConfiguration: nodeConfig,
Count: 1,
},
DataNodes: api.RonDBNodeConfiguration{
NodeConfiguration: nodeConfig,
Count: 1,
},
}
} else {
var replicationFactor = defaultRonDB.Configuration.NdbdDefault.ReplicationFactor
if v, ok := d.GetOk("rondb.0.configuration.0.ndbd_default.0.replication_factor"); ok {
replicationFactor = v.(int)
}

var grantUserPrivileges = defaultRonDB.Configuration.General.Benchmark.GrantUserPrivileges
if v, ok := d.GetOk("rondb.0.configuration.0.general.0.benchmark.0.grant_user_privileges"); ok {
grantUserPrivileges = v.(bool)
}

ronDBConfig = &api.RonDBConfiguration{
Configuration: api.RonDBBaseConfiguration{
NdbdDefault: api.RonDBNdbdDefaultConfiguration{
ReplicationFactor: replicationFactor,
},
General: api.RonDBGeneralConfiguration{
Benchmark: api.RonDBBenchmarkConfiguration{
GrantUserPrivileges: grantUserPrivileges,
},
},
},
ManagementNodes: structure.ExpandRonDBNodeConfiguration(d.Get("rondb.0.management_nodes").([]interface{})[0].(map[string]interface{})),
DataNodes: structure.ExpandRonDBNodeConfiguration(d.Get("rondb.0.data_nodes").([]interface{})[0].(map[string]interface{})),
MYSQLNodes: structure.ExpandRonDBNodeConfiguration(d.Get("rondb.0.mysql_nodes").([]interface{})[0].(map[string]interface{})),
}

if n, ok := d.GetOk("rondb.0.api_nodes"); ok {
ronDBConfig.APINodes = structure.ExpandRonDBNodeConfiguration(n.([]interface{})[0].(map[string]interface{}))
}

if ronDBConfig.DataNodes.Count%ronDBConfig.Configuration.NdbdDefault.ReplicationFactor != 0 {
return nil, fmt.Errorf("number of RonDB data nodes must be multiples of RonDB replication factor")
}

if ronDBConfig.IsSingleNodeSetup() {
return nil, fmt.Errorf("your configuration creates a single rondb node, you should use singe_node configuration block instead or modifiy the configuration to run RonDB in a cluster mode")
}
}
return ronDBConfig, nil
}

func resourceClusterRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
client := meta.(*api.HopsworksAIClient)
id := d.Id()
Expand Down Expand Up @@ -1360,6 +1376,15 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta int

clusterId := d.Id()

/*
Consider to run this with RonDB changes, since it will affect the RonDB version as well.
Otherwise there is a risk that we might exchange all VMs for a new RonDB version (which means
backup RonDB, and restore it, which are both lengthy processes) and then exchange all VMs again
because we have new desired VM types. We could instead just have requested new VM types
with the new cluster / RonDB version.

This would also have to be changed in the UI though, so it is not an easy task.
*/
Comment on lines +1379 to +1387
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't put this way tbh, cluster upgrade is an involved process and we shouldn't add yet another invariant to the procedure, also changing RonDB version is going to be an experimental feature for the time being and that shouldn't happen on the fly

if d.HasChange("version") {
o, n := d.GetChange("version")
fromVersion := o.(string)
Expand All @@ -1385,10 +1410,13 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta int
}
}

// TODO: How will the user know that perhaps not all of their changes have been applied?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we never return before the upgrade/rollback procedure are complete using the resourceWaiting procedures (resourceClusterWaitForRunningAfterUpgrade or resourceClusterWaitForStopping)

return resourceClusterRead(ctx, d, meta)
}

if d.HasChange("head.0.instance_type") {
// TODO: What about changes in disk_size & ha_enabled?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these attributes are immutable that shouldn't change after cluster creation at least for the time being


_, n := d.GetChange("head.0.instance_type")
toInstanceType := n.(string)

Expand All @@ -1397,38 +1425,19 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta int
}
}

if d.HasChange("rondb.0.data_nodes.0.instance_type") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should still allow offline upscaling, at least for the time being since reconfiguration is still experimental, and if no conflict with reconfiguration we should keep it and not remove it since we might have this use case

_, n := d.GetChange("rondb.0.data_nodes.0.instance_type")
toInstanceType := n.(string)

if err := api.ModifyInstanceType(ctx, client, clusterId, api.RonDBDataNode, toInstanceType); err != nil {
return diag.FromErr(err)
}
}

if d.HasChange("rondb.0.mysql_nodes.0.instance_type") {
_, n := d.GetChange("rondb.0.mysql_nodes.0.instance_type")
toInstanceType := n.(string)

if err := api.ModifyInstanceType(ctx, client, clusterId, api.RonDBMySQLNode, toInstanceType); err != nil {
return diag.FromErr(err)
}
}

if d.HasChange("rondb.0.api_nodes.0.instance_type") {
_, n := d.GetChange("rondb.0.api_nodes.0.instance_type")
toInstanceType := n.(string)

if err := api.ModifyInstanceType(ctx, client, clusterId, api.RonDBAPINode, toInstanceType); err != nil {
if d.HasChange("rondb") {
/*
using d.GetChange() can be helpful to access single fields;
might be easier to just let the backend handle the details
here though
*/
ronDBConfig, err := createAndValidateRonDB(d)
Comment on lines +1429 to +1434
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would needs more tests to make sure we don't miss anything in here

if err != nil {
return diag.FromErr(err)
}
}

if d.HasChange("rondb.0.single_node.0.instance_type") {
_, n := d.GetChange("rondb.0.single_node.0.instance_type")
toInstanceType := n.(string)

if err := api.ModifyInstanceType(ctx, client, clusterId, api.RonDBAllInOneNode, toInstanceType); err != nil {
err = api.ReconfigureRonDB(ctx, client, clusterId, *ronDBConfig)
if err != nil {
Comment on lines +1439 to +1440
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you would need to do the same as what we do for upgrade to wait until the reconfiguration process is complete (resourceClusterWaitForRunningAfterUpgrade)

return diag.FromErr(err)
}
}
Expand Down