Skip to content

Commit

Permalink
[Python] Distribute drivers as Python packages (apache#57)
Browse files Browse the repository at this point in the history
* [Python] Bundle Postgres driver as a Python package

* [Python] Bundle Postgres driver without Cython

* [Python] Add more tests

* Fix authors, remove redundant block
  • Loading branch information
lidavidm authored Aug 31, 2022
1 parent 62581c5 commit e95fb58
Show file tree
Hide file tree
Showing 18 changed files with 686 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# under the License.

[settings]
known_first_party = adbc_driver_manager
known_first_party = adbc_driver_manager, adbc_driver_postgres
profile = black
72 changes: 52 additions & 20 deletions c/driver_manager/adbc_driver_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ void SetError(struct AdbcError* error, const std::string& message) {

// Default stubs

AdbcStatusCode DatabaseSetOption(struct AdbcDatabase* database, const char* key,
const char* value, struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode ConnectionCommit(struct AdbcConnection*, struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}
Expand Down Expand Up @@ -147,6 +152,7 @@ struct TempDatabase {
std::string driver;
// Default name (see adbc.h)
std::string entrypoint = "AdbcDriverInit";
AdbcDriverInitFunc init_func = nullptr;
};

/// Temporary state while the database is being configured.
Expand Down Expand Up @@ -209,7 +215,7 @@ static AdbcStatusCode ReleaseDriver(struct AdbcDriver* driver, struct AdbcError*

AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
// Allocate a temporary structure to store options pre-Init
database->private_data = new TempDatabase;
database->private_data = new TempDatabase();
database->private_driver = nullptr;
return ADBC_STATUS_OK;
}
Expand All @@ -231,25 +237,48 @@ AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char*
return ADBC_STATUS_OK;
}

AdbcStatusCode AdbcDriverManagerDatabaseSetInitFunc(struct AdbcDatabase* database,
AdbcDriverInitFunc init_func,
struct AdbcError* error) {
if (database->private_driver) {
return ADBC_STATUS_INVALID_STATE;
}

TempDatabase* args = reinterpret_cast<TempDatabase*>(database->private_data);
args->init_func = init_func;
return ADBC_STATUS_OK;
}

AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) {
if (!database->private_data) {
SetError(error, "Must call AdbcDatabaseNew first");
return ADBC_STATUS_INVALID_STATE;
}
TempDatabase* args = reinterpret_cast<TempDatabase*>(database->private_data);
if (args->driver.empty()) {
// Don't delete args here; caller should still call AdbcDatabaseRelease
if (args->init_func) {
// Do nothing
} else if (args->driver.empty()) {
SetError(error, "Must provide 'driver' parameter");
return ADBC_STATUS_INVALID_ARGUMENT;
}

database->private_driver = new AdbcDriver;
std::memset(database->private_driver, 0, sizeof(AdbcDriver));
size_t initialized = 0;
AdbcStatusCode status =
AdbcLoadDriver(args->driver.c_str(), args->entrypoint.c_str(), ADBC_VERSION_0_0_1,
database->private_driver, &initialized, error);
AdbcStatusCode status;
// So we don't confuse a driver into thinking it's initialized already
database->private_data = nullptr;
if (args->init_func) {
status = AdbcLoadDriverFromInitFunc(args->init_func, ADBC_VERSION_0_0_1,
database->private_driver, &initialized, error);
} else {
status =
AdbcLoadDriver(args->driver.c_str(), args->entrypoint.c_str(), ADBC_VERSION_0_0_1,
database->private_driver, &initialized, error);
}
if (status != ADBC_STATUS_OK) {
// Restore private_data so it will be released by AdbcDatabaseRelease
database->private_data = args;
if (database->private_driver->release) {
database->private_driver->release(database->private_driver, error);
}
Expand Down Expand Up @@ -585,16 +614,6 @@ const char* AdbcStatusCodeMessage(AdbcStatusCode code) {
AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,
size_t count, struct AdbcDriver* driver,
size_t* initialized, struct AdbcError* error) {
#define FILL_DEFAULT(DRIVER, STUB) \
if (!DRIVER->STUB) { \
DRIVER->STUB = &STUB; \
}
#define CHECK_REQUIRED(DRIVER, STUB) \
if (!DRIVER->STUB) { \
SetError(error, "Driver does not implement required function Adbc" #STUB); \
return ADBC_STATUS_INTERNAL; \
}

AdbcDriverInitFunc init_func;
std::string error_message;

Expand Down Expand Up @@ -695,18 +714,31 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,

#endif // defined(_WIN32)

return AdbcLoadDriverFromInitFunc(init_func, count, driver, initialized, error);
}

AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, size_t count,
struct AdbcDriver* driver, size_t* initialized,
struct AdbcError* error) {
#define FILL_DEFAULT(DRIVER, STUB) \
if (!DRIVER->STUB) { \
DRIVER->STUB = &STUB; \
}
#define CHECK_REQUIRED(DRIVER, STUB) \
if (!DRIVER->STUB) { \
SetError(error, "Driver does not implement required function Adbc" #STUB); \
return ADBC_STATUS_INTERNAL; \
}

auto result = init_func(count, driver, initialized, error);
#if defined(_WIN32)
driver->private_manager = new ManagerDriverState{handle, driver->release};
driver->release = &ReleaseDriver;
#endif // defined(_WIN32)
if (result != ADBC_STATUS_OK) {
return result;
}

CHECK_REQUIRED(driver, DatabaseNew);
CHECK_REQUIRED(driver, DatabaseInit);
CHECK_REQUIRED(driver, DatabaseRelease);
FILL_DEFAULT(driver, DatabaseSetOption);

CHECK_REQUIRED(driver, ConnectionGetInfo);
CHECK_REQUIRED(driver, ConnectionNew);
Expand Down
32 changes: 32 additions & 0 deletions c/driver_manager/adbc_driver_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,38 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,
size_t count, struct AdbcDriver* driver,
size_t* initialized, struct AdbcError* error);

/// \brief Common entry point for drivers via the driver manager.
///
/// The driver manager can fill in default implementations of some
/// ADBC functions for drivers. Drivers must implement a minimum level
/// of functionality for this to be possible, however, and some
/// functions must be implemented by the driver.
///
/// \param[in] entrypoint The entrypoint to call.
/// \param[in] count The number of entries to initialize. Provides
/// backwards compatibility if the struct definition is changed.
/// \param[out] driver The table of function pointers to initialize.
/// \param[out] initialized How much of the table was actually
/// initialized (can be less than count).
/// \param[out] error An optional location to return an error message
/// if necessary.
ADBC_EXPORT
AdbcStatusCode AdbcLoadDriverFromInitFunc(AdbcDriverInitFunc init_func, size_t count,
struct AdbcDriver* driver, size_t* initialized,
struct AdbcError* error);

/// \brief Set the AdbcDriverInitFunc to use.
///
/// This is an extension to the ADBC API. The driver manager shims
/// the AdbcDatabase* functions to allow you to specify the
/// driver/entrypoint dynamically. This function lets you set the
/// entrypoint explicitly, for applications that can dynamically
/// load drivers on their own.
ADBC_EXPORT
AdbcStatusCode AdbcDriverManagerDatabaseSetInitFunc(struct AdbcDatabase* database,
AdbcDriverInitFunc init_func,
struct AdbcError* error);

/// \brief Get a human-friendly description of a status code.
ADBC_EXPORT
const char* AdbcStatusCodeMessage(AdbcStatusCode code);
Expand Down
19 changes: 14 additions & 5 deletions c/drivers/postgres/postgres.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "connection.h"
#include "database.h"
#include "statement.h"
#include "util.h"

using adbcpq::PostgresConnection;
using adbcpq::PostgresDatabase;
Expand All @@ -50,6 +51,7 @@ using adbcpq::PostgresStatement;
// AdbcDatabase

namespace {
using adbcpq::SetError;
AdbcStatusCode PostgresDatabaseInit(struct AdbcDatabase* database,
struct AdbcError* error) {
if (!database || !database->private_data) return ADBC_STATUS_INVALID_STATE;
Expand All @@ -59,7 +61,14 @@ AdbcStatusCode PostgresDatabaseInit(struct AdbcDatabase* database,

AdbcStatusCode PostgresDatabaseNew(struct AdbcDatabase* database,
struct AdbcError* error) {
if (!database || database->private_data) return ADBC_STATUS_INVALID_STATE;
if (!database) {
SetError(error, "database must not be null");
return ADBC_STATUS_INVALID_STATE;
}
if (database->private_data) {
SetError(error, "database is already initialized");
return ADBC_STATUS_INVALID_STATE;
}
auto impl = std::make_shared<PostgresDatabase>();
database->private_data = new std::shared_ptr<PostgresDatabase>(impl);
return ADBC_STATUS_OK;
Expand Down Expand Up @@ -115,7 +124,7 @@ AdbcStatusCode PostgresConnectionCommit(struct AdbcConnection* connection,

AdbcStatusCode PostgresConnectionGetInfo(struct AdbcConnection* connection,
uint32_t* info_codes, size_t info_codes_length,
struct AdbcStatement* statement,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
// if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
Expand Down Expand Up @@ -206,9 +215,9 @@ AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,

AdbcStatusCode AdbcConnectionGetInfo(struct AdbcConnection* connection,
uint32_t* info_codes, size_t info_codes_length,
struct AdbcStatement* statement,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
return PostgresConnectionGetInfo(connection, info_codes, info_codes_length, statement,
return PostgresConnectionGetInfo(connection, info_codes, info_codes_length, stream,
error);
}

Expand Down Expand Up @@ -438,7 +447,7 @@ AdbcStatusCode AdbcDriverInit(size_t count, struct AdbcDriver* driver,
driver->DatabaseSetOption = PostgresDatabaseSetOption;

driver->ConnectionCommit = PostgresConnectionCommit;
// driver->ConnectionGetInfo = PostgresConnectionGetInfo;
driver->ConnectionGetInfo = PostgresConnectionGetInfo;
// driver->ConnectionGetObjects = PostgresConnectionGetObjects;
driver->ConnectionGetTableSchema = PostgresConnectionGetTableSchema;
// driver->ConnectionGetTableTypes = PostgresConnectionGetTableTypes;
Expand Down
2 changes: 1 addition & 1 deletion c/drivers/postgres/statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ uint64_t ToNetworkInt64(int64_t v) { return htobe64(static_cast<uint64_t>(v)); }
} // namespace

int TupleReader::GetSchema(struct ArrowSchema* out) {
if (!result_) {
if (!schema_.release) {
last_error_ = "[libpq] Result set was already consumed or freed";
return EINVAL;
}
Expand Down
24 changes: 19 additions & 5 deletions python/adbc_driver_manager/adbc_driver_manager/_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ cdef extern from "adbc.h" nogil:
char[5] sqlstate
CAdbcErrorRelease release

cdef struct CAdbcDriver"AdbcDriver":
pass

cdef struct CAdbcDatabase"AdbcDatabase":
void* private_data

Expand All @@ -116,6 +119,12 @@ cdef extern from "adbc.h" nogil:
CAdbcStatusCode AdbcDatabaseInit(CAdbcDatabase* database, CAdbcError* error)
CAdbcStatusCode AdbcDatabaseRelease(CAdbcDatabase* database, CAdbcError* error)

ctypedef void (*CAdbcDriverInitFunc "AdbcDriverInitFunc")(size_t, CAdbcDriver*, size_t*, CAdbcError*)
CAdbcStatusCode AdbcDriverManagerDatabaseSetInitFunc(
CAdbcDatabase* database,
CAdbcDriverInitFunc init_func,
CAdbcError* error)

CAdbcStatusCode AdbcConnectionCommit(
CAdbcConnection* connection,
CAdbcError* error)
Expand Down Expand Up @@ -454,11 +463,16 @@ cdef class AdbcDatabase(_AdbcHandle):
check_error(status, &c_error)

for key, value in kwargs.items():
key = key.encode("utf-8")
value = value.encode("utf-8")
c_key = key
c_value = value
status = AdbcDatabaseSetOption(&self.database, c_key, c_value, &c_error)
if key == "init_func":
status = AdbcDriverManagerDatabaseSetInitFunc(
&self.database, <CAdbcDriverInitFunc> (<uintptr_t> value), &c_error)
else:
key = key.encode("utf-8")
value = value.encode("utf-8")
c_key = key
c_value = value
status = AdbcDatabaseSetOption(
&self.database, c_key, c_value, &c_error)
check_error(status, &c_error)

with nogil:
Expand Down
6 changes: 5 additions & 1 deletion python/adbc_driver_manager/adbc_driver_manager/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,11 @@ def _prepare_execute(self, operation, parameters=None) -> None:
if operation != self._last_query:
self._last_query = operation
self._stmt.set_sql_query(operation)
self._stmt.prepare()
try:
self._stmt.prepare()
except NotSupportedError:
# Not all drivers support it
pass

if parameters:
rb = pyarrow.record_batch(
Expand Down
2 changes: 1 addition & 1 deletion python/adbc_driver_manager/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
name = "adbc_driver_manager"
version = "0.0.1-alpha.1"
description = ""
authors = ["David Li <li.davidm96@gmail.com>"]
authors = ["Apache Arrow Developers <dev@arrow.apache.org>"]
license = "Apache-2.0"
homepage = "https://arrow.apache.org"
repository = "https://github.com/apache/arrow-adbc"
Expand Down
20 changes: 20 additions & 0 deletions python/adbc_driver_postgres/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

adbc_driver_postgres/*.c
adbc_driver_postgres/*.cpp
build/
28 changes: 28 additions & 0 deletions python/adbc_driver_postgres/adbc_driver_postgres/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import importlib.resources

import adbc_driver_manager


def connect(uri: str) -> adbc_driver_manager.AdbcDatabase:
"""Create a low level ADBC connection to Postgres."""
with importlib.resources.path(
__package__, "libadbc_driver_postgres.so"
) as entrypoint:
return adbc_driver_manager.AdbcDatabase(driver=str(entrypoint), uri=uri)
Loading

0 comments on commit e95fb58

Please sign in to comment.