Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cpp/cmake_modules/FindArrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ find_library(ARROW_PYTHON_LIB_PATH NAMES arrow_python
NO_DEFAULT_PATH)
get_filename_component(ARROW_PYTHON_LIBS ${ARROW_PYTHON_LIB_PATH} DIRECTORY)

if (PYARROW_BUILD_FLIGHT)
find_library(ARROW_FLIGHT_LIB_PATH NAMES arrow_flight
PATHS
${ARROW_SEARCH_LIB_PATH}
NO_DEFAULT_PATH)
get_filename_component(ARROW_FLIGHT_LIBS ${ARROW_FLIGHT_LIB_PATH} DIRECTORY)
endif()

if (MSVC)
SET(CMAKE_FIND_LIBRARY_SUFFIXES ".lib" ".dll")

Expand All @@ -101,19 +109,25 @@ if (ARROW_INCLUDE_DIR AND ARROW_LIBS)
set(ARROW_FOUND TRUE)
set(ARROW_LIB_NAME arrow)
set(ARROW_PYTHON_LIB_NAME arrow_python)
set(ARROW_FLIGHT_LIB_NAME arrow_flight)
if (MSVC)
set(ARROW_STATIC_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
set(ARROW_PYTHON_STATIC_LIB ${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
set(ARROW_FLIGHT_STATIC_LIB ${ARROW_FLIGHT_LIBS}/${ARROW_FLIGHT_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
set(ARROW_SHARED_LIB ${ARROW_SHARED_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_PYTHON_SHARED_LIB ${ARROW_PYTHON_SHARED_LIBS}/${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_FLIGHT_SHARED_LIB ${ARROW_FLIGHT_SHARED_LIBS}/${ARROW_FLIGHT_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_SHARED_IMP_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}.lib)
set(ARROW_PYTHON_SHARED_IMP_LIB ${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}.lib)
set(ARROW_FLIGHT_SHARED_IMP_LIB ${ARROW_FLIGHT_LIBS}/${ARROW_FLIGHT_LIB_NAME}.lib)
else()
set(ARROW_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}.a)
set(ARROW_PYTHON_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}.a)
set(ARROW_FLIGHT_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_FLIGHT_LIB_NAME}.a)

set(ARROW_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_PYTHON_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_FLIGHT_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_FLIGHT_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
endif()
endif()

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ add_custom_target(arrow_flight)
arrow_install_all_headers("arrow/flight")

set(ARROW_FLIGHT_STATIC_LINK_LIBS
protobuf_static
${PROTOBUF_LIBRARY}
grpc_grpcpp_static
grpc_grpc_static
grpc_gpr_static
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,25 @@ set(ARROW_PYTHON_SRCS
pyarrow.cc
serialize.cc)

if(ARROW_FLIGHT)
set(ARROW_PYTHON_SRCS ${ARROW_PYTHON_SRCS} flight.cc)
endif()

if("${COMPILER_FAMILY}" STREQUAL "clang")
set_property(SOURCE pyarrow.cc APPEND_STRING PROPERTY COMPILE_FLAGS " -Wno-cast-qual ")
endif()

set(ARROW_PYTHON_SHARED_LINK_LIBS arrow_shared ${PYTHON_OTHER_LIBS})

if(ARROW_FLIGHT)
# Must link shared: we don't want to link more than one copy of gRPC
# into the eventual Cython shared object, otherwise gRPC calls fail
# with weird errors due to multiple copies of global static state
# (The other solution is to link gRPC shared everywhere instead of
# statically only in Flight)
set(ARROW_PYTHON_SHARED_LINK_LIBS ${ARROW_PYTHON_SHARED_LINK_LIBS} arrow_flight_shared)
endif()

if(WIN32)
set(ARROW_PYTHON_SHARED_LINK_LIBS ${ARROW_PYTHON_SHARED_LINK_LIBS} ${PYTHON_LIBRARIES})
endif()
Expand Down
86 changes: 86 additions & 0 deletions cpp/src/arrow/python/flight.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <utility>

#include "arrow/flight/internal.h"
#include "arrow/python/flight.h"

namespace arrow {
namespace py {
namespace flight {

PyFlightServer::PyFlightServer(PyObject* server, PyFlightServerVtable vtable)
: vtable_(vtable) {
Py_INCREF(server);
server_.reset(server);
}

Status PyFlightServer::ListFlights(
const arrow::flight::Criteria* criteria,
std::unique_ptr<arrow::flight::FlightListing>* listings) {
return Status::NotImplemented("NYI");
}

Status PyFlightServer::GetFlightInfo(const arrow::flight::FlightDescriptor& request,
std::unique_ptr<arrow::flight::FlightInfo>* info) {
PyAcquireGIL lock;
vtable_.get_flight_info(server_.obj(), request, info);
return CheckPyError();
}

Status PyFlightServer::DoGet(const arrow::flight::Ticket& request,
std::unique_ptr<arrow::flight::FlightDataStream>* stream) {
PyAcquireGIL lock;
vtable_.do_get(server_.obj(), request, stream);
return CheckPyError();
}

Status PyFlightServer::DoPut(std::unique_ptr<arrow::flight::FlightMessageReader> reader) {
PyAcquireGIL lock;
vtable_.do_put(server_.obj(), std::move(reader));
return CheckPyError();
}

Status PyFlightServer::DoAction(const arrow::flight::Action& action,
std::unique_ptr<arrow::flight::ResultStream>* result) {
return Status::NotImplemented("NYI");
}

Status PyFlightServer::ListActions(std::vector<arrow::flight::ActionType>* actions) {
return Status::NotImplemented("NYI");
}

Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
const arrow::flight::FlightDescriptor& descriptor,
const std::vector<arrow::flight::FlightEndpoint>& endpoints,
uint64_t total_records, uint64_t total_bytes,
std::unique_ptr<arrow::flight::FlightInfo>* out) {
arrow::flight::FlightInfo::Data flight_data;
RETURN_NOT_OK(arrow::flight::internal::SchemaToString(*schema, &flight_data.schema));
flight_data.descriptor = descriptor;
flight_data.endpoints = endpoints;
flight_data.total_records = total_records;
flight_data.total_bytes = total_bytes;
arrow::flight::FlightInfo value(flight_data);
*out = std::unique_ptr<arrow::flight::FlightInfo>(new arrow::flight::FlightInfo(value));
return Status::OK();
}

} // namespace flight
} // namespace py
} // namespace arrow
79 changes: 79 additions & 0 deletions cpp/src/arrow/python/flight.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef PYARROW_FLIGHT_H
#define PYARROW_FLIGHT_H

#include <memory>
#include <vector>

#include "arrow/flight/api.h"
#include "arrow/python/common.h"
#include "arrow/python/config.h"

namespace arrow {

namespace py {

namespace flight {

/// \brief A table of function pointers for calling from C++ into
/// Python.
class ARROW_PYTHON_EXPORT PyFlightServerVtable {
public:
std::function<void(PyObject*, const arrow::flight::FlightDescriptor&,
std::unique_ptr<arrow::flight::FlightInfo>*)>
get_flight_info;
std::function<void(PyObject*, std::unique_ptr<arrow::flight::FlightMessageReader>)>
do_put;
std::function<void(PyObject*, const arrow::flight::Ticket&,
std::unique_ptr<arrow::flight::FlightDataStream>*)>
do_get;
};

class ARROW_PYTHON_EXPORT PyFlightServer : public arrow::flight::FlightServerBase {
public:
explicit PyFlightServer(PyObject* server, PyFlightServerVtable vtable);

Status ListFlights(const arrow::flight::Criteria* criteria,
std::unique_ptr<arrow::flight::FlightListing>* listings) override;
Status GetFlightInfo(const arrow::flight::FlightDescriptor& request,
std::unique_ptr<arrow::flight::FlightInfo>* info) override;
Status DoGet(const arrow::flight::Ticket& request,
std::unique_ptr<arrow::flight::FlightDataStream>* stream) override;
Status DoPut(std::unique_ptr<arrow::flight::FlightMessageReader> reader) override;
Status DoAction(const arrow::flight::Action& action,
std::unique_ptr<arrow::flight::ResultStream>* result) override;
Status ListActions(std::vector<arrow::flight::ActionType>* actions) override;

private:
OwnedRefNoGIL server_;
PyFlightServerVtable vtable_;
};

ARROW_PYTHON_EXPORT
Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
const arrow::flight::FlightDescriptor& descriptor,
const std::vector<arrow::flight::FlightEndpoint>& endpoints,
uint64_t total_records, uint64_t total_bytes,
std::unique_ptr<arrow::flight::FlightInfo>* out);

} // namespace flight
} // namespace py
} // namespace arrow

#endif // PYARROW_FLIGHT_H
25 changes: 25 additions & 0 deletions python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ endif()
# Top level cmake dir
if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(PYARROW_BUILD_CUDA "Build the PyArrow CUDA support" OFF)
option(PYARROW_BUILD_FLIGHT "Build the PyArrow Flight integration" OFF)
option(PYARROW_BUILD_GANDIVA "Build the PyArrow Gandiva integration" OFF)
option(PYARROW_BUILD_PARQUET "Build the PyArrow Parquet integration" OFF)
option(PYARROW_PARQUET_USE_SHARED "Rely on parquet shared libraries where relevant" ON)
Expand Down Expand Up @@ -191,6 +192,10 @@ include_directories(SYSTEM ${NUMPY_INCLUDE_DIRS} ${PYTHON_INCLUDE_DIRS} src)
# Dependencies
#

if(PYARROW_BUILD_FLIGHT)
set(ARROW_FLIGHT TRUE)
endif()

# Arrow
find_package(Arrow REQUIRED)
include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
Expand Down Expand Up @@ -352,9 +357,15 @@ endif()
if(MSVC)
add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_IMP_LIB})
add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_IMP_LIB})
if(PYARROW_BUILD_FLIGHT)
add_thirdparty_lib(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_IMP_LIB})
endif()
else()
add_thirdparty_lib(arrow SHARED_LIB ${ARROW_SHARED_LIB})
add_thirdparty_lib(arrow_python SHARED_LIB ${ARROW_PYTHON_SHARED_LIB})
if(PYARROW_BUILD_FLIGHT)
add_thirdparty_lib(arrow_flight SHARED_LIB ${ARROW_FLIGHT_SHARED_LIB})
endif()
endif()

#
Expand Down Expand Up @@ -474,6 +485,20 @@ if(PYARROW_BUILD_ORC)
set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _orc)
endif()

# Flight
if(PYARROW_BUILD_FLIGHT)
if(PYARROW_BUNDLE_ARROW_CPP)
# TODO:
message(FATAL_ERROR "Not yet implemented: bundling arrow-flight in pyarrow")
endif()
# We do NOT want to link gRPC or any other Flight dependency
# here. Linking more than one copy leads to odd runtime errors due
# to multiple copies of static global state. Thus we also need to
# link Flight as a shared object.
set(LINK_LIBS ${LINK_LIBS} arrow_flight_shared)
set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _flight)
endif()

# Gandiva
if(PYARROW_BUILD_GANDIVA)
find_package(Gandiva)
Expand Down
Loading