Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Support serverless environments #8015

Closed
wants to merge 6 commits into from

Conversation

mr-brobot
Copy link
Contributor

Closes #8011

There's a lot of context on the related issue. Open to any feedback, from directional to the nittiest of picks. Despite being thin wrappers for built-in classes, pyiceberg.utils.concurrent lacks tests. Happy to address once I have the nod from folks on this approach.

@github-actions github-actions bot added the python label Jul 8, 2023
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @mr-brobot I think being able to run PyIceberg in a serverless environment is really cool

python/pyiceberg/utils/concurrent.py Outdated Show resolved Hide resolved
python/pyiceberg/utils/concurrent.py Outdated Show resolved Hide resolved
python/pyiceberg/utils/concurrent.py Outdated Show resolved Hide resolved
python/pyiceberg/utils/concurrent.py Outdated Show resolved Hide resolved
@@ -773,11 +779,11 @@ def plan_files(self) -> Iterable[FileScanTask]:
data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.data_sequence_number or INITIAL_SEQUENCE_NUMBER)

with ThreadPool() as pool:
with DynamicExecutor() as executor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is awesome. I have one more suggestion. What do you think of being able to set the executor through the config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! What were you thinking for the config key and accepted values? Currently, this uses PYICEBERG_CONCURRENCY_MODE where possible values are None, thread, or process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this works for me. Can you also add this to the docs under python/mkdocs/?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None doesn't seem to work on my end:

export PYICEBERG_CONCURRENCY_MODE=None   
python3 /tmp/vo.py               
Traceback (most recent call last):
  File "/tmp/vo.py", line 1, in <module>
    from pyiceberg.catalog import load_catalog
  File "/Users/fokkodriesprong/Desktop/iceberg/python/pyiceberg/catalog/__init__.py", line 42, in <module>
    from pyiceberg.serializers import ToOutputFile
  File "/Users/fokkodriesprong/Desktop/iceberg/python/pyiceberg/serializers.py", line 26, in <module>
    from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
  File "/Users/fokkodriesprong/Desktop/iceberg/python/pyiceberg/table/__init__.py", line 72, in <module>
    from pyiceberg.utils.concurrent import DynamicExecutor
  File "/Users/fokkodriesprong/Desktop/iceberg/python/pyiceberg/utils/concurrent.py", line 146, in <module>
    concurrency_mode = _concurrency_mode()
  File "/Users/fokkodriesprong/Desktop/iceberg/python/pyiceberg/utils/concurrent.py", line 143, in _concurrency_mode
    raise ValueError(f"Invalid concurrency mode: {mode}")
ValueError: Invalid concurrency mode: None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies! I meant unset PYICEBERG_CONCURRENCY_MODE, which comes across as None in pyiceberg.utils.concurrent._concurrency_mode:

def _concurrency_mode() -> ConcurrencyMode:
    mode = Config().config.get("concurrency-mode")

    if mode is None:
        return "thread"

    if mode in ("thread", "process"):
        return mode  # type: ignore

    raise ValueError(f"Invalid concurrency mode: {mode}")

Will add some documentation!

@koenvo
Copy link

koenvo commented Jul 10, 2023

This looks very interesting and would solve some performance issues when switching to a process pool. It would be nice if a pool can be reused. Forking new processes for each new pool can be a costly operation

@mr-brobot
Copy link
Contributor Author

It would be nice if a pool can be reused. Forking new processes for each new pool can be a costly operation

@koenvo Reusing the executor is a good idea. I can create a separate issue for that and work on it next.

@mr-brobot mr-brobot requested a review from Fokko July 12, 2023 13:30
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last few small nits, apart from that, this looks great to me! 👍🏻

@@ -773,11 +779,11 @@ def plan_files(self) -> Iterable[FileScanTask]:
data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.data_sequence_number or INITIAL_SEQUENCE_NUMBER)

with ThreadPool() as pool:
with DynamicExecutor() as executor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this works for me. Can you also add this to the docs under python/mkdocs/?

python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
@Fokko
Copy link
Contributor

Fokko commented Jul 13, 2023

Just out curiosity, where are you able to run multi-processing?

export PYICEBERG_CONCURRENCY_MODE=process
➜  docker-spark-iceberg git:(main) time python3 /tmp/vo.py                  
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 125, in _main
    prepare(preparation_data)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 236, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 287, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/runpy.py", line 268, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/tmp/vo.py", line 11, in <module>
    df = table.scan().to_arrow()
  File "/Users/fokkodriesprong/Desktop/iceberg/python/pyiceberg/table/__init__.py", line 825, in to_arrow
    self.plan_files(),
  File "/Users/fokkodriesprong/Desktop/iceberg/python/pyiceberg/table/__init__.py", line 784, in plan_files
    *executor.map(
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/process.py", line 726, in map
    results = super().map(partial(_process_chunk, fn),
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 597, in map
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 597, in <listcomp>
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/process.py", line 697, in submit
    self._adjust_process_count()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/process.py", line 675, in _adjust_process_count
    p.start()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

Where:

from pyiceberg.catalog import load_catalog

catalog = load_catalog('local')

table = catalog.load_table('nyc.taxis')

expected_rows = 15885533

for _ in range(10):
    df = table.scan().to_arrow()
    assert len(df) == expected_rows, f"Got {len(df)} rows, instead of {expected_rows}"

And local points to my local docker-spark-iceberg setup, so I don't have S3 latency.

@mr-brobot
Copy link
Contributor Author

mr-brobot commented Jul 13, 2023

Just out curiosity, where are you able to run multi-processing?

@Fokko I was on Ubuntu where the start method defaults to fork. Since Windows & macOS default to the spawn method, and that is considerably slower due to re-initialization of a new Python interpreter, I think we should advise against multi-processing on these platforms in our documentation and perhaps log a warning.

In fact, in a previous version of this PR, I thought to fall back to multi-threading when the start mode was spawn:

def _mp_pref() -> bool:
    """Returns whether multiprocessing is preferred."""
    start_method = get_start_method()

    if start_method != "fork":
        logger.debug("Multi-processing not preferred in current runtime: start_method=%s", start_method)
        return False

    return start_method == "fork"

I'll add some documentation after I do some testing with process spawning on Windows & macOS.

@mr-brobot
Copy link
Contributor Author

mr-brobot commented Jul 14, 2023

@Fokko After some more testing in other platforms, I'm of the opinion that multi-processing support should be considered separately from simply making PyIceberg work in serverless environments.

  1. Making PyIceberg work in serverless is simple. Replacing usage of multiprocessing.pool.ThreadPool and related synchronization concepts with concurrent.futures.ThreadPoolExecutor and the corresponding synchronization concepts in threading.
  2. Making PyIceberg support multi-processing adds variability across platforms and would require more care when introducing future changes (e.g., ensuring that everything parallelized by an executor can be pickled).

I'm going to close this and reopen a separate PR focused on (1). For (2), I will create yet another PR where I will dedicate more attention to testing all platforms and possibly some benchmarks that prove the ROI.

@corleyma
Copy link

I think that's a great call! Multiprocessing is tricky to get right, would be good to know the juice is worth the squeeze.

@mr-brobot mr-brobot deleted the lambda branch July 19, 2023 13:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Python: Support serverless environments
4 participants