Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions historyserver/cmd/historyserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ func main() {
kubeconfigs := ""
runtimeClassConfigPath := "/var/collector-config/data"
dashboardDir := ""
useKubernetesProxy := false
flag.StringVar(&runtimeClassName, "runtime-class-name", "", "")
flag.StringVar(&rayRootDir, "ray-root-dir", "", "")
flag.StringVar(&kubeconfigs, "kubeconfigs", "", "")
flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "")
flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data"
flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "")
flag.Parse()

cliMgr := historyserver.NewClientManager(kubeconfigs)
cliMgr := historyserver.NewClientManager(kubeconfigs, useKubernetesProxy)

jsonData := make(map[string]interface{})
if runtimeClassConfigPath != "" {
Expand Down Expand Up @@ -75,7 +77,7 @@ func main() {
logrus.Info("EventHandler shutdown complete")
}()

handler := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler)
handler := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, useKubernetesProxy)

sigChan := make(chan os.Signal, 1)
stop := make(chan struct{}, 1)
Expand Down
70 changes: 59 additions & 11 deletions historyserver/docs/set_up_historyserver.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,31 @@ kind create cluster --image=kindest/node:v1.27.0

### 2. Build and Run Ray Operator

Build and deploy the KubeRay operator (binary or deployment).
Build and deploy the KubeRay operator (binary or deployment). For details, please refer to the
[ray-operator development guide](https://github.com/ray-project/kuberay/blob/master/ray-operator/DEVELOPMENT.md#run-the-operator-inside-the-cluster).

### 3. Deploy MinIO
### 3. Deploy & Access MinIO

```bash
kubectl apply -f historyserver/config/minio.yaml
```

Use the following command to port-forward the console and API ports. The API port is required only when running the
history server outside the kind cluster.

```bash
kubectl --namespace minio-dev port-forward svc/minio-service 9001:9001 9000:9000
```

> [!NOTE]
> Get the correct session directory from MinIO console.
> Login: `minioadmin` / `minioadmin`
> See: [MinIO Setup Guide](./set_up_collector.md#deploy-minio-for-log-and-event-storage)

### 4. Build and Load Collector & History Server Images

If you'd like to run the history server outside the Kind cluster, you don't need to build the history server image.

```bash
make -C historyserver localimage-build
kind load docker-image historyserver:v0.1.0
Expand Down Expand Up @@ -57,21 +72,54 @@ kubectl delete -f historyserver/config/raycluster.yaml
kubectl apply -f historyserver/config/service_account.yaml
```

### 9. Deploy History Server

```bash
kubectl apply -f historyserver/config/historyserver.yaml
```
### 9. Run and Access History Server

### 10. Access History Server
#### Deploy In-Cluster History Server

```bash
kubectl apply -f config/historyserver.yaml

# Port-forward to access the history server.
kubectl port-forward svc/historyserver 8080:30080
```

> **Note**: Get the correct session directory from MinIO console.
> Login: `minioadmin` / `minioadmin`
> See: [MinIO Setup Guide](./set_up_collector.md#deploy-minio-for-log-and-event-storage)
#### Run History Server Outside the Kind Cluster

You can also run the history server outside the Kind cluster to accelerate the development iteration and enable
debugging in your own IDE. For example, you can set up `.vscode/launch.json` as follows:

```json
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug (historyserver)",
"type": "go",
"request": "launch",
"program": "${workspaceFolder}/historyserver/cmd/historyserver/main.go",
"cwd": "${workspaceFolder}",
"args": [
"--runtime-class-name=s3",
"--ray-root-dir=log"
],
"env": {
"S3_REGION": "test",
// Use localhost rather than the Kubernetes service name.
"S3_ENDPOINT": "localhost:9000",
"S3_BUCKET": "ray-historyserver",
"AWS_S3ID": "minioadmin",
"AWS_S3SECRET": "minioadmin",
"AWS_S3TOKEN": "",
"S3FORCE_PATH_STYLE": "true",
"S3DISABLE_SSL": "true"
}
}
]
}
```

For setting up the `args` and `env` fields, please refer to `spec.template.spec.containers.command` and
`spec.template.spec.containers.env` in `historyserver/config/historyserver.yaml`.

---

Expand Down
28 changes: 20 additions & 8 deletions historyserver/pkg/historyserver/clientmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *ClientManager) ListRayClusters(ctx context.Context) ([]*rayv1.RayCluste
return list, nil
}

func NewClientManager(kubeconfigs string) *ClientManager {
func NewClientManager(kubeconfigs string, useKubernetesProxy bool) *ClientManager {
kubeconfigList := []*rest.Config{}
if len(kubeconfigs) > 0 {
stringList := strings.Split(kubeconfigs, ",")
Expand All @@ -60,15 +60,27 @@ func NewClientManager(kubeconfigs string) *ClientManager {
}
}
} else {
c, err := rest.InClusterConfig()
if err != nil {
logrus.Errorf("Failed to build config from kubeconfig: %v", err)
var c *rest.Config
var err error
if useKubernetesProxy {
// Load Kubernetes REST config from default kubeconfig locations (KUBECONFIG environment variable or ~/.kube/config)
// without interactive prompts.
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
configOverrides := &clientcmd.ConfigOverrides{}
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides)
c, err = clientConfig.ClientConfig()
if err != nil {
logrus.Errorf("Failed to load default kubeconfig in Kubernetes proxy mode: %v", err)
}
} else {
c.QPS = 50
c.Burst = 100
kubeconfigList = append(kubeconfigList, c)
logrus.Infof("add config from in cluster config")
c, err = rest.InClusterConfig()
if err != nil {
logrus.Errorf("Failed to build config from in-cluster kubeconfig: %v", err)
}
}
c.QPS = 50
c.Burst = 100
kubeconfigList = append(kubeconfigList, c)
}
scheme := runtime.NewScheme()
utilruntime.Must(rayv1.AddToScheme(scheme))
Expand Down
70 changes: 57 additions & 13 deletions historyserver/pkg/historyserver/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
Expand All @@ -26,6 +27,12 @@ const (
ATTRIBUTE_SERVICE_NAME = "cluster_service_name"
)

type ServiceInfo struct {
ServiceName string
Namespace string
Port int
}

func RequestLogFilter(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
logrus.Infof("Received request: %s %s", req.Request.Method, req.Request.URL.String())
chain.ProcessFilter(req, resp)
Expand Down Expand Up @@ -263,26 +270,63 @@ func (s *ServerHandler) RegisterRouter() {
}

func (s *ServerHandler) redirectRequest(req *restful.Request, resp *restful.Response) {
svcName := req.Attribute(ATTRIBUTE_SERVICE_NAME).(string)
remoteResp, err := s.httpClient.Get("http://" + svcName + req.Request.URL.String())
svcInfo := req.Attribute(ATTRIBUTE_SERVICE_NAME).(ServiceInfo)

var targetURL string
if s.useKubernetesProxy {
// Use Kubernetes API server proxy to access the in-cluster RayDashboard services.
targetURL = fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy%s",
s.clientManager.configs[0].Host,
svcInfo.Namespace,
svcInfo.ServiceName,
req.Request.URL.String())
logrus.Infof("Using Kubernetes API server proxy to access service %s/%s: %s",
svcInfo.Namespace, svcInfo.ServiceName, req.Request.URL.String())
} else {
// Connect through in-cluster service discovery.
targetURL = fmt.Sprintf("http://%s:%d%s", svcInfo.ServiceName, svcInfo.Port, req.Request.URL.String())
logrus.Infof("Using in-cluster service discovery to access service %s/%s: %s",
svcInfo.Namespace, svcInfo.ServiceName, req.Request.URL.String())
}

// Create a new request to the target URL.
proxyReq, err := http.NewRequest(req.Request.Method, targetURL, req.Request.Body)
if err != nil {
logrus.Errorf("Error: %v", err)
logrus.Errorf("Failed to create proxy request: %v", err)
resp.WriteError(http.StatusInternalServerError, err)
return
}

// Copy headers from original request to proxy request.
for key, values := range req.Request.Header {
if strings.ToLower(key) != "host" {
for _, value := range values {
// Use Add() to preserve multiple values for the same header key.
proxyReq.Header.Add(key, value)
}
}
}

// Send the proxy request to the target URL.
remoteResp, err := s.httpClient.Do(proxyReq)
if err != nil {
logrus.Errorf("Failed to proxy request to %s: %v", targetURL, err)
resp.WriteError(http.StatusBadGateway, err)
return
}
defer remoteResp.Body.Close()

// Copy headers from remote response
// Copy headers from remote response.
for key, values := range remoteResp.Header {
for _, value := range values {
resp.Header().Add(key, value)
}
}

// Set status code
// Set status code.
resp.WriteHeader(remoteResp.StatusCode)

// Copy response body
// Copy response body.
_, err = io.Copy(resp, remoteResp.Body)
if err != nil {
logrus.Errorf("Failed to copy response body: %v", err)
Expand Down Expand Up @@ -792,12 +836,12 @@ func (s *ServerHandler) CookieHandle(req *restful.Request, resp *restful.Respons
// Always query K8s to get the service name to prevent SSRF attacks.
// Do not trust user-provided cookies for service name.
// TODO: here might be a bottleneck if there are many requests in the future.
svcName, err := getClusterSvcName(s.clientManager.clients, clusterName.Value, clusterNamespace.Value)
svcInfo, err := getClusterSvcInfo(s.clientManager.clients, clusterName.Value, clusterNamespace.Value)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusBadRequest, err.Error())
return
}
req.SetAttribute(ATTRIBUTE_SERVICE_NAME, svcName)
req.SetAttribute(ATTRIBUTE_SERVICE_NAME, svcInfo)
}
req.SetAttribute(COOKIE_CLUSTER_NAME_KEY, clusterName.Value)
req.SetAttribute(COOKIE_SESSION_NAME_KEY, sessionName.Value)
Expand All @@ -806,19 +850,19 @@ func (s *ServerHandler) CookieHandle(req *restful.Request, resp *restful.Respons
chain.ProcessFilter(req, resp)
}

func getClusterSvcName(clis []client.Client, name, namespace string) (string, error) {
func getClusterSvcInfo(clis []client.Client, name, namespace string) (ServiceInfo, error) {
if len(clis) == 0 {
return "", errors.New("No available kubernetes config found")
return ServiceInfo{}, errors.New("No available kubernetes config found")
}
cli := clis[0]
rc := rayv1.RayCluster{}
err := cli.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: name}, &rc)
if err != nil {
return "", errors.New("RayCluster not found")
return ServiceInfo{}, errors.New("RayCluster not found")
}
svcName := rc.Status.Head.ServiceName
if svcName == "" {
return "", errors.New("RayCluster head service not ready")
return ServiceInfo{}, errors.New("RayCluster head service not ready")
}
return svcName + ":8265", nil
return ServiceInfo{ServiceName: svcName, Namespace: namespace, Port: 8265}, nil
}
39 changes: 34 additions & 5 deletions historyserver/pkg/historyserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ray-project/kuberay/historyserver/pkg/eventserver"
"github.com/ray-project/kuberay/historyserver/pkg/storage"
"github.com/sirupsen/logrus"
"k8s.io/client-go/transport"
)

type ServerHandler struct {
Expand All @@ -21,10 +22,12 @@ type ServerHandler struct {
clientManager *ClientManager
eventHandler *eventserver.EventHandler
httpClient *http.Client

useKubernetesProxy bool
}

func NewServerHandler(c *types.RayHistoryServerConfig, dashboardDir string, reader storage.StorageReader, clientManager *ClientManager, eventHandler *eventserver.EventHandler) *ServerHandler {
return &ServerHandler{
func NewServerHandler(c *types.RayHistoryServerConfig, dashboardDir string, reader storage.StorageReader, clientManager *ClientManager, eventHandler *eventserver.EventHandler, useKubernetesProxy bool) *ServerHandler {
handler := &ServerHandler{
reader: reader,
clientManager: clientManager,
eventHandler: eventHandler,
Expand All @@ -33,10 +36,36 @@ func NewServerHandler(c *types.RayHistoryServerConfig, dashboardDir string, read
dashboardDir: dashboardDir,
// TODO: make this configurable
maxClusters: 100,
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
}

if len(clientManager.configs) > 0 {
k8sRestConfig := clientManager.configs[0]
if useKubernetesProxy {
transportConfig, err := k8sRestConfig.TransportConfig()
if err != nil {
logrus.Errorf("Failed to get transport config: %v", err)
} else {
// Create a Kubernetes-aware round tripper that can handle authentication and transport security.
rt, err := transport.New(transportConfig)
if err != nil {
logrus.Errorf("Failed to create Kubernetes-aware round tripper: %v", err)
} else {
handler.httpClient = &http.Client{
Timeout: 30 * time.Second,
Transport: rt,
}
handler.useKubernetesProxy = true
}
}
} else {
// Create a simple HTTP client that doesn't use Kubernetes API server proxy.
handler.httpClient = &http.Client{
Timeout: 30 * time.Second,
}
handler.useKubernetesProxy = false
}
}
return handler
}

func (s *ServerHandler) Run(stop chan struct{}) error {
Expand Down
Loading