Skip to content

Commit

Permalink
[prism] Fusion base, reshuffle, cogbk. (apache#27737)
Browse files Browse the repository at this point in the history
* [prism] Fusion base, reshuffle, cogbk.

* silence logging, better message

* precise reshuffle strategy filtering

* Remove leftover comments

* remove decommissioned function

* fix typos.

---------

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
  • Loading branch information
2 people authored and bullet03 committed Aug 11, 2023
1 parent 71e1985 commit 82f11c3
Show file tree
Hide file tree
Showing 15 changed files with 760 additions and 231 deletions.
11 changes: 9 additions & 2 deletions sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func newBuilder(desc *fnpb.ProcessBundleDescriptor) (*builder, error) {

input := unmarshalKeyedValues(transform.GetInputs())
for i, from := range input {
succ[from] = append(succ[from], linkID{id, i})
// We don't need to multiplex successors for pardo side inputs.
// so we only do so for SDK side Flattens.
if i == 0 || transform.GetSpec().GetUrn() == graphx.URNFlatten {
succ[from] = append(succ[from], linkID{id, i})
}
}
output := unmarshalKeyedValues(transform.GetOutputs())
for _, to := range output {
Expand Down Expand Up @@ -731,7 +735,10 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
}
// Strip PCollections from Expand nodes, as CoGBK metrics are handled by
// the DataSource that preceeds them.
trueOut := out[0].(*PCollection).Out
trueOut := out[0]
if pcol, ok := trueOut.(*PCollection); ok {
trueOut = pcol.Out
}
b.units = b.units[:len(b.units)-1]
u = &Expand{UID: b.idgen.New(), ValueDecoders: decoders, Out: trueOut}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func (ss *stageState) startBundle(watermark mtime.Time, genBundID func() string)

var toProcess, notYet []element
for _, e := range ss.pending {
if !ss.aggregate || ss.aggregate && ss.strat.EarliestCompletion(e.window) <= watermark {
if !ss.aggregate || ss.aggregate && ss.strat.EarliestCompletion(e.window) < watermark {
toProcess = append(toProcess, e)
} else {
notYet = append(notYet, e)
Expand Down
27 changes: 15 additions & 12 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ func RunPipeline(j *jobservices.Job) {
j.SendMsg("running " + j.String())
j.Running()

executePipeline(j.RootCtx, wk, j)
err := executePipeline(j.RootCtx, wk, j)
if err != nil {
j.Failed(err)
return
}
j.SendMsg("pipeline completed " + j.String())

// Stop the worker.
Expand Down Expand Up @@ -126,14 +130,14 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo
type transformExecuter interface {
ExecuteUrns() []string
ExecuteWith(t *pipepb.PTransform) string
ExecuteTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, data [][]byte) *worker.B
ExecuteTransform(stageID, tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, data [][]byte) *worker.B
}

type processor struct {
transformExecuters map[string]transformExecuter
}

func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) error {
pipeline := j.Pipeline
comps := proto.Clone(pipeline.GetComponents()).(*pipepb.Components)

Expand All @@ -145,7 +149,8 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
Combine(CombineCharacteristic{EnableLifting: true}),
ParDo(ParDoCharacteristic{DisableSDF: true}),
Runner(RunnerCharacteristic{
SDKFlatten: false,
SDKFlatten: false,
SDKReshuffle: false,
}),
}

Expand Down Expand Up @@ -175,10 +180,7 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
// TODO move this loop and code into the preprocessor instead.
stages := map[string]*stage{}
var impulses []string
for i, stage := range topo {
if len(stage.transforms) != 1 {
panic(fmt.Sprintf("unsupported stage[%d]: contains multiple transforms: %v; TODO: implement fusion", i, stage.transforms))
}
for _, stage := range topo {
tid := stage.transforms[0]
t := ts[tid]
urn := t.GetSpec().GetUrn()
Expand Down Expand Up @@ -255,16 +257,16 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
wk.Descriptors[stage.ID] = stage.desc
case wk.ID:
// Great! this is for this environment. // Broken abstraction.
buildStage(stage, tid, t, comps, wk)
buildDescriptor(stage, comps, wk)
stages[stage.ID] = stage
slog.Debug("pipelineBuild", slog.Group("stage", slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName())))
outputs := maps.Keys(stage.OutputsToCoders)
sort.Strings(outputs)
em.AddStage(stage.ID, []string{stage.mainInputPCol}, stage.sides, outputs)
em.AddStage(stage.ID, []string{stage.primaryInput}, stage.sides, outputs)
default:
err := fmt.Errorf("unknown environment[%v]", t.GetEnvironmentId())
slog.Error("Execute", err)
panic(err)
return err
}
}

Expand All @@ -285,6 +287,7 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
}(rb)
}
slog.Info("pipeline done!", slog.String("job", j.String()))
return nil
}

