Skip to content

Commit

Permalink
Add Ingestion Job management API for Feast Core (#548)
Browse files Browse the repository at this point in the history
* Added protobuf definitions for a Job Management API

* Added query methods to JobRepository to query jobs by store and featureset

* Added hashCode() & equals() to Job model compare and hash jobs

This would allow jobs to be used as elements in HashSets and keys in
HashMaps

* Added toIngestionProto() to Job object to convert Job model to ingestion job proto

* Added query methods to FeatureSetRepository to query by exact (name, project) or (name,version)

* Added listJobs() to JobService to handle request list ingestion jobs requests

* Added missing filter field in ListIngestionJobsRequest protobuf

* Added code to setup mockups for testing JobService

* Revert "Added hashCode() & equals() to Job model compare and hash jobs"

This reverts commit ba995bb712cbd38f0f4bef47efbe433d5ec07521 as
it caused core tests to fail

* Added JobServiceTest unit tests for JobService's listJobs()

* Added stopJobs() to JobService to handle requests to stop jobs

* Added getTransitionalStates() to JobStatus to return collection of transitional states

* Changed stopJobs() to throw unsupported error on transitional job statuses

* Moved conversion of JobStatus to IngestionJobStatus proto to JobStatus.

* Make findFeatureSet() match only one featureset.

Limit findFeatureSets() as feature set references as composite keys
should match one and only one featureset.

* Added matchFeatureSets() to SpecService to match Feature Sets from References

This commit is adds temporary support for FeatureSetReference to
SpecService via listFeatureSets(). In the future, this should merged
together with listFeatureSets() as their functionality is almost the
same.

* Refactor JobService listJobs() to use SpecService to provide featureset matching

* Revert findBy methods added to FeatureSetRepository as no longer used.

JobService no longer depends on FeatureSetRepository directly, instead
via SpecService

* Added restartJob() to JobManagers to restart ingestion/import jobs

* Added restartJob() to JobService to restart ingestion jobs

* Update job model (due to new extId) when restartJob() in JobService

* Use assertThat() & equalTo() instead of assertEquals() in JobService

* Revert "Use assertThat() & equalTo() instead of assertEquals() in JobService" due to failed tests

This reverts commit bb3cf0ae1e7cfbde7a2f997cacc661d040c0a35f.

* Use hamcrest’s assertThat() and equalTo() instead of Junit's assertEquals

* Hook up JobService list, stop, restart job methods to CoreServiceImpl.

* Throw InvalidArgumentException instead when calling listJobs() with invalid FeatureSetReference

Throw InvalidArgumentException instead of UnsupportedOperationException
for better semantics: UnsupportedOperationException should be reserved
for operations that fail due to failed preconditions

* Fixed typos in javadocs: InvalidArgumentException should be IllegalArgumentException

* Fixed in findByFeatureSetsIn() query in JobRepository

* Renamed toIngestionProto() methods to toProto() to follow code convention

* Make JobService's listJobs() a transaction to prevent DB data race conditions

* Moved matchFeatureSets() in SpecService to private method in JobService

* Make the map that maps between JobStatus and IngestionJobStatus static

* Make JobService's listJobs() to return all ingestion jobs on empty filter

* Fixed issue where the jobManager map that JobService built used wrong keys

* Fix issue where actual Job Status is not synced with database.

Issue occurs when the job is aborted/restarted, but the JobStatus
has not yet been updated by JobUpdateTask. Hence another call
to abortJob() & restartJob() that should be rejected due invalid status
is allowed through

* Log stopJob() & restartJob() operations to make debugging easier

* Use Runner.name() instead of runner.toString() to build JobManager map

* Move documentation on JobService operations to CoreService protobuf definition

* Added IngestJob to python sdk as native representation of IngestionJob proto

* Make empty filter on JobService's listJobs() select all ingestion jobs

* Added bindings for Job management API to python sdk client.

* Fixed __connect_core() to connect to Feast CoreService when calling on Job API calls

* Auto reload IngestJob.status and IngestJob.external_id on get property.

* Added IngestJob.wait() to wait for job status to transtion

* Added basic job api e2e test to exercise job api

* Reorder the operations e2etest to make sure that jobs are running after test

* Added e2e test for all types exercising job api

* Fixed typo in function arguments

* Added unit tests for Ingestion Job API additions in python sdk

* Rename "ingestion" to "ingest" for more consistent naming

* Disable support for restarting Job in a terminal state due to possible race conditions

* Added __str__ and __repr__ to IngestJob to render ingestjob in human readable string

* Added FeatureSetRef to represent references to featursets

* Admend client's list_ingest_jobs() to accept feature references directly

* Fixed typo in IngestJob.store property

* Fixed issue with FeatureSetRef.from_str not converting version to int

* Make the grpc error message more apparent on stop_ingest_job() and restart_ingest_job()

* Added feast ingest-job list, describe, stop, restart to CLI

* Rename Job to RetrievalJob to prevent confusion with IngestJob

* Updated e2e tests to use FeatureSetRef in list_ingest_jobs()

* Fixed due e2e tests to cater to new limitations on stop_ingest_job()

* Increase timeout on test_all_types_ingest_jobs() e2e test.

* Configure IngestJob.wait() to backoff with a exponentially larger wait duration

* Added print statements to debug e2e test failure.

* Revert "Added print statements to debug e2e test failure."

This reverts commit 146fb2b.

* Fixed issue of test waiting for aborted job to become running causing timeout.

Co-authored-by: Zhu Zhanyan <zhu.zhanyan@gojek.com>
  • Loading branch information
mrzzy and Zhu Zhanyan authored Apr 7, 2020
1 parent 9917b63 commit 9139fe3
Show file tree
Hide file tree
Showing 19 changed files with 1,684 additions and 30 deletions.
7 changes: 7 additions & 0 deletions core/src/main/java/feast/core/dao/JobRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.dao;

import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import java.util.Collection;
Expand All @@ -29,4 +30,10 @@ public interface JobRepository extends JpaRepository<Job, String> {
List<Job> findByStatusNotIn(Collection<JobStatus> statuses);

List<Job> findBySourceIdAndStoreNameOrderByLastUpdatedDesc(String sourceId, String storeName);

// find jobs by feast store name
List<Job> findByStoreName(String storeName);

// find jobs by featureset
List<Job> findByFeatureSetsIn(List<FeatureSet> featureSets);
}
81 changes: 80 additions & 1 deletion core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.grpc;

import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.CoreServiceGrpc.CoreServiceImplBase;
import feast.core.CoreServiceProto.ApplyFeatureSetRequest;
Expand All @@ -30,21 +31,29 @@
import feast.core.CoreServiceProto.GetFeatureSetResponse;
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
import feast.core.CoreServiceProto.ListIngestionJobsRequest;
import feast.core.CoreServiceProto.ListIngestionJobsResponse;
import feast.core.CoreServiceProto.ListProjectsRequest;
import feast.core.CoreServiceProto.ListProjectsResponse;
import feast.core.CoreServiceProto.ListStoresRequest;
import feast.core.CoreServiceProto.ListStoresResponse;
import feast.core.CoreServiceProto.RestartIngestionJobRequest;
import feast.core.CoreServiceProto.RestartIngestionJobResponse;
import feast.core.CoreServiceProto.StopIngestionJobRequest;
import feast.core.CoreServiceProto.StopIngestionJobResponse;
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.exception.RetrievalException;
import feast.core.grpc.interceptors.MonitoringInterceptor;
import feast.core.model.Project;
import feast.core.service.AccessManagementService;
import feast.core.service.JobService;
import feast.core.service.SpecService;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.lognet.springboot.grpc.GRpcService;
Expand All @@ -57,11 +66,16 @@ public class CoreServiceImpl extends CoreServiceImplBase {

private SpecService specService;
private AccessManagementService accessManagementService;
private JobService jobService;

@Autowired
public CoreServiceImpl(SpecService specService, AccessManagementService accessManagementService) {
public CoreServiceImpl(
SpecService specService,
AccessManagementService accessManagementService,
JobService jobService) {
this.specService = specService;
this.accessManagementService = accessManagementService;
this.jobService = jobService;
}

@Override
Expand Down Expand Up @@ -192,4 +206,69 @@ public void listProjects(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void listIngestionJobs(
ListIngestionJobsRequest request,
StreamObserver<ListIngestionJobsResponse> responseObserver) {
try {
ListIngestionJobsResponse response = this.jobService.listJobs(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (InvalidArgumentException e) {
log.error("Recieved an invalid request on calling listIngestionJobs method:", e);
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException());
} catch (Exception e) {
log.error("Unexpected exception on calling listIngestionJobs method:", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void restartIngestionJob(
RestartIngestionJobRequest request,
StreamObserver<RestartIngestionJobResponse> responseObserver) {
try {
RestartIngestionJobResponse response = this.jobService.restartJob(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (NoSuchElementException e) {
log.error(
"Attempted to restart an nonexistent job on calling restartIngestionJob method:", e);
responseObserver.onError(
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
} catch (UnsupportedOperationException e) {
log.error("Recieved an unsupported request on calling restartIngestionJob method:", e);
responseObserver.onError(
Status.FAILED_PRECONDITION.withDescription(e.getMessage()).withCause(e).asException());
} catch (Exception e) {
log.error("Unexpected exception on calling restartIngestionJob method:", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void stopIngestionJob(
StopIngestionJobRequest request, StreamObserver<StopIngestionJobResponse> responseObserver) {
try {
StopIngestionJobResponse response = this.jobService.stopJob(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (NoSuchElementException e) {
log.error("Attempted to stop an nonexistent job on calling stopIngestionJob method:", e);
responseObserver.onError(
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
} catch (UnsupportedOperationException e) {
log.error("Recieved an unsupported request on calling stopIngestionJob method:", e);
responseObserver.onError(
Status.FAILED_PRECONDITION.withDescription(e.getMessage()).withCause(e).asException());
} catch (Exception e) {
log.error("Unexpected exception on calling stopIngestionJob method:", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/feast/core/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ public interface JobManager {
*/
void abortJob(String extId);

/**
* Restart an job. If job is an terminated state, will simply start the job. Might cause data to
* be lost during when restarting running jobs in some implementations. Refer to on docs the
* specific implementation.
*
* @param job job to restart
* @return the restarted job
*/
Job restartJob(Job job);

/**
* Get status of a job given runner-specific job ID.
*
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,26 @@ public void abortJob(String dataflowJobId) {
}
}

/**
* Restart a restart dataflow job. Dataflow should ensure continuity between during the restart,
* so no data should be lost during the restart operation.
*
* @param job job to restart
* @return the restarted job
*/
@Override
public Job restartJob(Job job) {
JobStatus status = job.getStatus();
if (JobStatus.getTerminalState().contains(status)) {
// job yet not running: just start job
return this.startJob(job);
} else {
// job is running - updating the job without changing the job has
// the effect of restarting the job
return this.updateJob(job);
}
}

/**
* Get status of a dataflow job with given id and try to map it into Feast's JobStatus.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,26 @@ public PipelineResult runPipeline(ImportOptions pipelineOptions) throws IOExcept
return ImportJob.runPipeline(pipelineOptions);
}

/**
* Restart a direct runner job. Note that some data will be temporarily lost during when
* restarting running direct runner jobs. See {#link {@link #updateJob(Job)} for more info.
*
* @param job job to restart
* @return the restarted job
*/
@Override
public Job restartJob(Job job) {
JobStatus status = job.getStatus();
if (JobStatus.getTerminalState().contains(status)) {
// job yet not running: just start job
return this.startJob(job);
} else {
// job is running - updating the job without changing the job has
// the effect of restarting the job.
return this.updateJob(job);
}
}

/**
* Gets the state of the direct runner job. Direct runner jobs only have 2 states: RUNNING and
* ABORTED.
Expand Down
45 changes: 44 additions & 1 deletion core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,24 @@
*/
package feast.core.model;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto;
import feast.core.IngestionJobProto;
import java.util.ArrayList;
import java.util.List;
import javax.persistence.*;
import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Id;
import javax.persistence.Index;
import javax.persistence.JoinColumn;
import javax.persistence.JoinTable;
import javax.persistence.ManyToMany;
import javax.persistence.ManyToOne;
import javax.persistence.OneToMany;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -102,4 +118,31 @@ public void updateMetrics(List<Metrics> newMetrics) {
public String getSinkName() {
return store.getName();
}

/**
* Convert a job model to ingestion job proto
*
* @return Ingestion Job proto derieved from the given job
*/
public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferException {

// convert featuresets of job to protos
List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
for (FeatureSet featureSet : this.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}

// build ingestion job proto with job data
IngestionJobProto.IngestionJob ingestJob =
IngestionJobProto.IngestionJob.newBuilder()
.setId(this.getId())
.setExternalId(this.getExtId())
.setStatus(this.getStatus().toProto())
.addAllFeatureSets(featureSetProtos)
.setSource(this.getSource().toProto())
.setStore(this.getStore().toProto())
.build();

return ingestJob;
}
}
37 changes: 37 additions & 0 deletions core/src/main/java/feast/core/model/JobStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package feast.core.model;

import feast.core.IngestionJobProto.IngestionJobStatus;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

public enum JobStatus {
/** Job status is not known. */
Expand Down Expand Up @@ -64,4 +66,39 @@ public enum JobStatus {
public static Collection<JobStatus> getTerminalState() {
return TERMINAL_STATE;
}

private static final Collection<JobStatus> TRANSITIONAL_STATES =
Collections.unmodifiableList(Arrays.asList(PENDING, ABORTING, SUSPENDING));

/**
* Get Transitional Job Status states. Transitionals states are assigned to jobs that
* transitioning to a more stable state (ie SUSPENDED, ABORTED etc.)
*
* @return Collection of transitional Job Status states.
*/
public static final Collection<JobStatus> getTransitionalStates() {
return TRANSITIONAL_STATES;
}

private static final Map<JobStatus, IngestionJobStatus> INGESTION_JOB_STATUS_MAP =
Map.of(
JobStatus.UNKNOWN, IngestionJobStatus.UNKNOWN,
JobStatus.PENDING, IngestionJobStatus.PENDING,
JobStatus.RUNNING, IngestionJobStatus.RUNNING,
JobStatus.COMPLETED, IngestionJobStatus.COMPLETED,
JobStatus.ABORTING, IngestionJobStatus.ABORTING,
JobStatus.ABORTED, IngestionJobStatus.ABORTED,
JobStatus.ERROR, IngestionJobStatus.ERROR,
JobStatus.SUSPENDING, IngestionJobStatus.SUSPENDING,
JobStatus.SUSPENDED, IngestionJobStatus.SUSPENDED);

/**
* Convert a Job Status to Ingestion Job Status proto
*
* @return IngestionJobStatus proto derieved from this job status
*/
public IngestionJobStatus toProto() {
// maps job models job status to ingestion job status
return INGESTION_JOB_STATUS_MAP.get(this);
}
}
Loading

0 comments on commit 9139fe3

Please sign in to comment.