Skip to content

fs: implement a fsspec-based filesystem backend #119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 9, 2021
Merged

fs: implement a fsspec-based filesystem backend #119

merged 6 commits into from
Aug 9, 2021

Conversation

isidentical
Copy link
Contributor

@isidentical isidentical commented Aug 3, 2021

This basically moves the already existing functionality from DVC to the PyDrive2. There wasn't any major changes, beside:

  • Opening files in the w mode is now supported, but we first collect everything and then write so the functionality should be used carefully. It was mainly needed for the tests.
  • Implementation of cp_file(), which basically does upload_fobj() to copy. And the move() method is now implemented through cp + rm.
  • A couple places used to pass size as string, now they are casted to integers
  • info() calls now have checksum field
  • get_file/put_file APIs now support fsspec.Callbacks.

Other changes (those I can remember):

  • Paths are now strings
  • ls()/find() now return lists instead of generators

Fixes #113

@isidentical isidentical changed the title fs: implement a fsspec-based filesystem backend [WIP] fs: implement a fsspec-based filesystem backend Aug 3, 2021
@efiop
Copy link
Contributor

efiop commented Aug 3, 2021

Oh, I totally forgot that PyDrive2 is still using travis 🙁 No wonder tests are not running in this PR.

@efiop
Copy link
Contributor

efiop commented Aug 3, 2021

@isidentical Would you be willing to convert ci to github actions? Maybe as a separate pre-requisite PR. Seems like the effort would be equal or lower than resurrecting travis.

@isidentical
Copy link
Contributor Author

isidentical commented Aug 3, 2021 via email

pydrive2/fs.py Outdated
else:
return parts[0], ""

@wrap_prop(threading.RLock())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we have that case here, but if multiple threads are reading this - would be beneficial to use write local (allow multiple readers) if cache is already populated. Not sure though if Python has it.

pydrive2/fs.py Outdated
def flush(self):
self.buffer.flush()
try:
self.fs.upload_fobj(self.buffer, self.path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to double check - we need this to be append semantics, right? does it work this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need this to be append semantics, right? does it work this way?

No. This is just a simple wrapper around upload_fobj() that buffers all your write() calls and dispatches them at once. Even after flush() we forcefully close the file so that you can not make any changes that might lead you to expected append() like semantics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. A few questions here:

  • should we test self._closed in write and raise if it's True? It's not a very common behavior to close on flush right?
  • self.buffer.flush() do we need this?
  • what will be happening on gdrive_retry? do we handle this at all in the place where we depend on this? (retries are important usually for tests to be stable)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? should we test self._closed in write and raise if it's True? It's not a very common behavior to close on flush right?

buffer.flush() actually checks for that, so that we don't have to manage individual states

self.buffer.flush() do we need this?

answered above

what will be happening on gdrive_retry? do we handle this at all in the place where we depend on this? (retries are important usually for tests to be stable)

upload_fobj() is retried.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffer.flush() actually checks for that,

could clarify please? (I'm not sure we are on the same page :) )

upload_fobj() is retried.

yep, I see, gdrive_upload_fobj is retried inside the upload_fobj

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could clarify please? (I'm not sure we are on the same page :) )

When we close our wrapper file (via .close()), we also close the buffer (we call buffer.close()). And if we try to call buffer.flush()after we close the file (and due to that, the buffer itself) it will raise the properI/O operation on closed file` error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm ... it looks I am missing something still, sorry :)

My concern was about the following workflow:

writer.flush() # it closes itself which is not expected (usually)
writer.write("more stuff") # how does it behave now? should we signal that we are trying to write to a closed object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't do that with the current approach. We only allow flush()ing once, and after the flush() we close the file (which is a bit unorthodox, though was the easiest route to emulate the behavior since generally you only do it once during the close() itself so you don't actually need the file to be opened for the rest of the flow).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, so going back to the initial question "should we signal that we are trying to write to a closed object?" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the answer is that, we already do at the first line of this function.

>>> import io
>>> buf = io.BytesIO()
>>> buf.write(b"hey") # first write data
3
>>> buf.flush(); buf.close() # then flush + close the file
>>> buf.flush() # try flushing again
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: I/O operation on closed file.

When you write stuff, and then call flush() of our wrapper (which also calls close() as I stated), then the next flush automatically is going to signal that since the first line is buf.flush() which would raise the appropriate error.

@efiop
Copy link
Contributor

efiop commented Aug 6, 2021

@Mergifyio rebase

@mergify
Copy link

mergify bot commented Aug 6, 2021

Command rebase: failure

Base branch update has failed
Git reported the following error:

Rebasing (1/4)

error: could not apply 3be2a30... fs: implement a fsspec-based filesystem backend

Resolve all conflicts manually, mark them as resolved with
"git add/rm <conflicted_files>", then run "git rebase --continue".
You can instead skip this commit: run "git rebase --skip".
To abort and get back to the state before "git rebase", run "git rebase --abort".
Could not apply 3be2a30... fs: implement a fsspec-based filesystem backend
CONFLICT (modify/delete): scripts/ci/install.sh deleted in HEAD and modified in 3be2a30 (fs: implement a fsspec-based filesystem backend). Version 3be2a30 (fs: implement a fsspec-based filesystem backend) of scripts/ci/install.sh left in tree.


err-code: DD704

@efiop
Copy link
Contributor

efiop commented Aug 6, 2021

@Mergifyio rebase

@mergify
Copy link

mergify bot commented Aug 6, 2021

Command rebase: failure

Pull request can't be updated with latest base branch changes
GitHub App like Mergify are not allowed to rebase pull request where .github/workflows is changed.
This pull request must be rebased manually.
err-code: 83DB3

@isidentical isidentical changed the title [WIP] fs: implement a fsspec-based filesystem backend fs: implement a fsspec-based filesystem backend Aug 9, 2021
@isidentical isidentical requested a review from shcheklein August 9, 2021 11:17
@isidentical isidentical requested a review from efiop August 9, 2021 11:17
Copy link
Member

@shcheklein shcheklein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor discussion that is still happening, not a blocker

@efiop efiop merged commit 0d6d6d7 into master Aug 9, 2021
@efiop efiop deleted the fsspec branch August 9, 2021 21:18
Comment on lines +360 to +365
with self.open(lpath) as stream:
# IterStream objects doesn't support full-length
# seek() calls, so we have to wrap the data with
# an external buffer.
buffer = io.BytesIO(stream.read())
self.upload_fobj(buffer, rpath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this. This will fail if a file is bigger than the system RAM.

Suggested change
with self.open(lpath) as stream:
# IterStream objects doesn't support full-length
# seek() calls, so we have to wrap the data with
# an external buffer.
buffer = io.BytesIO(stream.read())
self.upload_fobj(buffer, rpath)
with self.open(lpath) as stream:
self.upload_fobj(stream, rpath)

and fix self.open to return a proper stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

add fsspec-compatible file system
4 participants