-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(c/driver/postgresql): Inital COPY Writer design #1110
Conversation
@@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) { | |||
return NANOARROW_OK; | |||
} | |||
|
|||
// Write a value to a buffer without checking the buffer size. Advances |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put all of this code into postgres_copy_reader.h
because it re-uses a lot of the same patterns and constants. Maybe we should rename this postgres_copy_io.h
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also factor out the headers into shared/nonshared parts at some point, I don't think it's a big deal either way.
children_[child_i]->Init(array_view_->children[child_i]); | ||
} | ||
|
||
ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Write
method calls accept an index
argument, which is a little different from the Reader setup. Instead of accessing by index, the Readers always call ArrowBufferAppend
on the array they are building.
I think we could still do that here, it's just a little bit more complicated by the fact that there is no generator ArrowBufferGetNext
or similar, so I figured just using index access was easier to start. Could be something larger I am overlooking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think indices are fine, reading data is different than writing it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I think its just kind of confusing that we increment the buffer in the WriteUnchecked
calls but also mix in index access here. May be the best we can do to start
return NANOARROW_OK; | ||
} | ||
|
||
int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is necessary, just copied from the reader design
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not? That was to let us build partial results fitting roughly within some bound.
int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; } | ||
|
||
ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) { | ||
ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can move towards ArrowBufferAppendUnsafe
if ensure a proper buffer size up front. I think in the current protocol you need 19 bytes for the header, 2 bytes for the number of columns in each row, 4 bytes for each record to indicate the record length, n bits for every non-null record to contain its actual bytes and finally 4 bytes for the end message.
Something to investigate futher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, so one pass to figure out the buffer size, then another pass to actually copy data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have to think more about this. I don't think you need 2 passes though? I think this could be calculated up front
private: | ||
PostgresCopyFieldTupleWriter root_writer_; | ||
struct ArrowSchema* schema_; | ||
std::unique_ptr<struct ArrowArrayView> array_view_{new struct ArrowArrayView}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit iffy on C++ constructs, but I think this is the easiest way to declare an pointer that owns data as a class member with C++11 compat. Apologies if I'm missing something easier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe Handle<ArrowArrayView>
should work. I should really find the time to go write that adbc++ library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool that is a great idea. I think we would have to refactor the Handle to move from statement.cc to postgres_util.h so will tackle in another PR
result = writer_.WriteRecord(buffer, error); | ||
} while (result == NANOARROW_OK); | ||
|
||
// TODO: don't think we should do this here; the reader equivalent does |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT the reader implementation just moves through the message buffer. I mirrored that as well for the writer, but that means that trying to read the buffer after the fact requires knowing how many bytes were traversed and moving back there. Probably a better way to do this
|
||
// The last 4 bytes of a message can be transmitted via PQputCopyData | ||
// so no need to test those bytes from the Writer | ||
for (size_t i = 0; i < sizeof(kTestPgCopyBoolean) - 4; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ultimately when we implement this in statement.cc I imagine we will build the buffer (maybe even in chunks) and send that via PQputCopyData
. When all is said and done we would then do a PQputCopyEnd
to send the last 4 bytes. Maybe we should make the end message a constant in the tests so it is clear what is part of the "data" versus the sentinel signaling the end of the buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a great start
private: | ||
PostgresCopyFieldTupleWriter root_writer_; | ||
struct ArrowSchema* schema_; | ||
std::unique_ptr<struct ArrowArrayView> array_view_{new struct ArrowArrayView}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe Handle<ArrowArrayView>
should work. I should really find the time to go write that adbc++ library.
return NANOARROW_OK; | ||
} | ||
|
||
int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not? That was to let us build partial results fitting roughly within some bound.
children_[child_i]->Init(array_view_->children[child_i]); | ||
} | ||
|
||
ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think indices are fine, reading data is different than writing it
@@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) { | |||
return NANOARROW_OK; | |||
} | |||
|
|||
// Write a value to a buffer without checking the buffer size. Advances |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also factor out the headers into shared/nonshared parts at some point, I don't think it's a big deal either way.
template <> | ||
inline void WriteUnsafe(ArrowBuffer* buffer, int8_t in) { | ||
buffer->data[0] = in; | ||
buffer->data += sizeof(int8_t); | ||
buffer->size_bytes += sizeof(int8_t); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I'm not sure this is necessary. Compilers understand memcpy, and I would guess that they optimize the generic above to the same as these specializations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The specializations exist because of the unsigned argument requirement forSwapNetworkToHost
requirement, although that makes me realize these are incorrect as is
int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; } | ||
|
||
ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) { | ||
ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, so one pass to figure out the buffer size, then another pass to actually copy data?
I don't think the CI failures are related. Happy to have this merged now and work on more writers in follow ups if you'd like |
Yeah, the CI failures are #1088 |
This is a pre-cursor to #1093 ; figured it would be easier to work piece-wise rather than all at once.
This does not try to actually connect the statement.cc code to use this, but just gets the test case / general structure set up