Skip to content

Commit

Permalink
remove decommissioned function
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Jul 28, 2023
1 parent 5b28d6d commit 680cf56
Showing 1 changed file with 0 additions and 30 deletions.
30 changes: 0 additions & 30 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,36 +374,6 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
return nil
}

// handleSideInputs ensures appropriate coders are available to the bundle, and prepares a function to stage the data.
func handleSideInputs(tid string, t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W, replacements map[string]string) (func(b *worker.B, tid string, watermark mtime.Time), error) {
sis, err := getSideInputs(t)
if err != nil {
return nil, err
}
var prepSides []func(b *worker.B, watermark mtime.Time)

// Get WindowedValue Coders for the transform's input and output PCollections.
for local, global := range t.GetInputs() {
_, ok := sis[local]
if !ok {
continue // This is the main input.
}
if oldGlobal, ok := replacements[global]; ok {
global = oldGlobal
}
prepSide, err := handleSideInput(tid, local, global, comps, coders, wk)
if err != nil {
return nil, err
}
prepSides = append(prepSides, prepSide)
}
return func(b *worker.B, tid string, watermark mtime.Time) {
for _, prep := range prepSides {
prep(b, watermark)
}
}, nil
}

// handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark.
func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, watermark mtime.Time), error) {
t := comps.GetTransforms()[tid]
Expand Down

0 comments on commit 680cf56

Please sign in to comment.