From a950ac90a41d67b7c10bbcafc089b153b33bbe22 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Thu, 20 Jul 2023 06:59:09 -0700 Subject: [PATCH] Clarify coders for side inputs on err output. (#27568) * Clarify coders for side inputs on err output. * error tracking in harness. --------- Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> --- sdks/go/pkg/beam/core/runtime/exec/pardo.go | 2 +- sdks/go/pkg/beam/core/runtime/exec/sideinput.go | 2 +- sdks/go/pkg/beam/core/runtime/harness/harness.go | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index 212ff53b6dd8..b93835264507 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -552,5 +552,5 @@ func (n *ParDo) fail(err error) error { } func (n *ParDo) String() string { - return fmt.Sprintf("ParDo[%v] Out:%v Sig: %v", path.Base(n.Fn.Name()), IDs(n.Out...), n.Fn.ProcessElementFn().Fn.Type()) + return fmt.Sprintf("ParDo[%v] Out:%v Sig: %v, SideInputs: %v", path.Base(n.Fn.Name()), IDs(n.Out...), n.Fn.ProcessElementFn().Fn.Type(), n.Side) } diff --git a/sdks/go/pkg/beam/core/runtime/exec/sideinput.go b/sdks/go/pkg/beam/core/runtime/exec/sideinput.go index 1af4e71689b1..c3ceeee5d8b8 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sideinput.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sideinput.go @@ -140,7 +140,7 @@ func (s *sideInputAdapter) NewKeyedIterable(ctx context.Context, reader StateRea } func (s *sideInputAdapter) String() string { - return fmt.Sprintf("SideInputAdapter[%v, %v]", s.sid, s.sideInputID) + return fmt.Sprintf("SideInputAdapter[%v, %v] - Coder %v", s.sid, s.sideInputID, s.c) } // proxyReStream is a simple wrapper of an open function. diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 3f0e82c8265f..d97b6b7db079 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -393,7 +393,8 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe c.mu.Unlock() if err != nil { - return fail(ctx, instID, "Failed: %v", err) + c.failed[instID] = err + return fail(ctx, instID, "ProcessBundle failed: %v", err) } tokens := msg.GetCacheTokens() @@ -427,6 +428,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe // If there was an error on the data channel reads, fail this bundle // since we may have had a short read. c.failed[instID] = dataError + err = dataError } else { // Non failure plans should either be moved to the finalized state // or to plans so they can be re-used.