Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <mutex>
#include <random>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

Expand All @@ -45,6 +46,7 @@
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "util/cpu_info.h"
#include "util/string_util.h"

namespace doris::config {
#include "common/compile_check_avoid_begin.h"
Expand Down Expand Up @@ -1082,6 +1084,29 @@ DEFINE_mInt32(segcompaction_wait_for_dbm_task_timeout_s, "3600"); // 1h
// enable java udf and jdbc scannode
DEFINE_Bool(enable_java_support, "true");

// enable python udf
DEFINE_Bool(enable_python_udf_support, "true");
// python env mode, options: conda, venv
DEFINE_String(python_env_mode, "");
DEFINE_Validator(python_env_mode, [](const std::string& config) -> bool {
std::string lower_config = to_lower(config);
std::string_view trimmed_config = trim(lower_config);
if (trimmed_config.empty()) return true;
return (trimmed_config == "conda" || trimmed_config == "venv");
});
// root path of conda runtime, python_env_mode should be conda
DEFINE_String(python_conda_root_path, "");
// root path of venv runtime, python_env_mode should be venv
DEFINE_String(python_venv_root_path, "${DORIS_HOME}/lib/udf/python");
// python interpreter paths used by venv, e.g. /usr/bin/python3.7:/usr/bin/python3.6
DEFINE_String(python_venv_interpreter_paths, "");
// python deps index url
DEFINE_String(python_deps_index_url, "https://pypi.org/simple/");
// min number of python process
DEFINE_Int32(min_python_process_nums, "4");
// max number of python process
DEFINE_Int32(max_python_process_nums, "64");

// Set config randomly to check more issues in github workflow
DEFINE_Bool(enable_fuzzy_mode, "false");

Expand Down
17 changes: 17 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,23 @@ DECLARE_mInt32(segcompaction_num_threads);
// enable java udf and jdbc scannode
DECLARE_Bool(enable_java_support);

// enable python udf
DECLARE_Bool(enable_python_udf_support);
// python env mode, options: conda, venv
DECLARE_String(python_env_mode);
// root path of conda runtime, python_env_mode should be conda
DECLARE_String(python_conda_root_path);
// root path of venv runtime, python_env_mode should be venv
DECLARE_String(python_venv_root_path);
// python interpreter paths used by venv, e.g. /usr/bin/python3.7:/usr/bin/python3.6
DECLARE_String(python_venv_interpreter_paths);
// python deps index url
DECLARE_String(python_deps_index_url);
// min number of python process
DECLARE_Int32(min_python_process_nums);
// max number of python process
DECLARE_Int32(max_python_process_nums);

// Set config randomly to check more issues in github workflow
DECLARE_Bool(enable_fuzzy_mode);

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
#include "service/backend_options.h"
#include "service/backend_service.h"
#include "service/point_query_executor.h"
#include "udf/python/python_udf_server.h"
#include "util/bfd_parser.h"
#include "util/bit_util.h"
#include "util/brpc_client_cache.h"
Expand Down Expand Up @@ -889,6 +890,7 @@ void ExecEnv::destroy() {
_s_tracking_memory = false;

clear_storage_resource();
PythonUDFServerManager::instance().shutdown();
LOG(INFO) << "Doris exec envorinment is destoried.";
}

Expand Down
129 changes: 126 additions & 3 deletions be/src/runtime/user_function_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <glog/logging.h>
#include <minizip/unzip.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
Expand All @@ -41,6 +42,7 @@
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
#include "runtime/plugin/cloud_plugin_downloader.h"
#include "util/defer_op.h"
#include "util/dynamic_util.h"
#include "util/md5.h"
#include "util/string_util.h"
Expand Down Expand Up @@ -88,6 +90,9 @@ struct UserFunctionCacheEntry {
// And this is used to indicate whether library is downloaded.
bool is_downloaded = false;

// Indicate if the zip file is unziped.
bool is_unziped = false;

// used to lookup a symbol
void* lib_handle = nullptr;

Expand Down Expand Up @@ -144,9 +149,12 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std
lib_type = LibType::SO;
} else if (ends_with(file, ".jar")) {
lib_type = LibType::JAR;
} else if (ends_with(file, ".zip") && _check_cache_is_python_udf(dir, file)) {
lib_type = LibType::PY_ZIP;
} else {
return Status::InternalError(
"unknown library file format. the file type is not end with xxx.jar or xxx.so : " +
"unknown library file format. the file type is not end with xxx.jar or xxx.so"
" or xxx.zip : " +
file);
}

Expand Down Expand Up @@ -249,12 +257,117 @@ Status UserFunctionCache::_load_cache_entry(const std::string& url,
RETURN_IF_ERROR(_download_lib(url, entry));
}

if (!entry->is_unziped && entry->type == LibType::PY_ZIP) {
RETURN_IF_ERROR(_unzip_lib(entry->lib_file));
entry->lib_file = entry->lib_file.substr(0, entry->lib_file.size() - 4);
entry->is_unziped = true;
}

if (entry->type == LibType::SO) {
RETURN_IF_ERROR(_load_cache_entry_internal(entry));
} else if (entry->type != LibType::JAR) {
} else if (entry->type != LibType::JAR && entry->type != LibType::PY_ZIP) {
return Status::InvalidArgument(
"Unsupported lib type! Make sure your lib type is one of 'so' and 'jar'!");
"Unsupported lib type! Make sure your lib type is one of 'so' and 'jar' and "
"python 'zip'!");
}
return Status::OK();
}

