Skip to content

Commit

Permalink
Fix memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
Y-- committed Oct 10, 2024
1 parent 37e8c40 commit 10b7f37
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 9 deletions.
22 changes: 22 additions & 0 deletions include/pgduckdb/pgduckdb_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ extern "C" {
}

#include "duckdb/common/exception.hpp"
#include "duckdb/common/error_data.hpp"

#include <vector>
#include <string>
Expand Down Expand Up @@ -74,4 +75,25 @@ PostgresFunctionGuard(FuncType postgres_function, FuncArgs... args) {
}
}

template <typename FuncType, typename... FuncArgs>
const char*
DuckDBFunctionGuard(FuncType duckdb_function, FuncArgs... args) {
try {
duckdb_function(args...);
} catch (duckdb::Exception &ex) {
duckdb::ErrorData edata(ex.what());
return pstrdup(edata.Message().c_str());
} catch (std::exception &ex) {
const auto msg = ex.what();
if (msg[0] == '{') {
duckdb::ErrorData edata(ex.what());
return pstrdup(edata.Message().c_str());
} else {
return pstrdup(ex.what());
}
}

return nullptr;
}

} // namespace pgduckdb
39 changes: 30 additions & 9 deletions src/pgduckdb_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extern "C" {
#include "pgduckdb/pgduckdb_types.hpp"
#include "pgduckdb/pgduckdb_duckdb.hpp"
#include "pgduckdb/pgduckdb_planner.hpp"
#include "pgduckdb/pgduckdb_utils.hpp"

/* global variables */
CustomScanMethods duckdb_scan_scan_methods;
Expand All @@ -36,9 +37,21 @@ typedef struct DuckdbScanState {

static void
CleanupDuckdbScanState(DuckdbScanState *state) {
MemoryContextReset(state->css.ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
ExecClearTuple(state->css.ss.ss_ScanTupleSlot);

state->query_results.reset();
delete state->prepared_statement;
delete state->duckdb_connection;
state->current_data_chunk.reset();

if (state->prepared_statement) {
delete state->prepared_statement;
state->prepared_statement = nullptr;
}

if (state->duckdb_connection) {
delete state->duckdb_connection;
state->duckdb_connection = nullptr;
}
}

/* static callbacks */
Expand Down Expand Up @@ -106,12 +119,13 @@ ExecuteQuery(DuckdbScanState *state) {
std::ostringstream oss;
oss << "parameter '" << i << "' has an invalid type (" << pg_param->ptype << ") during query execution";
elog(ERROR, oss.str().c_str());
// throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, oss.str().c_str());
}
}

auto pending = prepared.PendingQuery(duckdb_params, true);
if (pending->HasError()) {
elog(ERROR, "DuckDB execute returned an error: %s", pending->GetError().c_str());
return pending->ThrowError();
}

duckdb::PendingExecutionResult execution_result;
Expand All @@ -124,16 +138,14 @@ ExecuteQuery(DuckdbScanState *state) {
// Wait for all tasks to terminate
executor.CancelTasks();
// Delete the scan state
CleanupDuckdbScanState(state);
// Process the interrupt on the Postgres side
ProcessInterrupts();
elog(ERROR, "Query cancelled");
throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, "Query cancelled");
}
} while (!duckdb::PendingQueryResult::IsResultReady(execution_result));

if (execution_result == duckdb::PendingExecutionResult::EXECUTION_ERROR) {
CleanupDuckdbScanState(state);
elog(ERROR, "(PGDuckDB/ExecuteQuery) %s", pending->GetError().c_str());
return pending->ThrowError();
}

query_results = pending->Execute();
Expand All @@ -149,7 +161,11 @@ Duckdb_ExecCustomScan(CustomScanState *node) {

bool already_executed = duckdb_scan_state->is_executed;
if (!already_executed) {
ExecuteQuery(duckdb_scan_state);
auto err_msg = pgduckdb::DuckDBFunctionGuard(ExecuteQuery, duckdb_scan_state);
if (err_msg) {
Duckdb_EndCustomScan(node);
elog(ERROR, "(PGDuckDB/ExecuteQuery) %s", err_msg);
}
}

if (duckdb_scan_state->fetch_next) {
Expand Down Expand Up @@ -209,7 +225,12 @@ Duckdb_ReScanCustomScan(CustomScanState *node) {
void
Duckdb_ExplainCustomScan(CustomScanState *node, List *ancestors, ExplainState *es) {
DuckdbScanState *duckdb_scan_state = (DuckdbScanState *)node;
ExecuteQuery(duckdb_scan_state);
auto err_msg = pgduckdb::DuckDBFunctionGuard(ExecuteQuery, duckdb_scan_state);
if (err_msg) {
Duckdb_EndCustomScan(node);
elog(ERROR, "(PGDuckDB/Duckdb_ExecCustomScan) %s", err_msg);
}

auto chunk = duckdb_scan_state->query_results->Fetch();
if (!chunk || chunk->size() == 0) {
return;
Expand Down

0 comments on commit 10b7f37

Please sign in to comment.