Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 44 additions & 43 deletions sdks/go/pkg/beam/core/runtime/coderx/coderx.shims.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/coderx/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ package coderx

//go:generate go install github.com/apache/beam/sdks/go/cmd/starcgen
//go:generate starcgen --package=coderx --identifiers=encString,decString,encUint32,decUint32,encInt32,decInt32,encUint64,decUint64,encInt64,decInt64,encVarIntZ,decVarIntZ,encVarUintZ,decVarUintZ,encFloat,decFloat
//go:generate go fmt
//go:generate go fmt
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ func ExampleSeq() {
a := textio.Read(s, "...some file path...") // PCollection<string>

beam.Seq(s, a,
strconv.Atoi, // string to int
strconv.Atoi, // string to int
func(i int) float64 { return float64(i) }, // int to float64
math.Signbit, // float64 to bool
math.Signbit, // float64 to bool
) // PCollection<bool>
}

Expand Down
36 changes: 35 additions & 1 deletion sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,44 @@
package dataflowlib

import (
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/golang/protobuf/proto"
)

// Fixup proto pipeline with Dataflow quirks.
func Fixup(p *pb.Pipeline) (*pb.Pipeline, error) {
return p, nil
upd := make(map[string]*pb.PTransform)

for id, t := range p.GetComponents().GetTransforms() {
if t.GetSpec().GetUrn() != graphx.URNParDo {
continue
}
var payload pb.ParDoPayload
if err := proto.Unmarshal(t.GetSpec().GetPayload(), &payload); err != nil {
continue // ignore: unexpected payload
}
if len(payload.SideInputs) == 0 {
continue
}

// ParDo w/ side input. Fixup URN.

fixedPayload := pipelinex.ShallowCloneParDoPayload(&payload)
for k, v := range payload.SideInputs {
fixedV := pipelinex.ShallowCloneSideInput(v)
fixedV.AccessPattern = pipelinex.ShallowCloneFunctionSpec(v.AccessPattern)
fixedV.AccessPattern.Urn = "urn:beam:sideinput:materialization:multimap:0.1"

fixedPayload.SideInputs[k] = fixedV
}
fixed := pipelinex.ShallowClonePTransform(t)
fixed.Spec = pipelinex.ShallowCloneFunctionSpec(t.Spec)
fixed.Spec.Payload = protox.MustEncode(fixedPayload)

upd[id] = fixed
}
return pipelinex.Update(p, &pb.Components{Transforms: upd})
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.beam.sdk.transforms;

import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardSideInputTypes;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
Expand All @@ -33,36 +31,12 @@
@Internal
public class Materializations {
/**
* The URN for a {@link Materialization} where the primitive view type is an iterable of fully
* specified windowed values.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static final String ITERABLE_MATERIALIZATION_URN =
StandardSideInputTypes.Enum.ITERABLE
.getValueDescriptor()
.getOptions()
.getExtension(RunnerApi.beamUrn);

/**
* The URN for a {@link Materialization} where the primitive view type is a multimap of fully
* The URN for a {@link Materialization} where the primitive view type is an multimap of fully
* specified windowed values.
*/
@Experimental(Kind.CORE_RUNNERS_ONLY)
public static final String MULTIMAP_MATERIALIZATION_URN =
StandardSideInputTypes.Enum.MULTIMAP
.getValueDescriptor()
.getOptions()
.getExtension(RunnerApi.beamUrn);

/**
* Represents the {@code PrimitiveViewT} supplied to the {@link ViewFn} when it declares to use
* the {@link Materializations#ITERABLE_MATERIALIZATION_URN iterable materialization}.
*
* @param <V>
*/
public interface IterableView<V> {
Iterable<V> get();
}
"urn:beam:sideinput:materialization:multimap:0.1";

/**
* Represents the {@code PrimitiveViewT} supplied to the {@link ViewFn} when it declares to use
Expand All @@ -76,31 +50,18 @@ public interface MultimapView<K, V> {
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>A {@link Materialization} where the primitive view type is a multimap with fully specified
* windowed values.
* windowed keys.
*/
@Internal
public static <K, V> Materialization<MultimapView<K, V>> multimap() {
return new Materialization<MultimapView<K, V>>() {
@Override
public String getUrn() {
return MULTIMAP_MATERIALIZATION_URN;
}
};
return new MultimapMaterialization<>();
}

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>A {@link Materialization} where the primitive view type is an iterable with fully specifed
* windowed values.
*/
@Internal
public static <V> Materialization<IterableView<V>> iterable() {
return new Materialization<IterableView<V>>() {
@Override
public String getUrn() {
return ITERABLE_MATERIALIZATION_URN;
}
};
private static class MultimapMaterialization<K, V>
implements Materialization<MultimapView<K, V>> {
@Override
public String getUrn() {
return MULTIMAP_MATERIALIZATION_URN;
}
}
}
Loading