Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional IngestionJob parameters passed by Spark Launcher #1130

Merged
merged 22 commits into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
pass project to job
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Nov 4, 2020
commit 6ef85a7d3547aa4e870ff4e770e34f2d5665d789
2 changes: 1 addition & 1 deletion infra/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ services:
- /opt/feast/feast-core.jar
- --spring.config.location=classpath:/application.yml,file:/etc/feast/application.yml

job-service:
jobservice:
image: gcr.io/kf-feast/feast-jobservice:${FEAST_VERSION}
depends_on:
- core
Expand Down
2 changes: 1 addition & 1 deletion infra/scripts/test-docker-compose.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ${PROJECT_ROOT_DIR}/infra/scripts/wait-for-it.sh ${FEAST_ONLINE_SERVING_CONTAINE


# Get Feast Job Service container IP address
export FEAST_JOB_SERVICE_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_job-service_1)
export FEAST_JOB_SERVICE_CONTAINER_IP_ADDRESS=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' feast_jobservice_1)

# Wait for Feast Job Service to be ready
${PROJECT_ROOT_DIR}/infra/scripts/wait-for-it.sh ${FEAST_JOB_SERVICE_CONTAINER_IP_ADDRESS}:6568 --timeout=120
Expand Down
32 changes: 26 additions & 6 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ def get_historical_features(
>>> output_file_uri = feature_retrieval_job.get_output_file_uri()
"gs://some-bucket/output/
"""
project = project or FEAST_DEFAULT_OPTIONS[CONFIG_PROJECT_KEY]
feature_tables = self._get_feature_tables_from_feature_refs(
feature_refs, project
)
Expand Down Expand Up @@ -1001,7 +1002,12 @@ def get_historical_features(
)
else:
return start_historical_feature_retrieval_job(
self, entity_source, feature_tables, output_format, output_location,
client=self,
project=self.project or FEAST_DEFAULT_OPTIONS[CONFIG_PROJECT_KEY],
pyalex marked this conversation as resolved.
Show resolved Hide resolved
entity_source=entity_source,
feature_tables=feature_tables,
output_format=output_format,
output_path=output_location,
)

def get_historical_features_df(
Expand Down Expand Up @@ -1043,7 +1049,10 @@ def get_historical_features_df(
feature_refs, project
)
return start_historical_feature_retrieval_spark_session(
self, entity_source, feature_tables
client=self,
project=self.project or FEAST_DEFAULT_OPTIONS[CONFIG_PROJECT_KEY],
entity_source=entity_source,
feature_tables=feature_tables,
)

def _get_feature_tables_from_feature_refs(
Expand Down Expand Up @@ -1079,10 +1088,17 @@ def start_offline_to_online_ingestion(
:return: Spark Job Proxy object
"""
if not self._use_job_service:
return start_offline_to_online_ingestion(feature_table, start, end, self)
return start_offline_to_online_ingestion(
client=self,
project=self.project or FEAST_DEFAULT_OPTIONS[CONFIG_PROJECT_KEY],
feature_table=feature_table,
start=start,
end=end,
)
else:
request = StartOfflineToOnlineIngestionJobRequest(
project=self.project, table_name=feature_table.name,
project=self.project or FEAST_DEFAULT_OPTIONS[CONFIG_PROJECT_KEY],
table_name=feature_table.name,
)
request.start_date.FromDatetime(start)
request.end_date.FromDatetime(end)
Expand All @@ -1096,11 +1112,15 @@ def start_stream_to_online_ingestion(
) -> SparkJob:
if not self._use_job_service:
return start_stream_to_online_ingestion(
feature_table, extra_jars or [], self
client=self,
project=self.project or FEAST_DEFAULT_OPTIONS[CONFIG_PROJECT_KEY],
feature_table=feature_table,
extra_jars=extra_jars or [],
)
else:
request = StartStreamToOnlineIngestionJobRequest(
project=self.project, table_name=feature_table.name,
project=self.project or FEAST_DEFAULT_OPTIONS[CONFIG_PROJECT_KEY],
table_name=feature_table.name,
)
response = self._job_service.StartStreamToOnlineIngestionJob(request)
return RemoteStreamIngestionJob(
Expand Down
39 changes: 29 additions & 10 deletions sdk/python/feast/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
JobStatus,
JobType,
ListJobsResponse,
StartOfflineToOnlineIngestionJobRequest,
StartOfflineToOnlineIngestionJobResponse,
StartStreamToOnlineIngestionJobRequest,
StartStreamToOnlineIngestionJobResponse,
)
from feast.data_source import DataSource
Expand All @@ -28,7 +30,10 @@
StreamIngestionJob,
)
from feast.pyspark.launcher import (
get_job_by_id,
list_jobs,
start_historical_feature_retrieval_job,
start_offline_to_online_ingestion,
start_stream_to_online_ingestion,
)
from feast.third_party.grpc.health.v1 import HealthService_pb2_grpc
Expand Down Expand Up @@ -69,22 +74,27 @@ def _job_to_proto(self, spark_job: SparkJob) -> JobProto:

return job

