Skip to content

[APIE-24] Add support for CP Flink SQL Shell (WIP) #3110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 44 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f7dcc03
Initial support for CP Flink
channingdong May 1, 2025
82de796
Minor code refactoring
channingdong May 7, 2025
2254523
Add the default compute-pool and statement support for CMF environment
channingdong May 8, 2025
d334ae0
Refactor the CLI code to fit OpenAPI 3.0 SDK
channingdong May 14, 2025
3c53b9f
Slight update on CMF client for the configuration
channingdong May 15, 2025
0ccee9a
Code cleaning up and address the cosmetic comments from PR review
channingdong May 15, 2025
86d799a
Address most of the comments from Fabian's bug bash
channingdong May 20, 2025
bee4723
Address the bug bash comments from Fabian with engineering binary v1.0
channingdong May 21, 2025
03cdee1
Fixed the potential memory leakage issue caused by nil pointer
channingdong May 21, 2025
62019be
Initial support and code refactoring for CP Flink shell command
channingdong May 26, 2025
5ff1f33
Merge branch 'main' into CLI-3606
channingdong May 31, 2025
583f2aa
Remove the reference to local CMF SDK
channingdong May 31, 2025
fd74bb5
Fix the compilation issue before handover
channingdong May 31, 2025
80cf7b6
Update user interface: add catalog and database flags and update the …
sgagniere Jun 12, 2025
5151be3
Remove lsp from StartAppOnPrem
sgagniere Jun 12, 2025
bc6b6b1
Separate cloud and on-prem histories for flink shell
sgagniere Jun 12, 2025
a2395ad
Remove reportUsage from on-prem, since we collect no usage data for o…
sgagniere Jun 12, 2025
8ed93a1
Remove TODO comment addressed by flags added earlier
sgagniere Jun 12, 2025
63e7807
Update ConvertToInternalResultsOnPrem
sgagniere Jun 13, 2025
3fb4309
Add pagetoken support
sgagniere Jun 13, 2025
2752668
Add support for authenticated and non-authenticated onprem shell
sgagniere Jun 14, 2025
b102d70
Merge branch 'main' of https://github.com/confluentinc/cli into CLI-3606
sgagniere Jun 14, 2025
a8e1a8b
fix existing tests
sgagniere Jun 14, 2025
10cbfb1
Don't read ld flag for onprem version
sgagniere Jun 16, 2025
b6a76d8
fix too long statement names
sgagniere Jun 17, 2025
93c25c4
on-prem does not have retryable pending statements
sgagniere Jun 17, 2025
aa3f675
don't call results api for non SELECT statements
sgagniere Jun 18, 2025
8a9f8e7
add flink-configuration flag to shell
sgagniere Jun 19, 2025
408864d
fix bug in previous commit
sgagniere Jun 19, 2025
15d6c93
Combine cloud and onprem ProcessedStatement structs
sgagniere Jun 21, 2025
7a7fefa
Remove onprem store interface (duplicate interface)
sgagniere Jun 21, 2025
98e327a
Remove onprem resultfetcher interface (duplicate interface)
sgagniere Jun 21, 2025
3885f69
Remove onprem applicationstatementcontroller interface (duplicate int…
sgagniere Jun 21, 2025
157c3db
Remove onprem interactiveoutputcontroller (duplicate struct)
sgagniere Jun 21, 2025
9091a97
Remove onprem baseoutputcontroller (duplicate struct)
sgagniere Jun 21, 2025
673d3b7
Update wording in existing tests
sgagniere Jun 21, 2025
f8aee56
Use normal text color for details when status isn't FAILED
sgagniere Jun 21, 2025
4f974ff
Remove duplicate NewResultFieldType function
sgagniere Jun 21, 2025
21f04a1
Remove onprem application struct (duplicate struct)
sgagniere Jun 21, 2025
7a4482e
Remove duplicate initPath function
sgagniere Jun 21, 2025
5b3fe17
shell doesn't need output flag
sgagniere Jun 21, 2025
ded3ea8
Remove duplicate store utils onprem
sgagniere Jun 21, 2025
27baa2b
Merge branch 'main' of https://github.com/confluentinc/cli into CLI-3606
sgagniere Jun 23, 2025
699a96e
don't suggest ALTER for on-prem
sgagniere Jun 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/flink/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {

// On-Prem and Cloud Commands
cmd.AddCommand(c.newComputePoolCommand(cfg))
cmd.AddCommand(c.newShellCommand(prerunner))
cmd.AddCommand(c.newShellCommand(prerunner, cfg))
cmd.AddCommand(c.newStatementCommand(cfg))

// Cloud Specific Commands
Expand Down
167 changes: 148 additions & 19 deletions internal/flink/command_shell.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package flink

import (
"encoding/json"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"

"github.com/spf13/cobra"
"gopkg.in/yaml.v3"

"github.com/confluentinc/cli/v4/pkg/auth"
"github.com/confluentinc/cli/v4/pkg/ccloudv2"
Expand All @@ -20,30 +25,52 @@
ppanic "github.com/confluentinc/cli/v4/pkg/panic-recovery"
)

func (c *command) newShellCommand(prerunner pcmd.PreRunner) *cobra.Command {
func (c *command) newShellCommand(prerunner pcmd.PreRunner, cfg *config.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "shell",
Short: "Start Flink interactive SQL client.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin},
Example: examples.BuildExampleString(
Use: "shell",
Short: "Start Flink interactive SQL client.",
}

// CCloud implementation for the shell command
if cfg.IsCloudLogin() {
cmd.Annotations = map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}
cmd.Example = examples.BuildExampleString(
examples.Example{
Text: "For a Quick Start with examples in context, see https://docs.confluent.io/cloud/current/flink/get-started/quick-start-shell.html.",
},
),
RunE: func(cmd *cobra.Command, args []string) error {
)
cmd.RunE = func(cmd *cobra.Command, args []string) error {
return c.startFlinkSqlClient(prerunner, cmd)
},
}

c.addComputePoolFlag(cmd)
pcmd.AddServiceAccountFlag(cmd, c.AuthenticatedCLICommand)
c.addDatabaseFlag(cmd)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)

if featureflags.Manager.BoolVariation("cli.flink.internal", c.Context, config.CliLaunchDarklyClient, true, false) {
cmd.Flags().StringSlice("config-key", []string{}, "App option keys for local mode.")
cmd.Flags().StringSlice("config-value", []string{}, "App option values for local mode.")
}
c.addComputePoolFlag(cmd)
pcmd.AddServiceAccountFlag(cmd, c.AuthenticatedCLICommand)
c.addDatabaseFlag(cmd)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)

if featureflags.Manager.BoolVariation("cli.flink.internal", c.Context, config.CliLaunchDarklyClient, true, false) {
cmd.Flags().StringSlice("config-key", []string{}, "App option keys for local mode.")
cmd.Flags().StringSlice("config-value", []string{}, "App option values for local mode.")
}
} else { // On-Prem implementation for the shell command
cmd.Annotations = map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout}
cmd.Example = examples.BuildExampleString(
examples.Example{
Text: "For a Quick Start with examples in context, see https://docs.confluent.io/cli/current/flink/get-started/quick-start-shell.html.",
},
)
cmd.RunE = func(cmd *cobra.Command, args []string) error {
return c.startFlinkSqlClientOnPrem(prerunner, cmd)
}
cmd.Flags().String("compute-pool", "", "The compute pool name to execute the Flink SQL statement.")

