Skip to content

Implement Upgrade Suggestions for Kafka Cluster Creation in the Confluent CLI #3075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions internal/kafka/command_cluster_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/examples"
"github.com/confluentinc/cli/v4/pkg/featureflags"
"github.com/confluentinc/cli/v4/pkg/kafka"
"github.com/confluentinc/cli/v4/pkg/output"
"github.com/confluentinc/cli/v4/pkg/utils"
Expand Down Expand Up @@ -75,6 +76,14 @@ func (c *clusterCommand) newCreateCommand() *cobra.Command {
return cmd
}

func (c *clusterCommand) isBasicToStandardUpgradeSuggestionEnabled() bool {
if c.Config.IsTest {
return true
}
ldClient := featureflags.GetCcloudLaunchDarklyClient(c.Context.PlatformName)
return featureflags.Manager.BoolVariation("cli.basic_to_standard_cluster_upgrade_suggestion.enable", c.Context, ldClient, true, false)
}

func (c *clusterCommand) create(cmd *cobra.Command, args []string) error {
cloud, err := cmd.Flags().GetString("cloud")
if err != nil {
Expand All @@ -91,6 +100,11 @@ func (c *clusterCommand) create(cmd *cobra.Command, args []string) error {
return err
}

// If no type specified, default to basic
if clusterType == "" {
clusterType = skuBasic
}

sku, err := stringToSku(clusterType)
if err != nil {
return err
Expand Down Expand Up @@ -160,7 +174,7 @@ func (c *clusterCommand) create(cmd *cobra.Command, args []string) error {
createCluster.Spec.Network = &cmkv2.EnvScopedObjectReference{Id: network}
}

kafkaCluster, httpResp, err := c.V2Client.CreateKafkaCluster(createCluster)
cluster, httpResp, err := c.V2Client.CreateKafkaCluster(createCluster)
if err != nil {
return catchClusterConfigurationNotValidError(err, httpResp, cloud, region)
}
Expand All @@ -169,7 +183,35 @@ func (c *clusterCommand) create(cmd *cobra.Command, args []string) error {
output.ErrPrintln(c.Config.EnableColor, getKafkaProvisionEstimate(sku))
}

return c.outputKafkaClusterDescription(cmd, &kafkaCluster, false)
if err := c.outputKafkaClusterDescription(cmd, &cluster, false); err != nil {
return err
}

if strings.ToLower(clusterType) == strings.ToLower(skuBasic) {
orgId := c.Context.GetCurrentOrganization()
if orgId == "" {
return nil
}

if !c.isBasicToStandardUpgradeSuggestionEnabled() {
return nil
}

copyManager, err := NewCopyManager()
Copy link
Preview

Copilot AI Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider logging or handling the error from NewCopyManager instead of silently returning nil, so that issues with loading upgrade suggestions can be diagnosed.

Copilot uses AI. Check for mistakes.

if err != nil {
return nil
}

content, cta, err := copyManager.GetCopy("cluster_upgrade_basic_to_standard")
Copy link
Preview

Copilot AI Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider logging or handling the error from GetCopy rather than returning nil, which will help in troubleshooting if the upgrade suggestion copy fails to load.

Copilot uses AI. Check for mistakes.

if err != nil {
return nil
}

formattedCopy := copyManager.FormatCopy(content, cta, cluster.GetId())
output.Println(c.Config.EnableColor, "\n"+formattedCopy)
}

return nil
}

func stringToAvailability(s string, sku ccstructs.Sku) (string, error) {
Expand Down
71 changes: 71 additions & 0 deletions internal/kafka/copy_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package kafka

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
)

type CopyVariation struct {
Content string `json:"content"`
CTA string `json:"cta"`
}

type CopyScenario struct {
Variations []CopyVariation `json:"variations"`
}

type CopyData struct {
Scenarios map[string]CopyScenario `json:"scenarios"`
}

type CopyManager struct {
data CopyData
}

func NewCopyManager() (*CopyManager, error) {
// Get the directory of the current file
_, filename, _, ok := runtime.Caller(0)
if !ok {
return nil, fmt.Errorf("failed to get current file path")
}
dir := filepath.Dir(filename)

// Read the JSON file
filePath := filepath.Join(dir, "upgrade_suggestions.json")

file, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read copy data: %w", err)
}

var data CopyData
if err := json.Unmarshal(file, &data); err != nil {
return nil, fmt.Errorf("failed to parse copy data: %w", err)
}

return &CopyManager{data: data}, nil
}

func (cm *CopyManager) GetCopy(scenario string) (string, string, error) {
scenarioData, exists := cm.data.Scenarios[scenario]
if !exists {
return "", "", fmt.Errorf("unknown scenario: %s", scenario)
}

variations := scenarioData.Variations
if len(variations) == 0 {
return "", "", fmt.Errorf("no variations found for scenario: %s", scenario)
}

// For now, simply use the first variation
// In the future, we could add more sophisticated selection logic if needed
return variations[0].Content, variations[0].CTA, nil
}

func (cm *CopyManager) FormatCopy(content, cta, id string) string {
return fmt.Sprintf("%s\n\n%s", content, strings.ReplaceAll(cta, "{{id}}", id))
}
20 changes: 20 additions & 0 deletions internal/kafka/upgrade_suggestions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"scenarios": {
"cluster_upgrade_basic_to_standard": {
"variations": [
{
"content": "NOTICE: Basic tier selected. Limitations include:\n- No high availability guarantees\n- Limited performance during peak loads\n- Basic support SLA (24hr response)\n\nFor production workloads, 'Standard' tier provides HA, performance scaling, and enterprise SLA.",
"cta": "To upgrade to the Standard Tier execute the following command:\nconfluent kafka cluster update {{id}} --type standard"
}
]
},
"stream_governance_upgrade_essentials_to_advanced": {
"variations": [
{
"content": "NOTICE: Stream Governance Essentials selected. Limitations include:\n- Limited schema compatibility checks\n- Basic schema management\n- No schema validation\n\nFor production workloads, 'Advanced' tier provides full schema compatibility, advanced validation, and enterprise-grade governance.",
"cta": "To upgrade to Stream Governance Advanced execute the following command:\nconfluent environment update {{id}} --governance-package advanced"
}
]
}
}
}