Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flytekit Rust entrypoint #2307

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft

Flytekit Rust entrypoint #2307

wants to merge 7 commits into from

Conversation

kumare3
Copy link
Contributor

@kumare3 kumare3 commented Mar 29, 2024

Tracking issue

  • Faster performance - faster startup. In limited testing more than 3.8x speedup
  • Smaller footprint - our goal is to remove object-store dependencies and replace with rust
  • Also remove dependency on grpc in python and only replace with rust

This also introduces rust more wholistically in flytekit

 - Faster performance - faster startup. In limited testing more than 3.8x speedup
 - Smaller footprint - our goal is to remove object-store dependencies and replace with rust
 - Also remove dependency on grpc in python and only replace with rust

This also introduces rust more wholistically in flytekit

Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Mar 29, 2024
@kumare3 kumare3 marked this pull request as draft March 29, 2024 05:44
Copy link

codecov bot commented Mar 29, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 95.71%. Comparing base (d32ce8f) to head (d34ce56).
Report is 27 commits behind head on master.

❗ Current head d34ce56 differs from pull request most recent head 06752b3. Consider uploading reports for the commit 06752b3 to get more accurate results

Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2307       +/-   ##
===========================================
+ Coverage   75.95%   95.71%   +19.75%     
===========================================
  Files         181       19      -162     
  Lines       18295      560    -17735     
  Branches     3788        0     -3788     
===========================================
- Hits        13896      536    -13360     
+ Misses       3807       24     -3783     
+ Partials      592        0      -592     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>

#[logfn_inputs(Info, fmt = "Downloading distribution from {} to {}")]
#[logfn(ok = "INFO", err = "ERROR")]
pub async fn download_unarchive_distribution(src: &Url, dst: &String) -> Result<(), Box<dyn std::error::Error>> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using https://docs.rs/tracing/latest/tracing/ framework -- it's considered more modern and is more flexible in terms of what type of data can be collected and how it can be processed. If you'd like, I can share a minimal example of tracing setup for CLI output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vlad-ivanov-name can you check

Comment on lines 1 to 7
export PYTHONPATH=`pwd`:$PYTHONPATH

TESTDATA_PATH=`pwd`/testdata

../target/debug/flyrs --inputs ${TESTDATA_PATH}/inputs.pb --output-prefix ${TESTDATA_PATH} --raw-output-data-prefix ${TESTDATA_PATH} --dynamic-addl-distro file://${TESTDATA_PATH}/schedule.tar.gz --dynamic-dest-dir . --resolver "flytekit.core.python_auto_container.default_task_resolver" -- task-module schedule task-name say_hello

cmp testdata/outputs.pb testdata/expected_outputs.pb || echo -e "----------- Outputs file comparision failed! ----------"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider writing a test inside Rust code for this usecase instead. Rust has excellent testing support, so it's as simple as annotating a function.

https://doc.rust-lang.org/book/ch11-01-writing-tests.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm i also wanted to test the cli, but let me learn

src/executor.rs Outdated