def StartOfflineToOnlineIngestionJob(self, request, context):
def StartOfflineToOnlineIngestionJob(
self, request: StartOfflineToOnlineIngestionJobRequest, context
):
"""Start job to ingest data from offline store into online store"""
feature_table = self.client.get_feature_table(
request.table_name, request.project
)
job = self.client.start_offline_to_online_ingestion(
feature_table,
request.start_date.ToDatetime(),
request.end_date.ToDatetime(),
job = start_offline_to_online_ingestion(
client=self.client,
project=request.project,
feature_table=feature_table,
start=request.start_date.ToDatetime(),
end=request.end_date.ToDatetime(),
)
return StartOfflineToOnlineIngestionJobResponse(id=job.get_id())

def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
"""Produce a training dataset, return a job id that will provide a file reference"""
job = start_historical_feature_retrieval_job(
client=self.client,
project=request.project,
pyalex marked this conversation as resolved.
Show resolved Hide resolved
entity_source=DataSource.from_proto(request.entity_source),
feature_tables=self.client._get_feature_tables_from_feature_refs(
list(request.feature_refs), request.project
Expand All @@ -99,30 +109,39 @@ def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
id=job.get_id(), output_file_uri=output_file_uri
)

def StartStreamToOnlineIngestionJob(self, request, context):
def StartStreamToOnlineIngestionJob(
self, request: StartStreamToOnlineIngestionJobRequest, context
):
"""Start job to ingest data from stream into online store"""

feature_table = self.client.get_feature_table(
request.table_name, request.project
)
# TODO: add extra_jars to request
job = start_stream_to_online_ingestion(feature_table, [], self.client)
job = start_stream_to_online_ingestion(
client=self.client,
project=request.project,
feature_table=feature_table,
extra_jars=[],
)
return StartStreamToOnlineIngestionJobResponse(id=job.get_id())

def ListJobs(self, request, context):
"""List all types of jobs"""
jobs = self.client.list_jobs(include_terminated=request.include_terminated)
jobs = list_jobs(
include_terminated=request.include_terminated, client=self.client
)
return ListJobsResponse(jobs=[self._job_to_proto(job) for job in jobs])

def CancelJob(self, request, context):
"""Stop a single job"""
job = self.client.get_job_by_id(request.job_id)
job = get_job_by_id(request.job_id, client=self.client)
job.cancel()
return CancelJobResponse()

def GetJob(self, request, context):
"""Get details of a single job"""
job = self.client.get_job_by_id(request.job_id)
job = get_job_by_id(request.job_id, client=self.client)
return GetJobResponse(job=self._job_to_proto(job))


Expand Down
24 changes: 16 additions & 8 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ def _source_to_argument(source: DataSource):
raise NotImplementedError(f"Unsupported Datasource: {type(source)}")


def _feature_table_to_argument(client: "Client", feature_table: FeatureTable):
def _feature_table_to_argument(
client: "Client", project: str, feature_table: FeatureTable
):
return {
"features": [
{"name": f.name, "type": ValueType(f.dtype).name}
for f in feature_table.features
],
"project": "default",
"project": project,
"name": feature_table.name,
"entities": [
{"name": n, "type": client.get_entity(n).value_type}
Expand All @@ -143,6 +145,7 @@ def _feature_table_to_argument(client: "Client", feature_table: FeatureTable):

def start_historical_feature_retrieval_spark_session(
client: "Client",
project: str,
entity_source: Union[FileSource, BigQuerySource],
feature_tables: List[FeatureTable],
):
Expand All @@ -161,14 +164,15 @@ def start_historical_feature_retrieval_spark_session(
for feature_table in feature_tables
],
feature_tables_conf=[
_feature_table_to_argument(client, feature_table)
_feature_table_to_argument(client, project, feature_table)
for feature_table in feature_tables
],
)


def start_historical_feature_retrieval_job(
client: "Client",
project: str,
entity_source: Union[FileSource, BigQuerySource],
feature_tables: List[FeatureTable],
output_format: str,
Expand All @@ -187,7 +191,7 @@ def start_historical_feature_retrieval_job(
entity_source=_source_to_argument(entity_source),
feature_tables_sources=feature_sources,
feature_tables=[
_feature_table_to_argument(client, feature_table)
_feature_table_to_argument(client, project, feature_table)
for feature_table in feature_tables
],
destination={"format": output_format, "path": output_path},
Expand Down Expand Up @@ -225,7 +229,11 @@ def replace_bq_table_with_joined_view(


def start_offline_to_online_ingestion(
feature_table: FeatureTable, start: datetime, end: datetime, client: "Client"
client: "Client",
project: str,
feature_table: FeatureTable,
start: datetime,
end: datetime,
) -> BatchIngestionJob:

launcher = resolve_launcher(client._config)
Expand All @@ -234,7 +242,7 @@ def start_offline_to_online_ingestion(
BatchIngestionJobParameters(
jar=client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR),
source=_source_to_argument(feature_table.batch_source),
feature_table=_feature_table_to_argument(client, feature_table),
feature_table=_feature_table_to_argument(client, project, feature_table),
start=start,
end=end,
redis_host=client._config.get(CONFIG_REDIS_HOST),
Expand All @@ -249,7 +257,7 @@ def start_offline_to_online_ingestion(


def start_stream_to_online_ingestion(
feature_table: FeatureTable, extra_jars: List[str], client: "Client"
client: "Client", project: str, feature_table: FeatureTable, extra_jars: List[str]
) -> StreamIngestionJob:

launcher = resolve_launcher(client._config)
Expand All @@ -259,7 +267,7 @@ def start_stream_to_online_ingestion(
jar=client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR),
extra_jars=extra_jars,
source=_source_to_argument(feature_table.stream_source),
feature_table=_feature_table_to_argument(client, feature_table),
feature_table=_feature_table_to_argument(client, project, feature_table),
redis_host=client._config.get(CONFIG_REDIS_HOST),
redis_port=client._config.getint(CONFIG_REDIS_PORT),
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
Expand Down