Status UserFunctionCache::_check_cache_is_python_udf(const std::string& dir,
const std::string& file) {
const std::string& full_path = dir + "/" + file;
RETURN_IF_ERROR(_unzip_lib(full_path));
std::string unzip_dir = full_path.substr(0, full_path.size() - 4);

bool has_python_file = false;

auto scan_cb = [&has_python_file](const io::FileInfo& file) {
if (file.is_file && ends_with(file.file_name, ".py")) {
has_python_file = true;
return false; // Stop iteration once we find a Python file
}
return true;
};
RETURN_IF_ERROR(io::global_local_filesystem()->iterate_directory(unzip_dir, scan_cb));
if (!has_python_file) {
return Status::InternalError("No Python file found in the unzipped directory.");
}
return Status::OK();
}

Status UserFunctionCache::_unzip_lib(const std::string& zip_file) {
std::string unzip_dir = zip_file.substr(0, zip_file.size() - 4);
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(unzip_dir));

unzFile zip_file_handle = unzOpen(zip_file.c_str());
if (zip_file_handle == nullptr) {
return Status::InternalError("Failed to open zip file: " + zip_file);
}

Defer defer([&] { unzClose(zip_file_handle); });

unz_global_info global_info;
if (unzGetGlobalInfo(zip_file_handle, &global_info) != UNZ_OK) {
return Status::InternalError("Failed to get global info from zip file: " + zip_file);
}

for (uLong i = 0; i < global_info.number_entry; ++i) {
unz_file_info file_info;
char filename[256];
if (unzGetCurrentFileInfo(zip_file_handle, &file_info, filename, sizeof(filename), nullptr,
0, nullptr, 0) != UNZ_OK) {
return Status::InternalError("Failed to get file info from zip file: " + zip_file);
}

if (std::string(filename).find("__MACOSX") != std::string::npos) {
if ((i + 1) < global_info.number_entry) {
if (unzGoToNextFile(zip_file_handle) != UNZ_OK) {
return Status::InternalError("Failed to go to next file in zip: " + zip_file);
}
}
continue;
}

std::string full_filename = unzip_dir + "/" + filename;
if (full_filename.length() > PATH_MAX) {
return Status::InternalError(
fmt::format("File path {}... is too long, maximum path length is {}",
full_filename.substr(0, 50), PATH_MAX));
}

if (filename[strlen(filename) - 1] == '/') {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(full_filename));
} else {
if (unzOpenCurrentFile(zip_file_handle) != UNZ_OK) {
return Status::InternalError("Failed to open file in zip: " +
std::string(filename));
}

FILE* out = fopen(full_filename.c_str(), "wb");
if (out == nullptr) {
unzCloseCurrentFile(zip_file_handle);
return Status::InternalError("Failed to create file: " + full_filename);
}
char buffer[8192];
int bytes_read;
while ((bytes_read = unzReadCurrentFile(zip_file_handle, buffer, sizeof(buffer))) > 0) {
fwrite(buffer, bytes_read, 1, out);
}
fclose(out);
unzCloseCurrentFile(zip_file_handle);
if (bytes_read < 0) {
return Status::InternalError("Failed to read file in zip: " +
std::string(filename));
}
}

if ((i + 1) < global_info.number_entry) {
if (unzGoToNextFile(zip_file_handle) != UNZ_OK) {
return Status::InternalError("Failed to go to next file in zip: " + zip_file);
}
}
}

return Status::OK();
}

Expand Down Expand Up @@ -348,6 +461,8 @@ std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::st
ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum;
if (type == LibType::JAR) {
ss << '.' << file_name;
} else if (type == LibType::PY_ZIP) {
ss << '.' << file_name;
} else {
ss << ".so";
}
Expand All @@ -362,6 +477,14 @@ Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url,
return Status::OK();
}

Status UserFunctionCache::get_pypath(int64_t fid, const std::string& url,
const std::string& checksum, std::string* libpath) {
std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::PY_ZIP));
*libpath = entry->lib_file;
return Status::OK();
}

