Skip to content

Commit ad05c34

Browse files
committed
Make user side coders more convenient.
1 parent 869f233 commit ad05c34

File tree

7 files changed

+123
-78
lines changed

7 files changed

+123
-78
lines changed

sdks/go/pkg/beam/coder.go

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ package beam
1818
import (
1919
"encoding/json"
2020
"fmt"
21+
"io"
2122
"reflect"
2223

2324
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
2425
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx"
26+
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
2527
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
2628
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
2729
"github.com/golang/protobuf/proto"
@@ -68,20 +70,56 @@ func (c Coder) String() string {
6870
return c.coder.String()
6971
}
7072

71-
// TODO(herohde) 4/4/2017: for convenience, we use the magic json coding
72-
// everywhere. To be replaced by Coder registry, sharing, etc.
73+
// NewElementEncoder returns a new encoding function for the given type.
74+
func NewElementEncoder(t reflect.Type) ElementEncoder {
75+
c, err := inferCoder(typex.New(t))
76+
if err != nil {
77+
panic(err)
78+
}
79+
return &execEncoder{enc: exec.MakeElementEncoder(c)}
80+
}
7381

74-
// TODO: select optimal coder based on type, notably handling int, string, etc.
82+
// execEncoder wraps an exec.ElementEncoder to implement the ElementDecoder interface
83+
// in this package.
84+
type execEncoder struct {
85+
enc exec.ElementEncoder
86+
coder *coder.Coder
87+
}
7588

76-
// TODO(herohde) 7/11/2017: figure out best way to let transformation use
77-
// coders (like passert). For now, we just allow them to grab in the internal
78-
// coder. Maybe it's cleaner to pull Encode/Decode into beam instead, if
79-
// adequate. The issue is that we would need non-windowed coding. Maybe focus on
80-
// coder registry and construction: then type -> coder might be adequate.
89+
func (e *execEncoder) Encode(element interface{}, w io.Writer) error {
90+
return e.enc.Encode(exec.FullValue{Elm: element}, w)
91+
}
8192

82-
// UnwrapCoder returns the internal coder.
83-
func UnwrapCoder(c Coder) *coder.Coder {
84-
return c.coder
93+
func (e *execEncoder) String() string {
94+
return e.coder.String()
95+
}
96+
97+
// NewElementDecoder returns an ElementDecoder the given type.
98+
func NewElementDecoder(t reflect.Type) ElementDecoder {
99+
c, err := inferCoder(typex.New(t))
100+
if err != nil {
101+
panic(err)
102+
}
103+
return &execDecoder{dec: exec.MakeElementDecoder(c)}
104+
}
105+
106+
// execDecoder wraps an exec.ElementDecoder to implement the ElementDecoder interface
107+
// in this package.
108+
type execDecoder struct {
109+
dec exec.ElementDecoder
110+
coder *coder.Coder
111+
}
112+
113+
func (d *execDecoder) Decode(r io.Reader) (interface{}, error) {
114+
fv, err := d.dec.Decode(r)
115+
if err != nil {
116+
return nil, err
117+
}
118+
return fv.Elm, nil
119+
}
120+
121+
func (d *execDecoder) String() string {
122+
return d.coder.String()
85123
}
86124

87125
// NewCoder infers a Coder for any bound full type.
@@ -179,11 +217,6 @@ func inferCoders(list []FullType) ([]*coder.Coder, error) {
179217
return ret, nil
180218
}
181219

182-
// TODO(herohde) 4/5/2017: decide whether we want an Encoded form. For now,
183-
// we'll use exploded form coders only using typex.T. We might also need a
184-
// form that doesn't require LengthPrefix'ing to cut up the bytestream from
185-
// the FnHarness.
186-
187220
// protoEnc marshals the supplied proto.Message.
188221
func protoEnc(in T) ([]byte, error) {
189222
return proto.Marshal(in.(proto.Message))

sdks/go/pkg/beam/core/graph/coder/coder.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package coder
1919

2020
import (
2121
"fmt"
22+
"io"
2223
"reflect"
2324
"strings"
2425

@@ -92,6 +93,16 @@ var (
9293
OptReturn: []reflect.Type{reflectx.Error}}
9394
)
9495

96+
// ElementEncoder encapsulates being able to encode an element into a writer.
97+
type ElementEncoder interface {
98+
Encode(element interface{}, w io.Writer) error
99+
}
100+
101+
// ElementDecoder encapsulates being able to decode an element from a reader.
102+
type ElementDecoder interface {
103+
Decode(r io.Reader) (interface{}, error)
104+
}
105+
95106
func validateEncoder(t reflect.Type, encode interface{}) error {
96107
// Check if it uses the real type in question.
97108
if err := funcx.Satisfy(encode, funcx.Replace(encodeSig, typex.TType, t)); err != nil {

sdks/go/pkg/beam/create.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,8 @@ import (
1919
"bytes"
2020
"fmt"
2121
"reflect"
22-
23-
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
24-
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
2522
)
2623

27-
// TODO(herohde) 7/11/2017: add variants that use coder encoding.
28-
2924
// Create inserts a fixed set of values into the pipeline. The values must
3025
// be of the same type 'A' and the returned PCollection is of type A.
3126
//
@@ -58,15 +53,15 @@ func TryCreate(s Scope, values ...interface{}) (PCollection, error) {
5853
}
5954

6055
t := reflect.ValueOf(values[0]).Type()
61-
coder := NewCoder(typex.New(t))
62-
fn := &createFn{Coder: EncodedCoder{Coder: coder}}
63-
en := exec.MakeElementEncoder(UnwrapCoder(coder))
56+
fn := &createFn{Type: EncodedType{T: t}}
57+
enc := NewElementEncoder(t)
58+
6459
for i, value := range values {
6560
if other := reflect.ValueOf(value).Type(); other != t {
6661
return PCollection{}, fmt.Errorf("value %v at index %v has type %v, want %v", value, i, other, t)
6762
}
6863
var buf bytes.Buffer
69-
if err := en.Encode(exec.FullValue{Elm: value}, &buf); err != nil {
64+
if err := enc.Encode(value, &buf); err != nil {
7065
return PCollection{}, fmt.Errorf("marshalling of %v failed: %v", value, err)
7166
}
7267
fn.Values = append(fn.Values, buf.Bytes())
@@ -84,18 +79,18 @@ func TryCreate(s Scope, values ...interface{}) (PCollection, error) {
8479
// TODO(herohde) 6/26/2017: make 'create' a SDF once supported. See BEAM-2421.
8580

8681
type createFn struct {
87-
Values [][]byte `json:"values"`
88-
Coder EncodedCoder `json:"coder"`
82+
Values [][]byte `json:"values"`
83+
Type EncodedType `json:"type"`
8984
}
9085

9186
func (c *createFn) ProcessElement(_ []byte, emit func(T)) error {
92-
dec := exec.MakeElementDecoder(UnwrapCoder(c.Coder.Coder))
87+
dec := NewElementDecoder(c.Type.T)
9388
for _, val := range c.Values {
94-
fv, err := dec.Decode(bytes.NewBuffer(val))
89+
element, err := dec.Decode(bytes.NewBuffer(val))
9590
if err != nil {
9691
return err
9792
}
98-
emit(fv.Elm)
93+
emit(element)
9994
}
10095
return nil
10196
}

sdks/go/pkg/beam/forward.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,19 @@ func RegisterInit(hook func()) {
9292
// func(reflect.Type, []byte) (T, error)
9393
//
9494
// where T is the matching user type.
95-
//
9695
func RegisterCoder(t reflect.Type, encoder, decoder interface{}) {
9796
runtime.RegisterType(t)
9897
runtime.RegisterFunction(encoder)
9998
runtime.RegisterFunction(decoder)
10099
coder.RegisterCoder(t, encoder, decoder)
101100
}
102101

102+
// ElementEncoder encapsulates being able to encode an element into a writer.
103+
type ElementEncoder = coder.ElementEncoder
104+
105+
// ElementDecoder encapsulates being able to decode an element from a reader.
106+
type ElementDecoder = coder.ElementDecoder
107+
103108
// Init is the hook that all user code must call after flags processing and
104109
// other static initialization, for now.
105110
func Init() {

sdks/go/pkg/beam/testing/passert/passert.go

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import (
2323
"fmt"
2424

2525
"github.com/apache/beam/sdks/go/pkg/beam"
26-
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
27-
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
2826
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
2927
"github.com/apache/beam/sdks/go/pkg/beam/transforms/filter"
3028
)
@@ -61,26 +59,25 @@ func equals(s beam.Scope, actual, expected beam.PCollection) beam.PCollection {
6159
// because all values are held in memory at the same time.
6260
func Diff(s beam.Scope, a, b beam.PCollection) (left, both, right beam.PCollection) {
6361
imp := beam.Impulse(s)
64-
return beam.ParDo3(s, &diffFn{Coder: beam.EncodedCoder{Coder: a.Coder()}}, imp, beam.SideInput{Input: a}, beam.SideInput{Input: b})
65-
}
6662

67-
// TODO(herohde) 7/11/2017: should there be a first-class way to obtain the coder,
68-
// such a a specially-typed parameter?
63+
t := beam.ValidateNonCompositeType(a)
64+
beam.ValidateNonCompositeType(b)
65+
return beam.ParDo3(s, &diffFn{Type: beam.EncodedType{T: t.Type()}}, imp, beam.SideInput{Input: a}, beam.SideInput{Input: b})
66+
}
6967

7068
// diffFn computes the symmetrical multi-set difference of 2 collections, under
7169
// coder equality. The Go values returned may be any of the coder-equal ones.
7270
type diffFn struct {
73-
Coder beam.EncodedCoder `json:"coder"`
71+
Type beam.EncodedType `json:"type"`
7472
}
7573

7674
func (f *diffFn) ProcessElement(_ []byte, ls, rs func(*beam.T) bool, left, both, right func(t beam.T)) error {
77-
c := beam.UnwrapCoder(f.Coder.Coder)
78-
79-
indexL, err := index(c, ls)
75+
enc := beam.NewElementEncoder(f.Type.T)
76+
indexL, err := index(enc, ls)
8077
if err != nil {
8178
return err
8279
}
83-
indexR, err := index(c, rs)
80+
indexR, err := index(enc, rs)
8481
if err != nil {
8582
return err
8683
}
@@ -130,15 +127,14 @@ type indexEntry struct {
130127
value beam.T
131128
}
132129

133-
func index(c *coder.Coder, iter func(*beam.T) bool) (map[string]indexEntry, error) {
130+
func index(enc beam.ElementEncoder, iter func(*beam.T) bool) (map[string]indexEntry, error) {
134131
ret := make(map[string]indexEntry)
135-
enc := exec.MakeElementEncoder(c)
136132

137133
var val beam.T
138134
for iter(&val) {
139135
var buf bytes.Buffer
140-
if err := enc.Encode(exec.FullValue{Elm: val}, &buf); err != nil {
141-
return nil, fmt.Errorf("value %v not encodable by %v", val, c)
136+
if err := enc.Encode(val, &buf); err != nil {
137+
return nil, fmt.Errorf("value %v not encodable with %v", val, enc)
142138
}
143139
encoded := buf.String()
144140

@@ -148,9 +144,6 @@ func index(c *coder.Coder, iter func(*beam.T) bool) (map[string]indexEntry, erro
148144
return ret, nil
149145
}
150146

151-
// TODO(herohde) 7/11/2017: perhaps extract the coder helpers as more
152-
// general and polished utilities for working with coders in user code.
153-
154147
// True asserts that all elements satisfy the given predicate.
155148
func True(s beam.Scope, col beam.PCollection, fn interface{}) beam.PCollection {
156149
fail(s, filter.Exclude(s, col, fn), "predicate(%v) = false, want true")

0 commit comments

Comments
 (0)