Check failure on line 65 in internal/flink/command_shell.go

View check run for this annotation

SonarQube-Confluent / cli Sonarqube Results

internal/flink/command_shell.go#L65

Define a constant instead of duplicating this literal "compute-pool" 3 times.
cmd.Flags().String("environment", "", "Name of the Flink environment.")
cmd.Flags().String("catalog", "", "The name of the default catalog.")
cmd.Flags().String("database", "", "The name of the default database.")
cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration.")
addCmfFlagSet(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("environment"))
cobra.CheckErr(cmd.MarkFlagRequired("compute-pool"))
}

return cmd
Expand Down Expand Up @@ -79,6 +106,32 @@
}
}

func (c *command) authenticatedOnPrem(authenticated func(*cobra.Command, []string) error, cmd *cobra.Command) func() error {
return func() error {
if !c.Config.IsOnPremLogin() { // don't refresh tokens when running in unauthenticated mode
return nil
}

authToken := c.Context.GetAuthToken()
authRefreshToken := c.Context.GetAuthRefreshToken()
if err := c.Context.UpdateAuthTokens(authToken, authRefreshToken); err != nil {
return err
}

if err := authenticated(cmd, nil); err != nil {
return err
}

cmfClient, err := c.GetCmfClient(cmd)
if err != nil {
return err
}
cmfClient.AuthToken = c.Context.GetAuthToken()

return nil
}
}

func (c *command) startFlinkSqlClient(prerunner pcmd.PreRunner, cmd *cobra.Command) error {
if featureflags.Manager.BoolVariation("cli.flink.internal", c.Context, config.CliLaunchDarklyClient, true, false) {
// get config keys and values from flags
Expand Down Expand Up @@ -185,6 +238,82 @@
return client.StartApp(flinkGatewayClient, c.authenticated(prerunner.Authenticated(c.AuthenticatedCLICommand), cmd, jwtValidator), opts, reportUsage(cmd, c.Config, unsafeTrace))
}

func (c *command) startFlinkSqlClientOnPrem(prerunner pcmd.PreRunner, cmd *cobra.Command) error {
environment, err := cmd.Flags().GetString("environment")
if err != nil {
return err
}

computePool, err := cmd.Flags().GetString("compute-pool")
if err != nil {
return err
}

catalog, err := cmd.Flags().GetString("catalog")
if err != nil {
return err
}

database, err := cmd.Flags().GetString("database")
if err != nil {
return err
}

unsafeTrace, err := c.Command.Flags().GetBool("unsafe-trace")
if err != nil {
return err
}

configFilePath, err := cmd.Flags().GetString("flink-configuration")
if err != nil {
return err
}

flinkConfiguration := map[string]string{}
if configFilePath != "" {
var data []byte
// Read configuration file contents
data, err = os.ReadFile(configFilePath)
if err != nil {
return fmt.Errorf("failed to read Flink configuration file: %v", err)
}
ext := filepath.Ext(configFilePath)
switch ext {
case ".json":
err = json.Unmarshal(data, &flinkConfiguration)
case ".yaml", ".yml":
err = yaml.Unmarshal(data, &flinkConfiguration)
default:
return errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.")
}
if err != nil {
return err
}
}

flinkCmfClient, err := c.GetCmfClient(cmd)
if err != nil {
return err
}
flinkCmfClient.AuthToken = c.Context.GetAuthToken()

verbose, _ := cmd.Flags().GetCount("verbose")

opts := types.ApplicationOptions{
Context: c.Context,
UnsafeTrace: unsafeTrace,
UserAgent: c.Version.UserAgent,
EnvironmentName: catalog,
EnvironmentId: environment,
Database: database,
ComputePoolId: computePool,
FlinkConfiguration: flinkConfiguration,
Verbose: verbose > 0,
}

return client.StartAppOnPrem(flinkCmfClient, c.authenticatedOnPrem(prerunner.AuthenticatedWithMDS(c.AuthenticatedCLICommand), cmd), opts)
}

func (c *command) startWithLocalMode(configKeys, configValues []string) error {
// parse app options from given flags
appOptions, err := types.ParseApplicationOptionsFromSlices(configKeys, configValues)
Expand Down
53 changes: 52 additions & 1 deletion pkg/flink/app/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

"github.com/confluentinc/cli/v4/pkg/ccloudv2"
"github.com/confluentinc/cli/v4/pkg/errors"
"github.com/confluentinc/cli/v4/pkg/flink"
"github.com/confluentinc/cli/v4/pkg/flink/components"
"github.com/confluentinc/cli/v4/pkg/flink/config"
"github.com/confluentinc/cli/v4/pkg/flink/internal/controller"
Expand Down Expand Up @@ -104,7 +105,7 @@
}
})

inputController := controller.NewInputController(historyStore, lspCompleter, handlerCh)
inputController := controller.NewInputController(historyStore, lspCompleter, handlerCh, true)
statementController := controller.NewStatementController(appController, dataStore, consoleParser)
interactiveOutputController := controller.NewInteractiveOutputController(components.NewTableView(), resultFetcher, userProperties, appOptions.GetVerbose())
baseOutputController := controller.NewBaseOutputController(resultFetcher, inputController.GetWindowWidth, userProperties)
Expand All @@ -127,6 +128,56 @@
return app.readEvalPrintLoop()
}

func StartAppOnPrem(flinkCmfClient *flink.CmfRestClient, tokenRefreshFunc func() error, appOptions types.ApplicationOptions) error {
synchronizedTokenRefreshFunc := synchronizedTokenRefresh(tokenRefreshFunc)

// Load history of previous commands from cache file
historyStore := history.LoadHistoryOnPrem()

// Instantiate Application Controller - this is the top level controller that will be passed down to all other controllers
// and should be used for functions that are not specific to a component
appController := controller.NewApplicationController(historyStore)

// Store used to process statements and store local properties
userProperties := store.NewUserProperties(&appOptions)
dataStore := store.NewStoreOnPrem(flinkCmfClient, appController.ExitApplication, userProperties, &appOptions, synchronizedTokenRefreshFunc)
resultFetcher := results.NewResultFetcher(dataStore)

stdinBefore := utils.GetStdin()
consoleParser, err := utils.GetConsoleParser()
if err != nil {
utils.OutputErr("Error: failed to initialize console parser")
return errors.NewErrorWithSuggestions("failed to initialize console parser", "Restart your shell session or try another terminal.")
}
appController.AddCleanupFunction(func() {
utils.TearDownConsoleParser(consoleParser)
utils.RestoreStdin(stdinBefore)
})

// Instantiate Component Controllers
inputController := controller.NewInputController(historyStore, nil, nil, false)
statementController := controller.NewStatementController(appController, dataStore, consoleParser)
interactiveOutputController := controller.NewInteractiveOutputController(components.NewTableView(), resultFetcher, userProperties, appOptions.GetVerbose())
baseOutputController := controller.NewBaseOutputController(resultFetcher, inputController.GetWindowWidth, userProperties)

app := Application{
history: historyStore,
userProperties: userProperties,
store: dataStore,
resultFetcher: resultFetcher,
appController: appController,
inputController: inputController,
statementController: statementController,
interactiveOutputController: interactiveOutputController,
baseOutputController: baseOutputController,
refreshToken: synchronizedTokenRefreshFunc,
reportUsage: func() {}, // on-prem does not support usage reporting

Check failure on line 174 in pkg/flink/app/application.go

View check run for this annotation

SonarQube-Confluent / cli Sonarqube Results

pkg/flink/app/application.go#L174

Add a nested comment explaining why this function is empty or complete the implementation.
appOptions: appOptions,
}
components.PrintWelcomeHeaderOnPrem(appOptions)
return app.readEvalPrintLoop()
}

