Skip to content
75 changes: 75 additions & 0 deletions historyserver/pkg/historyserver/reader.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
package historyserver

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"path"
"sort"
"strings"

"github.com/emicklei/go-restful/v3"
"github.com/ray-project/kuberay/historyserver/pkg/utils"
"github.com/sirupsen/logrus"
)

const (
// DEFAULT_LOG_LIMIT is the default number of lines to return when lines parameter is not specified or is 0.
// This matches Ray Dashboard API default behavior.
DEFAULT_LOG_LIMIT = 1000

// MAX_LOG_LIMIT is the maximum number of lines that can be requested.
// Requests exceeding this limit will be capped to this value.
MAX_LOG_LIMIT = 10000
)

func (s *ServerHandler) listClusters(limit int) []utils.ClusterInfo {
// Initial continuation marker
logrus.Debugf("Prepare to get list clusters info ...")
Expand Down Expand Up @@ -56,6 +70,67 @@ func (s *ServerHandler) _getNodeLogs(rayClusterNameID, sessionId, nodeId, dir st
return json.Marshal(ret)
}

func (s *ServerHandler) _getNodeLogFile(rayClusterNameID, sessionID, nodeID, filename string, maxLines int) ([]byte, error) {
logPath := path.Join(sessionID, "logs", nodeID, filename)

reader := s.reader.GetContent(rayClusterNameID, logPath)
Copy link
Member

@win5923 win5923 Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to implement in this PR. For StorageReader, but i think we can introduce a GetContentStream method that returns an io.ReadCloser instead of loading the entire file into memory. This would allow callers to stream content directly and manage resource cleanup explicitly, which is essential for handling large log files efficiently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestions! Will do it in follow-up


if reader == nil {
return nil, fmt.Errorf("log file not found: %s", logPath)
}

if maxLines < 0 {
// -1 means read all lines, match Ray Dashboard API behavior
return io.ReadAll(reader)
}

if maxLines == 0 {
maxLines = DEFAULT_LOG_LIMIT
}

if maxLines > MAX_LOG_LIMIT {
logrus.Warnf("Requested lines (%d) exceeds max limit (%d), capping to max", maxLines, MAX_LOG_LIMIT)
maxLines = MAX_LOG_LIMIT
}

scanner := bufio.NewScanner(reader)
buffer := make([]string, maxLines)
index := 0
totalLines := 0

// Get the last N lines following Ray Dashboard API behavior with circular buffer
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Example with maxLines=3, file has 5 lines:
// Line 1: buffer[0], Line 2: buffer[1], Line 3: buffer[2]
// Line 4: buffer[0] (overwrites Line 1), Line 5: buffer[1] (overwrites Line 2)
// Final buffer: ["Line 4", "Line 5", "Line 3"]
for scanner.Scan() {
buffer[index%maxLines] = scanner.Text()
index++
totalLines++
}

if err := scanner.Err(); err != nil {
return nil, err
}

// Reconstruct lines in correct order
var lines []string
if totalLines <= maxLines {
// File has fewer lines than requested, return all
lines = buffer[:totalLines]
} else {
// Construct response with correct log line order from the circular buffer
// Example: buffer=["Line 4", "Line 5", "Line 3"], start=2
// buffer[2:] = ["Line 3"] (oldest)
// buffer[:2] = ["Line 4", "Line 5"] (newest)
// Result: ["Line 3", "Line 4", "Line 5"]
start := index % maxLines
lines = append(buffer[start:], buffer[:start]...)
}

return []byte(strings.Join(lines, "\n")), nil
}

func (s *ServerHandler) GetNodes(rayClusterNameID, sessionId string) ([]byte, error) {
logPath := path.Join(sessionId, "logs")
nodes := s.reader.ListFiles(rayClusterNameID, logPath)
Expand Down
48 changes: 45 additions & 3 deletions historyserver/pkg/historyserver/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"

"github.com/emicklei/go-restful/v3"
Expand Down Expand Up @@ -106,7 +108,6 @@ func routerAPI(s *ServerHandler) {
Doc("get logfile").Param(ws.QueryParameter("node_id", "node_id")).
Param(ws.QueryParameter("filename", "filename")).
Param(ws.QueryParameter("lines", "lines")).
Param(ws.QueryParameter("format", "format")).
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@win5923 win5923 Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support other queries (like actor_id, task_id..), or will this be handled in a follow-up?

Writes("")) // Placeholder for specific return type

ws.Route(ws.GET("/v0/tasks").To(s.getTaskDetail).Filter(s.CookieHandle).
Expand Down Expand Up @@ -566,14 +567,55 @@ func (s *ServerHandler) getLogicalActor(req *restful.Request, resp *restful.Resp
}

func (s *ServerHandler) getNodeLogFile(req *restful.Request, resp *restful.Response) {
clusterNameID := req.Attribute(COOKIE_CLUSTER_NAME_KEY).(string)
clusterNamespace := req.Attribute(COOKIE_CLUSTER_NAMESPACE_KEY).(string)
sessionName := req.Attribute(COOKIE_SESSION_NAME_KEY).(string)

// Parse query parameters
nodeID := req.QueryParameter("node_id")
filename := req.QueryParameter("filename")
lines := req.QueryParameter("lines")
Comment on lines +574 to +577
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can define an struct, similar to Ray’s GetLogOptions?

Copy link
Collaborator Author

@machichima machichima Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While there's only 3 parameters here, I think it's not needed for now. If in the future we want to add support for those options, we can add it. WDYT?


// Validate required parameters
if nodeID == "" {
resp.WriteErrorString(http.StatusBadRequest, "Missing required parameter: node_id")
return
}
if filename == "" {
resp.WriteErrorString(http.StatusBadRequest, "Missing required parameter: filename")
return
}

// Prevent path traversal attacks (e.g., ../../etc/passwd)
if strings.Contains(nodeID, "..") || strings.Contains(filename, "..") {
resp.WriteErrorString(http.StatusBadRequest, fmt.Sprintf("invalid path: ../ not allowed in the path (node_id=%s, filename=%s)", nodeID, filename))
return
}

if sessionName == "live" {
s.redirectRequest(req, resp)
return
}

// Not yet supported
resp.WriteErrorString(http.StatusNotImplemented, "Node log file not yet supported")
// Convert lines parameter to int
maxLines := 0
if lines != "" {
parsedLines, err := strconv.Atoi(lines)
if err != nil {
resp.WriteErrorString(http.StatusBadRequest, fmt.Sprintf("invalid lines parameter: %s", lines))
return
}
maxLines = parsedLines
}

content, err := s._getNodeLogFile(clusterNameID+"_"+clusterNamespace, sessionName, nodeID, filename, maxLines)
if err != nil {
logrus.Errorf("Error getting node log file: %v", err)
resp.WriteError(http.StatusInternalServerError, err)
return
}
resp.Header().Set("Content-Type", "text/plain")
resp.Write(content)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
resp.Write(content)
resp.Header().Set("Content-Type", "text/plain")
resp.Write(content)

https://github.com/ray-project/ray/blob/22f7f7d85cdfe3b628b3a9e9aa37cf2ae3954820/python/ray/dashboard/modules/state/state_head.py#L264

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 9fc90c3

}

func (s *ServerHandler) getTaskSummarize(req *restful.Request, resp *restful.Response) {
Expand Down
Loading