std::vector<std::string> UserFunctionCache::_split_string_by_checksum(const std::string& file) {
std::vector<std::string> result;

Expand Down
13 changes: 12 additions & 1 deletion be/src/runtime/user_function_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct UserFunctionCacheEntry;
// with id, this function library is valid. And when user wants to
// change its implementation(URL), Doris will generate a new function
// id.
enum class LibType { JAR, SO };
enum class LibType { JAR, SO, PY_ZIP };

class UserFunctionCache {
public:
Expand All @@ -59,13 +59,24 @@ class UserFunctionCache {
Status get_jarpath(int64_t fid, const std::string& url, const std::string& checksum,
std::string* libpath);

Status get_pypath(int64_t fid, const std::string& url, const std::string& checksum,
std::string* libpath);

private:
Status _load_cached_lib();
Status _load_entry_from_lib(const std::string& dir, const std::string& file);
Status _get_cache_entry(int64_t fid, const std::string& url, const std::string& checksum,
std::shared_ptr<UserFunctionCacheEntry>& output_entry, LibType type);
Status _load_cache_entry(const std::string& url, std::shared_ptr<UserFunctionCacheEntry> entry);
Status _download_lib(const std::string& url, std::shared_ptr<UserFunctionCacheEntry> entry);
/**
* Unzip the python udf user file.
*/
Status _unzip_lib(const std::string& file);
/**
* Check if the cache file is python udf.
*/
Status _check_cache_is_python_udf(const std::string& dir, const std::string& file);
Status _load_cache_entry_internal(std::shared_ptr<UserFunctionCacheEntry> entry);

std::string _make_lib_file(int64_t function_id, const std::string& checksum, LibType type,
Expand Down
55 changes: 55 additions & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <fcntl.h>
#include <fmt/core.h>
#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \
!defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
#include <gperftools/malloc_extension.h> // IWYU pragma: keep
Expand Down Expand Up @@ -76,9 +77,11 @@
#include "service/backend_service.h"
#include "service/brpc_service.h"
#include "service/http_service.h"
#include "udf/python/python_env.h"
#include "util/debug_util.h"
#include "util/disk_info.h"
#include "util/mem_info.h"
#include "util/string_util.h"
#include "util/thrift_rpc_helper.h"
#include "util/thrift_server.h"
#include "util/uid_util.h"
Expand Down Expand Up @@ -499,6 +502,58 @@ int main(int argc, char** argv) {
}
}

if (doris::config::enable_python_udf_support) {
if (std::string python_udf_root_path =
fmt::format("{}/lib/udf/python", std::getenv("DORIS_HOME"));
!std::filesystem::exists(python_udf_root_path)) {
std::filesystem::create_directories(python_udf_root_path);
}
std::string python_env_mode = doris::config::python_env_mode;
if (doris::trim(doris::to_lower(python_env_mode)) == "conda") {
if (doris::trim(doris::config::python_conda_root_path).empty()) {
LOG(ERROR) << "Python conda root path is empty, please set python_conda_root_path "
"or set enable_python_udf_support to false";
exit(1);
} else {
LOG(INFO) << "Doris backend python version manager is initialized. Python conda "
"root path: "
<< doris::config::python_conda_root_path;
}
status = doris::PythonVersionManager::instance().init(
doris::PythonEnvType::CONDA, doris::config::python_conda_root_path, "");
} else if (doris::trim(doris::to_lower(python_env_mode)) == "venv") {
if (doris::trim(doris::config::python_venv_root_path).empty()) {
LOG(ERROR) << "Python venv root path is empty, please set python_venv_root_path or "
"set enable_python_udf_support to false";
exit(1);
} else if (doris::trim(doris::config::python_venv_interpreter_paths).empty()) {
LOG(ERROR) << "Python interpreter paths is empty, please set "
"python_venv_interpreter_paths or set enable_python_udf_support to "
"false";
exit(1);
} else {
LOG(INFO) << "Doris backend python version manager is initialized. Python venv "
"root path: "
<< doris::config::python_venv_root_path << ", python interpreter paths: "
<< doris::config::python_venv_interpreter_paths;
}
status = doris::PythonVersionManager::instance().init(
doris::PythonEnvType::VENV, doris::config::python_venv_root_path,
doris::config::python_venv_interpreter_paths);
} else {
status = Status::InvalidArgument(
"Python env mode is empty, should be conda or venv. If you don't want to "
"enable the Python UDF function, please set enable_python_udf_support to "
"false");
}
if (!status.ok()) {
LOG(ERROR) << "Failed to initialize python version manager: " << status;
exit(1);
} else {
LOG(INFO) << doris::PythonVersionManager::instance().to_string();
}
}

// Doris own signal handler must be register after jvm is init.
// Or our own sig-handler for SIGINT & SIGTERM will not be chained ...
// https://www.oracle.com/java/technologies/javase/signals.html
Expand Down
8 changes: 7 additions & 1 deletion be/src/udf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/udf")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/udf")

set(UDF_SOURCES udf.cpp)

file(GLOB PYTHON_UDF_SOURCES "python/*.cpp")

list(APPEND UDF_SOURCES ${PYTHON_UDF_SOURCES})

# Build this library twice. Once to be linked into the main Doris. This version
# can have dependencies on our other libs. The second version is shipped as part
# of the UDF sdk, which can't use other libs.
add_library(Udf STATIC udf.cpp)
add_library(Udf STATIC ${UDF_SOURCES})
Loading