Skip to content

Commit

Permalink
[prism] side input coders & progress hang (apache#27572)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
  • Loading branch information
2 people authored and bullet03 committed Aug 11, 2023
1 parent ade733b commit 9f74f55
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 38 deletions.
11 changes: 9 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,15 @@ func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) {
}
ready := true
for _, side := range ss.sides {
pID := em.pcolParents[side]
parent := em.stages[pID]
pID, ok := em.pcolParents[side]
// These panics indicate pre-process/stage construction problems.
if !ok {
panic(fmt.Sprintf("stage[%v] no parent ID for side input %v", ss.ID, side))
}
parent, ok := em.stages[pID]
if !ok {
panic(fmt.Sprintf("stage[%v] no parent for side input %v, with parent ID %v", ss.ID, side, pID))
}
ow := parent.OutputWatermark()
if upstreamW > ow {
ready = false
Expand Down
78 changes: 57 additions & 21 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ progress:
progTick.Stop()
break progress // exit progress loop on close.
case <-progTick.C:
resp := b.Progress(wk)
resp, err := b.Progress(wk)
if err != nil {
slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error())
break progress
}
index, unknownIDs := j.ContributeTentativeMetrics(resp)
if len(unknownIDs) > 0 {
md := wk.MonitoringMetadata(unknownIDs)
Expand All @@ -125,7 +129,11 @@ progress:
slog.Debug("progress report", "bundle", rb, "index", index)
// Progress for the bundle hasn't advanced. Try splitting.
if previousIndex == index && !splitsDone {
sr := b.Split(wk, 0.5 /* fraction of remainder */, nil /* allowed splits */)
sr, err := b.Split(wk, 0.5 /* fraction of remainder */, nil /* allowed splits */)
if err != nil {
slog.Debug("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error())
break progress
}
if sr.GetChannelSplits() == nil {
slog.Warn("split failed", "bundle", rb)
splitsDone = true
Expand Down Expand Up @@ -164,8 +172,8 @@ progress:
// Bundle has failed, fail the job.
// TODO add retries & clean up this logic. Channels are closed by the "runner" transforms.
if !ok && b.Error != "" {
slog.Error("job failed", "error", b.Error, "bundle", rb, "job", j)
j.Failed(fmt.Errorf("bundle failed: %v", b.Error))
slog.Error("job failed", "bundle", rb, "job", j)
j.Failed(fmt.Errorf("%v", b.Error))
return
}

Expand Down Expand Up @@ -245,31 +253,54 @@ func buildStage(s *stage, tid string, t *pipepb.PTransform, comps *pipepb.Compon
}
var inputInfo engine.PColInfo
var sides []string
localIdReplacements := map[string]string{}
globalIDReplacements := map[string]string{}
for local, global := range t.GetInputs() {
if _, ok := sis[local]; ok {
col := comps.GetPcollections()[global]
oCID := col.GetCoderId()
nCID := lpUnknownCoders(oCID, coders, comps.GetCoders())

sides = append(sides, global)
if oCID != nCID {
// Add a synthetic PCollection set with the new coder.
newGlobal := global + "_prismside"
comps.GetPcollections()[newGlobal] = &pipepb.PCollection{
DisplayData: col.GetDisplayData(),
UniqueName: col.GetUniqueName(),
CoderId: nCID,
IsBounded: col.GetIsBounded(),
WindowingStrategyId: col.WindowingStrategyId,
}
localIdReplacements[local] = newGlobal
globalIDReplacements[newGlobal] = global
}
continue
}
// This id is directly used for the source, but this also copies
// coders used by side inputs to the coders map for the bundle, so
// needs to be run for every ID.
wInCid := makeWindowedValueCoder(global, comps, coders)
_, ok := sis[local]
if ok {
sides = append(sides, global)
} else {
// this is the main input
transforms[s.inputTransformID] = sourceTransform(s.inputTransformID, portFor(wInCid, wk), global)
col := comps.GetPcollections()[global]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
inputInfo = engine.PColInfo{
GlobalID: global,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
}

// this is the main input
transforms[s.inputTransformID] = sourceTransform(s.inputTransformID, portFor(wInCid, wk), global)
col := comps.GetPcollections()[global]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
inputInfo = engine.PColInfo{
GlobalID: global,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
}
// We need to process all inputs to ensure we have all input coders, so we must continue.
}
// Update side inputs to point to new PCollection with any replaced coders.
for l, g := range localIdReplacements {
t.GetInputs()[l] = g
}

prepareSides, err := handleSideInputs(t, comps, coders, wk)
prepareSides, err := handleSideInputs(t, comps, coders, wk, globalIDReplacements)
if err != nil {
slog.Error("buildStage: handleSideInputs", err, slog.String("transformID", tid))
panic(err)
Expand Down Expand Up @@ -322,7 +353,7 @@ func buildStage(s *stage, tid string, t *pipepb.PTransform, comps *pipepb.Compon
}

// handleSideInputs ensures appropriate coders are available to the bundle, and prepares a function to stage the data.
func handleSideInputs(t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, tid string, watermark mtime.Time), error) {
func handleSideInputs(t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W, replacements map[string]string) (func(b *worker.B, tid string, watermark mtime.Time), error) {
sis, err := getSideInputs(t)
if err != nil {
return nil, err
Expand All @@ -335,6 +366,11 @@ func handleSideInputs(t *pipepb.PTransform, comps *pipepb.Components, coders map
if !ok {
continue // This is the main input.
}
// Use the old global ID as the identifier for the data storage
// This matches what we do in the rest of the stage layer.
if oldGlobal, ok := replacements[global]; ok {
global = oldGlobal
}

// this is a side input
switch si.GetAccessPattern().GetUrn() {
Expand Down
21 changes: 15 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package worker

import (
"fmt"
"sync/atomic"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
Expand Down Expand Up @@ -144,18 +145,22 @@ func (b *B) Cleanup(wk *W) {
wk.mu.Unlock()
}

func (b *B) Progress(wk *W) *fnpb.ProcessBundleProgressResponse {
return wk.sendInstruction(&fnpb.InstructionRequest{
func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) {
resp := wk.sendInstruction(&fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleProgress{
ProcessBundleProgress: &fnpb.ProcessBundleProgressRequest{
InstructionId: b.InstID,
},
},
}).GetProcessBundleProgress()
})
if resp.GetError() != "" {
return nil, fmt.Errorf("progress[%v] error from SDK: %v", b.InstID, resp.GetError())
}
return resp.GetProcessBundleProgress(), nil
}

func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) *fnpb.ProcessBundleSplitResponse {
return wk.sendInstruction(&fnpb.InstructionRequest{
func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) (*fnpb.ProcessBundleSplitResponse, error) {
resp := wk.sendInstruction(&fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleSplit{
ProcessBundleSplit: &fnpb.ProcessBundleSplitRequest{
InstructionId: b.InstID,
Expand All @@ -168,5 +173,9 @@ func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) *fnpb.ProcessB
},
},
},
}).GetProcessBundleSplit()
})
if resp.GetError() != "" {
return nil, fmt.Errorf("split[%v] error from SDK: %v", b.InstID, resp.GetError())
}
return resp.GetProcessBundleSplit(), nil
}
22 changes: 13 additions & 9 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,7 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
// TODO: Do more than assume these are ProcessBundleResponses.
wk.mu.Lock()
if b, ok := wk.activeInstructions[resp.GetInstructionId()]; ok {
// TODO. Better pipeline error handling.
if resp.Error != "" {
slog.LogAttrs(ctrl.Context(), slog.LevelError, "ctrl.Recv pipeline error",
slog.String("error", resp.GetError()))
}
// Error is handled in the resonse handler.
b.Respond(resp)
} else {
slog.Debug("ctrl.Recv: %v", resp)
Expand Down Expand Up @@ -327,12 +323,20 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
}
}()

for req := range wk.DataReqs {
if err := data.Send(req); err != nil {
slog.LogAttrs(context.TODO(), slog.LevelDebug, "data.Send error", slog.Any("error", err))
for {
select {
case req, ok := <-wk.DataReqs:
if !ok {
return nil
}
if err := data.Send(req); err != nil {
slog.LogAttrs(context.TODO(), slog.LevelDebug, "data.Send error", slog.Any("error", err))
}
case <-data.Context().Done():
slog.Debug("Data context canceled")
return data.Context().Err()
}
}
return nil
}

// State relays elements and timer bytes to SDKs and back again, coordinated via
Expand Down

0 comments on commit 9f74f55

Please sign in to comment.