From 6ce3165aacafa8232e3ebfb4927beee921d0541b Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Tue, 20 Feb 2024 16:14:19 -0500 Subject: [PATCH] feat: add write and query CLI sub-commands (#24671) * feat: add query and write cli for influxdb3 Adds two new sub-commands to the influxdb3 CLI: - query: perform queries against the running server - write: perform writes against the running server Both share a common set of parameters for connecting to the database which are managed in influxdb3/src/commands/common.rs. Currently, query supports all underlying output formats, and can write the output to a file on disk. It only supports SQL as the query language, but will eventually also support InfluxQL. Write supports line protocol for input and expects the source of data to be from a file. --- Cargo.lock | 3 + influxdb3/Cargo.toml | 3 + influxdb3/src/commands/common.rs | 23 +++++ influxdb3/src/commands/query.rs | 161 +++++++++++++++++++++++++++++++ influxdb3/src/commands/write.rs | 65 +++++++++++++ influxdb3/src/main.rs | 25 +++++ 6 files changed, 280 insertions(+) create mode 100644 influxdb3/src/commands/common.rs create mode 100644 influxdb3/src/commands/query.rs create mode 100644 influxdb3/src/commands/write.rs diff --git a/Cargo.lock b/Cargo.lock index 0bb208a651e..73ad910c7ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2600,6 +2600,7 @@ dependencies = [ "console-subscriber", "dotenvy", "hex", + "influxdb3_client", "influxdb3_server", "influxdb3_write", "iox_query", @@ -2615,6 +2616,7 @@ dependencies = [ "parking_lot 0.12.1", "parquet_file", "reqwest", + "secrecy", "sha2", "thiserror", "tikv-jemalloc-ctl", @@ -2625,6 +2627,7 @@ dependencies = [ "trace", "trace_exporters", "trogging", + "url", "uuid", "workspace-hack", ] diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index 704e0efce2a..ff952ab4b26 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -13,6 +13,7 @@ influxdb3_server = { path = "../influxdb3_server" } iox_time = { path = "../iox_time" } iox_query = { path = "../iox_query" } ioxd_common = { path = "../ioxd_common"} +influxdb3_client = { path = "../influxdb3_client" } influxdb3_write = { path = "../influxdb3_write" } metric = { path = "../metric" } object_store = { workspace = true } @@ -33,11 +34,13 @@ libc = { version = "0.2" } num_cpus = "1.16.0" once_cell = { version = "1.18", features = ["parking_lot"] } parking_lot = "0.12.1" +secrecy = "0.8.0" thiserror = "1.0.48" tikv-jemalloc-ctl = { version = "0.5.4", optional = true } tikv-jemalloc-sys = { version = "0.5.4", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] } tokio = { version = "1.32", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time", "io-std"] } tokio-util = { version = "0.7.9" } +url = "2.5.0" uuid = { version = "1", features = ["v4"] } workspace-hack = { version = "0.1", path = "../workspace-hack" } sha2 = "0.10.8" diff --git a/influxdb3/src/commands/common.rs b/influxdb3/src/commands/common.rs new file mode 100644 index 00000000000..7a443fd9e25 --- /dev/null +++ b/influxdb3/src/commands/common.rs @@ -0,0 +1,23 @@ +use clap::Parser; +use secrecy::Secret; +use url::Url; + +#[derive(Debug, Parser)] +pub struct InfluxDb3Config { + /// The host URL of the running InfluxDB 3.0 server + #[clap( + short = 'h', + long = "host", + env = "INFLUXDB3_HOST_URL", + default_value = "http://127.0.0.1:8181" + )] + pub host_url: Url, + + /// The database name to run the query against + #[clap(short = 'd', long = "dbname", env = "INFLUXDB3_DATABASE_NAME")] + pub database_name: String, + + /// The token for authentication with the InfluxDB 3.0 server + #[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")] + pub auth_token: Option>, +} diff --git a/influxdb3/src/commands/query.rs b/influxdb3/src/commands/query.rs new file mode 100644 index 00000000000..cfdb8c74957 --- /dev/null +++ b/influxdb3/src/commands/query.rs @@ -0,0 +1,161 @@ +use std::str::Utf8Error; + +use clap::{Parser, ValueEnum}; +use secrecy::ExposeSecret; +use tokio::{ + fs::OpenOptions, + io::{self, AsyncWriteExt}, +}; + +use super::common::InfluxDb3Config; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + #[error(transparent)] + Client(#[from] influxdb3_client::Error), + + #[error(transparent)] + Query(#[from] QueryError), + + #[error("invlid UTF8 received from server: {0}")] + Utf8(#[from] Utf8Error), + + #[error("io error: {0}")] + Io(#[from] io::Error), + + #[error( + "must specify an output file path with `--output` parameter when formatting\ + the output as `parquet`" + )] + NoOutputFileForParquet, +} + +pub type Result = std::result::Result; + +#[derive(Debug, Parser)] +#[clap(visible_alias = "q", trailing_var_arg = true)] +pub struct Config { + /// Common InfluxDB 3.0 config + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// The query language used to format the provided query string + #[clap( + value_enum, + long = "lang", + short = 'l', + default_value_t = QueryLanguage::Sql, + )] + language: QueryLanguage, + + /// The format in which to output the query + /// + /// If `--fmt` is set to `parquet`, then you must also specify an output + /// file path with `--output`. + #[clap(value_enum, long = "fmt", default_value = "pretty")] + output_format: Format, + + /// Put all query output into `output` + #[clap(short = 'o', long = "output")] + output_file_path: Option, + + /// The query string to execute + query: Vec, +} + +#[derive(Debug, ValueEnum, Clone)] +#[clap(rename_all = "snake_case")] +enum Format { + Pretty, + Json, + Csv, + Parquet, +} + +impl Format { + fn is_parquet(&self) -> bool { + matches!(self, Self::Parquet) + } +} + +impl From for influxdb3_client::Format { + fn from(this: Format) -> Self { + match this { + Format::Pretty => Self::Pretty, + Format::Json => Self::Json, + Format::Csv => Self::Csv, + Format::Parquet => Self::Parquet, + } + } +} + +#[derive(Debug, ValueEnum, Clone)] +enum QueryLanguage { + Sql, +} + +pub(crate) async fn command(config: Config) -> Result<()> { + let InfluxDb3Config { + host_url, + database_name, + auth_token, + } = config.influxdb3_config; + let mut client = influxdb3_client::Client::new(host_url)?; + if let Some(t) = auth_token { + client = client.with_auth_token(t.expose_secret()); + } + + let query = parse_query(config.query)?; + + // make the query using the client + let mut resp_bytes = match config.language { + QueryLanguage::Sql => { + client + .api_v3_query_sql(database_name, query) + .format(config.output_format.clone().into()) + .send() + .await? + } + }; + + // write to file if output path specified + if let Some(path) = &config.output_file_path { + let mut f = OpenOptions::new() + .write(true) + .create(true) + .open(path) + .await?; + f.write_all_buf(&mut resp_bytes).await?; + } else { + if config.output_format.is_parquet() { + Err(Error::NoOutputFileForParquet)? + } + println!("{}", std::str::from_utf8(&resp_bytes)?); + } + + Ok(()) +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum QueryError { + #[error("no query provided")] + NoQuery, + + #[error( + "ensure that a single query string is provided as the final \ + argument, enclosed in quotes" + )] + MoreThanOne, +} + +/// Parse the user-inputted query string +fn parse_query(mut input: Vec) -> Result { + if input.is_empty() { + Err(QueryError::NoQuery)? + } + if input.len() > 1 { + Err(QueryError::MoreThanOne)? + } else { + Ok(input.remove(0)) + } +} diff --git a/influxdb3/src/commands/write.rs b/influxdb3/src/commands/write.rs new file mode 100644 index 00000000000..ad7556e98f6 --- /dev/null +++ b/influxdb3/src/commands/write.rs @@ -0,0 +1,65 @@ +use clap::Parser; +use secrecy::ExposeSecret; +use tokio::{ + fs::File, + io::{self, AsyncReadExt}, +}; + +use super::common::InfluxDb3Config; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + #[error(transparent)] + Client(#[from] influxdb3_client::Error), + + #[error("error reading file: {0}")] + Io(#[from] io::Error), +} + +pub(crate) type Result = std::result::Result; + +#[derive(Debug, Parser)] +#[clap(visible_alias = "w", trailing_var_arg = true)] +pub struct Config { + /// Common InfluxDB 3.0 config + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, + + /// File path to load the write data from + /// + /// Currently, only files containing line protocol are supported. + #[clap(short = 'f', long = "file")] + file_path: String, + + /// Flag to request the server accept partial writes + /// + /// Invalid lines in the input data will be ignored by the server. + #[clap(long = "accept-partial")] + accept_partial_writes: bool, +} + +pub(crate) async fn command(config: Config) -> Result<()> { + let InfluxDb3Config { + host_url, + database_name, + auth_token, + } = config.influxdb3_config; + let mut client = influxdb3_client::Client::new(host_url)?; + if let Some(t) = auth_token { + client = client.with_auth_token(t.expose_secret()); + } + + let mut f = File::open(config.file_path).await?; + let mut writes = Vec::new(); + f.read_to_end(&mut writes).await?; + + let mut req = client.api_v3_write_lp(database_name); + if config.accept_partial_writes { + req = req.accept_partial(true); + } + req.body(writes).send().await?; + + println!("success"); + + Ok(()) +} diff --git a/influxdb3/src/main.rs b/influxdb3/src/main.rs index 683fc45c424..6b9058b0a58 100644 --- a/influxdb3/src/main.rs +++ b/influxdb3/src/main.rs @@ -25,8 +25,11 @@ use trogging::{ }; mod commands { + pub(crate) mod common; pub mod create; + pub mod query; pub mod serve; + pub mod write; } #[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))] @@ -79,11 +82,21 @@ struct Config { command: Option, } +// Ignoring clippy here since this enum is just used for running +// the CLI command +#[allow(clippy::large_enum_variant)] #[derive(Debug, clap::Parser)] #[allow(clippy::large_enum_variant)] enum Command { /// Run the InfluxDB 3.0 server Serve(commands::serve::Config), + + /// Perform a query against a running InfluxDB 3.0 server + Query(commands::query::Config), + + /// Perform a set of writes to a running InfluxDB 3.0 server + Write(commands::write::Config), + /// Create new resources Create(commands::create::Config), } @@ -118,6 +131,18 @@ fn main() -> Result<(), std::io::Error> { std::process::exit(ReturnCode::Failure as _) } } + Some(Command::Query(config)) => { + if let Err(e) = commands::query::command(config).await { + eprintln!("Query command failed: {e}"); + std::process::exit(ReturnCode::Failure as _) + } + } + Some(Command::Write(config)) => { + if let Err(e) = commands::write::command(config).await { + eprintln!("Write command failed: {e}"); + std::process::exit(ReturnCode::Failure as _) + } + } Some(Command::Create(config)) => { if let Err(e) = commands::create::command(config) { eprintln!("Create command failed: {e}");