Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions bin/console
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ def cluster
end
end

def shutdown
return unless @cluster

@cluster.disconnect
@cluster = nil
end

at_exit { shutdown }

def default_collection(bucket_name = "default")
@collections ||= {}
@collections[bucket_name] ||= cluster.bucket(bucket_name).default_collection
Expand Down
2 changes: 1 addition & 1 deletion ext/couchbase
200 changes: 189 additions & 11 deletions ext/couchbase.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <asio.hpp>
#include <spdlog/cfg/env.h>
#include <spdlog/sinks/base_sink.h>
#include <spdlog/spdlog.h>

#include <snappy.h>
Expand All @@ -32,6 +33,7 @@

#include <core/cluster.hxx>
#include <core/design_document_namespace_fmt.hxx>
#include <core/logger/configuration.hxx>
#include <core/operations.hxx>
#include <core/operations/management/analytics.hxx>
#include <core/operations/management/bucket.hxx>
Expand Down Expand Up @@ -182,10 +184,11 @@ cb_binary_new(VALUE str)
return couchbase::core::utils::to_binary(static_cast<const char*>(RSTRING_PTR(str)), static_cast<std::size_t>(RSTRING_LEN(str)));
}

template<typename StringLike>
static inline VALUE
cb_str_new(const std::string& str)
cb_str_new(const StringLike str)
{
return rb_external_str_new(str.data(), static_cast<long>(str.size()));
return rb_external_str_new(std::data(str), static_cast<long>(std::size(str)));
}

static inline VALUE
Expand All @@ -200,6 +203,12 @@ cb_str_new(const std::byte* data, std::size_t size)
return rb_external_str_new(reinterpret_cast<const char*>(data), static_cast<long>(size));
}

static inline VALUE
cb_str_new(const char* data)
{
return rb_external_str_new_cstr(data);
}

static inline VALUE
cb_str_new(const std::optional<std::string>& str)
{
Expand Down Expand Up @@ -266,6 +275,132 @@ init_versions(VALUE mCouchbase)
std::string_view(RSTRING_PTR(build_info), static_cast<std::size_t>(RSTRING_LEN(build_info))));
}

template<typename Mutex>
class ruby_logger_sink : public spdlog::sinks::base_sink<Mutex>
{
public:
explicit ruby_logger_sink(VALUE ruby_logger)
: ruby_logger_{ ruby_logger }
{
}

void flush_deferred_messages()
{
std::lock_guard<Mutex> lock(spdlog::sinks::base_sink<Mutex>::mutex_);
auto messages_ = std::move(deferred_messages_);
while (!messages_.empty()) {
write_message(messages_.front());
messages_.pop();
}
}

static VALUE map_log_level(spdlog::level::level_enum level)
{
switch (level) {
case spdlog::level::trace:
return rb_id2sym(rb_intern("trace"));
case spdlog::level::debug:
return rb_id2sym(rb_intern("debug"));
case spdlog::level::info:
return rb_id2sym(rb_intern("info"));
case spdlog::level::warn:
return rb_id2sym(rb_intern("warn"));
case spdlog::level::err:
return rb_id2sym(rb_intern("error"));
case spdlog::level::critical:
return rb_id2sym(rb_intern("critical"));
case spdlog::level::off:
return rb_id2sym(rb_intern("off"));
default:
break;
}
return Qnil;
}

protected:
struct log_message_for_ruby {
spdlog::level::level_enum level;
spdlog::log_clock::time_point time;
size_t thread_id;
std::string payload;
const char* filename;
int line;
const char* funcname;
};

void sink_it_(const spdlog::details::log_msg& msg) override
{
deferred_messages_.emplace(log_message_for_ruby{
msg.level,
msg.time,
msg.thread_id,
{ msg.payload.begin(), msg.payload.end() },
msg.source.filename,
msg.source.line,
msg.source.funcname,
});
}

void flush_() override
{
/* do nothing here, the flush will be initiated by the SDK */
}

private:
struct argument_pack {
VALUE logger;
const log_message_for_ruby& msg;
};

static VALUE invoke_log(VALUE arg)
{
auto* args = reinterpret_cast<argument_pack*>(arg);
const auto& msg = args->msg;

VALUE filename = Qnil;
if (msg.filename != nullptr) {
filename = cb_str_new(msg.filename);
}
VALUE line = Qnil;
if (msg.line > 0) {
line = ULL2NUM(msg.line);
}
VALUE function_name = Qnil;
if (msg.funcname != nullptr) {
function_name = cb_str_new(msg.funcname);
}
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(msg.time.time_since_epoch());
auto nanoseconds = msg.time.time_since_epoch() - seconds;
return rb_funcall(args->logger,
rb_intern("log"),
8,
map_log_level(msg.level),
ULL2NUM(msg.thread_id),
ULL2NUM(seconds.count()),
ULL2NUM(nanoseconds.count()),
cb_str_new(msg.payload),
filename,
line,
function_name);
}

void write_message(const log_message_for_ruby& msg)
{
if (NIL_P(ruby_logger_)) {
return;
}
argument_pack args{ ruby_logger_, msg };
rb_rescue(invoke_log, reinterpret_cast<VALUE>(&args), nullptr, Qnil);
}

VALUE ruby_logger_{ Qnil };
std::queue<log_message_for_ruby> deferred_messages_{};
};

