Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Common module #801

Merged
merged 10 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add blacklist functionality and test
  • Loading branch information
Terence committed Jun 18, 2020
commit b8718da6bf7fbfb138ea6c343ac5b94d688fff6e
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import feast.proto.core.StoreProto;
import feast.proto.core.StoreProto.Store.Subscription;
import feast.proto.types.FeatureRowProto;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.testing.PAssert;
Expand All @@ -41,6 +44,10 @@ private StoreProto.Store newStore(String s) {
.build();
}

private StoreProto.Store newStore(List<StoreProto.Store.Subscription> subscriptionList) {
return StoreProto.Store.newBuilder().addAllSubscriptions(subscriptionList).build();
}

@Test
public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscription() {
StoreProto.Store bqOnlyStore = newStore("bq*");
Expand Down Expand Up @@ -96,4 +103,55 @@ public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscription() {

p.run();
}

@Test
public void featureRowShouldBeAllocatedToStoreTagsAccordingToSubscriptionBlacklist() {
Subscription subscription1 = Subscription.newBuilder().setProject("*").setName("*").build();
Subscription subscription2 =
Subscription.newBuilder().setProject("project1").setName("fs_2").build();
Subscription subscription3 =
Subscription.newBuilder().setProject("project1").setName("fs_1").setExclude(true).build();
Subscription subscription4 =
Subscription.newBuilder().setProject("project2").setName("*").setExclude(true).build();

List<Subscription> testStoreSubscriptions1 =
Arrays.asList(subscription1, subscription2, subscription3);
StoreProto.Store testStore1 = newStore(testStoreSubscriptions1);

List<Subscription> testStoreSubscriptions2 = Arrays.asList(subscription1, subscription4);
StoreProto.Store testStore2 = newStore(testStoreSubscriptions2);

Map<StoreProto.Store, TupleTag<FeatureRowProto.FeatureRow>> storeTags =
ImmutableMap.of(
testStore1, new TupleTag<>(),
testStore2, new TupleTag<>());

PCollectionTuple allocatedRows =
p.apply(
Create.of(
FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project1/fs_1").build(),
FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project2/fs_1").build(),
FeatureRowProto.FeatureRow.newBuilder().setFeatureSet("project2/fs_2").build()))
.apply(
FeatureRowToStoreAllocator.newBuilder()
.setStoreTags(storeTags)
.setStores(ImmutableList.of(testStore1, testStore2))
.build());

PAssert.that(
allocatedRows
.get(storeTags.get(testStore1))
.setCoder(ProtoCoder.of(FeatureRowProto.FeatureRow.class))
.apply("CountStore1", Count.globally()))
.containsInAnyOrder(2L);

PAssert.that(
allocatedRows
.get(storeTags.get(testStore2))
.setCoder(ProtoCoder.of(FeatureRowProto.FeatureRow.class))
.apply("CountStore2", Count.globally()))
.containsInAnyOrder(1L);

p.run();
}
}
3 changes: 3 additions & 0 deletions protos/feast/core/Store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ message Store {
// - my-feature-set-6 can be used to select a single feature set
string name = 1;

// Boolean to filter retrieval of subscriptions
bool exclude = 4;

// Feature set version was removed in v0.5.0.
reserved 2;
}
Expand Down