Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(c/driver/postgresql): Inital COPY Writer design #1110

Merged
merged 14 commits into from
Sep 28, 2023

Conversation

WillAyd
Copy link
Contributor

@WillAyd WillAyd commented Sep 26, 2023

This is a pre-cursor to #1093 ; figured it would be easier to work piece-wise rather than all at once.

This does not try to actually connect the statement.cc code to use this, but just gets the test case / general structure set up

@WillAyd WillAyd requested a review from lidavidm as a code owner September 26, 2023 20:51
@@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) {
return NANOARROW_OK;
}

// Write a value to a buffer without checking the buffer size. Advances
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put all of this code into postgres_copy_reader.h because it re-uses a lot of the same patterns and constants. Maybe we should rename this postgres_copy_io.h?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also factor out the headers into shared/nonshared parts at some point, I don't think it's a big deal either way.

children_[child_i]->Init(array_view_->children[child_i]);
}

ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Write method calls accept an index argument, which is a little different from the Reader setup. Instead of accessing by index, the Readers always call ArrowBufferAppend on the array they are building.

I think we could still do that here, it's just a little bit more complicated by the fact that there is no generator ArrowBufferGetNext or similar, so I figured just using index access was easier to start. Could be something larger I am overlooking

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think indices are fine, reading data is different than writing it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I think its just kind of confusing that we increment the buffer in the WriteUnchecked calls but also mix in index access here. May be the best we can do to start

return NANOARROW_OK;
}

int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is necessary, just copied from the reader design

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not? That was to let us build partial results fitting roughly within some bound.

int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }

ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move towards ArrowBufferAppendUnsafe if ensure a proper buffer size up front. I think in the current protocol you need 19 bytes for the header, 2 bytes for the number of columns in each row, 4 bytes for each record to indicate the record length, n bits for every non-null record to contain its actual bytes and finally 4 bytes for the end message.

Something to investigate futher

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so one pass to figure out the buffer size, then another pass to actually copy data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have to think more about this. I don't think you need 2 passes though? I think this could be calculated up front

private:
PostgresCopyFieldTupleWriter root_writer_;
struct ArrowSchema* schema_;
std::unique_ptr<struct ArrowArrayView> array_view_{new struct ArrowArrayView};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit iffy on C++ constructs, but I think this is the easiest way to declare an pointer that owns data as a class member with C++11 compat. Apologies if I'm missing something easier

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Handle<ArrowArrayView> should work. I should really find the time to go write that adbc++ library.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool that is a great idea. I think we would have to refactor the Handle to move from statement.cc to postgres_util.h so will tackle in another PR

result = writer_.WriteRecord(buffer, error);
} while (result == NANOARROW_OK);

// TODO: don't think we should do this here; the reader equivalent does
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT the reader implementation just moves through the message buffer. I mirrored that as well for the writer, but that means that trying to read the buffer after the fact requires knowing how many bytes were traversed and moving back there. Probably a better way to do this


// The last 4 bytes of a message can be transmitted via PQputCopyData
// so no need to test those bytes from the Writer
for (size_t i = 0; i < sizeof(kTestPgCopyBoolean) - 4; i++) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately when we implement this in statement.cc I imagine we will build the buffer (maybe even in chunks) and send that via PQputCopyData . When all is said and done we would then do a PQputCopyEnd to send the last 4 bytes. Maybe we should make the end message a constant in the tests so it is clear what is part of the "data" versus the sentinel signaling the end of the buffer

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a great start

private:
PostgresCopyFieldTupleWriter root_writer_;
struct ArrowSchema* schema_;
std::unique_ptr<struct ArrowArrayView> array_view_{new struct ArrowArrayView};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Handle<ArrowArrayView> should work. I should really find the time to go write that adbc++ library.

return NANOARROW_OK;
}

int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not? That was to let us build partial results fitting roughly within some bound.

children_[child_i]->Init(array_view_->children[child_i]);
}

ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think indices are fine, reading data is different than writing it

@@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) {
return NANOARROW_OK;
}

// Write a value to a buffer without checking the buffer size. Advances
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also factor out the headers into shared/nonshared parts at some point, I don't think it's a big deal either way.

Comment on lines +130 to +135
template <>
inline void WriteUnsafe(ArrowBuffer* buffer, int8_t in) {
buffer->data[0] = in;
buffer->data += sizeof(int8_t);
buffer->size_bytes += sizeof(int8_t);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I'm not sure this is necessary. Compilers understand memcpy, and I would guess that they optimize the generic above to the same as these specializations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specializations exist because of the unsigned argument requirement forSwapNetworkToHost requirement, although that makes me realize these are incorrect as is

int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }

ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so one pass to figure out the buffer size, then another pass to actually copy data?

@WillAyd
Copy link
Contributor Author

WillAyd commented Sep 28, 2023

I don't think the CI failures are related. Happy to have this merged now and work on more writers in follow ups if you'd like

@lidavidm
Copy link
Member

Yeah, the CI failures are #1088

@lidavidm lidavidm added this to the ADBC Libraries 0.8.0 milestone Sep 28, 2023
@lidavidm lidavidm merged commit 7226c0e into apache:main Sep 28, 2023
60 of 64 checks passed
@WillAyd WillAyd deleted the postgres-copy-writer branch September 28, 2023 17:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants