-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python UDF in Ingestion being used for feature validation #1234
Conversation
/test test-end-to-end |
/test test-end-to-end-gcp |
/test test-end-to-end-gcp |
/test test-end-to-end-aws |
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
@@ -332,6 +332,7 @@ def _stream_ingestion_step( | |||
], | |||
"Args": ["spark-submit", "--class", "feast.ingestion.IngestionJob"] | |||
+ jars_args | |||
+ ["--conf", "spark.yarn.isPython=true"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any downside of doing this? Do you know why it isn't it always set it to true in Spark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's yarn specific and from what I found it's used mostly to enable distribution of python related stuff (like pyspark.zip) to yarn workers. It's being set by spark-submit
when main file is py-file, which is not the case for our IngestionJob.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Maybe add a comment there for the future generations?
spark/ingestion/src/main/scala/feast/ingestion/sources/bq/BigQueryReader.scala
Outdated
Show resolved
Hide resolved
@@ -45,19 +45,19 @@ lint-java: | |||
${MVN} --no-transfer-progress spotless:check | |||
|
|||
test-java: | |||
${MVN} --no-transfer-progress test | |||
${MVN} --no-transfer-progress -DskipITs=true test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? (i'm not super familiar with our java test machinery, does it now skip some tests it didn't skip before?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TLDR: mvn test runs only unit tests, everything else should be skipped
we have two toggles skipITs
and skipUTs
- means skip Integration Tests
and skip Unit Tests
, we need them to run thoses test suites separately. Before there was no need in skipITs
, since in maven pipeline test
is before verify
(used for IT), but in spark part we have some additional steps (generate-test-source
phase) that are required only by integration tests, and don't needed by unit tests (see spark/ingestion/pom.xml). To skip those steps I added this flag here.
from feast import Client, FeatureTable | ||
|
||
|
||
GE_PACKED_ARCHIVE = "https://storage.googleapis.com/feast-jobs/spark/validation/pylibs-ge-%(platform)s.tar.gz" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this work from distribution perspective? I worry that if this is not in any way tied to Feast version, we can't upgrade GE without breaking older versions of ingestion job.
Maybe we don't have to address this now, especially while this is contrib, but sometime down the road we probably need to pin the version of this tarball to a Feast version somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that smells. But since feature is experimental I think it's ok for now.
As an option for future - we could put this archive inside ingestion jar or docker image (jobservice).
Co-authored-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: khorshuheng, oavdeev, pyalex The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What this PR does / why we need it:
This PR introduces experimental feature that enables using custom python code inside ingestion job. See #1230 for motivation.
Technical details:
_validationUDF
label and use provided code for feature validation. The code is being called right after reading data from source and before writing it to store. IngestionJob can optionally (set by flag) drop rows that do not pass validation.Limitations:
Since python pickle happens on customer's machine and unpickle on Spark worker, there might be issues related to incompatibility of pickle protocols even customer's and worker's python versions are different. Ideally to avoid that the minor part of python versions should match (3.7, eg). However, it was confirmed by tests that lower version on SDK is fine (3.6 on SDK, 3.7 on worker), but not other way.
Which issue(s) this PR fixes:
Fixes #1230
Does this PR introduce a user-facing change?: