Skip to content

Conversation

@dhirschfeld
Copy link
Contributor

No description provided.

@dhirschfeld
Copy link
Contributor Author

Tests incoming - just wanted to get this up here for consideration in the meantime

@dhirschfeld
Copy link
Contributor Author

c.c. @wesm, @xhochy - this may be of interest and you're certainly better qualified than me to comment on the implementation!

@mrocklin
Copy link
Member

c.c. @wesm, @xhochy - this may be of interest and you're certainly better qualified than me to comment on the implementation!

To be explicit, the objective would be to get something like a bytes, buffer, or ideally memoryview object out as cheaply as possible. A list of such objects would also be welcome if data isn't contiguous.

@mrocklin
Copy link
Member

So if I were to do this for a pandas dataframe I would probably pull the .data attribute from each block in the block manager separately, and then put a lot of metadata into the header.

@mrocklin
Copy link
Member

So the solution where you scatter explicitly should work fine, but when working with coroutines you'll need to yield the scatter result. This shouldn't be an issue if you use the normal API.

The rest of the tests here are related to #2110 . It's also worth noting that this shouldn't be an issue for worker-to-worker transfers. So if your tasks generate and then pass around arrow objects then things will also be fine with this change. This is only an issue when you push data into Dask.

@xhochy
Copy link

xhochy commented Jul 14, 2018

In future, we will support pickling these objects. The return values of __reduce__ should be the necessary buffers packed into nested tuples. Would that be sufficient for dask or do you need to have a flat list of buffers?

Remove tests which are known not to work currently (by design)
@dhirschfeld
Copy link
Contributor Author

I think this is good to go now - all the tests, including the scatter pass ro me locally:

image

The other Client.submit tests have been removed as adding that functionality is being discussed in #2110.

Until arrow supports pickling itself this allows passing around arrow data-structures with dask so I think is useful new functionality.

If I've implemented it correctly I'd hope that it might be more efficient than pickling, at least until PEP 574 is passed and apache/arrow#2161 is merged.

@wesm
Copy link

wesm commented Jul 16, 2018

If you don't mind giving me a chance to review I'll have a closer look tomorrow

Copy link

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM aside from questions around sending pyarrow.Buffer directly

writer.close()
buf = sink.get_result()
header = {}
frames = [buf.to_pybytes()]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes an extra memory copy. Can frames contain objects exporting the buffer protocol?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. It looks like it can with small modification (pushed). Presumably this means that PyArrow also works with sockets and Tornado IOStreams. Hooray for consistent use of protocols.

def deserialize_batch(header, frames):
import pyarrow as pa
blob = frames[0]
reader = pa.RecordBatchStreamReader(pa.BufferReader(blob))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened ARROW-2859 to see if we can get rid of this pa.BufferReader detail

writer.close()
buf = sink.get_result()
header = {}
frames = [buf.to_pybytes()]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@wesm
Copy link

wesm commented Jul 17, 2018

Awesome. Is there something we can do to make the Python API for Buffer more conforming?

sink = pa.BufferOutputStream()
writer = pa.RecordBatchStreamWriter(sink, batch.schema)
writer.write_batch(batch)
writer.close()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One improvement on the arrow side would be if RecordBatchStreamWriter was a context manager as that would avoid the need for an explicit close.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A fair point. ARROW-2863

@mrocklin
Copy link
Member

Merging this in a few hours if there are no further comments.

@mrocklin
Copy link
Member

Awesome. Is there something we can do to make the Python API for Buffer more conforming?

No, this was due to ugliness on the Dask side.

@mrocklin mrocklin merged commit 82d51e1 into dask:master Jul 17, 2018
@mrocklin
Copy link
Member

Thanks @dhirschfeld ! Merged.

@dhirschfeld dhirschfeld deleted the arrow-support branch July 17, 2018 20:14
@dhirschfeld
Copy link
Contributor Author

Thanks @mrocklin, @wesm & @xhochy - I'm excited about getting this support into dask! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants