- 
          
- 
                Notifications
    You must be signed in to change notification settings 
- Fork 744
Add custom serialization support for pyarrow #2115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| Tests incoming - just wanted to get this up here for consideration in the meantime | 
| 
 To be explicit, the objective would be to get something like a  | 
| So if I were to do this for a pandas dataframe I would probably pull the  | 
5bac442    to
    6b6af6c      
    Compare
  
    97ba62a    to
    be5ce71      
    Compare
  
    | So the solution where you scatter explicitly should work fine, but when working with coroutines you'll need to yield the scatter result. This shouldn't be an issue if you use the normal API. The rest of the tests here are related to #2110 . It's also worth noting that this shouldn't be an issue for worker-to-worker transfers. So if your tasks generate and then pass around arrow objects then things will also be fine with this change. This is only an issue when you push data into Dask. | 
| In future, we will support pickling these objects. The return values of  | 
d496e2a    to
    911f71a      
    Compare
  
    | I think this is good to go now - all the tests, including the scatter pass ro me locally: The other  Until  If I've implemented it correctly I'd hope that it might be more efficient than pickling, at least until PEP 574 is passed and apache/arrow#2161 is merged. | 
| If you don't mind giving me a chance to review I'll have a closer look tomorrow | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM aside from questions around sending pyarrow.Buffer directly
        
          
                distributed/protocol/arrow.py
              
                Outdated
          
        
      | writer.close() | ||
| buf = sink.get_result() | ||
| header = {} | ||
| frames = [buf.to_pybytes()] | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This causes an extra memory copy. Can frames contain objects exporting the buffer protocol?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question. It looks like it can with small modification (pushed). Presumably this means that PyArrow also works with sockets and Tornado IOStreams. Hooray for consistent use of protocols.
| def deserialize_batch(header, frames): | ||
| import pyarrow as pa | ||
| blob = frames[0] | ||
| reader = pa.RecordBatchStreamReader(pa.BufferReader(blob)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened ARROW-2859 to see if we can get rid of this pa.BufferReader detail
        
          
                distributed/protocol/arrow.py
              
                Outdated
          
        
      | writer.close() | ||
| buf = sink.get_result() | ||
| header = {} | ||
| frames = [buf.to_pybytes()] | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| Awesome. Is there something we can do to make the Python API for Buffer more conforming? | 
| sink = pa.BufferOutputStream() | ||
| writer = pa.RecordBatchStreamWriter(sink, batch.schema) | ||
| writer.write_batch(batch) | ||
| writer.close() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One improvement on the arrow side would be if RecordBatchStreamWriter was a context manager as that would avoid the need for an explicit close.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A fair point. ARROW-2863
| Merging this in a few hours if there are no further comments. | 
| 
 No, this was due to ugliness on the Dask side. | 
| Thanks @dhirschfeld ! Merged. | 

No description provided.