-
Notifications
You must be signed in to change notification settings - Fork 72
fs: implement a fsspec-based filesystem backend #119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Oh, I totally forgot that PyDrive2 is still using travis 🙁 No wonder tests are not running in this PR. |
@isidentical Would you be willing to convert ci to github actions? Maybe as a separate pre-requisite PR. Seems like the effort would be equal or lower than resurrecting travis. |
Yeah sure.
…On Tue, Aug 3, 2021, 3:26 PM Ruslan Kuprieiev ***@***.***> wrote:
@isidentical <https://github.com/isidentical> Would you be willing to
convert ci to github actions? Maybe as a separate pre-requisite PR. Seems
like the effort would be equal or lower than resurrecting travis.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#119 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ALJKHQMJZRPA3MN6PMTQOUTT27N5XANCNFSM5BOWJLKQ>
.
|
pydrive2/fs.py
Outdated
else: | ||
return parts[0], "" | ||
|
||
@wrap_prop(threading.RLock()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if we have that case here, but if multiple threads are reading this - would be beneficial to use write local (allow multiple readers) if cache is already populated. Not sure though if Python has it.
pydrive2/fs.py
Outdated
def flush(self): | ||
self.buffer.flush() | ||
try: | ||
self.fs.upload_fobj(self.buffer, self.path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to double check - we need this to be append semantics, right? does it work this way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need this to be append semantics, right? does it work this way?
No. This is just a simple wrapper around upload_fobj()
that buffers all your write()
calls and dispatches them at once. Even after flush()
we forcefully close
the file so that you can not make any changes that might lead you to expected append()
like semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. A few questions here:
- should we test
self._closed
in write and raise if it'sTrue
? It's not a very common behavior to close onflush
right? self.buffer.flush()
do we need this?- what will be happening on gdrive_retry? do we handle this at all in the place where we depend on this? (retries are important usually for tests to be stable)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
? should we test self._closed in write and raise if it's True? It's not a very common behavior to close on flush right?
buffer.flush()
actually checks for that, so that we don't have to manage individual states
self.buffer.flush() do we need this?
answered above
what will be happening on gdrive_retry? do we handle this at all in the place where we depend on this? (retries are important usually for tests to be stable)
upload_fobj()
is retried.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buffer.flush() actually checks for that,
could clarify please? (I'm not sure we are on the same page :) )
upload_fobj() is retried.
yep, I see, gdrive_upload_fobj
is retried inside the upload_fobj
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could clarify please? (I'm not sure we are on the same page :) )
When we close our wrapper file (via .close()
), we also close the buffer (we call buffer.close()). And if we try to call
buffer.flush()after we close the file (and due to that, the buffer itself) it will raise the proper
I/O operation on closed file` error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm ... it looks I am missing something still, sorry :)
My concern was about the following workflow:
writer.flush() # it closes itself which is not expected (usually)
writer.write("more stuff") # how does it behave now? should we signal that we are trying to write to a closed object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't do that with the current approach. We only allow flush()
ing once, and after the flush()
we close the file (which is a bit unorthodox, though was the easiest route to emulate the behavior since generally you only do it once during the close()
itself so you don't actually need the file to be opened for the rest of the flow).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, so going back to the initial question "should we signal that we are trying to write to a closed object?" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the answer is that, we already do at the first line of this function.
>>> import io
>>> buf = io.BytesIO()
>>> buf.write(b"hey") # first write data
3
>>> buf.flush(); buf.close() # then flush + close the file
>>> buf.flush() # try flushing again
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ValueError: I/O operation on closed file.
When you write stuff, and then call flush()
of our wrapper (which also calls close()
as I stated), then the next flush automatically is going to signal that since the first line is buf.flush()
which would raise the appropriate error.
@Mergifyio rebase |
Command
error: could not apply 3be2a30... fs: implement a fsspec-based filesystem backend
|
@Mergifyio rebase |
Command
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor discussion that is still happening, not a blocker
with self.open(lpath) as stream: | ||
# IterStream objects doesn't support full-length | ||
# seek() calls, so we have to wrap the data with | ||
# an external buffer. | ||
buffer = io.BytesIO(stream.read()) | ||
self.upload_fobj(buffer, rpath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this. This will fail if a file is bigger than the system RAM.
with self.open(lpath) as stream: | |
# IterStream objects doesn't support full-length | |
# seek() calls, so we have to wrap the data with | |
# an external buffer. | |
buffer = io.BytesIO(stream.read()) | |
self.upload_fobj(buffer, rpath) | |
with self.open(lpath) as stream: | |
self.upload_fobj(stream, rpath) |
and fix self.open
to return a proper stream.
This basically moves the already existing functionality from DVC to the PyDrive2. There wasn't any major changes, beside:
w
mode is now supported, but we first collect everything and then write so the functionality should be used carefully. It was mainly needed for the tests.cp_file()
, which basically doesupload_fobj()
to copy. And themove()
method is now implemented throughcp
+rm
.info()
calls now havechecksum
fieldget_file
/put_file
APIs now supportfsspec.Callback
s.Other changes (those I can remember):
ls()
/find()
now return lists instead of generatorsFixes #113