-
Notifications
You must be signed in to change notification settings - Fork 689
[Feature][history server] support endpoint /api/v0/logs/file
#4411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
3180714
7fc3c87
2a75aef
3f280d2
f06a62a
ac166a3
be433ac
a092446
9fc90c3
c986529
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,17 +1,31 @@ | ||
| package historyserver | ||
|
|
||
| import ( | ||
| "bufio" | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "io" | ||
| "net/http" | ||
| "path" | ||
| "sort" | ||
| "strings" | ||
|
|
||
| "github.com/emicklei/go-restful/v3" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/utils" | ||
| "github.com/sirupsen/logrus" | ||
| ) | ||
|
|
||
| const ( | ||
| // DEFAULT_LOG_LIMIT is the default number of lines to return when lines parameter is not specified or is 0. | ||
| // This matches Ray Dashboard API default behavior. | ||
| DEFAULT_LOG_LIMIT = 1000 | ||
|
|
||
| // MAX_LOG_LIMIT is the maximum number of lines that can be requested. | ||
| // Requests exceeding this limit will be capped to this value. | ||
| MAX_LOG_LIMIT = 10000 | ||
| ) | ||
|
|
||
| func (s *ServerHandler) listClusters(limit int) []utils.ClusterInfo { | ||
| // Initial continuation marker | ||
| logrus.Debugf("Prepare to get list clusters info ...") | ||
|
|
@@ -56,6 +70,67 @@ func (s *ServerHandler) _getNodeLogs(rayClusterNameID, sessionId, nodeId, dir st | |
| return json.Marshal(ret) | ||
| } | ||
|
|
||
| func (s *ServerHandler) _getNodeLogFile(rayClusterNameID, sessionID, nodeID, filename string, maxLines int) ([]byte, error) { | ||
| logPath := path.Join(sessionID, "logs", nodeID, filename) | ||
|
|
||
| reader := s.reader.GetContent(rayClusterNameID, logPath) | ||
|
|
||
| if reader == nil { | ||
| return nil, fmt.Errorf("log file not found: %s", logPath) | ||
| } | ||
|
|
||
| if maxLines < 0 { | ||
| // -1 means read all lines, match Ray Dashboard API behavior | ||
| return io.ReadAll(reader) | ||
| } | ||
|
|
||
| if maxLines == 0 { | ||
| maxLines = DEFAULT_LOG_LIMIT | ||
| } | ||
|
|
||
| if maxLines > MAX_LOG_LIMIT { | ||
| logrus.Warnf("Requested lines (%d) exceeds max limit (%d), capping to max", maxLines, MAX_LOG_LIMIT) | ||
| maxLines = MAX_LOG_LIMIT | ||
| } | ||
|
|
||
| scanner := bufio.NewScanner(reader) | ||
| buffer := make([]string, maxLines) | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| index := 0 | ||
| totalLines := 0 | ||
|
|
||
| // Get the last N lines following Ray Dashboard API behavior with circular buffer | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ray dashboard |
||
| // Example with maxLines=3, file has 5 lines: | ||
| // Line 1: buffer[0], Line 2: buffer[1], Line 3: buffer[2] | ||
| // Line 4: buffer[0] (overwrites Line 1), Line 5: buffer[1] (overwrites Line 2) | ||
| // Final buffer: ["Line 4", "Line 5", "Line 3"] | ||
| for scanner.Scan() { | ||
| buffer[index%maxLines] = scanner.Text() | ||
| index++ | ||
| totalLines++ | ||
| } | ||
|
|
||
| if err := scanner.Err(); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // Reconstruct lines in correct order | ||
| var lines []string | ||
| if totalLines <= maxLines { | ||
| // File has fewer lines than requested, return all | ||
| lines = buffer[:totalLines] | ||
| } else { | ||
| // Construct response with correct log line order from the circular buffer | ||
| // Example: buffer=["Line 4", "Line 5", "Line 3"], start=2 | ||
| // buffer[2:] = ["Line 3"] (oldest) | ||
| // buffer[:2] = ["Line 4", "Line 5"] (newest) | ||
| // Result: ["Line 3", "Line 4", "Line 5"] | ||
| start := index % maxLines | ||
| lines = append(buffer[start:], buffer[:start]...) | ||
| } | ||
|
|
||
| return []byte(strings.Join(lines, "\n")), nil | ||
| } | ||
|
|
||
| func (s *ServerHandler) GetNodes(rayClusterNameID, sessionId string) ([]byte, error) { | ||
| logPath := path.Join(sessionId, "logs") | ||
| nodes := s.reader.ListFiles(rayClusterNameID, logPath) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -4,8 +4,10 @@ import ( | |||||||
| "context" | ||||||||
| "encoding/json" | ||||||||
| "errors" | ||||||||
| "fmt" | ||||||||
| "io" | ||||||||
| "net/http" | ||||||||
| "strconv" | ||||||||
| "strings" | ||||||||
|
|
||||||||
| "github.com/emicklei/go-restful/v3" | ||||||||
|
|
@@ -106,7 +108,6 @@ func routerAPI(s *ServerHandler) { | |||||||
| Doc("get logfile").Param(ws.QueryParameter("node_id", "node_id")). | ||||||||
| Param(ws.QueryParameter("filename", "filename")). | ||||||||
| Param(ws.QueryParameter("lines", "lines")). | ||||||||
| Param(ws.QueryParameter("format", "format")). | ||||||||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no "format" query string in ray dashboard
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we support other queries (like actor_id, task_id..), or will this be handled in a follow-up? |
||||||||
| Writes("")) // Placeholder for specific return type | ||||||||
|
|
||||||||
| ws.Route(ws.GET("/v0/tasks").To(s.getTaskDetail).Filter(s.CookieHandle). | ||||||||
|
|
@@ -566,14 +567,55 @@ func (s *ServerHandler) getLogicalActor(req *restful.Request, resp *restful.Resp | |||||||
| } | ||||||||
|
|
||||||||
| func (s *ServerHandler) getNodeLogFile(req *restful.Request, resp *restful.Response) { | ||||||||
| clusterNameID := req.Attribute(COOKIE_CLUSTER_NAME_KEY).(string) | ||||||||
| clusterNamespace := req.Attribute(COOKIE_CLUSTER_NAMESPACE_KEY).(string) | ||||||||
| sessionName := req.Attribute(COOKIE_SESSION_NAME_KEY).(string) | ||||||||
|
|
||||||||
| // Parse query parameters | ||||||||
| nodeID := req.QueryParameter("node_id") | ||||||||
| filename := req.QueryParameter("filename") | ||||||||
| lines := req.QueryParameter("lines") | ||||||||
|
Comment on lines
+574
to
+577
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can define an struct, similar to Ray’s GetLogOptions?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While there's only 3 parameters here, I think it's not needed for now. If in the future we want to add support for those options, we can add it. WDYT? |
||||||||
|
|
||||||||
| // Validate required parameters | ||||||||
| if nodeID == "" { | ||||||||
| resp.WriteErrorString(http.StatusBadRequest, "Missing required parameter: node_id") | ||||||||
| return | ||||||||
| } | ||||||||
| if filename == "" { | ||||||||
| resp.WriteErrorString(http.StatusBadRequest, "Missing required parameter: filename") | ||||||||
| return | ||||||||
| } | ||||||||
|
|
||||||||
| // Prevent path traversal attacks (e.g., ../../etc/passwd) | ||||||||
| if strings.Contains(nodeID, "..") || strings.Contains(filename, "..") { | ||||||||
| resp.WriteErrorString(http.StatusBadRequest, fmt.Sprintf("invalid path: ../ not allowed in the path (node_id=%s, filename=%s)", nodeID, filename)) | ||||||||
| return | ||||||||
| } | ||||||||
|
|
||||||||
| if sessionName == "live" { | ||||||||
| s.redirectRequest(req, resp) | ||||||||
| return | ||||||||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
| } | ||||||||
|
|
||||||||
| // Not yet supported | ||||||||
| resp.WriteErrorString(http.StatusNotImplemented, "Node log file not yet supported") | ||||||||
| // Convert lines parameter to int | ||||||||
| maxLines := 0 | ||||||||
| if lines != "" { | ||||||||
| parsedLines, err := strconv.Atoi(lines) | ||||||||
| if err != nil { | ||||||||
| resp.WriteErrorString(http.StatusBadRequest, fmt.Sprintf("invalid lines parameter: %s", lines)) | ||||||||
| return | ||||||||
| } | ||||||||
| maxLines = parsedLines | ||||||||
| } | ||||||||
|
|
||||||||
| content, err := s._getNodeLogFile(clusterNameID+"_"+clusterNamespace, sessionName, nodeID, filename, maxLines) | ||||||||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
| if err != nil { | ||||||||
| logrus.Errorf("Error getting node log file: %v", err) | ||||||||
| resp.WriteError(http.StatusInternalServerError, err) | ||||||||
| return | ||||||||
| } | ||||||||
| resp.Header().Set("Content-Type", "text/plain") | ||||||||
| resp.Write(content) | ||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 9fc90c3 |
||||||||
| } | ||||||||
|
|
||||||||
| func (s *ServerHandler) getTaskSummarize(req *restful.Request, resp *restful.Response) { | ||||||||
|
|
||||||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to implement in this PR. For StorageReader, but i think we can introduce a
GetContentStreammethod that returns anio.ReadCloserinstead of loading the entire file into memory. This would allow callers to stream content directly and manage resource cleanup explicitly, which is essential for handling large log files efficiently.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice suggestions! Will do it in follow-up