Skip to content

Commit

Permalink
[Java feature server] Converge ServingService API to make Python and …
Browse files Browse the repository at this point in the history
…Java feature servers consistent (feast-dev#2166)

* hgetall

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

optimized version of Serving proto

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

temp

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* refactored online service

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* java tests pass

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* remove project from request & entities from response

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* go sdk updated to use new protos

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* benchmark serving in ITs

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* fix api docs build

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* fixes after rebase

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* rename FeatureReferenceV2.name -> FeatureReferenceV2.feature_name

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* change proto property name in go sdk

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* refactoring FeastClient

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* add some comments

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* populate metrics

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* format after rebase

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* comment

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* todo added

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex authored Jan 5, 2022
1 parent d447db2 commit b1efc80
Show file tree
Hide file tree
Showing 62 changed files with 2,036 additions and 1,629 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import feast.proto.serving.ServingAPIProto.FeatureReferenceV2;

public class FeatureV2 {
public class Feature {

/**
* Accepts FeatureReferenceV2 object and returns its reference in String
Expand All @@ -27,10 +27,10 @@ public class FeatureV2 {
* @param featureReference {@link FeatureReferenceV2}
* @return String format of FeatureReferenceV2
*/
public static String getFeatureStringRef(FeatureReferenceV2 featureReference) {
String ref = featureReference.getName();
if (!featureReference.getFeatureTable().isEmpty()) {
ref = featureReference.getFeatureTable() + ":" + ref;
public static String getFeatureReference(FeatureReferenceV2 featureReference) {
String ref = featureReference.getFeatureName();
if (!featureReference.getFeatureViewName().isEmpty()) {
ref = featureReference.getFeatureViewName() + ":" + ref;
}
return ref;
}
Expand All @@ -47,4 +47,12 @@ public static String getFeatureName(String featureReference) {
String[] tokens = featureReference.split(":", 2);
return tokens[tokens.length - 1];
}

public static FeatureReferenceV2 parseFeatureReference(String featureReference) {
String[] tokens = featureReference.split(":", 2);
return FeatureReferenceV2.newBuilder()
.setFeatureViewName(tokens[0])
.setFeatureName(tokens[1])
.build();
}
}
48 changes: 0 additions & 48 deletions java/common/src/main/java/feast/common/models/FeatureTable.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public List<AuditLogEntry> getTestAuditLogs() {
.addAllFeatures(
Arrays.asList(
FeatureReferenceV2.newBuilder()
.setFeatureTable("featuretable_1")
.setName("feature1")
.setFeatureViewName("featuretable_1")
.setFeatureName("feature1")
.build(),
FeatureReferenceV2.newBuilder()
.setFeatureTable("featuretable_1")
.setName("feature2")
.setFeatureViewName("featuretable_1")
.setFeatureName("feature2")
.build()))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ public class FeaturesTest {
public void setUp() {
featureReference =
FeatureReferenceV2.newBuilder()
.setFeatureTable("featuretable_1")
.setName("feature1")
.setFeatureViewName("featuretable_1")
.setFeatureName("feature1")
.build();
}

@Test
public void shouldReturnFeatureStringRef() {
String actualFeatureStringRef = FeatureV2.getFeatureStringRef(featureReference);
String actualFeatureStringRef = Feature.getFeatureReference(featureReference);
String expectedFeatureStringRef = "featuretable_1:feature1";

assertThat(actualFeatureStringRef, equalTo(expectedFeatureStringRef));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.gojek.feast;
package dev.feast;

import feast.proto.serving.ServingAPIProto.FeatureReferenceV2;
import com.google.common.collect.Lists;
import feast.proto.serving.ServingAPIProto;
import feast.proto.serving.ServingAPIProto.GetFeastServingInfoRequest;
import feast.proto.serving.ServingAPIProto.GetFeastServingInfoResponse;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2.EntityRow;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequest;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponseV2;
import feast.proto.serving.ServingServiceGrpc;
import feast.proto.serving.ServingServiceGrpc.ServingServiceBlockingStub;
import feast.proto.types.ValueProto;
import io.grpc.CallCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
Expand All @@ -32,9 +33,8 @@
import io.opentracing.contrib.grpc.TracingClientInterceptor;
import io.opentracing.util.GlobalTracer;
import java.io.File;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
Expand Down Expand Up @@ -118,11 +118,60 @@ public GetFeastServingInfoResponse getFeastServingInfo() {
* @param featureRefs list of string feature references to retrieve in the following format
* featureTable:feature, where 'featureTable' and 'feature' refer to the FeatureTable and
* Feature names respectively. Only the Feature name is required.
* @param rows list of {@link Row} to select the entities to retrieve the features for.
* @param entities list of {@link Row} to select the entities to retrieve the features for.
* @return list of {@link Row} containing retrieved data fields.
*/
public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> rows) {
return getOnlineFeatures(featureRefs, rows, "");
public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> entities) {
GetOnlineFeaturesRequest.Builder requestBuilder = GetOnlineFeaturesRequest.newBuilder();

requestBuilder.setFeatures(
ServingAPIProto.FeatureList.newBuilder().addAllVal(featureRefs).build());

requestBuilder.putAllEntities(getEntityValuesMap(entities));

GetOnlineFeaturesResponseV2 response = stub.getOnlineFeatures(requestBuilder.build());

List<Row> results = Lists.newArrayList();
if (response.getResultsCount() == 0) {
return results;
}

for (int rowIdx = 0; rowIdx < response.getResults(0).getValuesCount(); rowIdx++) {
Row row = Row.create();
for (int featureIdx = 0; featureIdx < response.getResultsCount(); featureIdx++) {
row.set(
response.getMetadata().getFeatureNames().getVal(featureIdx),
response.getResults(featureIdx).getValues(rowIdx),
response.getResults(featureIdx).getStatuses(rowIdx));

row.setEntityTimestamp(
Instant.ofEpochSecond(
response.getResults(featureIdx).getEventTimestamps(rowIdx).getSeconds()));
}
for (Map.Entry<String, ValueProto.Value> entry :
entities.get(rowIdx).getFields().entrySet()) {
row.set(entry.getKey(), entry.getValue());
}

results.add(row);
}
return results;
}

private Map<String, ValueProto.RepeatedValue> getEntityValuesMap(List<Row> entities) {
Map<String, ValueProto.RepeatedValue.Builder> columnarEntities = new HashMap<>();
for (Row row : entities) {
for (Map.Entry<String, ValueProto.Value> field : row.getFields().entrySet()) {
if (!columnarEntities.containsKey(field.getKey())) {
columnarEntities.put(field.getKey(), ValueProto.RepeatedValue.newBuilder());
}
columnarEntities.get(field.getKey()).addVal(field.getValue());
}
}

return columnarEntities.entrySet().stream()
.map((e) -> Map.entry(e.getKey(), e.getValue().build()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

/**
Expand All @@ -149,42 +198,7 @@ public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> rows) {
* @return list of {@link Row} containing retrieved data fields.
*/
public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> rows, String project) {
List<FeatureReferenceV2> features = RequestUtil.createFeatureRefs(featureRefs);
// build entity rows and collect entity references
HashSet<String> entityRefs = new HashSet<>();
List<EntityRow> entityRows =
rows.stream()
.map(
row -> {
entityRefs.addAll(row.getFields().keySet());
return EntityRow.newBuilder()
.setTimestamp(row.getEntityTimestamp())
.putAllFields(row.getFields())
.build();
})
.collect(Collectors.toList());

GetOnlineFeaturesResponse response =
stub.getOnlineFeaturesV2(
GetOnlineFeaturesRequestV2.newBuilder()
.addAllFeatures(features)
.addAllEntityRows(entityRows)
.setProject(project)
.build());

return response.getFieldValuesList().stream()
.map(
fieldValues -> {
Row row = Row.create();
for (String fieldName : fieldValues.getFieldsMap().keySet()) {
row.set(
fieldName,
fieldValues.getFieldsMap().get(fieldName),
fieldValues.getStatusesMap().get(fieldName));
}
return row;
})
.collect(Collectors.toList());
return getOnlineFeatures(featureRefs, rows);
}

protected FeastClient(ManagedChannel channel, Optional<CallCredentials> credentials) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.gojek.feast;
package dev.feast;

import feast.proto.serving.ServingAPIProto.FeatureReferenceV2;
import java.util.List;
Expand Down Expand Up @@ -71,8 +71,8 @@ public static FeatureReferenceV2 parseFeatureRef(String featureRefString) {
String[] featureReferenceParts = featureRefString.split(":");
FeatureReferenceV2 featureRef =
FeatureReferenceV2.newBuilder()
.setFeatureTable(featureReferenceParts[0])
.setName(featureReferenceParts[1])
.setFeatureViewName(featureReferenceParts[0])
.setFeatureName(featureReferenceParts[1])
.build();

return featureRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.gojek.feast;
package dev.feast;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldStatus;
import feast.proto.serving.ServingAPIProto.FieldStatus;
import feast.proto.types.ValueProto.Value;
import feast.proto.types.ValueProto.Value.ValCase;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.gojek.feast;
package dev.feast;

import com.google.auto.value.AutoValue;
import io.grpc.CallCredentials;
Expand Down
Loading

0 comments on commit b1efc80

Please sign in to comment.