Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional IngestionJob parameters passed by Spark Launcher #1130

Merged
merged 22 commits into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
optional job parameters
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Nov 4, 2020
commit 6c7a4041601f319516b016759f7c8b97c5bbdcb4
2 changes: 1 addition & 1 deletion infra/scripts/test-docker-compose.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ docker exec \
-e DISABLE_SERVICE_FIXTURES=true \
-e DISABLE_FEAST_SERVICE_FIXTURES=true \
feast_jupyter_1 bash \
-c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ -m "not bq" --ingestion-jar gs://feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --kafka-brokers kafka:9092'
-c 'cd /feast/tests && python -m pip install -r requirements.txt && pytest e2e/ -m "not bq" --ingestion-jar https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-${FEAST_VERSION}.jar --redis-url redis:6379 --core-url core:6565 --serving-url online_serving:6566 --kafka-brokers kafka:9092'
9 changes: 8 additions & 1 deletion sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ class AuthProvider(Enum):
CONFIG_REDIS_PORT = "redis_port"
CONFIG_REDIS_SSL = "redis_ssl"

CONFIG_STATSD_HOST = "statsd_host"
CONFIG_STATSD_PORT = "statsd_port"

CONFIG_DEADLETTER_PATH = "deadletter_path"
CONFIG_STENCIL_URL = "stencil_url"

CONFIG_SPARK_EMR_REGION = "emr_region"
CONFIG_SPARK_EMR_CLUSTER_ID = "emr_cluster_id"
CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH = "emr_cluster_template_path"
Expand Down Expand Up @@ -129,7 +135,8 @@ class AuthProvider(Enum):
# Authentication Provider - Google OpenID/OAuth
CONFIG_AUTH_PROVIDER: "google",
CONFIG_SPARK_LAUNCHER: "dataproc",
CONFIG_SPARK_INGESTION_JOB_JAR: "gs://feast-jobs/spark/ingestion/feast-ingestion-spark-develop.jar",
CONFIG_SPARK_INGESTION_JOB_JAR: "https://storage.googleapis.com/feast-jobs/spark/"
"ingestion/feast-ingestion-spark-develop.jar",
CONFIG_SPARK_STANDALONE_MASTER: "local[*]",
CONFIG_REDIS_HOST: "localhost",
CONFIG_REDIS_PORT: "6379",
Expand Down
140 changes: 95 additions & 45 deletions sdk/python/feast/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,39 +290,41 @@ def get_output_file_uri(self, timeout_sec=None, block=True):
raise NotImplementedError


class BatchIngestionJobParameters(SparkJobParameters):
class IngestionJobParameters(SparkJobParameters):
def __init__(
self,
feature_table: Dict,
source: Dict,
start: datetime,
end: datetime,
jar: str,
redis_host: str,
redis_port: int,
redis_ssl: bool,
statsd_host: Optional[str] = None,
statsd_port: Optional[int] = None,
deadletter_path: Optional[str] = None,
stencil_url: Optional[str] = None,
):
self._feature_table = feature_table
self._source = source
self._start = start
self._end = end
self._jar = jar
self._redis_host = redis_host
self._redis_port = redis_port
self._redis_ssl = redis_ssl

def get_name(self) -> str:
return (
f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}-"
f"{self._start.strftime('%Y-%m-%d')}-{self._end.strftime('%Y-%m-%d')}"
)

def get_job_type(self) -> SparkJobType:
return SparkJobType.BATCH_INGESTION
self._statsd_host = statsd_host
self._statsd_port = statsd_port
self._deadletter_path = deadletter_path
self._stencil_url = stencil_url

def _get_redis_config(self):
return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl)

def _get_statsd_config(self):
return (
dict(host=self._statsd_host, port=self._statsd_port)
if self._statsd_host
else None
)

def get_feature_table_name(self) -> str:
return self._feature_table["name"]

Expand All @@ -333,23 +335,79 @@ def get_class_name(self) -> Optional[str]:
return "feast.ingestion.IngestionJob"

def get_arguments(self) -> List[str]:
return [
"--mode",
"offline",
args = [
"--feature-table",
json.dumps(self._feature_table),
"--source",
json.dumps(self._source),
"--redis",
json.dumps(self._get_redis_config()),
]

if self._get_statsd_config():
args.extend(["--statsd", json.dumps(self._get_statsd_config())])

if self._deadletter_path:
args.extend(["--deadletter-path", self._deadletter_path])

if self._stencil_url:
args.extend(["--stencil-url", self._stencil_url])

return args


class BatchIngestionJobParameters(IngestionJobParameters):
def __init__(
self,
feature_table: Dict,
source: Dict,
start: datetime,
end: datetime,
jar: str,
redis_host: str,
redis_port: int,
redis_ssl: bool,
statsd_host: Optional[str] = None,
statsd_port: Optional[int] = None,
deadletter_path: Optional[str] = None,
stencil_url: Optional[str] = None,
):
super().__init__(
feature_table,
source,
jar,
redis_host,
redis_port,
redis_ssl,
statsd_host,
statsd_port,
deadletter_path,
stencil_url,
)
self._start = start
self._end = end

def get_name(self) -> str:
return (
f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}-"
f"{self._start.strftime('%Y-%m-%d')}-{self._end.strftime('%Y-%m-%d')}"
)

def get_job_type(self) -> SparkJobType:
return SparkJobType.BATCH_INGESTION

def get_arguments(self) -> List[str]:
return super().get_arguments() + [
"--mode",
"offline",
"--start",
self._start.strftime("%Y-%m-%dT%H:%M:%S"),
"--end",
self._end.strftime("%Y-%m-%dT%H:%M:%S"),
"--redis",
json.dumps(self._get_redis_config()),
]


class StreamIngestionJobParameters(SparkJobParameters):
class StreamIngestionJobParameters(IngestionJobParameters):
def __init__(
self,
feature_table: Dict,
Expand All @@ -359,46 +417,38 @@ def __init__(
redis_host: str,
redis_port: int,
redis_ssl: bool,
statsd_host: Optional[str] = None,
statsd_port: Optional[int] = None,
deadletter_path: Optional[str] = None,
stencil_url: Optional[str] = None,
):
self._feature_table = feature_table
self._source = source
self._jar = jar
super().__init__(
feature_table,
source,
jar,
redis_host,
redis_port,
redis_ssl,
statsd_host,
statsd_port,
deadletter_path,
stencil_url,
)
self._extra_jars = extra_jars
self._redis_host = redis_host
self._redis_port = redis_port
self._redis_ssl = redis_ssl

def get_name(self) -> str:
return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}"

def get_job_type(self) -> SparkJobType:
return SparkJobType.STREAM_INGESTION

def _get_redis_config(self):
return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl)

def get_feature_table_name(self) -> str:
return self._feature_table["name"]

def get_main_file_path(self) -> str:
return self._jar

def get_extra_jar_paths(self) -> List[str]:
return self._extra_jars

def get_class_name(self) -> Optional[str]:
return "feast.ingestion.IngestionJob"

def get_arguments(self) -> List[str]:
return [
return super().get_arguments() + [
"--mode",
"online",
"--feature-table",
json.dumps(self._feature_table),
"--source",
json.dumps(self._source),
"--redis",
json.dumps(self._get_redis_config()),
]


Expand Down
35 changes: 14 additions & 21 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import shutil
import tempfile
from datetime import datetime
from typing import TYPE_CHECKING, List, Union
from urllib.parse import urlparse

from feast.config import Config
from feast.constants import (
CONFIG_DEADLETTER_PATH,
CONFIG_REDIS_HOST,
CONFIG_REDIS_PORT,
CONFIG_REDIS_SSL,
Expand All @@ -21,6 +19,9 @@
CONFIG_SPARK_LAUNCHER,
CONFIG_SPARK_STAGING_LOCATION,
CONFIG_SPARK_STANDALONE_MASTER,
CONFIG_STATSD_HOST,
CONFIG_STATSD_PORT,
CONFIG_STENCIL_URL,
)
from feast.data_source import BigQuerySource, DataSource, FileSource, KafkaSource
from feast.feature_table import FeatureTable
Expand All @@ -35,7 +36,6 @@
StreamIngestionJobParameters,
)
from feast.staging.entities import create_bq_view_of_joined_features_and_entities
from feast.staging.storage_client import get_staging_client
from feast.value_type import ValueType

if TYPE_CHECKING:
Expand Down Expand Up @@ -224,36 +224,26 @@ def replace_bq_table_with_joined_view(
)


def _download_jar(remote_jar: str) -> str:
remote_jar_parts = urlparse(remote_jar)

local_temp_jar = tempfile.NamedTemporaryFile(suffix=".jar", delete=False)
with local_temp_jar:
shutil.copyfileobj(
get_staging_client(remote_jar_parts.scheme).download_file(remote_jar_parts),
local_temp_jar,
)

return local_temp_jar.name


def start_offline_to_online_ingestion(
feature_table: FeatureTable, start: datetime, end: datetime, client: "Client"
) -> BatchIngestionJob:

launcher = resolve_launcher(client._config)
local_jar_path = _download_jar(client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR))

return launcher.offline_to_online_ingestion(
BatchIngestionJobParameters(
jar=local_jar_path,
jar=client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR),
source=_source_to_argument(feature_table.batch_source),
feature_table=_feature_table_to_argument(client, feature_table),
start=start,
end=end,
redis_host=client._config.get(CONFIG_REDIS_HOST),
redis_port=client._config.getint(CONFIG_REDIS_PORT),
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
statsd_host=client._config.get(CONFIG_STATSD_HOST),
statsd_port=client._config.getint(CONFIG_STATSD_PORT),
deadletter_path=client._config.get(CONFIG_DEADLETTER_PATH),
stencil_url=client._config.get(CONFIG_STENCIL_URL),
)
)

Expand All @@ -263,17 +253,20 @@ def start_stream_to_online_ingestion(
) -> StreamIngestionJob:

launcher = resolve_launcher(client._config)
local_jar_path = _download_jar(client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR))

return launcher.start_stream_to_online_ingestion(
StreamIngestionJobParameters(
jar=local_jar_path,
jar=client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR),
extra_jars=extra_jars,
source=_source_to_argument(feature_table.stream_source),
feature_table=_feature_table_to_argument(client, feature_table),
redis_host=client._config.get(CONFIG_REDIS_HOST),
redis_port=client._config.getint(CONFIG_REDIS_PORT),
redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL),
statsd_host=client._config.get(CONFIG_STATSD_HOST),
statsd_port=client._config.getint(CONFIG_STATSD_PORT),
deadletter_path=client._config.get(CONFIG_DEADLETTER_PATH),
stencil_url=client._config.get(CONFIG_STENCIL_URL),
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ trait BasePipeline {
case None => ()
}

jobConfig.stencilURL match {
case Some(url: String) =>
conf
.set("feast.ingestion.registry.proto.kind", "local")
.set("feast.ingestion.registry.proto.url", url)
case None => ()
}

SparkSession
.builder()
.config(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ object IngestionJob {
opt[String](name = "deadletter-path")
.action((x, c) => c.copy(deadLetterPath = Some(x)))

opt[String](name = "stencil-url")
.action((x, c) => c.copy(stencilURL = Some(x)))
}

def main(args: Array[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,6 @@ case class IngestionJobConfig(
endTime: DateTime = DateTime.now(),
store: StoreConfig = RedisConfig("localhost", 6379, false),
metrics: Option[MetricConfig] = None,
deadLetterPath: Option[String] = None
deadLetterPath: Option[String] = None,
stencilURL: Option[String] = None
)
3 changes: 0 additions & 3 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
def pytest_addoption(parser):
parser.addoption("--core-url", action="store", default="localhost:6565")
parser.addoption("--serving-url", action="store", default="localhost:6566")
parser.addoption(
"--gcs_path", action="store", default="gs://feast-templocation-kf-feast/"
)
parser.addoption("--kafka-brokers", action="store", default="localhost:9092")

parser.addoption("--env", action="store", help="local|aws|gcloud", default="local")
Expand Down