-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add system.parquet_files
table
#25225
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
//! module for query executor | ||
use crate::system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA}; | ||
use crate::system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA_NAME}; | ||
use crate::{QueryExecutor, QueryKind}; | ||
use arrow::array::{ArrayRef, Int64Builder, StringBuilder, StructArray}; | ||
use arrow::datatypes::SchemaRef; | ||
|
@@ -345,10 +345,10 @@ impl<B: WriteBuffer> Database<B> { | |
query_log: Arc<QueryLog>, | ||
) -> Self { | ||
let system_schema_provider = Arc::new(SystemSchemaProvider::new( | ||
db_schema.name.to_string(), | ||
write_buffer.catalog(), | ||
Arc::clone(&db_schema.name), | ||
Arc::clone(&query_log), | ||
write_buffer.last_cache_provider(), | ||
write_buffer.queryable_buffer(), | ||
)); | ||
Self { | ||
db_schema, | ||
|
@@ -462,14 +462,14 @@ impl<B: WriteBuffer> CatalogProvider for Database<B> { | |
|
||
fn schema_names(&self) -> Vec<String> { | ||
debug!("Database as CatalogProvider::schema_names"); | ||
vec![DEFAULT_SCHEMA.to_string(), SYSTEM_SCHEMA.to_string()] | ||
vec![DEFAULT_SCHEMA.to_string(), SYSTEM_SCHEMA_NAME.to_string()] | ||
} | ||
|
||
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> { | ||
debug!(schema_name = %name, "Database as CatalogProvider::schema"); | ||
match name { | ||
DEFAULT_SCHEMA => Some(Arc::new(Self::from_namespace(self))), | ||
SYSTEM_SCHEMA => Some(Arc::clone(&self.system_schema_provider) as _), | ||
SYSTEM_SCHEMA_NAME => Some(Arc::clone(&self.system_schema_provider) as _), | ||
_ => None, | ||
} | ||
} | ||
|
@@ -574,3 +574,193 @@ impl<B: WriteBuffer> TableProvider for QueryTable<B> { | |
provider.scan(ctx, projection, &filters, limit).await | ||
} | ||
} | ||
#[cfg(test)] | ||
mod tests { | ||
use std::{num::NonZeroUsize, sync::Arc, time::Duration}; | ||
|
||
use arrow::array::RecordBatch; | ||
use data_types::NamespaceName; | ||
use datafusion::{assert_batches_sorted_eq, error::DataFusionError}; | ||
use futures::TryStreamExt; | ||
use influxdb3_wal::WalConfig; | ||
use influxdb3_write::{ | ||
persister::PersisterImpl, write_buffer::WriteBufferImpl, Bufferer, Level0Duration, | ||
}; | ||
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig}; | ||
use iox_time::{MockProvider, Time}; | ||
use metric::Registry; | ||
use object_store::{local::LocalFileSystem, ObjectStore}; | ||
use parquet_file::storage::{ParquetStorage, StorageId}; | ||
|
||
use crate::{ | ||
query_executor::QueryExecutorImpl, system_tables::table_name_predicate_error, QueryExecutor, | ||
}; | ||
|
||
fn make_exec(object_store: Arc<dyn ObjectStore>) -> Arc<Executor> { | ||
let metrics = Arc::new(metric::Registry::default()); | ||
|
||
let parquet_store = ParquetStorage::new( | ||
Arc::clone(&object_store), | ||
StorageId::from("test_exec_storage"), | ||
); | ||
Arc::new(Executor::new_with_config_and_executor( | ||
ExecutorConfig { | ||
target_query_partitions: NonZeroUsize::new(1).unwrap(), | ||
object_stores: [&parquet_store] | ||
.into_iter() | ||
.map(|store| (store.id(), Arc::clone(store.object_store()))) | ||
.collect(), | ||
metric_registry: Arc::clone(&metrics), | ||
// Default to 1gb | ||
mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb) | ||
}, | ||
DedicatedExecutor::new_testing(), | ||
)) | ||
} | ||
|
||
type TestWriteBuffer = WriteBufferImpl<MockProvider>; | ||
|
||
async fn setup() -> ( | ||
Arc<TestWriteBuffer>, | ||
QueryExecutorImpl<TestWriteBuffer>, | ||
Arc<MockProvider>, | ||
) { | ||
// Set up QueryExecutor | ||
let object_store = | ||
Arc::new(LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap()); | ||
let persister = Arc::new(PersisterImpl::new(Arc::clone(&object_store) as _)); | ||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); | ||
let executor = make_exec(object_store); | ||
let level_0_duration = Level0Duration::new_5m(); | ||
let write_buffer = Arc::new( | ||
WriteBufferImpl::new( | ||
Arc::clone(&persister), | ||
Arc::clone(&time_provider), | ||
level_0_duration, | ||
Arc::clone(&executor), | ||
WalConfig { | ||
level_0_duration: Duration::from_millis(100), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm about to change this in a follow on PR to use the Level0Duration type, which is only valid for 1m, 5m, or 10m durations. Can you update this test to use one of those? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is addressed in ff6e9a4 by using a 60 second duration. So switching to a |
||
max_write_buffer_size: 100, | ||
flush_interval: Duration::from_millis(10), | ||
snapshot_size: 1, | ||
}, | ||
) | ||
.await | ||
.unwrap(), | ||
); | ||
let metrics = Arc::new(Registry::new()); | ||
let df_config = Arc::new(Default::default()); | ||
let query_executor = QueryExecutorImpl::new( | ||
write_buffer.catalog(), | ||
Arc::clone(&write_buffer), | ||
executor, | ||
metrics, | ||
df_config, | ||
10, | ||
10, | ||
); | ||
|
||
(write_buffer, query_executor, time_provider) | ||
} | ||
|
||
#[test_log::test(tokio::test)] | ||
async fn system_parquet_files_success() { | ||
let (write_buffer, query_executor, time_provider) = setup().await; | ||
// Perform some writes to multiple tables | ||
let db_name = "test_db"; | ||
// perform writes over time to generate WAL files and some snapshots | ||
// the time provider is bumped to trick the system into persisting files: | ||
for i in 0..10 { | ||
let time = i * 10; | ||
let _ = write_buffer | ||
.write_lp( | ||
NamespaceName::new(db_name).unwrap(), | ||
"\ | ||
cpu,host=a,region=us-east usage=250\n\ | ||
mem,host=a,region=us-east usage=150000\n\ | ||
", | ||
Time::from_timestamp_nanos(time), | ||
false, | ||
influxdb3_write::Precision::Nanosecond, | ||
) | ||
.await | ||
.unwrap(); | ||
|
||
time_provider.set(Time::from_timestamp(time + 1, 0).unwrap()); | ||
} | ||
|
||
// bump time again and sleep briefly to ensure time to persist things | ||
time_provider.set(Time::from_timestamp(20, 0).unwrap()); | ||
tokio::time::sleep(Duration::from_millis(500)).await; | ||
|
||
struct TestCase<'a> { | ||
query: &'a str, | ||
expected: &'a [&'a str], | ||
} | ||
|
||
let test_cases = [ | ||
TestCase { | ||
query: "SELECT * FROM system.parquet_files WHERE table_name = 'cpu'", | ||
expected: &[ | ||
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+", | ||
"| table_name | path | size_bytes | row_count | min_time | max_time |", | ||
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+", | ||
"| cpu | dbs/test_db/cpu/1970-01-01/00-00/0000000003.parquet | 2142 | 2 | 0 | 10 |", | ||
"| cpu | dbs/test_db/cpu/1970-01-01/00-00/0000000006.parquet | 2147 | 3 | 20 | 40 |", | ||
"| cpu | dbs/test_db/cpu/1970-01-01/00-00/0000000009.parquet | 2147 | 3 | 50 | 70 |", | ||
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+", | ||
], | ||
}, | ||
TestCase { | ||
query: "SELECT * FROM system.parquet_files WHERE table_name = 'mem'", | ||
expected: &[ | ||
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+", | ||
"| table_name | path | size_bytes | row_count | min_time | max_time |", | ||
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+", | ||
"| mem | dbs/test_db/mem/1970-01-01/00-00/0000000003.parquet | 2142 | 2 | 0 | 10 |", | ||
"| mem | dbs/test_db/mem/1970-01-01/00-00/0000000006.parquet | 2147 | 3 | 20 | 40 |", | ||
"| mem | dbs/test_db/mem/1970-01-01/00-00/0000000009.parquet | 2147 | 3 | 50 | 70 |", | ||
"+------------+-----------------------------------------------------+------------+-----------+----------+----------+", | ||
], | ||
}, | ||
]; | ||
|
||
for t in test_cases { | ||
let batch_stream = query_executor | ||
.query(db_name, t.query, None, crate::QueryKind::Sql, None, None) | ||
.await | ||
.unwrap(); | ||
let batches: Vec<RecordBatch> = batch_stream.try_collect().await.unwrap(); | ||
assert_batches_sorted_eq!(t.expected, &batches); | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn system_parquet_files_predicate_error() { | ||
let (write_buffer, query_executor, time_provider) = setup().await; | ||
// make some writes, so that we have a database that we can query against: | ||
let db_name = "test_db"; | ||
let _ = write_buffer | ||
.write_lp( | ||
NamespaceName::new(db_name).unwrap(), | ||
"cpu,host=a,region=us-east usage=0.1 1", | ||
Time::from_timestamp_nanos(0), | ||
false, | ||
influxdb3_write::Precision::Nanosecond, | ||
) | ||
.await | ||
.unwrap(); | ||
|
||
// Bump time to trick the persister into persisting to parquet: | ||
time_provider.set(Time::from_timestamp(60 * 10, 0).unwrap()); | ||
|
||
// query without the `WHERE table_name =` clause to trigger the error: | ||
let query = "SELECT * FROM system.parquet_files"; | ||
let stream = query_executor | ||
.query(db_name, query, None, crate::QueryKind::Sql, None, None) | ||
.await | ||
.unwrap(); | ||
let error: DataFusionError = stream.try_collect::<Vec<RecordBatch>>().await.unwrap_err(); | ||
assert_eq!(error.message(), table_name_predicate_error().message()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated to this PR, but we should update these tables to be
influxdb3
tables. I logged #25227 to track.