func collectionPullDecoder(coldCId string, coders map[string]*pipepb.Coder, comps *pipepb.Components) func(io.Reader) []byte {
Expand All @@ -300,7 +303,7 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod

func getOnlyValue[K comparable, V any](in map[K]V) V {
if len(in) != 1 {
panic(fmt.Sprintf("expected single value map, had %v", len(in)))
panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in))
}
for _, v := range in {
return v
Expand Down
151 changes: 149 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
Expand Down Expand Up @@ -319,6 +320,61 @@ func TestRunner_Pipelines(t *testing.T) {
Want: []int{16, 17, 18},
}, sum)
},
}, {
name: "sideinput_sameAsMainInput",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col0 := beam.ParDo(s, dofn1, imp)
sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col0}, beam.SideInput{Input: col0})
beam.ParDo(s, &int64Check{
Name: "sum sideinput check",
Want: []int{13, 14, 15},
}, sum)
},
}, {
name: "sideinput_sameAsMainInput+Derived",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col0 := beam.ParDo(s, dofn1, imp)
col1 := beam.ParDo(s, dofn2, col0)
// Doesn't matter which of col0 or col1 is used.
sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col0}, beam.SideInput{Input: col1})
beam.ParDo(s, &int64Check{
Name: "sum sideinput check",
Want: []int{16, 17, 18},
}, sum)
},
}, {
// Main input is getting duplicated data, since it's being executed twice...
// But that doesn't make any sense
name: "sideinput_2iterable1Data2",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col0 := beam.ParDo(s, dofn1, imp)
col1 := beam.ParDo(s, dofn2, col0)
col2 := beam.ParDo(s, dofn2, col0)
// Doesn't matter which of col1 or col2 is used.
sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col2}, beam.SideInput{Input: col1})
beam.ParDo(s, &int64Check{
Name: "iter sideinput check",
Want: []int{19, 20, 21},
}, sum)
},
}, {
// Re-use the same side inputs sequentially (the two consumers should be in the same stage.)
name: "sideinput_two_2iterable1Data",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col0 := beam.ParDo(s, dofn1, imp)
sideIn1 := beam.ParDo(s, dofn1, imp)
sideIn2 := beam.ParDo(s, dofn1, imp)
col1 := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: sideIn1}, beam.SideInput{Input: sideIn2})
sum := beam.ParDo(s, dofn3x1, col1, beam.SideInput{Input: sideIn1}, beam.SideInput{Input: sideIn2})
beam.ParDo(s, &int64Check{
Name: "check_sideinput_re-use",
Want: []int{25, 26, 27},
}, sum)
},
}, {
name: "combine_perkey",
pipeline: func(s beam.Scope) {
Expand Down Expand Up @@ -380,6 +436,30 @@ func TestRunner_Pipelines(t *testing.T) {
}, flat)
passert.NonEmpty(s, flat)
},
}, {
name: "gbk_into_gbk",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col1 := beam.ParDo(s, dofnKV, imp)
gbk1 := beam.GroupByKey(s, col1)
col2 := beam.ParDo(s, dofnGBKKV, gbk1)
gbk2 := beam.GroupByKey(s, col2)
out := beam.ParDo(s, dofnGBK, gbk2)
passert.Equals(s, out, int64(9), int64(12))
},
}, {
name: "lperror_gbk_into_cogbk_shared_input",
pipeline: func(s beam.Scope) {
want := beam.CreateList(s, []int{0})
fruits := beam.CreateList(s, []int64{42, 42, 42})
fruitsKV := beam.AddFixedKey(s, fruits)

fruitsGBK := beam.GroupByKey(s, fruitsKV)
fooKV := beam.ParDo(s, toFoo, fruitsGBK)
fruitsFooCoGBK := beam.CoGroupByKey(s, fruitsKV, fooKV)
got := beam.ParDo(s, toID, fruitsFooCoGBK)
passert.Equals(s, got, want)
},
},
}
// TODO: Explicit DoFn Failure case.
Expand Down Expand Up @@ -429,8 +509,75 @@ func TestFailure(t *testing.T) {
if err == nil {
t.Fatalf("expected pipeline failure, but got a success")
}
// Job failure state reason isn't communicated with the state change over the API
// so we can't check for a reason here.
if want := "doFnFail: failing as intended"; !strings.Contains(err.Error(), want) {
t.Fatalf("expected pipeline failure with %q, but was %v", want, err)
}
}

func TestRunner_Passert(t *testing.T) {
initRunner(t)
tests := []struct {
name string
pipeline func(s beam.Scope)
metrics func(t *testing.T, pr beam.PipelineResult)
}{
{
name: "Empty",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col1 := beam.ParDo(s, dofnEmpty, imp)
passert.Empty(s, col1)
},
}, {
name: "Equals-TwoEmpty",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col1 := beam.ParDo(s, dofnEmpty, imp)
col2 := beam.ParDo(s, dofnEmpty, imp)
passert.Equals(s, col1, col2)
},
}, {
name: "Equals",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col1 := beam.ParDo(s, dofn1, imp)
col2 := beam.ParDo(s, dofn1, imp)
passert.Equals(s, col1, col2)
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
test.pipeline(s)
pr, err := executeWithT(context.Background(), t, p)
if err != nil {
t.Fatal(err)
}
if test.metrics != nil {
test.metrics(t, pr)
}
})
}
}

func toFoo(et beam.EventTime, id int, _ func(*int64) bool) (int, string) {
return id, "ooo"
}

func toID(et beam.EventTime, id int, fruitIter func(*int64) bool, fooIter func(*string) bool) int {
var fruit int64
for fruitIter(&fruit) {
}
var foo string
for fooIter(&foo) {
}
return id
}

func init() {
register.Function3x2(toFoo)
register.Function4x1(toID)
}

// TODO: PCollection metrics tests, in particular for element counts, in multi transform pipelines
Expand Down
70 changes: 65 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ import (
// RunnerCharacteristic holds the configuration for Runner based transforms,
// such as GBKs, Flattens.
type RunnerCharacteristic struct {
SDKFlatten bool // Sets whether we should force an SDK side flatten.
SDKGBK bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK.
SDKFlatten bool // Sets whether we should force an SDK side flatten.
SDKGBK bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK.
SDKReshuffle bool
}

func Runner(config any) *runner {
Expand All @@ -63,13 +64,72 @@ func (*runner) ConfigCharacteristic() reflect.Type {
return reflect.TypeOf((*RunnerCharacteristic)(nil)).Elem()
}

var _ transformPreparer = (*runner)(nil)

func (*runner) PrepareUrns() []string {
return []string{urns.TransformReshuffle}
}

// PrepareTransform handles special processing with respect runner transforms, like reshuffle.
func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
// TODO: Implement the windowing strategy the "backup" transforms used for Reshuffle.
// TODO: Implement a fusion break for reshuffles.

if h.config.SDKReshuffle {
panic("SDK side reshuffle not yet supported")
}

// A Reshuffle, in principle, is a no-op on the pipeline structure, WRT correctness.
// It could however affect performance, so it exists to tell the runner that this
// point in the pipeline needs a fusion break, to enable the pipeline to change it's
// degree of parallelism.
//
// The change of parallelism goes both ways. It could allow for larger batch sizes
// enable smaller batch sizes downstream if it is infact paralleizable.
//
// But for a single transform node per stage runner, we can elide it entirely,
// since the input collection and output collection types match.

// Get the input and output PCollections, there should only be 1 each.
if len(t.GetInputs()) != 1 {
panic("Expected single input PCollection in reshuffle: " + prototext.Format(t))
}
if len(t.GetOutputs()) != 1 {
panic("Expected single output PCollection in reshuffle: " + prototext.Format(t))
}

inColID := getOnlyValue(t.GetInputs())
outColID := getOnlyValue(t.GetOutputs())

// We need to find all Transforms that consume the output collection and
// replace them so they consume the input PCollection directly.

// We need to remove the consumers of the output PCollection.
toRemove := []string{}

for _, t := range comps.GetTransforms() {
for li, gi := range t.GetInputs() {
if gi == outColID {
// The whole s
t.GetInputs()[li] = inColID
}
}
}

// And all the sub transforms.
toRemove = append(toRemove, t.GetSubtransforms()...)

// Return the new components which is the transforms consumer
return nil, toRemove
}

var _ transformExecuter = (*runner)(nil)

func (*runner) ExecuteUrns() []string {
return []string{urns.TransformFlatten, urns.TransformGBK}
return []string{urns.TransformFlatten, urns.TransformGBK, urns.TransformReshuffle}
}

// ExecuteWith returns what environment the
// ExecuteWith returns what environment the transform should execute in.
func (h *runner) ExecuteWith(t *pipepb.PTransform) string {
urn := t.GetSpec().GetUrn()
if urn == urns.TransformFlatten && !h.config.SDKFlatten {
Expand All @@ -82,7 +142,7 @@ func (h *runner) ExecuteWith(t *pipepb.PTransform) string {
}

// ExecuteTransform handles special processing with respect to runner specific transforms
func (h *runner) ExecuteTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, inputData [][]byte) *worker.B {
func (h *runner) ExecuteTransform(stageID, tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, inputData [][]byte) *worker.B {
urn := t.GetSpec().GetUrn()
var data [][]byte
var onlyOut string
Expand Down
Loading

0 comments on commit 82f11c3

Please sign in to comment.