Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Project Namespacing #393

Merged
merged 4 commits into from
Dec 27, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Apply spotless
  • Loading branch information
zhilingc committed Dec 27, 2019
commit f6c7ca036d3ee3e1af34e054a763a3a795cfa466
4 changes: 2 additions & 2 deletions core/src/main/java/feast/core/config/FeatureStreamConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public Source getDefaultSource(FeastProperties feastProperties) {
String topicName = streamProperties.getOptions().get("topic");
Map<String, Object> map = new HashMap<>();
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,
DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG);
map.put(
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_KAFKA_REQUEST_TIMEOUT_MS_CONFIG);
AdminClient client = AdminClient.create(map);

NewTopic newTopic =
Expand Down
20 changes: 9 additions & 11 deletions core/src/main/java/feast/core/dao/FeatureSetRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;

/**
* JPA repository supplying FeatureSet objects keyed by id.
*/
/** JPA repository supplying FeatureSet objects keyed by id. */
public interface FeatureSetRepository extends JpaRepository<FeatureSet, String> {

long count();

// Find single feature set by project, name, and version
FeatureSet findFeatureSetByNameAndProject_NameAndVersion(String name, String project,
Integer version);
FeatureSet findFeatureSetByNameAndProject_NameAndVersion(
String name, String project, Integer version);

// Find single latest version of a feature set by project and name (LIKE)
FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(String name,
String project);
FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(
String name, String project);

// find all feature sets and order by name and version
List<FeatureSet> findAllByOrderByNameAscVersionAsc();
Expand All @@ -42,10 +40,10 @@ FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(String
List<FeatureSet> findAllByProject_NameOrderByNameAscVersionAsc(String project_name);

// find all versions of feature sets matching the given name pattern with a specific project.
List<FeatureSet> findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc(String name,
String project_name);
List<FeatureSet> findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc(
String name, String project_name);

// find all versions of feature sets matching the given name pattern and project pattern
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc(String name,
String project_name);
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc(
String name, String project_name);
}
1 change: 0 additions & 1 deletion core/src/main/java/feast/core/dao/ProjectRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package feast.core.dao;

import feast.core.model.Project;
Expand Down
66 changes: 23 additions & 43 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@
import org.lognet.springboot.grpc.GRpcService;
import org.springframework.beans.factory.annotation.Autowired;

/**
* Implementation of the feast core GRPC service.
*/
/** Implementation of the feast core GRPC service. */
@Slf4j
@GRpcService(interceptors = {MonitoringInterceptor.class})
public class CoreServiceImpl extends CoreServiceImplBase {
Expand Down Expand Up @@ -81,10 +79,8 @@ public void getFeatureSet(
responseObserver.onCompleted();
} catch (RetrievalException | StatusRuntimeException e) {
log.error("Exception has occurred in GetFeatureSet method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -97,10 +93,8 @@ public void listFeatureSets(
responseObserver.onCompleted();
} catch (RetrievalException | IllegalArgumentException e) {
log.error("Exception has occurred in ListFeatureSet method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -113,10 +107,8 @@ public void listStores(
responseObserver.onCompleted();
} catch (RetrievalException e) {
log.error("Exception has occurred in ListStores method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -132,16 +124,12 @@ public void applyFeatureSet(
"Unable to persist this feature set due to a constraint violation. Please ensure that"
+ " field names are unique within the project namespace: ",
e);
responseObserver.onError(Status.ALREADY_EXISTS
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.ALREADY_EXISTS.withDescription(e.getMessage()).withCause(e).asRuntimeException());
} catch (Exception e) {
log.error("Exception has occurred in ApplyFeatureSet method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -154,10 +142,8 @@ public void updateStore(
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception has occurred in UpdateStore method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -170,10 +156,8 @@ public void createProject(
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception has occurred in the createProject method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -186,10 +170,8 @@ public void archiveProject(
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception has occurred in the createProject method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

Expand All @@ -198,17 +180,15 @@ public void listProjects(
ListProjectsRequest request, StreamObserver<ListProjectsResponse> responseObserver) {
try {
List<Project> projects = accessManagementService.listProjects();
responseObserver.onNext(ListProjectsResponse.newBuilder()
.addAllProjects(projects.stream().map(Project::getName).collect(
Collectors.toList())).build());
responseObserver.onNext(
ListProjectsResponse.newBuilder()
.addAllProjects(projects.stream().map(Project::getName).collect(Collectors.toList()))
.build());
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception has occurred in the listProjects method: ", e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException());
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

}
16 changes: 11 additions & 5 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ public Job call() {
}
} else {
String jobId = createJobId(source.getId(), store.getName());
submittedJob =
executorService.submit(() -> startJob(jobId, featureSets, sourceSpec, store));
submittedJob = executorService.submit(() -> startJob(jobId, featureSets, sourceSpec, store));
}

Job job = null;
Expand All @@ -136,7 +135,10 @@ private Job startJob(
.map(
fsp ->
FeatureSet.fromProto(
FeatureSetProto.FeatureSet.newBuilder().setSpec(fsp.getSpec()).setMeta(fsp.getMeta()).build()))
FeatureSetProto.FeatureSet.newBuilder()
.setSpec(fsp.getSpec())
.setMeta(fsp.getMeta())
.build()))
.collect(Collectors.toList());
Job job =
new Job(
Expand Down Expand Up @@ -184,13 +186,17 @@ private Job startJob(
}

/** Update the given job */
private Job updateJob(Job job, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store store) {
private Job updateJob(
Job job, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store store) {
job.setFeatureSets(
featureSets.stream()
.map(
fs ->
FeatureSet.fromProto(
FeatureSetProto.FeatureSet.newBuilder().setSpec(fs.getSpec()).setMeta(fs.getMeta()).build()))
FeatureSetProto.FeatureSet.newBuilder()
.setSpec(fs.getSpec())
.setMeta(fs.getMeta())
.build()))
.collect(Collectors.toList()));
job.setStore(feast.core.model.Store.fromProto(store));
AuditLogger.log(
Expand Down
26 changes: 12 additions & 14 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ public Runner getRunnerType() {
@Override
public Job startJob(Job job) {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream()
.map(FeatureSet::toProto)
.collect(Collectors.toList());
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());
try {
return submitDataflowJob(
job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), false);
job.getId(),
featureSetProtos,
job.getSource().toProto(),
job.getStore().toProto(),
false);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("Unable to start job %s", job.getId()), e);
}
Expand All @@ -100,9 +102,7 @@ public Job startJob(Job job) {
public Job updateJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream()
.map(FeatureSet::toProto)
.collect(Collectors.toList());
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());

return submitDataflowJob(
job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true);
Expand Down Expand Up @@ -156,12 +156,7 @@ public JobStatus getJobStatus(Job job) {

try {
com.google.api.services.dataflow.model.Job dataflowJob =
dataflow
.projects()
.locations()
.jobs()
.get(projectId, location, job.getExtId())
.execute();
dataflow.projects().locations().jobs().get(projectId, location, job.getExtId()).execute();
return DataflowJobStateMapper.map(dataflowJob.getCurrentState());
} catch (Exception e) {
log.error(
Expand Down Expand Up @@ -208,7 +203,10 @@ private Job submitDataflowJob(
}

private ImportOptions getPipelineOptions(
String jobName, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink, boolean update)
String jobName,
List<FeatureSetProto.FeatureSet> featureSets,
StoreProto.Store sink,
boolean update)
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,9 @@ public Runner getRunnerType() {
public Job startJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream()
.map(FeatureSet::toProto)
.collect(Collectors.toList());
ImportOptions pipelineOptions = getPipelineOptions(featureSetProtos, job.getStore().toProto());
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());
ImportOptions pipelineOptions =
getPipelineOptions(featureSetProtos, job.getStore().toProto());
PipelineResult pipelineResult = runPipeline(pipelineOptions);
DirectJob directJob = new DirectJob(job.getId(), pipelineResult);
jobs.add(directJob);
Expand Down
10 changes: 2 additions & 8 deletions core/src/main/java/feast/core/model/FeatureSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,9 @@ public class FeatureSet extends AbstractTimestampEntity implements Comparable<Fe
@Column(name = "max_age")
private long maxAgeSeconds;


// Entity fields inside this feature set
@ElementCollection(fetch = FetchType.EAGER)
@CollectionTable(
name = "entities",
joinColumns = @JoinColumn(name = "feature_set_id")
)
@CollectionTable(name = "entities", joinColumns = @JoinColumn(name = "feature_set_id"))
@Fetch(FetchMode.SUBSELECT)
private Set<Field> entities;

Expand All @@ -91,9 +87,7 @@ public class FeatureSet extends AbstractTimestampEntity implements Comparable<Fe
@CollectionTable(
name = "features",
joinColumns = @JoinColumn(name = "feature_set_id"),
uniqueConstraints =
@UniqueConstraint(columnNames = {"name", "project", "version"})
)
uniqueConstraints = @UniqueConstraint(columnNames = {"name", "project", "version"}))
@Fetch(FetchMode.SUBSELECT)
private Set<Field> features;

Expand Down
3 changes: 1 addition & 2 deletions core/src/main/java/feast/core/model/Field.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ public class Field {
@Column(name = "project")
private String project;

public Field() {
}
public Field() {}

public Field(String name, ValueType.Enum type) {
this.name = name;
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public class Job extends AbstractTimestampEntity {
private Store store;

// FeatureSets populated by the job
@ManyToMany
private List<FeatureSet> featureSets;
@ManyToMany private List<FeatureSet> featureSets;

// Job Metrics
@OneToMany(mappedBy = "job", cascade = CascadeType.ALL)
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/feast/core/model/Project.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ public class Project {
@Column(name = "archived", nullable = false)
private boolean archived;

@OneToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER, orphanRemoval = true, mappedBy = "project")
@OneToMany(
cascade = CascadeType.ALL,
fetch = FetchType.EAGER,
orphanRemoval = true,
mappedBy = "project")
private Set<FeatureSet> featureSets;

public Project() {
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/java/feast/core/model/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ public List<Subscription> getSubscriptions() {
}

private static String convertSubscriptionToString(Subscription sub) {
if(sub.getVersion().isEmpty() || sub.getName().isEmpty() || sub.getProject().isEmpty()){
throw new IllegalArgumentException(String.format("Missing arguments in subscription string: %s", sub.toString()));
if (sub.getVersion().isEmpty() || sub.getName().isEmpty() || sub.getProject().isEmpty()) {
throw new IllegalArgumentException(
String.format("Missing arguments in subscription string: %s", sub.toString()));
}
return String.format("%s:%s:%s", sub.getProject(), sub.getName(), sub.getVersion());
}
Expand All @@ -129,6 +130,10 @@ private Subscription convertStringToSubscription(String sub) {
return Subscription.newBuilder().build();
}
String[] split = sub.split(":", 3);
return Subscription.newBuilder().setProject(split[0]).setName(split[1]).setVersion(split[2]).build();
return Subscription.newBuilder()
.setProject(split[0])
.setName(split[1])
.setVersion(split[2])
.build();
}
}
}
Loading