Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only lookup storage specs that we actually need #52

Merged
merged 4 commits into from
Jan 10, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ensure that rows that have no write transform still get returned in t…
…he split main output, so they can be written downstream (eg to the warehouse)
tims committed Jan 10, 2019
commit 586d45423479830864ed4fbda046dc6be3c7f796
Original file line number Diff line number Diff line change
@@ -19,17 +19,13 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import feast.ingestion.exceptions.ErrorsHandler;
import feast.ingestion.model.Specs;
import feast.ingestion.transform.FeatureIO.Write;
import feast.ingestion.transform.SplitFeatures.MultiOutputSplit;
import feast.ingestion.values.PFeatureRows;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.StorageSpecProto.StorageSpec;
import feast.storage.FeatureStore;
import feast.storage.noop.NoOpIO;
import feast.types.FeatureRowExtendedProto.Attempt;
import feast.types.FeatureRowExtendedProto.Error;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import java.util.Collection;
import java.util.HashMap;
@@ -38,7 +34,6 @@
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -71,12 +66,11 @@ public PFeatureRows expand(PFeatureRows input) {
taggedTransforms.put(tag, transforms.get(key));
}

PFeatureRows output = splits.apply(new WriteTags(taggedTransforms, MultiOutputSplit.MAIN_TAG));
PCollection<FeatureRowExtended> written = splits
.apply(new WriteTags(taggedTransforms, MultiOutputSplit.MAIN_TAG));
return new PFeatureRows(
output.getMain(),
PCollectionList.of(input.getErrors())
.and(output.getErrors())
.apply("Flatten errors", Flatten.pCollections()));
written,
input.getErrors());
}

