Skip to content

Commit

Permalink
feat: add write and query CLI sub-commands (#24671)
Browse files Browse the repository at this point in the history
* feat: add query and write cli for influxdb3

Adds two new sub-commands to the influxdb3 CLI:

- query: perform queries against the running server
- write: perform writes against the running server

Both share a common set of parameters for connecting to the database
which are managed in influxdb3/src/commands/common.rs.

Currently, query supports all underlying output formats, and can
write the output to a file on disk. It only supports SQL as the
query language, but will eventually also support InfluxQL.

Write supports line protocol for input and expects the source of
data to be from a file.
  • Loading branch information
hiltontj authored Feb 20, 2024
1 parent de102bc commit 6ce3165
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ influxdb3_server = { path = "../influxdb3_server" }
iox_time = { path = "../iox_time" }
iox_query = { path = "../iox_query" }
ioxd_common = { path = "../ioxd_common"}
influxdb3_client = { path = "../influxdb3_client" }
influxdb3_write = { path = "../influxdb3_write" }
metric = { path = "../metric" }
object_store = { workspace = true }
Expand All @@ -33,11 +34,13 @@ libc = { version = "0.2" }
num_cpus = "1.16.0"
once_cell = { version = "1.18", features = ["parking_lot"] }
parking_lot = "0.12.1"
secrecy = "0.8.0"
thiserror = "1.0.48"
tikv-jemalloc-ctl = { version = "0.5.4", optional = true }
tikv-jemalloc-sys = { version = "0.5.4", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] }
tokio = { version = "1.32", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time", "io-std"] }
tokio-util = { version = "0.7.9" }
url = "2.5.0"
uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
sha2 = "0.10.8"
Expand Down
23 changes: 23 additions & 0 deletions influxdb3/src/commands/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use clap::Parser;
use secrecy::Secret;
use url::Url;

#[derive(Debug, Parser)]
pub struct InfluxDb3Config {
/// The host URL of the running InfluxDB 3.0 server
#[clap(
short = 'h',
long = "host",
env = "INFLUXDB3_HOST_URL",
default_value = "http://127.0.0.1:8181"
)]
pub host_url: Url,

/// The database name to run the query against
#[clap(short = 'd', long = "dbname", env = "INFLUXDB3_DATABASE_NAME")]
pub database_name: String,

/// The token for authentication with the InfluxDB 3.0 server
#[clap(long = "token", env = "INFLUXDB3_AUTH_TOKEN")]
pub auth_token: Option<Secret<String>>,
}
161 changes: 161 additions & 0 deletions influxdb3/src/commands/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use std::str::Utf8Error;

use clap::{Parser, ValueEnum};
use secrecy::ExposeSecret;
use tokio::{
fs::OpenOptions,
io::{self, AsyncWriteExt},
};

use super::common::InfluxDb3Config;

#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error(transparent)]
Client(#[from] influxdb3_client::Error),

#[error(transparent)]
Query(#[from] QueryError),

#[error("invlid UTF8 received from server: {0}")]
Utf8(#[from] Utf8Error),

#[error("io error: {0}")]
Io(#[from] io::Error),

#[error(
"must specify an output file path with `--output` parameter when formatting\
the output as `parquet`"
)]
NoOutputFileForParquet,
}

pub type Result<T> = std::result::Result<T, Error>;

#[derive(Debug, Parser)]
#[clap(visible_alias = "q", trailing_var_arg = true)]
pub struct Config {
/// Common InfluxDB 3.0 config
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,

/// The query language used to format the provided query string
#[clap(
value_enum,
long = "lang",
short = 'l',
default_value_t = QueryLanguage::Sql,
)]
language: QueryLanguage,

/// The format in which to output the query
///
/// If `--fmt` is set to `parquet`, then you must also specify an output
/// file path with `--output`.
#[clap(value_enum, long = "fmt", default_value = "pretty")]
output_format: Format,

/// Put all query output into `output`
#[clap(short = 'o', long = "output")]
output_file_path: Option<String>,

/// The query string to execute
query: Vec<String>,
}

#[derive(Debug, ValueEnum, Clone)]
#[clap(rename_all = "snake_case")]
enum Format {
Pretty,
Json,
Csv,
Parquet,
}

impl Format {
fn is_parquet(&self) -> bool {
matches!(self, Self::Parquet)
}
}

impl From<Format> for influxdb3_client::Format {
fn from(this: Format) -> Self {
match this {
Format::Pretty => Self::Pretty,
Format::Json => Self::Json,
Format::Csv => Self::Csv,
Format::Parquet => Self::Parquet,
}
}
}

