Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Support serverless environments #8011

Closed
mr-brobot opened this issue Jul 7, 2023 · 5 comments
Closed

Python: Support serverless environments #8011

mr-brobot opened this issue Jul 7, 2023 · 5 comments

Comments

@mr-brobot
Copy link
Contributor

Feature Request / Improvement

Problem

PyIceberg currently relies on multiprocessing in Table.plan_files and the pyarrow interface. Unfortunately, multiprocessing relies on /dev/shm, which is not provided by serverless runtimes like AWS Lambda and AWS Fargate. In effect, reliance on /dev/shm assumes the user has control over the host environment and thus disqualifies use in serverless environments.

Apparently, one way to emulate Lambda or Fargate container runtimes locally is by running a container with --ipc="none". This will disable the /dev/shm mount and cause the multiprocessing module to fail with the following error:

[ERROR] OSError: [Errno 38] Function not implemented
Traceback (most recent call last):
  File "/var/task/app.py", line 27, in handler
    result = scan.to_arrow().slice_length(limit)
  File "/var/task/pyiceberg/table/__init__.py", line 819, in to_arrow
    self.plan_files(),
  File "/var/task/pyiceberg/table/__init__.py", line 776, in plan_files
    with ThreadPool() as pool:
  File "/var/lang/lib/python3.10/multiprocessing/pool.py", line 930, in __init__
    Pool.__init__(self, processes, initializer, initargs)
  File "/var/lang/lib/python3.10/multiprocessing/pool.py", line 196, in __init__
    self._change_notifier = self._ctx.SimpleQueue()
  File "/var/lang/lib/python3.10/multiprocessing/context.py", line 113, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
  File "/var/lang/lib/python3.10/multiprocessing/queues.py", line 341, in __init__
    self._rlock = ctx.Lock()
  File "/var/lang/lib/python3.10/multiprocessing/context.py", line 68, in Lock
    return Lock(ctx=self.get_context())
  File "/var/lang/lib/python3.10/multiprocessing/synchronize.py", line 162, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/var/lang/lib/python3.10/multiprocessing/synchronize.py", line 57, in __init__
    sl = self._semlock = _multiprocessing.SemLock(

Interesting note from multiprocessing.pool.ThreadPool:

A ThreadPool shares the same interface as Pool, which is designed around a pool of processes and predates the introduction of the concurrent.futures module [...] Users should generally prefer to use concurrent.futures.ThreadPoolExecutor, which has a simpler interface that was designed around threads from the start, and which returns concurrent.futures.Future instances that are compatible with many other libraries, including asyncio.

Proposal

Perhaps PyIceberg should support multiple concurrency strategies, allowing the user to configure which is most appropriate for their runtime/resources.

Instead of using multiprocessing directly, we could instead depend on a concrete implementation of an Executor from the concurrent.futures module wherever we need concurrency. The user can select the appropriate implementation via configuration:

  • PYICEBERG__CONCURRENCY=process uses ProcessPoolExecutor (default, same as current implementation)
  • PYICEBERG__CONCURRENCY=thread uses ThreadPoolExecutor (appropriate for serverless environments)

This might even allow PyIceberg to support other concurrency models in the future, e.g., user-defined implementations of Executor.

I reproduced this problem and have a fix on a fork, confirming that this approach at least works. Depending on feedback from the community, I can tidy things up and submit a PR. 😃

Query engine

None

@mr-brobot
Copy link
Contributor Author

Alternatively, and maybe most preferable due to support for multi-processor environments in Python, we can always try to use ProcessPoolExecutor and fall back to ThreadPoolExecutor. Basically, we always choose the best option available, and the user doesn't have to think about it.

@jacopotagliabue
Copy link

@mr-brobot this is FANTASTIC - I solved my problem with this a while ago by monkeypatching both plan_files and to_arrow

DataScan.plan_files = _plan_files
DataScan.to_arrow = _to_arrow

and basically removing the offending libraries to run in a Lambda - I agree that would be great to have some PyArrow config to just handle this better.

I can't check now (not at laptop) but I'm pretty sure plan_files uses ThreadPools and even that is still not allowed in AWS lambda: did you test "live" in lambda or just docker?

@mr-brobot
Copy link
Contributor Author

@jacopotagliabue Great to hear I'm not alone in this! 😄

I have this working on my fork and successfully tested in AWS Lambda. There were two errors in Lambda, one from using multiprocessing.pool.ThreadPool and another from using multiprocessing.Value for synchronization in the pyarrow adapter, both related to the lack of a /dev/shm mount. Using concurrent.futures.ThreadPoolExecutor solves that scenario.

Just finishing up some cleanup now. I plan to have a PR open tonight or early tomorrow.

@jacopotagliabue
Copy link

Happy to test it on my lambda when ready to go!

@mr-brobot
Copy link
Contributor Author

I'm doing some more research to continue the multi-processing conversation in a separate issue. But the primary problem stated in this issue, the ability to run PyIceberg in AWS Lambda / AWS Fargate, is solved. 😃

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants