Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion datafusion-cli/examples/cli-session-context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use datafusion::{
prelude::SessionContext,
};
use datafusion_cli::{
cli_context::CliSessionContext, exec::exec_from_repl, print_options::PrintOptions,
cli_context::CliSessionContext, exec::exec_from_repl,
object_storage::instrumented::InstrumentedObjectStoreRegistry,
print_options::PrintOptions,
};
use object_store::ObjectStore;

Expand Down Expand Up @@ -89,6 +91,7 @@ pub async fn main() {
quiet: false,
maxrows: datafusion_cli::print_options::MaxRows::Unlimited,
color: true,
instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

};

exec_from_repl(&my_ctx, &mut print_options).await.unwrap();
Expand Down
85 changes: 84 additions & 1 deletion datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum Command {
SearchFunctions(String),
QuietMode(Option<bool>),
OutputFormat(Option<String>),
ObjectStoreProfileMode(Option<String>),
}

pub enum OutputFormat {
Expand Down Expand Up @@ -122,6 +123,29 @@ impl Command {
Self::OutputFormat(_) => exec_err!(
"Unexpected change output format, this should be handled outside"
),
Self::ObjectStoreProfileMode(mode) => {
if let Some(mode) = mode {
let profile_mode = mode
.parse()
.map_err(|_|
exec_datafusion_err!("Failed to parse input: {mode}. Valid options are disabled, enabled")
)?;
print_options
.instrumented_registry
.set_instrument_mode(profile_mode);
println!(
"ObjectStore Profile mode set to {}",
print_options.instrumented_registry.instrument_mode()
);
} else {
println!(
"ObjectStore Profile mode is {}",
print_options.instrumented_registry.instrument_mode()
);
}

Ok(())
}
}
}

Expand All @@ -140,11 +164,15 @@ impl Command {
Self::OutputFormat(_) => {
("\\pset [NAME [VALUE]]", "set table output option\n(format)")
}
Self::ObjectStoreProfileMode(_) => (
"\\object_store_profiling (disabled|enabled)",
"print or set object store profile mode",
),
}
}
}

const ALL_COMMANDS: [Command; 9] = [
const ALL_COMMANDS: [Command; 10] = [
Command::ListTables,
Command::DescribeTableStmt(String::new()),
Command::Quit,
Expand All @@ -154,6 +182,7 @@ const ALL_COMMANDS: [Command; 9] = [
Command::SearchFunctions(String::new()),
Command::QuietMode(None),
Command::OutputFormat(None),
Command::ObjectStoreProfileMode(None),
];

fn all_commands_info() -> RecordBatch {
Expand Down Expand Up @@ -204,6 +233,10 @@ impl FromStr for Command {
Self::OutputFormat(Some(subcommand.to_string()))
}
("pset", None) => Self::OutputFormat(None),
("object_store_profiling", Some(mode)) => {
Self::ObjectStoreProfileMode(Some(mode.to_string()))
}
("object_store_profiling", None) => Self::ObjectStoreProfileMode(None),
_ => return Err(()),
})
}
Expand Down Expand Up @@ -244,3 +277,53 @@ impl OutputFormat {
}
}
}

#[cfg(test)]
mod tests {
use datafusion::prelude::SessionContext;

use crate::{
object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
},
print_options::MaxRows,
};

use super::*;

#[tokio::test]
async fn command_execute_profile_mode() {
let ctx = SessionContext::new();

let mut print_options = PrintOptions {
format: PrintFormat::Automatic,
quiet: false,
maxrows: MaxRows::Unlimited,
color: true,
instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()),
};

let mut cmd: Command = "object_store_profiling"
.parse()
.expect("expected parse to succeed");
assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
assert_eq!(
print_options.instrumented_registry.instrument_mode(),
InstrumentedObjectStoreMode::default()
);

cmd = "object_store_profiling enabled"
.parse()
.expect("expected parse to succeed");
assert!(cmd.execute(&ctx, &mut print_options).await.is_ok());
assert_eq!(
print_options.instrumented_registry.instrument_mode(),
InstrumentedObjectStoreMode::Enabled
);

cmd = "object_store_profiling does_not_exist"
.parse()
.expect("expected parse to succeed");
assert!(cmd.execute(&ctx, &mut print_options).await.is_err());
}
}
17 changes: 12 additions & 5 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{
FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion::execution::object_store::DefaultObjectStoreRegistry;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::logical_expr::ExplainFormat;
use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -149,6 +148,13 @@ struct Args {
value_parser(extract_disk_limit)
)]
disk_limit: Option<usize>,

#[clap(
long,
help = "Specify the default object_store_profiling mode, defaults to 'disabled'.\n[possible values: disabled, enabled]",
default_value_t = InstrumentedObjectStoreMode::Disabled
)]
object_store_profiling: InstrumentedObjectStoreMode,
}

#[tokio::main]
Expand Down Expand Up @@ -210,10 +216,10 @@ async fn main_inner() -> Result<()> {
rt_builder = rt_builder.with_disk_manager_builder(builder);
}

let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new(
Arc::new(DefaultObjectStoreRegistry::new()),
InstrumentedObjectStoreMode::default(),
));
let instrumented_registry = Arc::new(
InstrumentedObjectStoreRegistry::new()
.with_profile_mode(args.object_store_profiling),
);
rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone());

let runtime_env = rt_builder.build_arc()?;
Expand Down Expand Up @@ -243,6 +249,7 @@ async fn main_inner() -> Result<()> {
quiet: args.quiet,
maxrows: args.maxrows,
color: args.color,
instrumented_registry: Arc::clone(&instrumented_registry),
};

let commands = args.command;
Expand Down
89 changes: 66 additions & 23 deletions datafusion-cli/src/object_storage/instrumented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@ use std::{
};

use async_trait::async_trait;
use datafusion::{error::DataFusionError, execution::object_store::ObjectStoreRegistry};
use datafusion::{
error::DataFusionError,
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};
use futures::stream::BoxStream;
use object_store::{
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
};
use parking_lot::RwLock;
use url::Url;

/// The profiling mode to use for an [`ObjectStore`] instance that has been instrumented to collect
/// profiling data. Collecting profiling data will have a small negative impact on both CPU and
/// memory usage. Default is `Disabled`
/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling
/// data will have a small negative impact on both CPU and memory usage. Default is `Disabled`
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub enum InstrumentedObjectStoreMode {
/// Disable collection of profiling data
Expand Down Expand Up @@ -75,7 +78,7 @@ impl From<u8> for InstrumentedObjectStoreMode {
/// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the
/// inner [`ObjectStore`]
#[derive(Debug)]
struct InstrumentedObjectStore {
pub struct InstrumentedObjectStore {
inner: Arc<dyn ObjectStore>,
instrument_mode: AtomicU8,
}
Expand All @@ -88,6 +91,10 @@ impl InstrumentedObjectStore {
instrument_mode,
}
}

fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
self.instrument_mode.store(mode as u8, Ordering::Relaxed)
}
}

impl fmt::Display for InstrumentedObjectStore {
Expand Down Expand Up @@ -150,23 +157,53 @@ impl ObjectStore for InstrumentedObjectStore {
}
}

/// Provides access to [`ObjectStore`] instances that record requests for reporting
/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting
#[derive(Debug)]
pub struct InstrumentedObjectStoreRegistry {
inner: Arc<dyn ObjectStoreRegistry>,
instrument_mode: InstrumentedObjectStoreMode,
instrument_mode: AtomicU8,
stores: RwLock<Vec<Arc<InstrumentedObjectStore>>>,
}

impl Default for InstrumentedObjectStoreRegistry {
fn default() -> Self {
Self::new()
}
}

impl InstrumentedObjectStoreRegistry {
/// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided
/// [`ObjectStoreRegistry`]
pub fn new(
registry: Arc<dyn ObjectStoreRegistry>,
default_mode: InstrumentedObjectStoreMode,
) -> Self {
pub fn new() -> Self {
Self {
inner: registry,
instrument_mode: default_mode,
inner: Arc::new(DefaultObjectStoreRegistry::new()),
instrument_mode: AtomicU8::new(InstrumentedObjectStoreMode::default() as u8),
stores: RwLock::new(Vec::new()),
}
}

pub fn with_profile_mode(self, mode: InstrumentedObjectStoreMode) -> Self {
self.instrument_mode.store(mode as u8, Ordering::Relaxed);
self
}

/// Provides access to all of the [`InstrumentedObjectStore`]s managed by this
/// [`InstrumentedObjectStoreRegistry`]
pub fn stores(&self) -> Vec<Arc<InstrumentedObjectStore>> {
self.stores.read().clone()
}

/// Returns the current [`InstrumentedObjectStoreMode`] for this
/// [`InstrumentedObjectStoreRegistry`]
pub fn instrument_mode(&self) -> InstrumentedObjectStoreMode {
self.instrument_mode.load(Ordering::Relaxed).into()
}

/// Sets the [`InstrumentedObjectStoreMode`] for this [`InstrumentedObjectStoreRegistry`]
pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
self.instrument_mode.store(mode as u8, Ordering::Relaxed);
for s in self.stores.read().iter() {
s.set_instrument_mode(mode)
}
}
}
Expand All @@ -177,8 +214,10 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
url: &Url,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let mode = AtomicU8::new(self.instrument_mode as u8);
let instrumented = Arc::new(InstrumentedObjectStore::new(store, mode));
let mode = self.instrument_mode.load(Ordering::Relaxed);
let instrumented =
Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode)));
self.stores.write().push(Arc::clone(&instrumented));
self.inner.register_store(url, instrumented)
}

Expand All @@ -189,8 +228,6 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {

#[cfg(test)]
mod tests {
use datafusion::execution::object_store::DefaultObjectStoreRegistry;

use super::*;

#[test]
Expand Down Expand Up @@ -219,17 +256,23 @@ mod tests {

#[test]
fn instrumented_registry() {
let reg = Arc::new(InstrumentedObjectStoreRegistry::new(
Arc::new(DefaultObjectStoreRegistry::new()),
InstrumentedObjectStoreMode::default(),
));
let store = object_store::memory::InMemory::new();
let mut reg = InstrumentedObjectStoreRegistry::new();
assert!(reg.stores().is_empty());
assert_eq!(
reg.instrument_mode(),
InstrumentedObjectStoreMode::default()
);

reg = reg.with_profile_mode(InstrumentedObjectStoreMode::Enabled);
assert_eq!(reg.instrument_mode(), InstrumentedObjectStoreMode::Enabled);

let store = object_store::memory::InMemory::new();
let url = "mem://test".parse().unwrap();
let registered = reg.register_store(&url, Arc::new(store));
assert!(registered.is_none());

let fetched = reg.get_store(&url);
assert!(fetched.is_ok())
assert!(fetched.is_ok());
assert_eq!(reg.stores().len(), 1);
}
}
Loading