#[derive(Debug, ValueEnum, Clone)]
enum QueryLanguage {
Sql,
}

pub(crate) async fn command(config: Config) -> Result<()> {
let InfluxDb3Config {
host_url,
database_name,
auth_token,
} = config.influxdb3_config;
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}

let query = parse_query(config.query)?;

// make the query using the client
let mut resp_bytes = match config.language {
QueryLanguage::Sql => {
client
.api_v3_query_sql(database_name, query)
.format(config.output_format.clone().into())
.send()
.await?
}
};

// write to file if output path specified
if let Some(path) = &config.output_file_path {
let mut f = OpenOptions::new()
.write(true)
.create(true)
.open(path)
.await?;
f.write_all_buf(&mut resp_bytes).await?;
} else {
if config.output_format.is_parquet() {
Err(Error::NoOutputFileForParquet)?
}
println!("{}", std::str::from_utf8(&resp_bytes)?);
}

Ok(())
}

#[derive(Debug, thiserror::Error)]
pub(crate) enum QueryError {
#[error("no query provided")]
NoQuery,

#[error(
"ensure that a single query string is provided as the final \
argument, enclosed in quotes"
)]
MoreThanOne,
}

/// Parse the user-inputted query string
fn parse_query(mut input: Vec<String>) -> Result<String> {
if input.is_empty() {
Err(QueryError::NoQuery)?
}
if input.len() > 1 {
Err(QueryError::MoreThanOne)?
} else {
Ok(input.remove(0))
}
}
65 changes: 65 additions & 0 deletions influxdb3/src/commands/write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use clap::Parser;
use secrecy::ExposeSecret;
use tokio::{
fs::File,
io::{self, AsyncReadExt},
};

use super::common::InfluxDb3Config;

#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error(transparent)]
Client(#[from] influxdb3_client::Error),

#[error("error reading file: {0}")]
Io(#[from] io::Error),
}

pub(crate) type Result<T> = std::result::Result<T, Error>;

#[derive(Debug, Parser)]
#[clap(visible_alias = "w", trailing_var_arg = true)]
pub struct Config {
/// Common InfluxDB 3.0 config
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,

/// File path to load the write data from
///
/// Currently, only files containing line protocol are supported.
#[clap(short = 'f', long = "file")]
file_path: String,

/// Flag to request the server accept partial writes
///
/// Invalid lines in the input data will be ignored by the server.
#[clap(long = "accept-partial")]
accept_partial_writes: bool,
}

pub(crate) async fn command(config: Config) -> Result<()> {
let InfluxDb3Config {
host_url,
database_name,
auth_token,
} = config.influxdb3_config;
let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}

let mut f = File::open(config.file_path).await?;
let mut writes = Vec::new();
f.read_to_end(&mut writes).await?;

let mut req = client.api_v3_write_lp(database_name);
if config.accept_partial_writes {
req = req.accept_partial(true);
}
req.body(writes).send().await?;

println!("success");

Ok(())
}
25 changes: 25 additions & 0 deletions influxdb3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ use trogging::{
};

mod commands {
pub(crate) mod common;
pub mod create;
pub mod query;
pub mod serve;
pub mod write;
}

#[cfg(all(not(feature = "heappy"), feature = "jemalloc_replacing_malloc"))]
Expand Down Expand Up @@ -79,11 +82,21 @@ struct Config {
command: Option<Command>,
}

// Ignoring clippy here since this enum is just used for running
// the CLI command
#[allow(clippy::large_enum_variant)]
#[derive(Debug, clap::Parser)]
#[allow(clippy::large_enum_variant)]
enum Command {
/// Run the InfluxDB 3.0 server
Serve(commands::serve::Config),

/// Perform a query against a running InfluxDB 3.0 server
Query(commands::query::Config),

/// Perform a set of writes to a running InfluxDB 3.0 server
Write(commands::write::Config),

/// Create new resources
Create(commands::create::Config),
}
Expand Down Expand Up @@ -118,6 +131,18 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Query(config)) => {
if let Err(e) = commands::query::command(config).await {
eprintln!("Query command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Write(config)) => {
if let Err(e) = commands::write::command(config).await {
eprintln!("Write command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::Create(config)) => {
if let Err(e) = commands::create::command(config) {
eprintln!("Create command failed: {e}");
Expand Down

0 comments on commit 6ce3165

Please sign in to comment.