Skip to content

Commit

Permalink
feat: make SerializeState cancellable
Browse files Browse the repository at this point in the history
SerializeState is called during post-processing, but also from the web
UI.  Under some circumstances, it can take a very long time to serialize
the data, so it's entirely possible for requests to time out before
they're returned.  Worse, clients may retry, which can result in a
pile-up.  When a request times out, we want to abort collection of that
data.

While we're at it, add more tasks and regions for tracing the process.

Note this includes breaking changes to the public go API.
  • Loading branch information
adam-azarchs committed Aug 7, 2023
1 parent c9f1cb5 commit 541bc9f
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 43 deletions.
22 changes: 16 additions & 6 deletions cmd/mrp/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,20 @@ import (
"github.com/martian-lang/martian/martian/util"
)

func getFinalState(rt *core.Runtime, pipestance *core.Pipestance) []*core.NodeInfo {
func getFinalState(ctx context.Context, rt *core.Runtime, pipestance *core.Pipestance) []*core.NodeInfo {
var target []*core.NodeInfo
if err := rt.GetSerializationInto(pipestance.GetPath(), core.FinalState, &target); err == nil {
return target
}
return pipestance.SerializeState()
return pipestance.SerializeState(ctx)
}

func getPerf(rt *core.Runtime, pipestance *core.Pipestance) []*core.NodePerfInfo {
func getPerf(ctx context.Context, rt *core.Runtime, pipestance *core.Pipestance) []*core.NodePerfInfo {
var target []*core.NodePerfInfo
if err := rt.GetSerializationInto(pipestance.GetPath(), core.Perf, &target); err == nil {
return target
}
return pipestance.SerializePerf()
return pipestance.SerializePerf(ctx)
}

func runWebServer(
Expand Down Expand Up @@ -309,9 +309,14 @@ func (self *mrpWebServer) getState(w http.ResponseWriter, req *http.Request) {
}
pipestance := self.pipestanceBox.getPipestance()
state := api.PipestanceState{
Nodes: getFinalState(self.rt, pipestance),
Nodes: getFinalState(req.Context(), self.rt, pipestance),
Info: self.pipestanceBox.info,
}
if err := req.Context().Err(); err != nil {
// Don't sending bytes if the request was canceled.
http.Error(w, err.Error(), http.StatusRequestTimeout)
return
}
self.mutex.Lock()
bytes, err := json.Marshal(&state)
self.mutex.Unlock()
Expand Down Expand Up @@ -342,7 +347,12 @@ func (self *mrpWebServer) getPerf(w http.ResponseWriter, req *http.Request) {
}
pipestance := self.pipestanceBox.getPipestance()
state := api.PerfInfo{
Nodes: getPerf(self.rt, pipestance),
Nodes: getPerf(req.Context(), self.rt, pipestance),
}
if err := req.Context().Err(); err != nil {
// Don't sending bytes if the request was canceled.
http.Error(w, err.Error(), http.StatusRequestTimeout)
return
}
bytes, err := json.Marshal(&state)
if err != nil {
Expand Down
36 changes: 26 additions & 10 deletions martian/core/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
package core

import (
"context"
"fmt"
"math"
"os"
"path"
"regexp"
"runtime/trace"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -761,13 +763,14 @@ func (self *Node) kill(message string) {
}
}

func (self *Node) postProcess() {
func (self *Node) postProcess(ctx context.Context) {
defer trace.StartRegion(ctx, "Node_postProcess").End()
os.RemoveAll(self.top.journalPath)
os.RemoveAll(self.top.tmpPath)

var errs syntax.ErrorList
for _, fork := range self.forks {
if err := fork.postProcess(); err != nil {
if err := fork.postProcess(ctx); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -776,11 +779,11 @@ func (self *Node) postProcess() {
}
}

func (self *Node) cachePerf() {
func (self *Node) cachePerf(ctx context.Context) {
if _, ok := self.vdrKill(); ok {
// Cache all fork performance info if node can be VDR-ed.
for _, fork := range self.forks {
fork.cachePerf()
fork.cachePerf(ctx)
}
}
}
Expand Down Expand Up @@ -891,11 +894,13 @@ func (self *Node) step() bool {
}
self.addFrontierNode(self)
case Complete:
ctx, task := trace.NewTask(context.Background(), "step_Complete")
defer task.End()
if vdr := self.top.rt.Config.VdrMode; vdr == VdrRolling || vdr == VdrStrict {
for _, node := range self.prenodes {
node.getNode().cachePerf()
node.getNode().cachePerf(ctx)
}
self.cachePerf()
self.cachePerf(ctx)
}
fallthrough
case DisabledState:
Expand Down Expand Up @@ -986,13 +991,20 @@ func (self *Node) refreshState(readOnly bool) {
}

// Serialization.
func (self *Node) serializeState() *NodeInfo {
func (self *Node) serializeState(ctx context.Context) *NodeInfo {
defer trace.StartRegion(ctx, "Node_serializeState").End()
forks := make([]*ForkInfo, 0, len(self.forks))
for _, fork := range self.forks {
forks = append(forks, fork.serializeState())
if ctx.Err() != nil {
return nil
}
forks = append(forks, fork.serializeState(ctx))
}
edges := make([]EdgeInfo, 0, len(self.directPrenodes))
for _, prenode := range self.directPrenodes {
if ctx.Err() != nil {
return nil
}
edges = append(edges, EdgeInfo{
From: prenode.GetFQName(),
To: self.call.GetFqid(),
Expand Down Expand Up @@ -1035,11 +1047,15 @@ func (self *Node) serializeState() *NodeInfo {
return info
}

func (self *Node) serializePerf() (*NodePerfInfo, []*VdrEvent) {
func (self *Node) serializePerf(ctx context.Context) (*NodePerfInfo, []*VdrEvent) {
defer trace.StartRegion(ctx, "Node_serializePerf").End()
forks := make([]*ForkPerfInfo, 0, len(self.forks))
var storageEvents []*VdrEvent
for _, fork := range self.forks {
forkSer, vdrKill := fork.serializePerf()
if ctx.Err() != nil {
return nil, nil
}
forkSer, vdrKill := fork.serializePerf(ctx)
forks = append(forks, forkSer)
if vdrKill != nil && self.call.Kind() != syntax.KindPipeline {
storageEvents = append(storageEvents, vdrKill.Events...)
Expand Down
42 changes: 26 additions & 16 deletions martian/core/pipestance.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,47 +653,54 @@ func (self *Pipestance) Reset() error {
return nil
}

func (self *Pipestance) SerializeState() []*NodeInfo {
func (self *Pipestance) SerializeState(ctx context.Context) []*NodeInfo {
nodes := self.allNodes()
ser := make([]*NodeInfo, 0, len(nodes))
for _, node := range nodes {
ser = append(ser, node.serializeState())
if ctx.Err() != nil {
return nil
}
ser = append(ser, node.serializeState(ctx))
}
return ser
}

func (self *Pipestance) SerializePerf() []*NodePerfInfo {
func (self *Pipestance) SerializePerf(ctx context.Context) []*NodePerfInfo {
nodes := self.allNodes()
ser := make([]*NodePerfInfo, 0, len(nodes))
for _, node := range nodes {
perf, _ := node.serializePerf()
if ctx.Err() != nil {
return nil
}
perf, _ := node.serializePerf(ctx)
ser = append(ser, perf)
}
util.LogInfo("perform", "Serializing pipestance performance data.")
if len(ser) > 0 {
if len(ser) > 0 && ctx.Err() == nil {
overallPerf := ser[0]
self.ComputeDiskUsage(overallPerf)
self.ComputeDiskUsage(ctx, overallPerf)
overallPerf.HighMem = &self.node.top.rt.LocalJobManager.highMem
}
return ser
}

func (self *Pipestance) Serialize(name MetadataFileName) interface{} {
func (self *Pipestance) Serialize(ctx context.Context, name MetadataFileName) interface{} {
switch name {
case FinalState:
return self.SerializeState()
return self.SerializeState(ctx)
case Perf:
return self.SerializePerf()
return self.SerializePerf(ctx)
default:
panic(fmt.Sprintf("Unsupported serialization type: %v", name))
}
}

func (self *Pipestance) ComputeDiskUsage(nodePerf *NodePerfInfo) *NodePerfInfo {
func (self *Pipestance) ComputeDiskUsage(ctx context.Context, nodePerf *NodePerfInfo) *NodePerfInfo {
defer trace.StartRegion(ctx, "ComputeDiskUsage").End()
nodes := self.allNodes()
allStorageEvents := make(StorageEventByTimestamp, 0, len(nodes)*2)
for _, node := range nodes {
_, storageEvents := node.serializePerf()
_, storageEvents := node.serializePerf(ctx)
for _, ev := range storageEvents {
if ev.DeltaBytes != 0 {
allStorageEvents = append(allStorageEvents,
Expand Down Expand Up @@ -797,14 +804,16 @@ func (self *Pipestance) GetVersions() (string, string, error) {
}

func (self *Pipestance) PostProcess() {
self.node.postProcess()
ctx, task := trace.NewTask(context.Background(), "PostProcess")
defer task.End()
self.node.postProcess(ctx)
start, _ := self.metadata.readRawBytes(TimestampFile)
start = append(start, "\nend: "...)
if err := self.metadata.WriteRawBytes(TimestampFile, append(start, util.Timestamp()...)); err != nil {
util.LogError(err, "runtime",
"Error writing completion timestamp.")
}
if err := self.Immortalize(false); err != nil {
if err := self.Immortalize(ctx, false); err != nil {
util.LogError(err, "runtime",
"Error finalizing pipestance state.")
}
Expand All @@ -814,19 +823,20 @@ func (self *Pipestance) PostProcess() {
// for posterity.
//
// Unless force is true, this is only permitted for locked pipestances.
func (self *Pipestance) Immortalize(force bool) error {
func (self *Pipestance) Immortalize(ctx context.Context, force bool) error {
defer trace.StartRegion(ctx, "Immortalize").End()
if !force && self.readOnly() {
return &RuntimeError{"Pipestance is in read only mode."}
}
self.metadata.loadCache()
var errs syntax.ErrorList
if !self.metadata.exists(Perf) {
if err := self.metadata.Write(Perf, self.SerializePerf()); err != nil {
if err := self.metadata.Write(Perf, self.SerializePerf(ctx)); err != nil {
errs = append(errs, err)
}
}
if !self.metadata.exists(FinalState) {
if err := self.metadata.Write(FinalState, self.SerializeState()); err != nil {
if err := self.metadata.Write(FinalState, self.SerializeState(ctx)); err != nil {
errs = append(errs, err)
}
}
Expand Down
6 changes: 5 additions & 1 deletion martian/core/post_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ package core

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"runtime/trace"
"sort"
"strconv"
"strings"
Expand All @@ -21,7 +23,9 @@ import (
"github.com/martian-lang/martian/martian/util"
)

func (self *Fork) postProcess() error {
func (self *Fork) postProcess(ctx context.Context) error {
defer trace.StartRegion(ctx, "Fork_postProcess").End()

ro := self.node.call.ResolvedOutputs()
if ro == nil {
return nil
Expand Down
4 changes: 2 additions & 2 deletions martian/core/post_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestPostProcess(t *testing.T) {
}
var buf strings.Builder
util.SetPrintLogger(&buf)
if err := fork.postProcess(); err != nil {
if err := fork.postProcess(context.Background()); err != nil {
t.Error(err)
}
util.SetPrintLogger(&devNull)
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestPostProcessEmpties(t *testing.T) {
}
var buf strings.Builder
util.SetPrintLogger(&buf)
if err := fork.postProcess(); err != nil {
if err := fork.postProcess(context.Background()); err != nil {
t.Error(err)
}
util.SetPrintLogger(&devNull)
Expand Down
4 changes: 2 additions & 2 deletions martian/core/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func TestResolveSimplePipelineOutputs(t *testing.T) {
}
var buf strings.Builder
util.SetPrintLogger(&buf)
if err := fork.postProcess(); err != nil {
if err := fork.postProcess(context.Background()); err != nil {
t.Error(err)
}
util.SetPrintLogger(&devNull)
Expand Down Expand Up @@ -595,7 +595,7 @@ func TestResolvePipelineOutputs(t *testing.T) {
}
var buf strings.Builder
util.SetPrintLogger(&buf)
if err := fork.postProcess(); err != nil {
if err := fork.postProcess(context.Background()); err != nil {
t.Error(err)
}
util.SetPrintLogger(&devNull)
Expand Down
2 changes: 1 addition & 1 deletion martian/core/runloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestPipestanceRun(t *testing.T) {
}
}
}
nodeInfos := pipestance.SerializeState()
nodeInfos := pipestance.SerializeState(context.Background())
if len(nodeInfos) != 22 {
t.Errorf("node count %d != 22", len(nodeInfos))
}
Expand Down
Loading

0 comments on commit 541bc9f

Please sign in to comment.