using ruby_logger_sink_ptr = std::shared_ptr<ruby_logger_sink<std::mutex>>;

static ruby_logger_sink_ptr cb_global_sink{ nullptr };

struct cb_backend_data {
std::unique_ptr<asio::io_context> ctx;
std::shared_ptr<couchbase::core::cluster> cluster;
Expand Down Expand Up @@ -1064,6 +1199,9 @@ cb_wait_for_future(Future&& f) -> decltype(f.get())
&arg,
nullptr,
nullptr);
if (cb_global_sink) {
cb_global_sink->flush_deferred_messages();
}
return std::move(arg.res);
}

Expand Down Expand Up @@ -1373,6 +1511,9 @@ cb_Backend_close(VALUE self)
cb_backend_data* backend = nullptr;
TypedData_Get_Struct(self, cb_backend_data, &cb_backend_type, backend);
cb_backend_close(backend);
if (cb_global_sink) {
cb_global_sink->flush_deferred_messages();
}
return Qnil;
}

Expand Down Expand Up @@ -7265,9 +7406,8 @@ cb_Backend_document_view(VALUE self, VALUE bucket_name, VALUE design_document_na
}

static VALUE
cb_Backend_set_log_level(VALUE self, VALUE log_level)
cb_Backend_set_log_level(VALUE /* self */, VALUE log_level)
{
(void)self;
Check_Type(log_level, T_SYMBOL);
if (ID type = rb_sym2id(log_level); type == rb_intern("trace")) {
spdlog::set_level(spdlog::level::trace);
Expand All @@ -7291,9 +7431,8 @@ cb_Backend_set_log_level(VALUE self, VALUE log_level)
}

static VALUE
cb_Backend_get_log_level(VALUE self)
cb_Backend_get_log_level(VALUE /* self */)
{
(void)self;
switch (spdlog::get_level()) {
case spdlog::level::trace:
return rb_id2sym(rb_intern("trace"));
Expand All @@ -7315,6 +7454,43 @@ cb_Backend_get_log_level(VALUE self)
return Qnil;
}

static VALUE
cb_Backend_install_logger_shim(VALUE self, VALUE logger, VALUE log_level)
{
couchbase::core::logger::reset();
rb_iv_set(self, "@__logger_shim", logger);
if (NIL_P(logger)) {
return Qnil;
}
Check_Type(log_level, T_SYMBOL);
couchbase::core::logger::level level{ couchbase::core::logger::level::off };
if (ID type = rb_sym2id(log_level); type == rb_intern("trace")) {
level = couchbase::core::logger::level::trace;
} else if (type == rb_intern("debug")) {
level = couchbase::core::logger::level::debug;
} else if (type == rb_intern("info")) {
level = couchbase::core::logger::level::info;
} else if (type == rb_intern("warn")) {
level = couchbase::core::logger::level::warn;
} else if (type == rb_intern("error")) {
level = couchbase::core::logger::level::err;
} else if (type == rb_intern("critical")) {
level = couchbase::core::logger::level::critical;
} else {
rb_iv_set(self, "__logger_shim", Qnil);
return Qnil;
}

auto sink = std::make_shared<ruby_logger_sink<std::mutex>>(logger);
couchbase::core::logger::configuration configuration;
configuration.console = false;
configuration.log_level = level;
configuration.sink = sink;
couchbase::core::logger::create_file_logger(configuration);
cb_global_sink = sink;
return Qnil;
}

static VALUE
cb_Backend_snappy_compress(VALUE self, VALUE data)
{
Expand Down Expand Up @@ -7524,6 +7700,7 @@ init_backend(VALUE mCouchbase)
rb_define_singleton_method(cBackend, "parse_connection_string", VALUE_FUNC(cb_Backend_parse_connection_string), 1);
rb_define_singleton_method(cBackend, "set_log_level", VALUE_FUNC(cb_Backend_set_log_level), 1);
rb_define_singleton_method(cBackend, "get_log_level", VALUE_FUNC(cb_Backend_get_log_level), 0);
rb_define_singleton_method(cBackend, "install_logger_shim", VALUE_FUNC(cb_Backend_install_logger_shim), 2);
rb_define_singleton_method(cBackend, "snappy_compress", VALUE_FUNC(cb_Backend_snappy_compress), 1);
rb_define_singleton_method(cBackend, "snappy_uncompress", VALUE_FUNC(cb_Backend_snappy_uncompress), 1);
rb_define_singleton_method(cBackend, "leb128_encode", VALUE_FUNC(cb_Backend_leb128_encode), 1);
Expand All @@ -7536,14 +7713,15 @@ init_backend(VALUE mCouchbase)
void
init_logger()
{
couchbase::core::logger::create_console_logger();
if (auto env_val = spdlog::details::os::getenv("COUCHBASE_BACKEND_LOG_LEVEL"); !env_val.empty()) {
couchbase::core::logger::set_log_levels(couchbase::core::logger::level_from_str(env_val));
}

if (auto env_val = spdlog::details::os::getenv("COUCHBASE_BACKEND_DONT_INSTALL_TERMINATE_HANDLER"); env_val.empty()) {
couchbase::core::platform::install_backtrace_terminate_handler();
}
if (auto env_val = spdlog::details::os::getenv("COUCHBASE_BACKEND_DONT_USE_BUILTIN_LOGGER"); env_val.empty()) {
couchbase::core::logger::create_console_logger();
if (env_val = spdlog::details::os::getenv("COUCHBASE_BACKEND_LOG_LEVEL"); !env_val.empty()) {
couchbase::core::logger::set_log_levels(couchbase::core::logger::level_from_str(env_val));
}
}
}

extern "C" {
Expand Down
43 changes: 43 additions & 0 deletions lib/couchbase/logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require "logger"

module Couchbase
# Set log level
#
Expand Down Expand Up @@ -39,4 +41,45 @@ def self.log_level=(level)
def self.log_level
Backend.get_log_level
end

# Return logger associated with the library
def self.logger
@logger # rubocop:disable ThreadSafety/InstanceVariableInClassMethod
end

# Associate logger with the library
#
# The log messages, that are generated by extension might come with out of order timestamps, in order to reduce number
# of switches between Ruby and Native code.
#
# @param [Logger] logger an object implementing logging interface, e.g. stdlib Logger, or something that responds to
# "level"-methods like +#debug+, +#error+, etc.
# @param [Class] adapter_class custom implementation of the logger adapter interface between extension and ruby code.
# See {Utils::StdlibLoggerAdapter} and {Utils::GenericLoggerAdapter}
# @param [Boolean] verbose if true, the message will also include source code location, where the message was
# generated (if available)
# @param [Symbol] log level, see {#log_level=} for allowed values
#
# @example Specify custom logger and limit core messages to debug level
# Couchbase.set_logger(Logger.new(STDERR), level: :debug)
#
# @since 3.3.1
def self.set_logger(logger, adapter_class: nil, verbose: false, level: :info)
@logger = logger # rubocop:disable ThreadSafety/InstanceVariableInClassMethod
if @logger.nil? # rubocop:disable ThreadSafety/InstanceVariableInClassMethod
Backend.install_logger_shim(nil)
return
end
shim =
if adapter_class
adapter_class
elsif logger.is_a?(::Logger)
require "couchbase/utils/stdlib_logger_adapter"
Utils::StdlibLoggerAdapter
else
require "couchbase/utils/generic_logger_adapter"
Utils::GenericLoggerAdapter
end
Backend.install_logger_shim(shim.new(logger, verbose: verbose), level)
end
end
Loading