Skip to content

Commit

Permalink
Tests for transformation service integration in java feature server (f…
Browse files Browse the repository at this point in the history
…east-dev#2236)

* IT for transformation service interop

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>

* remove unnecessary concat

Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex authored Jan 26, 2022
1 parent 3dcec6d commit b3174c9
Show file tree
Hide file tree
Showing 19 changed files with 405 additions and 250 deletions.
10 changes: 10 additions & 0 deletions java/serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.feast</groupId>
Expand Down Expand Up @@ -141,6 +146,11 @@
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<!--compile "com.google.protobuf:protobuf-java-util:${protobuf.version}"-->
<dependency>
<groupId>com.google.protobuf</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,38 +147,46 @@ public TracingProperties getTracing() {
public LoggingProperties getLogging() {
return logging;
}
}

private FeastProperties feast;
private String gcpProject;

public void setFeast(FeastProperties feast) {
this.feast = feast;
}
public String getGcpProject() {
return gcpProject;
}

public FeastProperties getFeast() {
return feast;
}
public void setGcpProject(String gcpProject) {
this.gcpProject = gcpProject;
}

private String gcpProject;
public void setAwsRegion(String awsRegion) {
this.awsRegion = awsRegion;
}

public String getGcpProject() {
return gcpProject;
}
private String awsRegion;

public void setAwsRegion(String awsRegion) {
this.awsRegion = awsRegion;
}
public String getAwsRegion() {
return awsRegion;
}

private String transformationServiceEndpoint;

private String awsRegion;
public String getTransformationServiceEndpoint() {
return transformationServiceEndpoint;
}

public String getAwsRegion() {
return awsRegion;
public void setTransformationServiceEndpoint(String transformationServiceEndpoint) {
this.transformationServiceEndpoint = transformationServiceEndpoint;
}
}

private String transformationServiceEndpoint;
private FeastProperties feast;

public String getTransformationServiceEndpoint() {
return transformationServiceEndpoint;
public void setFeast(FeastProperties feast) {
this.feast = feast;
}

public FeastProperties getFeast() {
return feast;
}

/** Store configuration class for database that this Feast Serving uses. */
Expand Down Expand Up @@ -263,6 +271,10 @@ public static class Server {
public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}
}

public static class GrpcServer {
Expand All @@ -271,6 +283,10 @@ public static class GrpcServer {
public Server getServer() {
return server;
}

public void setServer(Server server) {
this.server = server;
}
}

public static class RestServer {
Expand All @@ -279,6 +295,10 @@ public static class RestServer {
public Server getServer() {
return server;
}

public void setServer(Server server) {
this.server = server;
}
}

private GrpcServer grpc;
Expand All @@ -288,10 +308,18 @@ public GrpcServer getGrpc() {
return grpc;
}

public void setGrpc(GrpcServer grpc) {
this.grpc = grpc;
}

public RestServer getRest() {
return rest;
}

public void setRest(RestServer rest) {
this.rest = rest;
}

public enum StoreType {
REDIS,
REDIS_CLUSTER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public class RegistryConfig extends AbstractModule {
@Provides
Storage googleStorage(ApplicationProperties applicationProperties) {
return StorageOptions.newBuilder()
.setProjectId(applicationProperties.getGcpProject())
.setProjectId(applicationProperties.getFeast().getGcpProject())
.build()
.getService();
}

@Provides
public AmazonS3 awsStorage(ApplicationProperties applicationProperties) {
return AmazonS3ClientBuilder.standard()
.withRegion(applicationProperties.getAwsRegion())
.withRegion(applicationProperties.getFeast().getAwsRegion())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public ServingServiceV2 registryBasedServingServiceV2(

log.info("Working Directory = " + System.getProperty("user.dir"));

final String transformationServiceEndpoint =
applicationProperties.getTransformationServiceEndpoint();
final OnlineTransformationService onlineTransformationService =
new OnlineTransformationService(transformationServiceEndpoint, registryRepository);
new OnlineTransformationService(
applicationProperties.getFeast().getTransformationServiceEndpoint(),
registryRepository);

servingService =
new OnlineServingServiceV2(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
import feast.proto.serving.ServingAPIProto;
import feast.proto.serving.ServingServiceGrpc;
import feast.serving.service.ServingServiceV2;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import javax.inject.Inject;
import org.slf4j.Logger;

public class OnlineServingGrpcServiceV2 extends ServingServiceGrpc.ServingServiceImplBase {
private final ServingServiceV2 servingServiceV2;
private static final Logger log =
org.slf4j.LoggerFactory.getLogger(OnlineServingGrpcServiceV2.class);

@Inject
OnlineServingGrpcServiceV2(ServingServiceV2 servingServiceV2) {
Expand All @@ -34,15 +38,27 @@ public class OnlineServingGrpcServiceV2 extends ServingServiceGrpc.ServingServic
public void getFeastServingInfo(
ServingAPIProto.GetFeastServingInfoRequest request,
StreamObserver<ServingAPIProto.GetFeastServingInfoResponse> responseObserver) {
responseObserver.onNext(this.servingServiceV2.getFeastServingInfo(request));
responseObserver.onCompleted();
try {
responseObserver.onNext(this.servingServiceV2.getFeastServingInfo(request));
responseObserver.onCompleted();
} catch (RuntimeException e) {
log.warn("Failed to get Serving Info", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void getOnlineFeatures(
ServingAPIProto.GetOnlineFeaturesRequest request,
StreamObserver<ServingAPIProto.GetOnlineFeaturesResponse> responseObserver) {
responseObserver.onNext(this.servingServiceV2.getOnlineFeatures(request));
responseObserver.onCompleted();
try {
responseObserver.onNext(this.servingServiceV2.getOnlineFeatures(request));
responseObserver.onCompleted();
} catch (RuntimeException e) {
log.warn("Failed to get Online Features", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}
}
19 changes: 0 additions & 19 deletions java/serving/src/main/java/feast/serving/modules/ServerModule.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ public ServingAPIProto.GetOnlineFeaturesResponse getOnlineFeatures(
// Pair from extractRequestDataFeatureNamesAndOnDemandFeatureInputs.
// Currently, we can retrieve context variables directly from GetOnlineFeaturesRequest.
List<FeatureReferenceV2> onDemandFeatureInputs =
this.onlineTransformationService
.extractRequestDataFeatureNamesAndOnDemandFeatureInputs(onDemandFeatureReferences)
.getRight();
this.onlineTransformationService.extractOnDemandFeaturesDependencies(
onDemandFeatureReferences);

// Add on demand feature inputs to list of feature references to retrieve.
for (FeatureReferenceV2 onDemandFeatureInput : onDemandFeatureInputs) {
Expand Down Expand Up @@ -284,7 +283,12 @@ private void populateOnDemandFeatures(
valueList.add(features.get(rowIdx).get(featureIdx).getFeatureValue(valueType));
}

onDemandContext.add(Pair.of(Feature.getFeatureReference(featureReference), valueList));
onDemandContext.add(
Pair.of(
String.format(
"%s__%s",
featureReference.getFeatureViewName(), featureReference.getFeatureName()),
valueList));
}
// Serialize the augmented values.
ValueType transformationInput =
Expand Down
Loading

0 comments on commit b3174c9

Please sign in to comment.