Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions cpp/src/arrow/flight/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,25 @@ add_custom_command(

set_source_files_properties(${FLIGHT_GENERATED_PROTO_FILES} PROPERTIES GENERATED TRUE)

add_custom_target(flight_grpc_gen ALL DEPENDS ${FLIGHT_GENERATED_PROTO_FILES})

# Note, we do not compile the generated Protobuf sources directly, instead
# compiling then via protocol-internal.cc which contains some gRPC template
# overrides to enable Flight-specific optimizations. See comments in
# protobuf-internal.cc
set(ARROW_FLIGHT_SRCS
client.cc
Flight.pb.cc
Flight.grpc.pb.cc
internal.cc
protocol-internal.cc
serialization-internal.cc
server.cc
types.cc)

add_arrow_lib(arrow_flight
SOURCES
${ARROW_FLIGHT_SRCS}
DEPENDENCIES
flight_grpc_gen
SHARED_LINK_LIBS
arrow_shared
${ARROW_FLIGHT_STATIC_LINK_LIBS}
Expand Down
21 changes: 6 additions & 15 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include "arrow/flight/client.h"
#include "arrow/flight/protocol-internal.h"

#include <memory>
#include <sstream>
Expand All @@ -33,8 +34,6 @@
#include "arrow/type.h"
#include "arrow/util/logging.h"

#include "arrow/flight/Flight.grpc.pb.h"
#include "arrow/flight/Flight.pb.h"
#include "arrow/flight/internal.h"
#include "arrow/flight/serialization-internal.h"

Expand Down Expand Up @@ -74,13 +73,8 @@ class FlightStreamReader : public RecordBatchReader {
return Status::OK();
}

// For customizing read path for better memory/serialization efficiency
// XXX this cast is undefined behavior
auto custom_reader = reinterpret_cast<grpc::ClientReader<FlightData>*>(stream_.get());

// Explicitly specify the override to invoke - otherwise compiler
// may invoke through vtable (not updated by reinterpret_cast)
if (custom_reader->grpc::ClientReader<FlightData>::Read(&data)) {
// Pretend to be pb::FlightData and intercept in SerializationTraits
if (stream_->Read(reinterpret_cast<pb::FlightData*>(&data))) {
std::unique_ptr<ipc::Message> message;

// Validate IPC message
Expand Down Expand Up @@ -127,12 +121,9 @@ class FlightPutWriter::FlightPutWriterImpl : public ipc::RecordBatchWriter {
Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override {
IpcPayload payload;
RETURN_NOT_OK(ipc::internal::GetRecordBatchPayload(batch, pool_, &payload));
// XXX this cast is undefined behavior
auto custom_writer = reinterpret_cast<grpc::ClientWriter<IpcPayload>*>(writer_.get());
// Explicitly specify the override to invoke - otherwise compiler
// may invoke through vtable (not updated by reinterpret_cast)
if (!custom_writer->grpc::ClientWriter<IpcPayload>::Write(payload,
grpc::WriteOptions())) {

if (!writer_->Write(*reinterpret_cast<const pb::FlightData*>(&payload),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we are praying that Write doesn't do anything with the FlightData pointer except pass it to SerializationTraits<FlightData>, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grpc::WriteOptions())) {
std::stringstream ss;
ss << "Could not write record batch to stream: "
<< rpc_->context.debug_error_string();
Expand Down
100 changes: 100 additions & 0 deletions cpp/src/arrow/flight/customize_protobuf.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <limits>
#include <memory>

#include "grpcpp/impl/codegen/config_protobuf.h"

// It is necessary to undefined this macro so that the protobuf
// SerializationTraits specialization is not declared in proto_utils.h. We've
// copied that specialization below and modified it to exclude
// protocol::FlightData from the default implementation so we can specialize
// for our faster serialization-deserialization path
#undef GRPC_OPEN_SOURCE_PROTO
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on this? I think it's easy to miss it or misunderstand why it's here. (as in "what happens if I remove this")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


#include "grpcpp/impl/codegen/proto_utils.h"

namespace arrow {
namespace ipc {
namespace internal {

struct IpcPayload;

} // namespace internal
} // namespace ipc

namespace flight {

struct FlightData;

namespace protocol {

class FlightData;

} // namespace protocol
} // namespace flight
} // namespace arrow

namespace grpc {

using arrow::flight::FlightData;
using arrow::ipc::internal::IpcPayload;

class ByteBuffer;
class Status;

Status FlightDataSerialize(const IpcPayload& msg, ByteBuffer* out, bool* own_buffer);
Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out);

// This class provides a protobuf serializer. It translates between protobuf
// objects and grpc_byte_buffers. More information about SerializationTraits can
// be found in include/grpcpp/impl/codegen/serialization_traits.h.
template <class T>
class SerializationTraits<
T, typename std::enable_if<
std::is_base_of<grpc::protobuf::Message, T>::value &&
!std::is_same<arrow::flight::protocol::FlightData, T>::value>::type> {
public:
static Status Serialize(const grpc::protobuf::Message& msg, ByteBuffer* bb,
bool* own_buffer) {
return GenericSerialize<ProtoBufferWriter, T>(msg, bb, own_buffer);
}

static Status Deserialize(ByteBuffer* buffer, grpc::protobuf::Message* msg) {
return GenericDeserialize<ProtoBufferReader, T>(buffer, msg);
}
};

template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_same<
arrow::flight::protocol::FlightData, T>::value>::type> {
public:
static Status Serialize(const grpc::protobuf::Message& msg, ByteBuffer* bb,
bool* own_buffer) {
return FlightDataSerialize(*reinterpret_cast<const IpcPayload*>(&msg), bb,
own_buffer);
}

static Status Deserialize(ByteBuffer* buffer, grpc::protobuf::Message* msg) {
return FlightDataDeserialize(buffer, reinterpret_cast<FlightData*>(msg));
}
};

} // namespace grpc
2 changes: 2 additions & 0 deletions cpp/src/arrow/flight/internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "arrow/flight/internal.h"

#include "arrow/flight/customize_protobuf.h"

#include <memory>
#include <string>
#include <utility>
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/flight/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
#include "arrow/ipc/writer.h"
#include "arrow/util/macros.h"

#include "arrow/flight/Flight.grpc.pb.h"
#include "arrow/flight/Flight.pb.h"
#include "arrow/flight/protocol-internal.h"
#include "arrow/flight/types.h"

namespace arrow {
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/arrow/flight/protocol-internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations

#include "arrow/flight/protocol-internal.h"

// NOTE(wesm): Including .cc files in another .cc file would ordinarily be a
// no-no. We have customized the serialization path for FlightData, which is
// currently only possible through some pre-processor commands that need to be
// included before either of these files is compiled. Because we don't want to
// edit the generated C++ files, we include them here and do our gRPC
// customizations in protocol-internal.h
#include "arrow/flight/Flight.grpc.pb.cc" // NOLINT
#include "arrow/flight/Flight.pb.cc" // NOLINT
23 changes: 23 additions & 0 deletions cpp/src/arrow/flight/protocol-internal.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations

#pragma once

// Need to include this first to get our gRPC customizations
#include "arrow/flight/customize_protobuf.h"

#include "arrow/flight/Flight.grpc.pb.h"
#include "arrow/flight/Flight.pb.h"
Loading