impl Display for ExecutorArgs {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ExecutorArgs {{ inputs: {}, output_prefix: {}, test: {}, raw_output_data_prefix: {}, resolver: {}, resolver_args: {:?}, checkpoint_path: {:?}, prev_checkpoint: {:?}, dynamic_addl_distro: {:?}, dynamic_dest_dir: {:?} }}",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The notation you've described here is more or less what Debug derive outputs. So you can for example do write!("{:?}", self)

src/executor.rs Outdated

#[logfn_inputs(Info, fmt = "Invoking task with {}")]
#[logfn(ok = "INFO", err = "ERROR")]
pub async fn execute_task(args: &ExecutorArgs) -> Result<(), Box<dyn std::error::Error>>{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using https://docs.rs/anyhow/latest/anyhow/ or https://docs.rs/miette/latest/miette/ as error types: they provide useful facilities such as backtrace, additional error context, and user-friendly error messages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you

src/executor.rs Outdated
if executor_args.dynamic_dest_dir.is_none() {
return Err("Dynamic distro requires a destination directory".into());
}
let src_url = url::Url::parse(executor_args.dynamic_addl_distro.clone().unwrap().as_str())?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same about unwrap -- you can use a match expression

@kumare3
Copy link
Contributor Author

kumare3 commented Apr 4, 2024

@vlad-ivanov-name thank you for all the comments. As you can tell I am complete newbie and I will definitely try to adapt all your suggestions. If you think you can take over the PR and improve it - please feel free too :) I would love to hand it over

Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
@wild-endeavor wild-endeavor changed the title [wip] Introducing Flytekit Rust entrypoint [flyrs] Flytekit Rust entrypoint Apr 22, 2024
kumare3 and others added 3 commits April 25, 2024 10:36
Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
Co-authored-by: Ketan Umare <kumare3@users.noreply.github.com>
Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
Copy link

@vlad-ivanov-name vlad-ivanov-name left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 looks good overall, left a few comments

let store = store_box.0;

let src_path = Path::parse(src.path())?;
let tar_gz_stream = store.get(&src_path).await.unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably still want to use ? here:

Suggested change
let tar_gz_stream = store.get(&src_path).await.unwrap();
let tar_gz_stream = store.get(&src_path).await?;

let tar_gz_stream = store.get(&src_path).await.unwrap();

// TODO figure out how to stream unarchive the tar.gz file
let tar_gz_data = tar_gz_stream.bytes().await.unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Suggested change
let tar_gz_data = tar_gz_stream.bytes().await.unwrap();
let tar_gz_data = tar_gz_stream.bytes().await?;

Comment on lines +63 to +65
debug!("Python path: {:?}", path);
debug!("Python version: {:?}", version);
debug!("Python modules: {:?}", keys);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a better syntax you can use here

Suggested change
debug!("Python path: {:?}", path);
debug!("Python version: {:?}", version);
debug!("Python modules: {:?}", keys);
debug!(
path = ?path,
version = ?version,
modules = ?keys,
"debug python setup"
);

what this will achieve is structured tracing: for backends that support it, you won't just log a string but rather a map with fields. it's really useful when e.g. analyzing traces in cloud

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow you are indeed making me a better rust programmer - thank you appreciate it


fn debug_python_setup(py: Python) {
if tracing::enabled!(tracing::Level::DEBUG) {
let sys = PyModule::import_bound(py, "sys").unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of unwrap you can create another function that returns anyhow::Result, and here you can write something like

let if let Ok((path, version, keys)) = get_python_info() {
    // log it
} else {
    tracing::error!("failed to get python info")
}

pyo3::prepare_freethreaded_python();
let _ = Python::with_gil(|py| -> Result<()> {
debug_python_setup(py);
let entrypoint = PyModule::import_bound(py, "flytekit.bin.entrypoint").unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you already return a result ? would probably work instead of unwrap



#[tracing::instrument(err)]
pub async fn download_unarchive_distribution(src: &Url, dst: &String) -> Result<()> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the dst type should probably be &str

@kdubovikov
Copy link

@kumare3 hi. Wanted to check when this is planned to be merged?

@kumare3
Copy link
Contributor Author

kumare3 commented Aug 17, 2024

This needs more work, especially tooling so - not yet planned but definitely later this year

@kdubovikov
Copy link

kdubovikov commented Aug 19, 2024

This needs more work, especially tooling so - not yet planned but definitely later this year

@kumare3 is there anything that outside contributors can help with so that the release of this feature will be faster? I have some experience working with pyo3

@austin362667
Copy link
Collaborator

Hi @kdubovikov thanks for your interest. I'm an outside contributor too.

I'm working on removing dependencies over grpc and protobuf in python and only replace them with rust recently. You can check the ongoing works here for flyte and flytekit. It's almost done; it's not an easy task, though.

The Flytekit Rust entrypoint is also a challenging task with significant potential for performance gains. Perhaps we can figure out the blockers and share some pitfalls with each other. wdyt?

@kdubovikov
Copy link

@austin362667 , @kumare3, hi. Just a surface-level thought on the overall design. So, Flyte uses GRPC as a default transport, most likely due to being written in Go. Then, we inherited this transport in Python Entrypoint and flytekit. Afterward, community has realized that GRPC clients in Python is bound to some performance and stability issues in our specific use cases. So, the proposed solution is to create native GRPC clients in Rust and wrap them as Python packages with PyO3. Am I correct with this chain of reasoning?

@austin362667 By inspecting your PRs I see what you say when you mention that the task is not easy. You seem to employ quite deep workarounds for solving ref outliving.

If I am, then I wanted to ask if any alternative and probably simpler approaches were considered:

  • What if Flyte API had a set of RESTful ports/endpoints in addition to the GRPC ones? Would it remove the performance issues in the Entrypoint? Do we know if the primary contributor to the slowness of the entrypoint is the GRPC client? I did not see much CPU-bound code in there. This approach would also remove GRPC dependencies from flytekit and make it more lightweight. asyncio could be used for communication to resolve any performance issues, and if the endpoint was over compressed HTTP/2 I suppose that the difference between GRPC and REST would be minimal. Just looking at the efforts needed to introduce Rust to the project along with additional GRPC client packaging using PyO3, I am wondering if simpler alternatives are viable? Are there any significant drawbacks that you see?
  • Was it considered encapsulating all communication with Flyte APIs into a sidecar, making Task pods lighter. There would be no need for entrypoint this way, or at least it would be a minimal setup script.

I am not saying that the current solution is bad in any way, just asking questions from a fresh perspective.

@austin362667
Copy link
Collaborator

austin362667 commented Aug 20, 2024

@kdubovikov Exactly! That's sort of the whole context and what we are doing now.

@kumare3 EDIT: Grpc is not used because of golang, but because of it being way faster than rest and type safety.

Actually, I think you're almost right about your suggestion.

Refactoring gRPC into RESTful is another huge story, too. And, yes, it indeed mitigates the dependencies over Python grpcio and protobuf.

Let's say if we want to stay in gRPC architecture. It's true the transport layer is not CPU-bounded, and I've already enabling non-blocking network by leveraging asyncio during this run / register asynchronously. So performance is not an issue or bottleneck any more. Another reason to introduce Rust in the Flytekit remote client is to reduce the wheel size, thereby decreasing the Docker image pull wait time for Flytekit when starting a pod. Am I missing anything? @pingsutw

IMO, a better use case for introducing Rust is rewriting the entry point, just like what @kumare3 is trying here. Refactor this component might have much more potential for performance gains.

@kumare3
Copy link
Contributor Author

kumare3 commented Aug 20, 2024

Grpc is not used because of golang, but because of it being way faster than rest and type safety. Problem has been Grpc python support by Google has been not great. All the top projects in the world today use Grpc and I still prefer it. We are looking migrating to an http2 compliant implementation of Grpc - connect!

@austin362667
Copy link
Collaborator

austin362667 commented Aug 20, 2024

Yeh, no doubt speed and type safety are the two most important reasons to use gRPC!

  1. Was it considered encapsulating all communication with Flyte APIs into a sidecar, making Task pods lighter. There would be no need for entrypoint this way, or at least it would be a minimal setup script.

I'm not sure on this one. Can you elaborate more? @kdubovikov
And wdyt? @MortalHappiness

@kdubovikov
Copy link

Grpc is not used because of golang, but because of it being way faster than rest and type safety. Problem has been Grpc python support by Google has been not great. All the top projects in the world today use Grpc and I still prefer it. We are looking migrating to an http2 compliant implementation of Grpc - connect!

@kumare3 understood. I can see reasoning behind this. I prefer strong typing in remote calls as well. However, if Python is the primary client language and GRPC support there is not great, is it a solid reason for introducing another language (and a quite tricky one) to the project?

In case of the Endpoint, why not implement it in Go, for example? Is it a drastic difference between calling the Task function from Go, compared to using PyO3 in Rust? If Go was used, probably we would achieve the same goals along with reducing the maintainability pressure by introducing a new language to the project.

And feel free to tell me if I am not making much sense, I will stop debating on this particular point :)

Another reason to introduce Rust in the Flytekit remote client is to reduce the wheel size, thereby decreasing the Docker image pull wait time for Flytekit when starting a pod.

@austin362667 this is a good point indeed. Do we know by how much the wheel size is reduced?

@MortalHappiness
Copy link
Member

MortalHappiness commented Aug 20, 2024

  1. Was it considered encapsulating all communication with Flyte APIs into a sidecar, making Task pods lighter. There would be no need for entrypoint this way, or at least it would be a minimal setup script.

@kdubovikov @austin362667

Although I do not have much context with the Rust gRPC implementation, from a K8s perspective, I do not think this is correct. First off, sidecar container also run in the same Pod with the main container, so the task pods will not be lighter. Besides, I don't think it is easier to implement it as a sidecar container. As far as I know, the main communication ways between sidecar and main container is via network or shared volumes, so the sidecar container must be either consistently watch for file system changes or serves as a translator between the main container and the Flyte API, by capturing the network requests from the main container and translate it to gRPC. In either way the implementation will not be easier.

Feel free to tell me if you have a better implementation for the sidecar pattern. Maybe I misunderstand something.

@kdubovikov
Copy link

kdubovikov commented Aug 20, 2024

  1. Was it considered encapsulating all communication with Flyte APIs into a sidecar, making Task pods lighter. There would be no need for entrypoint this way, or at least it would be a minimal setup script.

@kdubovikov @austin362667

Although I do not have much context with the Rust gRPC implementation, from a K8s perspective, I do not think this is correct. First off, sidecar container also run in the same Pod with the main container, so the task pods will not be lighter. Besides, I don't think it is easier to implement it as a sidecar container. As far as I know, the main communication ways between sidecar and main container is via network or shared volumes, so the sidecar container must be either consistently watch for file system changes or serves as a translator between the main container and the Flyte API, by capturing the network requests from the main container and translate it to gRPC. In either way the implementation will not be easier.

Feel free to tell me if you have a better implementation for the sidecar pattern. Maybe I misunderstand something.

@MortalHappiness I meant that the Task container will get lighter and possibly won't need extra GRPC dependencies, and the sidecar can act as a bridge and use any more efficient implementation that we consider to be faster. This way the task container won't need much except from the user code itself, and sidecar would take care of handling all of the platform functionality. However, I agree that adding a sidecar is not contributing much to the entrypoint performance by itself, and the problem of communication between sidecar and task containers still must be solved. The key to this issue is optimizing the entrypoint cold start time, and sidecar is more about decoupling one from another. So yes, let's dismiss the sidecar thread for now.

It's just that using PyO3 and Rust as a workaround for standard Google GRPC clients in Python is not looking as a very common or widely accepted pattern that I have seen in other codebases (especially those that are not primarily in Rust), so I ask if everyone involved are sure that introducing Rust to the project is the best possible solution considering all tradeoffs. Maybe I am not aware of other roadmap points that will broaden the usage of Rust for different use cases as well. Again, not saying that using Rust for creating Python GRPC client packages or entrypoint script is wrong in any way, just mentioning that it adds a whole new layer of maintainability in the long term.

The second usecase is to use Rust for the Entrypoint script, but I am not entirely sure if it will be much different than using Go for the same: run task setup code and then run and monitor the Python process, communicating back to Flyte when neccessary.

In short

  • Does Flyte has far-reaching plans to use Rust in the future for any other cases than Entrypoint?
  • If no, why won't we write entrypoint in Go, which is already the primary language of the platform and should give us performance benefits as well.

** Update **
I see the benefit of using PyO3 in the following

  • It embeds Python in the same process
  • No IPC is needed (no pipes / sockets)
  • Much more flexible in back-and-forth communication between Rust code and Python code

Go entrypoint benefits:

  • Does not introduce a new language to the project, better maintanability
  • Most likely will be almost as fast as Rust, but this needs checking
  • Needs IPC between Python and Endpoint processes, but the only things I am aware about are sending the startup arguments to the task and checking for exceptions or errors during execution, which is not much

@kumare3
Copy link
Contributor Author

kumare3 commented Aug 20, 2024

@kdubovikov grpc is not used at runtime. This communication happens through blob store.
The decision on a new language is dependent on what we can achieve with the new language.
The reason for rust is the amazing benefit of interoperability with many different languages. Goal is to build a common rust runtime that allows any language sdk.

As for the sidecar pattern - it is already supported in Flyte through raw container tasks. There are many disadvantages with it too. Check it out

@kdubovikov
Copy link

@kdubovikov grpc is not used at runtime. This communication happens through blob store. The decision on a new language is dependent on what we can achieve with the new language. The reason for rust is the amazing benefit of interoperability with many different languages. Goal is to build a common rust runtime that allows any language sdk.

As for the sidecar pattern - it is already supported in Flyte through raw container tasks. There are many disadvantages with it too. Check it out

Thanks for the context, @kumare3 . Go would not solve the interoperability for sure, if that's our goal. Then Rust it is 🦀 , closing this discussion.

My first question remains: is there anything that can be done by outside contributors to speed up the release of this feature?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L This PR changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants