Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement JobService API calls & connect it to SDK #1129

Merged
merged 13 commits into from
Nov 4, 2020
Prev Previous commit
Next Next commit
Merge branch 'master' into job-service-server
Signed-off-by: Tsotne Tabidze <tsotnet@gmail.com>
  • Loading branch information
tsotnet committed Oct 29, 2020
commit 029646236c98e3f56ba5b901905b06aa74fa40a8
59 changes: 34 additions & 25 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ def get_historical_features(
feature_refs: List[str],
entity_source: Union[pd.DataFrame, FileSource, BigQuerySource],
project: Optional[str] = None,
destination_path: Optional[str] = None,
output_location: Optional[str] = None,
) -> RetrievalJob:
"""
Launch a historical feature retrieval job.
Expand Down Expand Up @@ -916,6 +916,17 @@ def get_historical_features(
>>> output_file_uri = feature_retrieval_job.get_output_file_uri()
"gs://some-bucket/output/
"""
feature_tables = self._get_feature_tables_from_feature_refs(
feature_refs, project
)

if output_location is None:
output_location = os.path.join(
self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION),
str(uuid.uuid4()),
)
output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT)

if isinstance(entity_source, pd.DataFrame):
staging_location = self._config.get(CONFIG_SPARK_STAGING_LOCATION)
entity_staging_uri = urlparse(
Expand All @@ -936,32 +947,30 @@ def get_historical_features(
"event_timestamp", ParquetFormat(), entity_staging_uri.geturl(),
)

if destination_path is None:
destination_path = self._config.get(
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION
)
destination_path = os.path.join(destination_path, str(uuid.uuid4()))

if not self._job_service:
feature_tables = self._get_feature_tables_from_feature_refs(
feature_refs, project
if self._use_job_service:
response = self._job_service.GetHistoricalFeatures(
GetHistoricalFeaturesRequest(
feature_refs=feature_refs,
entity_source=entity_source.to_proto(),
project=project,
output_location=output_location,
),
**self._extra_grpc_params(),
)
output_format = self._config.get(
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT
)

return start_historical_feature_retrieval_job(
self, entity_source, feature_tables, output_format, destination_path
return RemoteRetrievalJob(
self._job_service,
self._extra_grpc_params,
response.id,
output_file_uri=response.output_file_uri,
)
else:
request = GetHistoricalFeaturesRequest(
feature_refs=feature_refs,
entities_source=entity_source.to_proto(),
project=project,
destination_path=destination_path,
return start_historical_feature_retrieval_job(
self,
entity_source,
feature_tables,
output_format,
os.path.join(output_location, str(uuid.uuid4())),
)
response = self._job_service.GetHistoricalFeatures(request)
return response.id

def get_historical_features_df(
self,
Expand Down Expand Up @@ -1037,7 +1046,7 @@ def start_offline_to_online_ingestion(
:param end: upper datetime boundary
:return: Spark Job Proxy object
"""
if not self._job_service:
if not self._use_job_service:
return start_offline_to_online_ingestion(feature_table, start, end, self)
else:
request = StartOfflineToOnlineIngestionJobRequest(
Expand All @@ -1051,7 +1060,7 @@ def start_offline_to_online_ingestion(
def start_stream_to_online_ingestion(
self, feature_table: FeatureTable, extra_jars: Optional[List[str]] = None,
) -> SparkJob:
if not self._job_service:
if not self._use_job_service:
return start_stream_to_online_ingestion(
feature_table, extra_jars or [], self
)
Expand Down
7 changes: 2 additions & 5 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,10 @@ class AuthProvider(Enum):
# Path to certificate(s) to secure connection to Feast Job Service
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "",
# Enable or disable Feast Job Service
# TODO: is this necessary?
CONFIG_JOB_SERVICE_ENABLED: "False",
# Default connection timeout to Feast Serving, Feast Core, and Feast Job Service (in seconds)
CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "3",
CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "10",
# Default gRPC connection timeout when sending an ApplyFeatureSet command to
# Feast Core (in seconds)
CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY: "600",
Expand All @@ -142,8 +143,4 @@ class AuthProvider(Enum):
CONFIG_REDIS_SSL: "False",
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT: "parquet",
CONFIG_SPARK_EXTRA_OPTIONS: "",
# Enable or disable TLS/SSL to Feast Service
CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False",
# Path to certificate(s) to secure connection to Feast Job Service
CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "",
}
45 changes: 26 additions & 19 deletions sdk/python/feast/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,21 @@
GetHistoricalFeaturesResponse,
StartOfflineToOnlineIngestionJobResponse,
StartStreamToOnlineIngestionJobResponse,
CancelJobResponse,
GetHistoricalFeaturesResponse,
GetJobResponse,
JobStatus,
JobType,
ListJobsResponse,
Job as JobProto,
)
from feast.data_source import DataSource
from feast.pyspark.launcher import (
start_historical_feature_retrieval_job,
start_offline_to_online_ingestion,
start_stream_to_online_ingestion,
from feast.pyspark.abc import (
BatchIngestionJob,
RetrievalJob,
SparkJob,
SparkJobStatus,
StreamIngestionJob,
)
from feast.third_party.grpc.health.v1 import HealthService_pb2_grpc
from feast.third_party.grpc.health.v1.HealthService_pb2 import (
Expand Down Expand Up @@ -60,38 +69,36 @@ def StartOfflineToOnlineIngestionJob(self, request, context):
feature_table = self.client.get_feature_table(
request.table_name, request.project
)
job = start_offline_to_online_ingestion(
job = self.client.start_offline_to_online_ingestion(
feature_table,
request.start_date.ToDatetime(),
request.end_date.ToDatetime(),
self.client,
)
return StartOfflineToOnlineIngestionJobResponse(id=job.get_id())

def GetHistoricalFeatures(self, request, context):
"""Produce a training dataset, return a job id that will provide a file reference"""
feature_tables = self.client._get_feature_tables_from_feature_refs(
request.feature_refs, request.project
)
output_format = self.client._config.get(
CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT
job = self.client.get_historical_features(
request.feature_refs,
entity_source=DataSource.from_proto(request.entity_source),
project=request.project,
output_location=request.output_location,
)

job = start_historical_feature_retrieval_job(
self.client,
DataSource.from_proto(request.entities_source),
feature_tables,
output_format,
request.destination_path,
output_file_uri = job.get_output_file_uri(block=False)

return GetHistoricalFeaturesResponse(
id=job.get_id(), output_file_uri=output_file_uri
)
return GetHistoricalFeaturesResponse(id=job.get_id())

def StartStreamToOnlineIngestionJob(self, request, context):
"""Start job to ingest data from stream into online store"""

feature_table = self.client.get_feature_table(
request.table_name, request.project
)
job = start_stream_to_online_ingestion(feature_table, [], self.client)
# TODO: add extra_jars to request
job = self.client.start_stream_to_online_ingestion(feature_table, [])
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())

def ListJobs(self, request, context):
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.