Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add system.parquet_files table #25225

Merged
merged 3 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ sha2 = "0.10.8"
snap = "1.0.0"
sqlparser = "0.48.0"
sysinfo = "0.30.8"
test-log = { version = "0.2.16", features = ["trace"] }
thiserror = "1.0"
tokio = { version = "1.35", features = ["full"] }
tokio-util = "0.7.9"
Expand Down
25 changes: 13 additions & 12 deletions influxdb3/tests/server/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,19 @@ async fn flight() -> Result<(), influxdb3_client::Error> {

assert_batches_sorted_eq!(
[
"+--------------+--------------------+-------------+------------+",
"| catalog_name | db_schema_name | table_name | table_type |",
"+--------------+--------------------+-------------+------------+",
"| public | information_schema | columns | VIEW |",
"| public | information_schema | df_settings | VIEW |",
"| public | information_schema | schemata | VIEW |",
"| public | information_schema | tables | VIEW |",
"| public | information_schema | views | VIEW |",
"| public | iox | cpu | BASE TABLE |",
"| public | system | last_caches | BASE TABLE |",
"| public | system | queries | BASE TABLE |",
"+--------------+--------------------+-------------+------------+",
"+--------------+--------------------+---------------+------------+",
"| catalog_name | db_schema_name | table_name | table_type |",
"+--------------+--------------------+---------------+------------+",
"| public | information_schema | columns | VIEW |",
"| public | information_schema | df_settings | VIEW |",
"| public | information_schema | schemata | VIEW |",
"| public | information_schema | tables | VIEW |",
"| public | information_schema | views | VIEW |",
"| public | iox | cpu | BASE TABLE |",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to this PR, but we should update these tables to be influxdb3 tables. I logged #25227 to track.

"| public | system | last_caches | BASE TABLE |",
"| public | system | parquet_files | BASE TABLE |",
"| public | system | queries | BASE TABLE |",
"+--------------+--------------------+---------------+------------+",
],
&batches
);
Expand Down
1 change: 1 addition & 0 deletions influxdb3_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,6 @@ test_helpers.workspace = true
# crates.io crates
http.workspace = true
hyper.workspace = true
test-log.workspace = true
urlencoding.workspace = true
pretty_assertions.workspace = true
200 changes: 195 additions & 5 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! module for query executor
use crate::system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA};
use crate::system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA_NAME};
use crate::{QueryExecutor, QueryKind};
use arrow::array::{ArrayRef, Int64Builder, StringBuilder, StructArray};
use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -345,10 +345,10 @@ impl<B: WriteBuffer> Database<B> {
query_log: Arc<QueryLog>,
) -> Self {
let system_schema_provider = Arc::new(SystemSchemaProvider::new(
db_schema.name.to_string(),
write_buffer.catalog(),
Arc::clone(&db_schema.name),
Arc::clone(&query_log),
write_buffer.last_cache_provider(),
write_buffer.queryable_buffer(),
));
Self {
db_schema,
Expand Down Expand Up @@ -462,14 +462,14 @@ impl<B: WriteBuffer> CatalogProvider for Database<B> {

fn schema_names(&self) -> Vec<String> {
debug!("Database as CatalogProvider::schema_names");
vec![DEFAULT_SCHEMA.to_string(), SYSTEM_SCHEMA.to_string()]
vec![DEFAULT_SCHEMA.to_string(), SYSTEM_SCHEMA_NAME.to_string()]
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
debug!(schema_name = %name, "Database as CatalogProvider::schema");
match name {
DEFAULT_SCHEMA => Some(Arc::new(Self::from_namespace(self))),
SYSTEM_SCHEMA => Some(Arc::clone(&self.system_schema_provider) as _),
SYSTEM_SCHEMA_NAME => Some(Arc::clone(&self.system_schema_provider) as _),
_ => None,
}
}
Expand Down Expand Up @@ -574,3 +574,193 @@ impl<B: WriteBuffer> TableProvider for QueryTable<B> {
provider.scan(ctx, projection, &filters, limit).await
}
}
#[cfg(test)]
mod tests {
use std::{num::NonZeroUsize, sync::Arc, time::Duration};

use arrow::array::RecordBatch;
use data_types::NamespaceName;
use datafusion::{assert_batches_sorted_eq, error::DataFusionError};
use futures::TryStreamExt;
use influxdb3_wal::WalConfig;
use influxdb3_write::{
persister::PersisterImpl, write_buffer::WriteBufferImpl, Bufferer, Level0Duration,
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::{MockProvider, Time};
use metric::Registry;
use object_store::{local::LocalFileSystem, ObjectStore};
use parquet_file::storage::{ParquetStorage, StorageId};

use crate::{
query_executor::QueryExecutorImpl, system_tables::table_name_predicate_error, QueryExecutor,
};

fn make_exec(object_store: Arc<dyn ObjectStore>) -> Arc<Executor> {
let metrics = Arc::new(metric::Registry::default());

let parquet_store = ParquetStorage::new(
Arc::clone(&object_store),
StorageId::from("test_exec_storage"),
);
Arc::new(Executor::new_with_config_and_executor(
ExecutorConfig {
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
// Default to 1gb
mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb)
},
DedicatedExecutor::new_testing(),
))
}

type TestWriteBuffer = WriteBufferImpl<MockProvider>;

async fn setup() -> (
Arc<TestWriteBuffer>,
QueryExecutorImpl<TestWriteBuffer>,
Arc<MockProvider>,
) {
// Set up QueryExecutor
let object_store =
Arc::new(LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap());
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store) as _));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let executor = make_exec(object_store);
let level_0_duration = Level0Duration::new_5m();
let write_buffer = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&time_provider),
level_0_duration,
Arc::clone(&executor),
WalConfig {
level_0_duration: Duration::from_millis(100),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm about to change this in a follow on PR to use the Level0Duration type, which is only valid for 1m, 5m, or 10m durations. Can you update this test to use one of those?

Copy link
Contributor Author

@hiltontj hiltontj Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is addressed in ff6e9a4 by using a 60 second duration. So switching to a Level0Duration::1m should be a straightforward change.

max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
)
.await
.unwrap(),
);
let metrics = Arc::new(Registry::new());
let df_config = Arc::new(Default::default());
let query_executor = QueryExecutorImpl::new(
write_buffer.catalog(),
Arc::clone(&write_buffer),
executor,
metrics,
df_config,
10,
10,
);

