Skip to content

Add CLI functionality for fetching connect logs #3123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions internal/connect/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
cmd.AddCommand(newClusterCommand(cfg, prerunner))
cmd.AddCommand(newCustomPluginCommand(cfg, prerunner))
cmd.AddCommand(newEventCommand(prerunner))
cmd.AddCommand(newLogsCommand(prerunner))
cmd.AddCommand(newOffsetCommand(prerunner))
cmd.AddCommand(newPluginCommand(cfg, prerunner))

Expand Down
316 changes: 316 additions & 0 deletions internal/connect/command_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
package connect

import (
"encoding/json"
"fmt"
"net/url"
"os"
"regexp"
"strings"

"github.com/spf13/cobra"

"github.com/confluentinc/cli/v4/pkg/ccloudv2"
pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/config"
"github.com/confluentinc/cli/v4/pkg/examples"
"github.com/confluentinc/cli/v4/pkg/kafka"
"github.com/confluentinc/cli/v4/pkg/output"
)

type logsCommand struct {
*pcmd.AuthenticatedCLICommand
}

type logEntryOut struct {
Timestamp string `human:"Timestamp" serialized:"timestamp"`
Level string `human:"Level" serialized:"level"`
TaskId string `human:"Task ID" serialized:"task_id"`
Message string `human:"Message" serialized:"message"`
}

func newLogsCommand(prerunner pcmd.PreRunner) *cobra.Command {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can break this into 3 functions: get parameters & validate it, then search and then the output part?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on how other commands are, kept the getting parameters and search part in this function and moved out the rest of the logic into individual functions

cmd := &cobra.Command{
Use: "logs <id>",
Short: "Manage logs for connectors.",
Args: cobra.ExactArgs(1),
Example: examples.BuildExampleString(
examples.Example{
Text: "Query connector logs with log level ERROR between the provided time window:",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add an example where someone can search for ERROR & WARN?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Code: `confluent connect logs lcc-123456 --level ERROR --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z"`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we expect user to provide the start and end times as flags in this format? i am not sure if this is great UX to ask such timestamps from the user in the CLI . @sgagniere your views on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit unfortunate, but it looks like it's necessary for the API call.

But I would include a link to the standard explaining the time format, either in an example or the flag descriptions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in RFC3339 format with UTC timezone however we don't allow miliseconds. I've updated the flag description to "Start time for log query in RFC3339 format with UTC timezone i.e. YYYY-MM-DDTHH:MM:SSZ (e.g., 2025-02-01T00:00:00Z)" which should make things clear.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the datetime format was giving a lint error so removed it however I've mentioned it in the timstamp validation error return fmt.Errorf("must be formatted as: YYYY-MM-DDTHH:MM:SSZ")

},
examples.Example{
Text: "Query connector logs with log level ERROR and WARN between the provided time window:",
Code: `confluent connect logs lcc-123456 --level "ERROR|WARN" --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z"`,
},
examples.Example{
Text: "Query next page of connector logs for the same query by running the command repeatedly until \"No more logs for the current query\" is printed to the console:",
Code: `confluent connect logs lcc-123456 --level ERROR --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z" --next`,
},
examples.Example{
Text: "Query all connector logs between the provided time window:",
Code: `confluent connect logs lcc-123456 --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z"`,
},
examples.Example{
Text: "Query connector logs with log level ERROR and containing \"example error\" in logs between the provided time window, and store in file:",
Code: `confluent connect logs lcc-123456 --level "ERROR" --search-text "example error" --start-time "2025-02-01T00:00:00Z" --end-time "2025-02-01T23:59:59Z" --output-file errors.json`,
},
),
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin},
}

c := &logsCommand{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)}
cmd.RunE = c.queryLogs
cmd.Flags().String("start-time", "", "Start time for log query in RFC3339 format with UTC timezone (e.g., 2025-02-01T00:00:00Z).")

Check failure on line 64 in internal/connect/command_logs.go

View check run for this annotation

SonarQube-Confluent / cli Sonarqube Results

internal/connect/command_logs.go#L64

Define a constant instead of duplicating this literal "start-time" 3 times.
cmd.Flags().String("end-time", "", "End time for log query in RFC3339 format with UTC timezone (e.g., 2025-02-01T23:59:59Z).")

Check failure on line 65 in internal/connect/command_logs.go

View check run for this annotation

SonarQube-Confluent / cli Sonarqube Results

internal/connect/command_logs.go#L65