private Map<String, FeatureStore> getStoresMap() {
@@ -101,69 +95,39 @@ private Map<String, Write> getFeatureStoreTransforms() {
return transforms;
}

/**
* Writes each pcollection in the tuple to a correspondingly tagged write transform and returns
* the a union of the written rows.
*
* The main tag, is not written to a transform, but is returned. This represents the default for
* rows which have no associated store to write to but might want to be written down stream (eg,
* it has no serving store, but does have a warehouse store).
*
* Tag in the tuple that are not the main tag and have no transform, will be discarded
* completely.
*/
@AllArgsConstructor
public static class WriteTags extends PTransform<PCollectionTuple, PFeatureRows> {
public static class WriteTags extends
PTransform<PCollectionTuple, PCollection<FeatureRowExtended>> {

private Map<TupleTag<FeatureRowExtended>, Write> transforms;
private TupleTag<FeatureRowExtended> mainTag;

@Override
public PFeatureRows expand(PCollectionTuple input) {

List<PCollection<FeatureRowExtended>> mainList = Lists.newArrayList();
public PCollection<FeatureRowExtended> expand(PCollectionTuple tuple) {
List<PCollection<FeatureRowExtended>> outputList = Lists.newArrayList();
for (TupleTag<FeatureRowExtended> tag : transforms.keySet()) {
Write write = transforms.get(tag);
Preconditions.checkNotNull(write, String.format("Null transform for tag=%s", tag.getId()));
PCollection<FeatureRowExtended> main = input.get(tag);
if (!(write instanceof NoOpIO.Write)) {
main.apply(String.format("Write to %s", tag.getId()), write);
}
mainList.add(main);
PCollection<FeatureRowExtended> input = tuple.get(tag);
input.apply(String.format("Write to %s", tag.getId()), write);
outputList.add(input);
}

/*
* FeatureRows with no matching write transform `input.get(mainTag)` are considered
* discardible, if they didn't have a matching store, they should have been discarded before
* reaching here. So these will be feature with no output store at all.
*/
return PFeatureRows.of(
PCollectionList.of(mainList).apply("Flatten main", Flatten.pCollections()));
}
}

/**
* Sets the last attempt error for all rows with a given exception
*/
public static class WithErrors extends DoFn<FeatureRowExtended, FeatureRowExtended> {

private Error error;

public WithErrors(Error error) {
this.error = error;
}

public WithErrors(String transformName, String message) {
this(Error.newBuilder().setTransform(transformName).setMessage(message).build());
}

@ProcessElement
public void processElement(
@Element FeatureRowExtended rowExtended, OutputReceiver<FeatureRowExtended> out) {
Attempt lastAttempt = rowExtended.getLastAttempt();

Error lastError = lastAttempt.getError();
Error thisError = error;

int numAttempts =
ErrorsHandler.checkAttemptCount(lastAttempt.getAttempts(), lastError, thisError);

Attempt thisAttempt =
Attempt.newBuilder().setAttempts(numAttempts + 1).setError(thisError).build();
out.output(
FeatureRowExtended.newBuilder()
.mergeFrom(rowExtended)
.setLastAttempt(thisAttempt)
.build());
// FeatureRows with no matching write transform end up in `input.get(mainTag)` and considered
// discardible, we return them in the main output so they are considered written, but don't
// actually write them to any store.
outputList.add(tuple.get(mainTag));
return PCollectionList.of(outputList).apply("Flatten main", Flatten.pCollections());
}
}
}
Original file line number Diff line number Diff line change
@@ -2,12 +2,15 @@

import static junit.framework.TestCase.assertNull;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.protobuf.Timestamp;
import feast.ingestion.model.Features;
import feast.ingestion.model.Specs;
import feast.ingestion.model.Values;
import feast.ingestion.service.MockSpecService;
import feast.ingestion.transform.FeatureIO.Write;
import feast.ingestion.transform.SplitOutputByStore.WriteTags;
import feast.ingestion.values.PFeatureRows;
import feast.specs.EntitySpecProto.EntitySpec;
import feast.specs.FeatureSpecProto.DataStore;
@@ -23,12 +26,15 @@
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.Rule;
import org.junit.Test;
@@ -144,7 +150,8 @@ public void testSplit() {

@Test
public void testSplitWhereFeature2HasNoStoreId() {
// Note we are stores on the group, instead of warehouse or serving store id.
// Feature2 should get thrown away harmlessly

SerializableFunction<FeatureSpec, String> selector = (fs) -> fs.getDataStores().getServing()
.getId();
MockSpecService specService = new MockSpecService();
@@ -202,15 +209,14 @@ public void testSplitWhereFeature2HasNoStoreId() {
MapElements.into(TypeDescriptor.of(FeatureRow.class))
.via(FeatureRowExtended::getRow)))
.containsInAnyOrder(
Lists.newArrayList(
FeatureRow.newBuilder()
.addFeatures(Features.of("f1", Values.ofInt32(1)))
.setEventTimestamp(Timestamp.getDefaultInstance())
.build(),
FeatureRow.newBuilder()
.addFeatures(Features.of("f2", Values.ofInt32(2)))
.setEventTimestamp(Timestamp.getDefaultInstance())
.build()));
FeatureRow.newBuilder()
.addFeatures(Features.of("f1", Values.ofInt32(1)))
.setEventTimestamp(Timestamp.getDefaultInstance())
.build(),
FeatureRow.newBuilder()
.addFeatures(Features.of("f2", Values.ofInt32(2)))
.setEventTimestamp(Timestamp.getDefaultInstance())
.build());

MockTransforms.Write mockSpecService1 = ((MockFeatureStore) stores.get(0)).getWrite();

@@ -233,4 +239,52 @@ public void testSplitWhereFeature2HasNoStoreId() {
pipeline.run();
}


@Test
public void testWriteTags() {
TupleTag<FeatureRowExtended> tag1 = new TupleTag<>("TAG1");
TupleTag<FeatureRowExtended> tag2 = new TupleTag<>("TAG2");
TupleTag<FeatureRowExtended> tag3 = new TupleTag<>("TAG3");
TupleTag<FeatureRowExtended> mainTag = new TupleTag<>("TAG4");

Map<TupleTag<FeatureRowExtended>, Write> transforms = ImmutableMap.<TupleTag<FeatureRowExtended>, Write>builder()
.put(tag1, new MockTransforms.Write())
.put(tag2, new MockTransforms.Write())
// tag3 and mainTag do not have write transforms.
.build();

FeatureRowExtended rowex1 = FeatureRowExtended.newBuilder()
.setRow(FeatureRow.newBuilder().setEntityKey("1")).build();
FeatureRowExtended rowex2 = FeatureRowExtended.newBuilder()
.setRow(FeatureRow.newBuilder().setEntityKey("2")).build();
FeatureRowExtended rowex3 = FeatureRowExtended.newBuilder()
.setRow(FeatureRow.newBuilder().setEntityKey("3")).build();
FeatureRowExtended rowex4 = FeatureRowExtended.newBuilder()
.setRow(FeatureRow.newBuilder().setEntityKey("4")).build();

PCollection<FeatureRowExtended> pcollection1 = pipeline.apply("input1", Create.of(rowex1));
PCollection<FeatureRowExtended> pcollection2 = pipeline.apply("input2", Create.of(rowex2));
PCollection<FeatureRowExtended> pcollection3 = pipeline.apply("input3", Create.of(rowex3));
PCollection<FeatureRowExtended> pcollection4 = pipeline.apply("input4", Create.of(rowex4));

PCollectionTuple tuple = PCollectionTuple.of(tag1, pcollection1).and(tag2, pcollection2)
.and(tag3, pcollection3)
.and(mainTag, pcollection4); // input 3 is included in the output

PCollection<FeatureRowExtended> output = tuple.apply(new WriteTags(transforms, mainTag));

// All input should be in the main output.
// input 1 is returned in the output because we wrote it to a write transform.
// input 2 is returned in the output because we wrote it to a write transform.
// input 3 is NOT returned in the output because it has no write transform.
// input 4 is is returned in the output because it is the main tag.
PAssert.that(output).containsInAnyOrder(rowex1, rowex2, rowex4);

// Each non main tagged input should be written to corresponding Write transform
PAssert.that(((MockTransforms.Write) transforms.get(tag1)).getInputs().get(0))
.containsInAnyOrder(rowex1);
PAssert.that(((MockTransforms.Write) transforms.get(tag2)).getInputs().get(0))
.containsInAnyOrder(rowex2);
pipeline.run();
}
}
14 changes: 9 additions & 5 deletions ingestion/src/test/java/feast/storage/MockTransforms.java
Original file line number Diff line number Diff line change
@@ -18,24 +18,28 @@
package feast.storage;

import com.google.common.collect.Lists;
import feast.ingestion.transform.FeatureIO;
import feast.ingestion.transform.fn.Identity;
import feast.specs.StorageSpecProto.StorageSpec;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;
import java.util.List;
import lombok.Getter;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import feast.ingestion.transform.FeatureIO;
import feast.ingestion.transform.fn.Identity;
import feast.specs.StorageSpecProto.StorageSpec;
import feast.types.FeatureRowExtendedProto.FeatureRowExtended;

public class MockTransforms {

@Getter
public static class Write extends FeatureIO.Write {

List<PCollection<FeatureRowExtended>> inputs = Lists.newArrayList();
private StorageSpec spec;

Write(StorageSpec spec) {
public Write() {
}

public Write(StorageSpec spec) {
this.spec = spec;
}