(write_buffer, query_executor, time_provider)
}

#[test_log::test(tokio::test)]
async fn system_parquet_files_success() {
let (write_buffer, query_executor, time_provider) = setup().await;
// Perform some writes to multiple tables
let db_name = "test_db";
// perform writes over time to generate WAL files and some snapshots
// the time provider is bumped to trick the system into persisting files:
for i in 0..10 {
let time = i * 10;
let _ = write_buffer
.write_lp(
NamespaceName::new(db_name).unwrap(),
"\
cpu,host=a,region=us-east usage=250\n\
mem,host=a,region=us-east usage=150000\n\
",
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
)
.await
.unwrap();

time_provider.set(Time::from_timestamp(time + 1, 0).unwrap());
}

// bump time again and sleep briefly to ensure time to persist things
time_provider.set(Time::from_timestamp(20, 0).unwrap());
tokio::time::sleep(Duration::from_millis(500)).await;

struct TestCase<'a> {
query: &'a str,
expected: &'a [&'a str],
}

let test_cases = [
TestCase {
query: "SELECT * FROM system.parquet_files WHERE table_name = 'cpu'",
expected: &[
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+",
"| table_name | path | size_bytes | row_count | min_time | max_time |",
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+",
"| cpu | dbs/test_db/cpu/1970-01-01/00-00/0000000003.parquet | 2142 | 2 | 0 | 10 |",
"| cpu | dbs/test_db/cpu/1970-01-01/00-00/0000000006.parquet | 2147 | 3 | 20 | 40 |",
"| cpu | dbs/test_db/cpu/1970-01-01/00-00/0000000009.parquet | 2147 | 3 | 50 | 70 |",
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+",
],
},
TestCase {
query: "SELECT * FROM system.parquet_files WHERE table_name = 'mem'",
expected: &[
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+",
"| table_name | path | size_bytes | row_count | min_time | max_time |",
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+",
"| mem | dbs/test_db/mem/1970-01-01/00-00/0000000003.parquet | 2142 | 2 | 0 | 10 |",
"| mem | dbs/test_db/mem/1970-01-01/00-00/0000000006.parquet | 2147 | 3 | 20 | 40 |",
"| mem | dbs/test_db/mem/1970-01-01/00-00/0000000009.parquet | 2147 | 3 | 50 | 70 |",
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+",
],
},
];

for t in test_cases {
let batch_stream = query_executor
.query(db_name, t.query, None, crate::QueryKind::Sql, None, None)
.await
.unwrap();
let batches: Vec<RecordBatch> = batch_stream.try_collect().await.unwrap();
assert_batches_sorted_eq!(t.expected, &batches);
}
}

#[tokio::test]
async fn system_parquet_files_predicate_error() {
let (write_buffer, query_executor, time_provider) = setup().await;
// make some writes, so that we have a database that we can query against:
let db_name = "test_db";
let _ = write_buffer
.write_lp(
NamespaceName::new(db_name).unwrap(),
"cpu,host=a,region=us-east usage=0.1 1",
Time::from_timestamp_nanos(0),
false,
influxdb3_write::Precision::Nanosecond,
)
.await
.unwrap();

// Bump time to trick the persister into persisting to parquet:
time_provider.set(Time::from_timestamp(60 * 10, 0).unwrap());

// query without the `WHERE table_name =` clause to trigger the error:
let query = "SELECT * FROM system.parquet_files";
let stream = query_executor
.query(db_name, query, None, crate::QueryKind::Sql, None, None)
.await
.unwrap();
let error: DataFusionError = stream.try_collect::<Vec<RecordBatch>>().await.unwrap_err();
assert_eq!(error.message(), table_name_predicate_error().message());
}
}
6 changes: 3 additions & 3 deletions influxdb3_server/src/system_tables/last_caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use influxdb3_write::last_cache::LastCacheProvider;
use iox_system_tables::IoxSystemTable;

pub(super) struct LastCachesTable {
db_name: String,
db_name: Arc<str>,
schema: SchemaRef,
provider: Arc<LastCacheProvider>,
}

impl LastCachesTable {
pub(super) fn new(db: impl Into<String>, provider: Arc<LastCacheProvider>) -> Self {
pub(super) fn new(db_name: Arc<str>, provider: Arc<LastCacheProvider>) -> Self {
Self {
db_name: db.into(),
db_name,
schema: last_caches_schema(),
provider,
}
Expand Down
Loading