-
Notifications
You must be signed in to change notification settings - Fork 297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flytekit Rust entrypoint #2307
base: master
Are you sure you want to change the base?
Flytekit Rust entrypoint #2307
Conversation
- Faster performance - faster startup. In limited testing more than 3.8x speedup - Smaller footprint - our goal is to remove object-store dependencies and replace with rust - Also remove dependency on grpc in python and only replace with rust This also introduces rust more wholistically in flytekit Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #2307 +/- ##
===========================================
+ Coverage 75.95% 95.71% +19.75%
===========================================
Files 181 19 -162
Lines 18295 560 -17735
Branches 3788 0 -3788
===========================================
- Hits 13896 536 -13360
+ Misses 3807 24 -3783
+ Partials 592 0 -592 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
src/distribution.rs
Outdated
|
||
#[logfn_inputs(Info, fmt = "Downloading distribution from {} to {}")] | ||
#[logfn(ok = "INFO", err = "ERROR")] | ||
pub async fn download_unarchive_distribution(src: &Url, dst: &String) -> Result<(), Box<dyn std::error::Error>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using https://docs.rs/tracing/latest/tracing/ framework -- it's considered more modern and is more flexible in terms of what type of data can be collected and how it can be processed. If you'd like, I can share a minimal example of tracing setup for CLI output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vlad-ivanov-name can you check
flyrs_test/test.sh
Outdated
export PYTHONPATH=`pwd`:$PYTHONPATH | ||
|
||
TESTDATA_PATH=`pwd`/testdata | ||
|
||
../target/debug/flyrs --inputs ${TESTDATA_PATH}/inputs.pb --output-prefix ${TESTDATA_PATH} --raw-output-data-prefix ${TESTDATA_PATH} --dynamic-addl-distro file://${TESTDATA_PATH}/schedule.tar.gz --dynamic-dest-dir . --resolver "flytekit.core.python_auto_container.default_task_resolver" -- task-module schedule task-name say_hello | ||
|
||
cmp testdata/outputs.pb testdata/expected_outputs.pb || echo -e "----------- Outputs file comparision failed! ----------" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider writing a test inside Rust code for this usecase instead. Rust has excellent testing support, so it's as simple as annotating a function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm i also wanted to test the cli, but let me learn
src/executor.rs
Outdated
|
||
impl Display for ExecutorArgs { | ||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||
write!(f, "ExecutorArgs {{ inputs: {}, output_prefix: {}, test: {}, raw_output_data_prefix: {}, resolver: {}, resolver_args: {:?}, checkpoint_path: {:?}, prev_checkpoint: {:?}, dynamic_addl_distro: {:?}, dynamic_dest_dir: {:?} }}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The notation you've described here is more or less what Debug derive outputs. So you can for example do write!("{:?}", self)
src/executor.rs
Outdated
|
||
#[logfn_inputs(Info, fmt = "Invoking task with {}")] | ||
#[logfn(ok = "INFO", err = "ERROR")] | ||
pub async fn execute_task(args: &ExecutorArgs) -> Result<(), Box<dyn std::error::Error>>{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider using https://docs.rs/anyhow/latest/anyhow/ or https://docs.rs/miette/latest/miette/ as error types: they provide useful facilities such as backtrace, additional error context, and user-friendly error messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you
src/executor.rs
Outdated
if executor_args.dynamic_dest_dir.is_none() { | ||
return Err("Dynamic distro requires a destination directory".into()); | ||
} | ||
let src_url = url::Url::parse(executor_args.dynamic_addl_distro.clone().unwrap().as_str())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same about unwrap -- you can use a match expression
@vlad-ivanov-name thank you for all the comments. As you can tell I am complete newbie and I will definitely try to adapt all your suggestions. If you think you can take over the PR and improve it - please feel free too :) I would love to hand it over |
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com> Co-authored-by: Ketan Umare <kumare3@users.noreply.github.com>
Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 looks good overall, left a few comments
let store = store_box.0; | ||
|
||
let src_path = Path::parse(src.path())?; | ||
let tar_gz_stream = store.get(&src_path).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you probably still want to use ?
here:
let tar_gz_stream = store.get(&src_path).await.unwrap(); | |
let tar_gz_stream = store.get(&src_path).await?; |
let tar_gz_stream = store.get(&src_path).await.unwrap(); | ||
|
||
// TODO figure out how to stream unarchive the tar.gz file | ||
let tar_gz_data = tar_gz_stream.bytes().await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
let tar_gz_data = tar_gz_stream.bytes().await.unwrap(); | |
let tar_gz_data = tar_gz_stream.bytes().await?; |
debug!("Python path: {:?}", path); | ||
debug!("Python version: {:?}", version); | ||
debug!("Python modules: {:?}", keys); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a better syntax you can use here
debug!("Python path: {:?}", path); | |
debug!("Python version: {:?}", version); | |
debug!("Python modules: {:?}", keys); | |
debug!( | |
path = ?path, | |
version = ?version, | |
modules = ?keys, | |
"debug python setup" | |
); |
what this will achieve is structured tracing: for backends that support it, you won't just log a string but rather a map with fields. it's really useful when e.g. analyzing traces in cloud
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow you are indeed making me a better rust programmer - thank you appreciate it
|
||
fn debug_python_setup(py: Python) { | ||
if tracing::enabled!(tracing::Level::DEBUG) { | ||
let sys = PyModule::import_bound(py, "sys").unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of unwrap you can create another function that returns anyhow::Result, and here you can write something like
let if let Ok((path, version, keys)) = get_python_info() {
// log it
} else {
tracing::error!("failed to get python info")
}
pyo3::prepare_freethreaded_python(); | ||
let _ = Python::with_gil(|py| -> Result<()> { | ||
debug_python_setup(py); | ||
let entrypoint = PyModule::import_bound(py, "flytekit.bin.entrypoint").unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you already return a result ?
would probably work instead of unwrap
|
||
|
||
#[tracing::instrument(err)] | ||
pub async fn download_unarchive_distribution(src: &Url, dst: &String) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the dst
type should probably be &str
@kumare3 hi. Wanted to check when this is planned to be merged? |
This needs more work, especially tooling so - not yet planned but definitely later this year |
@kumare3 is there anything that outside contributors can help with so that the release of this feature will be faster? I have some experience working with pyo3 |
Hi @kdubovikov thanks for your interest. I'm an outside contributor too. I'm working on removing dependencies over grpc and protobuf in python and only replace them with rust recently. You can check the ongoing works here for flyte and flytekit. It's almost done; it's not an easy task, though. The Flytekit Rust entrypoint is also a challenging task with significant potential for performance gains. Perhaps we can figure out the blockers and share some pitfalls with each other. wdyt? |
@austin362667 , @kumare3, hi. Just a surface-level thought on the overall design. So, Flyte uses GRPC as a default transport, most likely due to being written in Go. Then, we inherited this transport in Python Entrypoint and flytekit. Afterward, community has realized that GRPC clients in Python is bound to some performance and stability issues in our specific use cases. So, the proposed solution is to create native GRPC clients in Rust and wrap them as Python packages with PyO3. Am I correct with this chain of reasoning? @austin362667 By inspecting your PRs I see what you say when you mention that the task is not easy. You seem to employ quite deep workarounds for solving ref outliving. If I am, then I wanted to ask if any alternative and probably simpler approaches were considered:
I am not saying that the current solution is bad in any way, just asking questions from a fresh perspective. |
@kdubovikov Exactly! That's sort of the whole context and what we are doing now.
Actually, I think you're almost right about your suggestion. Refactoring gRPC into RESTful is another huge story, too. And, yes, it indeed mitigates the dependencies over Python Let's say if we want to stay in gRPC architecture. It's true the transport layer is not CPU-bounded, and I've already enabling non-blocking network by leveraging IMO, a better use case for introducing Rust is rewriting the entry point, just like what @kumare3 is trying here. Refactor this component might have much more potential for performance gains. |
Grpc is not used because of golang, but because of it being way faster than rest and type safety. Problem has been Grpc python support by Google has been not great. All the top projects in the world today use Grpc and I still prefer it. We are looking migrating to an http2 compliant implementation of Grpc - connect! |
Yeh, no doubt speed and type safety are the two most important reasons to use gRPC!
I'm not sure on this one. Can you elaborate more? @kdubovikov |
@kumare3 understood. I can see reasoning behind this. I prefer strong typing in remote calls as well. However, if Python is the primary client language and GRPC support there is not great, is it a solid reason for introducing another language (and a quite tricky one) to the project? In case of the Endpoint, why not implement it in Go, for example? Is it a drastic difference between calling the Task function from Go, compared to using PyO3 in Rust? If Go was used, probably we would achieve the same goals along with reducing the maintainability pressure by introducing a new language to the project. And feel free to tell me if I am not making much sense, I will stop debating on this particular point :)
@austin362667 this is a good point indeed. Do we know by how much the wheel size is reduced? |
Although I do not have much context with the Rust gRPC implementation, from a K8s perspective, I do not think this is correct. First off, sidecar container also run in the same Pod with the main container, so the task pods will not be lighter. Besides, I don't think it is easier to implement it as a sidecar container. As far as I know, the main communication ways between sidecar and main container is via network or shared volumes, so the sidecar container must be either consistently watch for file system changes or serves as a translator between the main container and the Flyte API, by capturing the network requests from the main container and translate it to gRPC. In either way the implementation will not be easier. Feel free to tell me if you have a better implementation for the sidecar pattern. Maybe I misunderstand something. |
@MortalHappiness I meant that the Task container will get lighter and possibly won't need extra GRPC dependencies, and the sidecar can act as a bridge and use any more efficient implementation that we consider to be faster. This way the task container won't need much except from the user code itself, and sidecar would take care of handling all of the platform functionality. However, I agree that adding a sidecar is not contributing much to the entrypoint performance by itself, and the problem of communication between sidecar and task containers still must be solved. The key to this issue is optimizing the entrypoint cold start time, and sidecar is more about decoupling one from another. So yes, let's dismiss the sidecar thread for now. It's just that using PyO3 and Rust as a workaround for standard Google GRPC clients in Python is not looking as a very common or widely accepted pattern that I have seen in other codebases (especially those that are not primarily in Rust), so I ask if everyone involved are sure that introducing Rust to the project is the best possible solution considering all tradeoffs. Maybe I am not aware of other roadmap points that will broaden the usage of Rust for different use cases as well. Again, not saying that using Rust for creating Python GRPC client packages or entrypoint script is wrong in any way, just mentioning that it adds a whole new layer of maintainability in the long term. The second usecase is to use Rust for the Entrypoint script, but I am not entirely sure if it will be much different than using Go for the same: run task setup code and then run and monitor the Python process, communicating back to Flyte when neccessary. In short
** Update **
|
@kdubovikov grpc is not used at runtime. This communication happens through blob store. As for the sidecar pattern - it is already supported in Flyte through raw container tasks. There are many disadvantages with it too. Check it out |
Thanks for the context, @kumare3 . My first question remains: is there anything that can be done by outside contributors to speed up the release of this feature? |
Tracking issue
This also introduces rust more wholistically in flytekit