-
Notifications
You must be signed in to change notification settings - Fork 18
Add CLI functionality for fetching connect logs #3123
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
7612c22
9fc35b2
846b088
dbd44bb
f77101e
bb17855
38c6bf0
3823ed4
2dc1b6d
178936c
11ae6df
cee0057
69198c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,316 @@ | ||
package connect | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"net/url" | ||
"os" | ||
"regexp" | ||
"strings" | ||
|
||
"github.com/spf13/cobra" | ||
|
||
"github.com/confluentinc/cli/v4/pkg/ccloudv2" | ||
pcmd "github.com/confluentinc/cli/v4/pkg/cmd" | ||
"github.com/confluentinc/cli/v4/pkg/config" | ||
"github.com/confluentinc/cli/v4/pkg/examples" | ||
"github.com/confluentinc/cli/v4/pkg/kafka" | ||
"github.com/confluentinc/cli/v4/pkg/output" | ||
) | ||
|
||
type logsCommand struct { | ||
*pcmd.AuthenticatedCLICommand | ||
} | ||
|
||
type logEntryOut struct { | ||
Timestamp string `human:"Timestamp" serialized:"timestamp"` | ||
Level string `human:"Level" serialized:"level"` | ||
TaskId string `human:"Task ID" serialized:"task_id"` | ||
Message string `human:"Message" serialized:"message"` | ||
} | ||
|
||
func newLogsCommand(prerunner pcmd.PreRunner) *cobra.Command { | ||
cmd := &cobra.Command{ | ||
Use: "logs <id>", | ||
Short: "Manage logs for connectors.", | ||
Args: cobra.ExactArgs(1), | ||
Example: examples.BuildExampleString( | ||
examples.Example{ | ||
Text: "Query connector logs with log level ERROR between the provided time window:", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add an example where someone can search for ERROR & WARN? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
Code: `confluent connect logs lcc-123456 --level ERROR --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z"`, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we expect user to provide the start and end times as flags in this format? i am not sure if this is great UX to ask such timestamps from the user in the CLI . @sgagniere your views on this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a bit unfortunate, but it looks like it's necessary for the API call. But I would include a link to the standard explaining the time format, either in an example or the flag descriptions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's in RFC3339 format with UTC timezone however we don't allow miliseconds. I've updated the flag description to "Start time for log query in RFC3339 format with UTC timezone i.e. YYYY-MM-DDTHH:MM:SSZ (e.g., 2025-02-01T00:00:00Z)" which should make things clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding the datetime format was giving a lint error so removed it however I've mentioned it in the timstamp validation error return |
||
}, | ||
examples.Example{ | ||
Text: "Query connector logs with log level ERROR and WARN between the provided time window:", | ||
Code: `confluent connect logs lcc-123456 --level "ERROR|WARN" --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z"`, | ||
}, | ||
examples.Example{ | ||
Text: "Query next page of connector logs for the same query by running the command repeatedly until \"No more logs for the current query\" is printed to the console:", | ||
Code: `confluent connect logs lcc-123456 --level ERROR --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z" --next`, | ||
}, | ||
examples.Example{ | ||
Text: "Query all connector logs between the provided time window:", | ||
Code: `confluent connect logs lcc-123456 --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z"`, | ||
}, | ||
examples.Example{ | ||
Text: "Query connector logs with log level ERROR and containing \"example error\" in logs between the provided time window, and store in file:", | ||
Code: `confluent connect logs lcc-123456 --level "ERROR" --search-text "example error" --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z" --output-file errors.json`, | ||
}, | ||
), | ||
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, | ||
} | ||
|
||
c := &logsCommand{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)} | ||
cmd.RunE = c.queryLogs | ||
cmd.Flags().String("start-time", "", "Start time for log query in RFC3339 format with UTC timezone (e.g., 2025-02-01T00:00:00Z).") | ||
cmd.Flags().String("end-time", "", "End time for log query in RFC3339 format with UTC timezone (e.g., 2025-02-01T23:59:59Z).") | ||
cmd.Flags().String("level", "ERROR", "Log level filter (INFO, WARN, ERROR). Defaults to ERROR. Use '|' to specify multiple levels (e.g., ERROR|WARN).") | ||
cmd.Flags().String("search-text", "", "Search text within logs (optional).") | ||
cmd.Flags().String("output-file", "", "Output file path to append connector logs (optional).") | ||
cmd.Flags().Bool("next", false, "Whether to fetch next page of logs after the next execution of the command (optional).") | ||
|
||
pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) | ||
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) | ||
pcmd.AddOutputFlag(cmd) | ||
|
||
cobra.CheckErr(cmd.MarkFlagRequired("start-time")) | ||
cobra.CheckErr(cmd.MarkFlagRequired("end-time")) | ||
|
||
return cmd | ||
} | ||
|
||
func (c *logsCommand) queryLogs(cmd *cobra.Command, args []string) error { | ||
connectorId := args[0] | ||
if connectorId == "" { | ||
return fmt.Errorf("connector ID cannot be empty") | ||
} | ||
|
||
startTime, err := cmd.Flags().GetString("start-time") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
endTime, err := cmd.Flags().GetString("end-time") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
level, err := cmd.Flags().GetString("level") | ||
if err != nil { | ||
return err | ||
} | ||
levels := strings.Split(level, "|") | ||
|
||
searchText, err := cmd.Flags().GetString("search-text") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
outputFile, err := cmd.Flags().GetString("output-file") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
next, err := cmd.Flags().GetBool("next") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if err := validateTimeFormat(startTime); err != nil { | ||
return fmt.Errorf("invalid start-time format: %w", err) | ||
} | ||
|
||
if err := validateTimeFormat(endTime); err != nil { | ||
return fmt.Errorf("invalid end-time format: %w", err) | ||
} | ||
|
||
currentLogQuery := &config.ConnectLogsQueryState{ | ||
StartTime: startTime, | ||
EndTime: endTime, | ||
Level: level, | ||
SearchText: searchText, | ||
ConnectorId: connectorId, | ||
PageToken: "", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets add omitempty for this instead of providing an empty string here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This struct isn't for json parsing, we are storing the current query parameters and the pagetoken returned in the context state so that at the next query run we can identify if we need to fetch a subsequent page or start a fresh query (determined by whether pageToken is set to "" or a non empty string) |
||
} | ||
|
||
kafkaCluster, err := kafka.GetClusterForCommand(c.V2Client, c.Context) | ||
if err != nil { | ||
return fmt.Errorf("failed to get Kafka cluster information: %w\nPlease ensure you have set a cluster context with 'confluent kafka cluster use <cluster-id>' or specify --cluster flag", err) | ||
} | ||
|
||
kafkaClusterId := kafkaCluster.GetId() | ||
if kafkaClusterId == "" { | ||
return fmt.Errorf("failed to get Kafka cluster ID: %w\nPlease ensure you have set a cluster context with 'confluent kafka cluster use <cluster-id>' or specify --cluster flag", err) | ||
} | ||
|
||
environmentId, err := c.Context.EnvironmentId() | ||
if err != nil { | ||
return fmt.Errorf("failed to get environment ID: %w\nPlease ensure you have set an environment context with 'confluent environment use <env-id>' or specify --environment flag", err) | ||
} | ||
|
||
connector, err := c.V2Client.GetConnectorExpansionById(connectorId, environmentId, kafkaClusterId) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
connectorInfo := connector.GetInfo() | ||
connectorName := connectorInfo.GetName() | ||
if connectorName == "" { | ||
return fmt.Errorf("failed to get connector name") | ||
} | ||
|
||
lastQueryPageToken, err := c.getPageTokenFromStoredQuery(next, currentLogQuery) | ||
// if error not nil this means that there are no further pages for current query hence return | ||
if err != nil { | ||
return nil | ||
} | ||
|
||
crn := fmt.Sprintf("crn://confluent.cloud/organization=%s/environment=%s/cloud-cluster=%s/connector=%s", | ||
c.Context.GetCurrentOrganization(), | ||
environmentId, | ||
kafkaClusterId, | ||
connectorName, | ||
) | ||
|
||
logs, err := c.V2Client.SearchConnectorLogs(crn, startTime, endTime, levels, searchText, lastQueryPageToken) | ||
if err != nil { | ||
return fmt.Errorf("failed to query connector logs: %w", err) | ||
} | ||
|
||
err = c.storeQueryInContext(logs, currentLogQuery) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if outputFile != "" { | ||
return writeLogsToFile(outputFile, logs) | ||
} | ||
|
||
if output.GetFormat(cmd).IsSerialized() { | ||
return output.SerializedOutput(cmd, logs.Data) | ||
} | ||
|
||
return printHumanLogs(cmd, logs, connectorId) | ||
} | ||
|
||
func (c *logsCommand) getPageTokenFromStoredQuery(next bool, currentLogQuery *config.ConnectLogsQueryState) (string, error) { | ||
lastLogQuery := c.Context.GetConnectLogsQueryState() | ||
var lastQueryPageToken string | ||
if next { | ||
if lastLogQuery != nil && (lastLogQuery.StartTime == currentLogQuery.StartTime && | ||
lastLogQuery.EndTime == currentLogQuery.EndTime && | ||
lastLogQuery.Level == currentLogQuery.Level && | ||
lastLogQuery.SearchText == currentLogQuery.SearchText && | ||
lastLogQuery.ConnectorId == currentLogQuery.ConnectorId) { | ||
lastQueryPageToken = lastLogQuery.PageToken | ||
// If page token for the last query is empty, it means there are no more logs for the current query | ||
if lastQueryPageToken == "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, is this the error case? Something seems off to me here, have we tested this scenario? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is what the output looks like when the current query has no more pages left. This condition triggers the code to immediately return since there's nothing to do. |
||
output.Printf(false, "No more logs for the current query\n") | ||
return "", fmt.Errorf("no more logs for the current query") | ||
} | ||
} else { | ||
// If the last query is not the same as the current query or there was no last query,, reset the page token | ||
lastQueryPageToken = "" | ||
} | ||
} else { | ||
// If the next flag is not set, reset the page token | ||
lastQueryPageToken = "" | ||
} | ||
return lastQueryPageToken, nil | ||
} | ||
|
||
func (c *logsCommand) storeQueryInContext(logs *ccloudv2.LoggingSearchResponse, currentLogQuery *config.ConnectLogsQueryState) error { | ||
if logs.Metadata != nil { | ||
pageToken, err := extractPageToken(logs.Metadata.Next) | ||
currentLogQuery.SetPageToken(pageToken) | ||
if err != nil { | ||
return fmt.Errorf("failed to extract page token: %w", err) | ||
} | ||
} else { | ||
currentLogQuery.SetPageToken("") | ||
} | ||
|
||
// Update the context with the current query state | ||
err := c.Context.SetConnectLogsQueryState(currentLogQuery) | ||
if err != nil { | ||
return fmt.Errorf("failed to set connect logs query state: %w", err) | ||
} | ||
if err := c.Config.Save(); err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func writeLogsToFile(outputFile string, logs *ccloudv2.LoggingSearchResponse) error { | ||
file, err := os.OpenFile(outputFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) | ||
if err != nil { | ||
return fmt.Errorf("failed to open file %s: %w", outputFile, err) | ||
} | ||
defer file.Close() | ||
|
||
for _, log := range logs.Data { | ||
logEntry := &logEntryOut{ | ||
Timestamp: log.Timestamp, | ||
Level: log.Level, | ||
TaskId: log.TaskId, | ||
Message: log.Message, | ||
} | ||
|
||
data, err := json.Marshal(logEntry) | ||
if err != nil { | ||
return fmt.Errorf("failed to marshal log entry to JSON: %w", err) | ||
} | ||
|
||
if _, err := file.Write(data); err != nil { | ||
return fmt.Errorf("failed to write log entry to file %s: %w", outputFile, err) | ||
} | ||
|
||
if _, err := file.WriteString("\n"); err != nil { | ||
return fmt.Errorf("failed to write newline to file %s: %w", outputFile, err) | ||
} | ||
} | ||
|
||
output.Printf(false, "Appended %d log entries to file: %s\n", len(logs.Data), outputFile) | ||
return nil | ||
} | ||
|
||
func validateTimeFormat(timeStr string) error { | ||
pattern := `^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$` | ||
match, err := regexp.MatchString(pattern, timeStr) | ||
if !match || err != nil { | ||
return fmt.Errorf("must be formatted as: YYYY-MM-DDTHH:MM:SSZ") | ||
} | ||
return nil | ||
} | ||
|
||
func extractPageToken(urlStr string) (string, error) { | ||
parsedURL, err := url.Parse(urlStr) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
queryParams := parsedURL.Query() | ||
pageToken := queryParams.Get("page_token") | ||
|
||
return pageToken, nil | ||
} | ||
|
||
func printHumanLogs(cmd *cobra.Command, logs *ccloudv2.LoggingSearchResponse, connectorId string) error { | ||
list := output.NewList(cmd) | ||
for _, log := range logs.Data { | ||
logOut := &logEntryOut{ | ||
Timestamp: log.Timestamp, | ||
Level: log.Level, | ||
TaskId: log.TaskId, | ||
Message: log.Message, | ||
} | ||
list.Add(logOut) | ||
} | ||
|
||
if len(logs.Data) == 0 { | ||
output.Println(false, "No more logs for the current query") | ||
return nil | ||
} | ||
|
||
output.Printf(false, "Found %d log entries for connector %s:\n\n", len(logs.Data), connectorId) | ||
return list.Print() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can break this into 3 functions: get parameters & validate it, then search and then the output part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on how other commands are, kept the getting parameters and search part in this function and moved out the rest of the logic into individual functions