func (a *Application) readEvalPrintLoop() error {
run := utils.NewPanicRecoveryWithLimit(3, 3*time.Second)
for a.isAuthenticated() {
Expand Down
6 changes: 4 additions & 2 deletions pkg/flink/app/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ func (s *ApplicationTestSuite) TestShouldUseTView() {
{
name: "select statement should always use TView",
statement: types.ProcessedStatement{
Traits: flinkgatewayv1.SqlV1StatementTraits{
SqlKind: flinkgatewayv1.PtrString("SELECT"),
Traits: types.StatementTraits{
FlinkGatewayv1StatementTraits: &flinkgatewayv1.SqlV1StatementTraits{
SqlKind: flinkgatewayv1.PtrString("SELECT"),
},
},
StatementResults: &types.StatementResults{}},
isBasicOutput: false,
Expand Down
29 changes: 29 additions & 0 deletions pkg/flink/cmf_rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,18 @@ type OnPremCMFRestFlagValues struct {
clientKeyPath string
}

type CmfClientInterface interface {
GetStatement(ctx context.Context, environment, name string) (cmfsdk.Statement, error)
ListStatements(ctx context.Context, environment, computePool, status string) ([]cmfsdk.Statement, error)
CreateStatement(ctx context.Context, environment string, statement cmfsdk.Statement) (cmfsdk.Statement, error)
ListStatementExceptions(ctx context.Context, environment, statementName string) (cmfsdk.StatementExceptionList, error)
DeleteStatement(ctx context.Context, environment, statement string) error
UpdateStatement(ctx context.Context, environment, statementName string, statement cmfsdk.Statement) error
}

type CmfRestClient struct {
*cmfsdk.APIClient
AuthToken string
}

func NewCmfRestHttpClient(restFlags *OnPremCMFRestFlagValues) (*http.Client, error) {
Expand Down Expand Up @@ -136,6 +146,13 @@ func ResolveOnPremCmfRestFlags(cmd *cobra.Command) (*OnPremCMFRestFlagValues, er
return values, nil
}

func (cmfClient *CmfRestClient) CmfApiContext() context.Context {
if cmfClient.AuthToken == "" {
return context.Background()
}
return context.WithValue(context.Background(), cmfsdk.ContextAccessToken, cmfClient.AuthToken)
}

// CreateApplication Create a Flink application in the specified environment.
// Internally, since the call for Create and Update is the same, we check if the environment doesn't contain said application before creation.
func (cmfClient *CmfRestClient) CreateApplication(ctx context.Context, environment string, application cmfsdk.FlinkApplication) (cmfsdk.FlinkApplication, error) {
Expand Down Expand Up @@ -391,6 +408,18 @@ func (cmfClient *CmfRestClient) ListStatementExceptions(ctx context.Context, env
return exceptionList, nil
}

func (cmfClient *CmfRestClient) GetStatementResults(ctx context.Context, environment, statementName, pageToken string) (cmfsdk.StatementResult, error) {
req := cmfClient.DefaultApi.GetStatementResult(ctx, environment, statementName)
if pageToken != "" {
req = req.PageToken(pageToken)
}
resp, httpResponse, err := req.Execute()
if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil {
return cmfsdk.StatementResult{}, fmt.Errorf(`failed to get result for statement "%s" in the environment "%s": %s`, statementName, environment, parsedErr)
}
return resp, nil
}

func (cmfClient *CmfRestClient) CreateCatalog(ctx context.Context, kafkaCatalog cmfsdk.KafkaCatalog) (cmfsdk.KafkaCatalog, error) {
catalogName := kafkaCatalog.Metadata.Name
outputCatalog, httpResponse, err := cmfClient.DefaultApi.CreateKafkaCatalog(ctx).KafkaCatalog(kafkaCatalog).Execute()
Expand Down
11 changes: 11 additions & 0 deletions pkg/flink/components/interactive_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,19 @@

ldClient := featureflags.GetCcloudLaunchDarklyClient(appOtions.Context.PlatformName)
if featureflags.Manager.BoolVariation("flink.language_service.enable_diagnostics", appOtions.Context, ldClient, true, false) {
output.Printf(false, "[Ctrl-Q] %s [Ctrl-S] %s [Ctrl-G] %s \n", c.Sprint("Quit"), c.Sprint("Toggle Completions"), c.Sprint("Toggle Diagnostics"))

Check failure on line 62 in pkg/flink/components/interactive_input.go

View check run for this annotation

SonarQube-Confluent / cli Sonarqube Results

pkg/flink/components/interactive_input.go#L62

Define a constant instead of duplicating this literal "Toggle Completions" 3 times.
} else {
output.Printf(false, "[Ctrl-Q] %s [Ctrl-S] %s \n", c.Sprint("Quit"), c.Sprint("Toggle Completions"))
}
}

func PrintWelcomeHeaderOnPrem(appOtions types.ApplicationOptions) {
// Print welcome message
output.Print(false, "Welcome! \n")
output.Print(false, "To exit, press Ctrl-Q or type \"exit\". \n\n")

// Print shortcuts
c := fColor.New(color.AccentColor)

output.Printf(false, "[Ctrl-Q] %s [Ctrl-S] %s \n", c.Sprint("Quit"), c.Sprint("Toggle Completions"))
}
13 changes: 11 additions & 2 deletions pkg/flink/internal/autocomplete/examples_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import (
"github.com/confluentinc/go-prompt"
)

// Common examples for cloud and on-prem
func ExamplesCompleter(in prompt.Document) []prompt.Suggest {
s := []prompt.Suggest{
{Text: "CREATE TABLE ", Description: "Register a table/view/function into current or specified Catalog"},
{Text: "ALTER TABLE ", Description: "Modify a registered table/view/function definition in the Catalog"},
{Text: "DESCRIBE ", Description: "Describe the schema of a table or a view"},
{Text: "INSERT INTO ", Description: "Add rows to a table"},
{Text: "USE ", Description: "Used to set the current database or catalog"},
Expand All @@ -18,3 +17,13 @@ func ExamplesCompleter(in prompt.Document) []prompt.Suggest {

return SuggestFromPrefix(s, in.TextBeforeCursor())
}

// Additional examples for cloud
func ExamplesCompleterCloud(in prompt.Document) []prompt.Suggest {
s := []prompt.Suggest{
{Text: "CREATE TABLE ", Description: "Register a table/view/function into current or specified Catalog"},
{Text: "ALTER TABLE ", Description: "Modify a registered table/view/function definition in the Catalog"},
}

return SuggestFromPrefix(s, in.TextBeforeCursor())
}
Loading