Define a constant instead of duplicating this literal "end-time" 3 times.
cmd.Flags().String("level", "ERROR", "Log level filter (INFO, WARN, ERROR). Defaults to ERROR. Use '|' to specify multiple levels (e.g., ERROR|WARN).")
cmd.Flags().String("search-text", "", "Search text within logs (optional).")
cmd.Flags().String("output-file", "", "Output file path to append connector logs (optional).")
cmd.Flags().Bool("next", false, "Whether to fetch next page of logs after the next execution of the command (optional).")

pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("start-time"))
cobra.CheckErr(cmd.MarkFlagRequired("end-time"))

return cmd
}

func (c *logsCommand) queryLogs(cmd *cobra.Command, args []string) error {

Check failure on line 81 in internal/connect/command_logs.go

View check run for this annotation

SonarQube-Confluent / cli Sonarqube Results

internal/connect/command_logs.go#L81

Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed.
connectorId := args[0]
if connectorId == "" {
return fmt.Errorf("connector ID cannot be empty")
}

startTime, err := cmd.Flags().GetString("start-time")
if err != nil {
return err
}

endTime, err := cmd.Flags().GetString("end-time")
if err != nil {
return err
}

level, err := cmd.Flags().GetString("level")
if err != nil {
return err
}
levels := strings.Split(level, "|")

searchText, err := cmd.Flags().GetString("search-text")
if err != nil {
return err
}

outputFile, err := cmd.Flags().GetString("output-file")
if err != nil {
return err
}

next, err := cmd.Flags().GetBool("next")
if err != nil {
return err
}

if err := validateTimeFormat(startTime); err != nil {
return fmt.Errorf("invalid start-time format: %w", err)
}

if err := validateTimeFormat(endTime); err != nil {
return fmt.Errorf("invalid end-time format: %w", err)
}

currentLogQuery := &config.ConnectLogsQueryState{
StartTime: startTime,
EndTime: endTime,
Level: level,
SearchText: searchText,
ConnectorId: connectorId,
PageToken: "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add omitempty for this instead of providing an empty string here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This struct isn't for json parsing, we are storing the current query parameters and the pagetoken returned in the context state so that at the next query run we can identify if we need to fetch a subsequent page or start a fresh query (determined by whether pageToken is set to "" or a non empty string)

}

kafkaCluster, err := kafka.GetClusterForCommand(c.V2Client, c.Context)
if err != nil {
return fmt.Errorf("failed to get Kafka cluster information: %w\nPlease ensure you have set a cluster context with 'confluent kafka cluster use <cluster-id>' or specify --cluster flag", err)
}

kafkaClusterId := kafkaCluster.GetId()
if kafkaClusterId == "" {
return fmt.Errorf("failed to get Kafka cluster ID: %w\nPlease ensure you have set a cluster context with 'confluent kafka cluster use <cluster-id>' or specify --cluster flag", err)
}

environmentId, err := c.Context.EnvironmentId()
if err != nil {
return fmt.Errorf("failed to get environment ID: %w\nPlease ensure you have set an environment context with 'confluent environment use <env-id>' or specify --environment flag", err)
}

connector, err := c.V2Client.GetConnectorExpansionById(connectorId, environmentId, kafkaClusterId)
if err != nil {
return err
}

connectorInfo := connector.GetInfo()
connectorName := connectorInfo.GetName()
if connectorName == "" {
return fmt.Errorf("failed to get connector name")
}

lastQueryPageToken, err := c.getPageTokenFromStoredQuery(next, currentLogQuery)
// if error not nil this means that there are no further pages for current query hence return
if err != nil {
return nil
}

crn := fmt.Sprintf("crn://confluent.cloud/organization=%s/environment=%s/cloud-cluster=%s/connector=%s",
c.Context.GetCurrentOrganization(),
environmentId,
kafkaClusterId,
connectorName,
)

logs, err := c.V2Client.SearchConnectorLogs(crn, startTime, endTime, levels, searchText, lastQueryPageToken)
if err != nil {
return fmt.Errorf("failed to query connector logs: %w", err)
}

err = c.storeQueryInContext(logs, currentLogQuery)
if err != nil {
return err
}

if outputFile != "" {
return writeLogsToFile(outputFile, logs)
}

if output.GetFormat(cmd).IsSerialized() {
return output.SerializedOutput(cmd, logs.Data)
}

return printHumanLogs(cmd, logs, connectorId)
}

