Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix task linking #15418

Merged
merged 1 commit into from
Sep 18, 2024
Merged

Fix task linking #15418

merged 1 commit into from
Sep 18, 2024

Conversation

desertaxle
Copy link
Member

@desertaxle desertaxle commented Sep 18, 2024

Closes #15391

In #14651 we started tracking task run states with a WeakValueDictionary to avoid memory bloat in flow with many tasks. Using a WeakValueDictionary can lead task run states to be garbage collected too soon which remove the information necessary to consistently show data dependencies between tasks.

This PR goes back to using a dict to track task run states, but drops the return value from the user's code to avoid memory bloat.

Using the following script, we can see that memory usage stays roughly consistent (or even improves a bit) before and after this change:

from memory_profiler import profile

from prefect import flow, task


@task
def with_task():
    return [{"abc": "123"} for _ in range(10_000)]

@flow
@profile
def some_flow(n: int = 100):
    for _ in range(n):
        with_task()
    print("DONE")


@profile
def main():
    some_flow(n=100)


if __name__ == "__main__":
    main()

Here are the results:

Before
n=100 w/ WeakValueDictionary

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    10    133.5 MiB    133.5 MiB           1   @flow
    11                                         @profile
    12                                         def some_flow(n: int = 100):
    13    138.6 MiB  -1302.8 MiB         101       for _ in range(n):
    14    138.6 MiB  -1297.7 MiB         100           with_task()
    15    125.7 MiB    -12.9 MiB           1       print("DONE")

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    18    129.2 MiB    129.2 MiB           1   @profile
    19                                         def main():
    20    125.8 MiB     -3.3 MiB           1       some_flow(n=100)

n=1000 w/ WeakValueDictionary

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    10    133.0 MiB    133.0 MiB           1   @flow
    11                                         @profile
    12                                         def some_flow(n: int = 100):
    13    141.2 MiB -19463.3 MiB        1001       for _ in range(n):
    14    141.2 MiB -19455.5 MiB        1000           with_task()
    15    120.8 MiB    -20.5 MiB           1       print("DONE")

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    18    128.5 MiB    128.5 MiB           1   @profile
    19                                         def main():
    20    121.1 MiB     -7.5 MiB           1       some_flow(n=1000)

After
n=100 w/ dict

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    10    133.5 MiB    133.5 MiB           1   @flow
    11                                         @profile
    12                                         def some_flow(n: int = 100):
    13    140.5 MiB  -5924.6 MiB         101       for _ in range(n):
    14    140.5 MiB  -5917.7 MiB         100           with_task()
    15     67.2 MiB    -73.3 MiB           1       print("DONE")

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    18    129.5 MiB    129.5 MiB           1   @profile
    19                                         def main():
    20     69.4 MiB    -60.1 MiB           1       some_flow(n=100)

n=1000 w/ dict

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    10    133.9 MiB    133.9 MiB           1   @flow
    11                                         @profile
    12                                         def some_flow(n: int = 100):
    13    133.9 MiB -42578.9 MiB        1001       for _ in range(n):
    14    130.2 MiB -48355.7 MiB        1000           with_task()
    15     64.5 MiB    -69.4 MiB           1       print("DONE")

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    18    131.6 MiB    131.6 MiB           1   @profile
    19                                         def main():
    20     66.9 MiB    -64.6 MiB           1       some_flow(n=1000)

Alternatives considered

I explored using weakref.finalizer to try to intelligently remove tracked task run states when the associate return value is garbage collection, but weak references cannot be created for all objects (like int or str) so unfortunately, that approach didn't work.

@github-actions github-actions bot added the bug Something isn't working label Sep 18, 2024
Copy link

codspeed-hq bot commented Sep 18, 2024

CodSpeed Performance Report

Merging #15418 will not alter performance

Comparing fix-task-linkingwq (c8192e0) with main (9d78f77)

Summary

✅ 3 untouched benchmarks

@desertaxle desertaxle merged commit d0ba1d2 into main Sep 18, 2024
34 checks passed
@desertaxle desertaxle deleted the fix-task-linkingwq branch September 18, 2024 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Tasks connections are not connecting properly in UI
2 participants