Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions historyserver/cmd/collector/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package main

import (
"context"
"encoding/json"
"flag"
"os"
"os/signal"
"path"
"sync"
"syscall"
"time"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -89,18 +91,39 @@ func main() {
panic("Failed to create writer for runtime class name: " + runtimeClassName + " for role: " + role + ".")
}

var wg sync.WaitGroup

sigChan := make(chan os.Signal, 1)
stop := make(chan struct{}, 1)
eventStop := make(chan struct{}, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

wg.Add(1)
// Create and initialize EventServer
eventServer := eventserver.NewEventServer(writer, rayRootDir, sessionDir, rayNodeId, rayClusterName, rayClusterId, sessionName)
eventServer.InitServer(eventsPort)

collector := runtime.NewCollector(&globalConfig, writer)
_ = collector.Start(context.TODO().Done())

eventStop := eventServer.WaitForStop()
logStop := collector.WaitForStop()
<-eventStop
logrus.Info("Event server shutdown")
<-logStop
logrus.Info("Log server shutdown")
logrus.Info("All servers shutdown")
go func() {
defer wg.Done()
eventServer := eventserver.NewEventServer(writer, rayRootDir, sessionDir, rayNodeId, rayClusterName, rayClusterId, sessionName)
eventServer.InitServer(eventStop, eventsPort)
logrus.Info("Event server shutdown")
}()

wg.Add(1)
go func() {
defer wg.Done()
collector := runtime.NewCollector(&globalConfig, writer)
collector.Start(stop)
<-collector.WaitForStop()
logrus.Info("Log server shutdown")
}()

<-sigChan
logrus.Info("Received shutdown signal, initiating graceful shutdown...")

// Stop both the event server and the collector
stop <- struct{}{}
eventStop <- struct{}{}

// Wait for both goroutines to complete
wg.Wait()
logrus.Info("Graceful shutdown complete")
}
18 changes: 5 additions & 13 deletions historyserver/pkg/collector/eventserver/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"io"
"net/http"
"os"
"os/signal"
"path"
"strings"
"sync"
"syscall"
"time"

"github.com/emicklei/go-restful/v3"
Expand Down Expand Up @@ -83,7 +81,7 @@ func NewEventServer(writer storage.StorageWriter, rootDir, sessionDir, nodeID, c
return server
}

func (es *EventServer) InitServer(port int) {
func (es *EventServer) InitServer(stop chan struct{}, port int) {
ws := new(restful.WebService)
ws.Path("/v1")
ws.Consumes(restful.MIME_JSON)
Expand All @@ -101,16 +99,10 @@ func (es *EventServer) InitServer(port int) {
es.periodicFlush()
}()

// Handle SIGTERM signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)

go func() {
<-sigChan
logrus.Info("Received SIGTERM, flushing events to storage")
es.flushEvents()
close(es.stopped)
}()
<-stop
logrus.Info("Received SIGTERM, flushing events to storage")
es.flushEvents()
close(es.stopped)
}

// watchNodeIDFile watches /tmp/ray/raylet_node_id for content changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (
"io/fs"
"net/http"
"os"
"os/signal"
"path"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -61,10 +59,6 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error {
}
defer watcher.Close()

// Setup signal handling for SIGTERM
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)

// WatchPrevLogsLoops performs an initial scan of the prev-logs directory on startup
// to process leftover log files in prev-logs/{sessionID}/{nodeID}/logs/ directories.
// After scanning, it watches for new directories and files. This ensures incomplete
Expand All @@ -74,17 +68,11 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error {
go r.WatchSessionLatestLoops() // Watch session_latest symlink changes
}

select {
case <-sigChan:
logrus.Info("Received SIGTERM, processing all logs...")
r.processSessionLatestLogs()
close(r.ShutdownChan)
case <-stop:
logrus.Info("Received stop signal, processing all logs...")
r.processSessionLatestLogs()
close(r.ShutdownChan)
}
logrus.Warnf("Receive stop single, so stop ray collector ")
<-stop
logrus.Info("Received stop signal, processing all logs...")
r.processSessionLatestLogs()
close(r.ShutdownChan)

return nil
}

Expand Down
Loading