func (c *logsCommand) getPageTokenFromStoredQuery(next bool, currentLogQuery *config.ConnectLogsQueryState) (string, error) {
lastLogQuery := c.Context.GetConnectLogsQueryState()
var lastQueryPageToken string
if next {
if lastLogQuery != nil && (lastLogQuery.StartTime == currentLogQuery.StartTime &&
lastLogQuery.EndTime == currentLogQuery.EndTime &&
lastLogQuery.Level == currentLogQuery.Level &&
lastLogQuery.SearchText == currentLogQuery.SearchText &&
lastLogQuery.ConnectorId == currentLogQuery.ConnectorId) {
lastQueryPageToken = lastLogQuery.PageToken
// If page token for the last query is empty, it means there are no more logs for the current query
if lastQueryPageToken == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, is this the error case? Something seems off to me here, have we tested this scenario?

Copy link
Member Author

@mchoudhary-cflt mchoudhary-cflt Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mchoudhary@WH9KW4M76R cli % ./confluent connect logs lcc-123 --start-time "2025-06-22T05:35:00Z" --end-time "2025-06-22T05:45:00Z" --level "INFO" --next
No more logs for the current query

This is what the output looks like when the current query has no more pages left. This condition triggers the code to immediately return since there's nothing to do.

output.Printf(false, "No more logs for the current query\n")
return "", fmt.Errorf("no more logs for the current query")
}
} else {
// If the last query is not the same as the current query or there was no last query,, reset the page token
lastQueryPageToken = ""
}
} else {
// If the next flag is not set, reset the page token
lastQueryPageToken = ""
}
return lastQueryPageToken, nil
}

func (c *logsCommand) storeQueryInContext(logs *ccloudv2.LoggingSearchResponse, currentLogQuery *config.ConnectLogsQueryState) error {
if logs.Metadata != nil {
pageToken, err := extractPageToken(logs.Metadata.Next)
currentLogQuery.SetPageToken(pageToken)
if err != nil {
return fmt.Errorf("failed to extract page token: %w", err)
}
} else {
currentLogQuery.SetPageToken("")
}

// Update the context with the current query state
err := c.Context.SetConnectLogsQueryState(currentLogQuery)
if err != nil {
return fmt.Errorf("failed to set connect logs query state: %w", err)
}
if err := c.Config.Save(); err != nil {
return err
}
return nil
}

func writeLogsToFile(outputFile string, logs *ccloudv2.LoggingSearchResponse) error {
file, err := os.OpenFile(outputFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open file %s: %w", outputFile, err)
}
defer file.Close()

for _, log := range logs.Data {
logEntry := &logEntryOut{
Timestamp: log.Timestamp,
Level: log.Level,
TaskId: log.TaskId,
Message: log.Message,
}

data, err := json.Marshal(logEntry)
if err != nil {
return fmt.Errorf("failed to marshal log entry to JSON: %w", err)
}

if _, err := file.Write(data); err != nil {
return fmt.Errorf("failed to write log entry to file %s: %w", outputFile, err)
}

if _, err := file.WriteString("\n"); err != nil {
return fmt.Errorf("failed to write newline to file %s: %w", outputFile, err)
}
}

output.Printf(false, "Appended %d log entries to file: %s\n", len(logs.Data), outputFile)
return nil
}

func validateTimeFormat(timeStr string) error {
pattern := `^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$`
match, err := regexp.MatchString(pattern, timeStr)
if !match || err != nil {
return fmt.Errorf("must be formatted as: YYYY-MM-DDTHH:MM:SSZ")
}
return nil
}

func extractPageToken(urlStr string) (string, error) {
parsedURL, err := url.Parse(urlStr)
if err != nil {
return "", err
}

queryParams := parsedURL.Query()
pageToken := queryParams.Get("page_token")

return pageToken, nil
}

func printHumanLogs(cmd *cobra.Command, logs *ccloudv2.LoggingSearchResponse, connectorId string) error {
list := output.NewList(cmd)
for _, log := range logs.Data {
logOut := &logEntryOut{
Timestamp: log.Timestamp,
Level: log.Level,
TaskId: log.TaskId,
Message: log.Message,
}
list.Add(logOut)
}

if len(logs.Data) == 0 {
output.Println(false, "No more logs for the current query")
return nil
}

output.Printf(false, "Found %d log entries for connector %s:\n\n", len(logs.Data), connectorId)
return list.Print()
}
Loading