Skip to content

Commit

Permalink
Merge branch 'main' into feature/update-pgwire-019
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 authored Jan 11, 2024
2 parents 53d6e24 + 7b7eb1b commit 9add4fa
Show file tree
Hide file tree
Showing 166 changed files with 5,481 additions and 2,084 deletions.
2 changes: 1 addition & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ github:
- horaedb
enabled_merge_buttons:
squash: true
merge: false
merge: true
rebase: true
protected_branches:
main:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ on:
push:
branches:
- main
- dev
paths-ignore:
- 'docs/**'
- 'etc/**'
Expand Down
41 changes: 26 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
horaedbproto = { git = "https://github.com/apache/incubator-horaedb-proto.git", rev = "92152841fc8a10729d6864d2873c62b7a334b2bd" }
horaedbproto = { git = "https://github.com/apache/incubator-horaedb-proto.git", rev = "19ece8f771fc0b3e8e734072cc3d8040de6c74cb" }
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand All @@ -119,10 +119,10 @@ hash_ext = { path = "components/hash_ext" }
hex = "0.4.3"
hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git", rev = "425487ce910f26636fbde8c4d640b538431aad50" }
id_allocator = { path = "components/id_allocator" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "schema" }
influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "iox_query_influxql" }
influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "influxdb_influxql_parser" }
influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "iox_query" }
influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "a905863", package = "schema" }
interpreters = { path = "interpreters" }
itertools = "0.10.5"
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
Expand Down Expand Up @@ -187,6 +187,7 @@ tokio = { version = "1.29", features = ["full"] }
wal = { path = "src/wal" }
xorfilter-rs = { git = "https://github.com/CeresDB/xorfilter", rev = "ac8ef01" }
zstd = { version = "0.12", default-features = false }
uuid = "1.6.1"
regex = "1"

# This profile optimizes for good runtime performance.
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ datafusion = { workspace = true }
future_ext = { workspace = true }
futures = { workspace = true }
generic_error = { workspace = true }
hash_ext = { workspace = true }
hex = { workspace = true }
horaedbproto = { workspace = true }
hyperloglog = { workspace = true }
Expand Down
7 changes: 6 additions & 1 deletion analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ impl SchedulerImpl {
runtime: Arc<Runtime>,
config: SchedulerConfig,
write_sst_max_buffer_size: usize,
min_flush_interval_ms: u64,
scan_options: ScanOptions,
) -> Self {
let (tx, rx) = mpsc::channel(config.schedule_channel_len);
Expand All @@ -324,6 +325,7 @@ impl SchedulerImpl {
max_ongoing_tasks: config.max_ongoing_tasks,
max_unflushed_duration: config.max_unflushed_duration.0,
write_sst_max_buffer_size,
min_flush_interval_ms,
scan_options,
limit: Arc::new(OngoingTaskLimit {
ongoing_tasks: AtomicUsize::new(0),
Expand Down Expand Up @@ -402,6 +404,7 @@ struct ScheduleWorker {
picker_manager: PickerManager,
max_ongoing_tasks: usize,
write_sst_max_buffer_size: usize,
min_flush_interval_ms: u64,
scan_options: ScanOptions,
limit: Arc<OngoingTaskLimit>,
running: Arc<AtomicBool>,
Expand Down Expand Up @@ -508,6 +511,7 @@ impl ScheduleWorker {
num_rows_per_row_group: table_data.table_options().num_rows_per_row_group,
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
column_stats: Default::default(),
};
let scan_options = self.scan_options.clone();

Expand All @@ -526,7 +530,7 @@ impl ScheduleWorker {
}
let res = space_store
.compact_table(
request_id,
request_id.clone(),
&table_data,
&compaction_task,
scan_options,
Expand Down Expand Up @@ -667,6 +671,7 @@ impl ScheduleWorker {
space_store: self.space_store.clone(),
runtime: self.runtime.clone(),
write_sst_max_buffer_size: self.write_sst_max_buffer_size,
min_flush_interval_ms: Some(self.min_flush_interval_ms),
};

for table_data in &tables_buf {
Expand Down
45 changes: 28 additions & 17 deletions analytic_engine/src/instance/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::{
},
manifest::{ManifestRef, SnapshotRequest},
space::SpaceRef,
table::data::TableDataRef,
};

pub(crate) struct Closer {
Expand All @@ -50,13 +51,37 @@ impl Closer {
}
};

// Do flush before close for the fast recovery during the following opening.
// And it should not stop closing if flush fails.
if let Err(e) = self.flush(&table_data).await {
warn!(
"Ignore the failure to flush data before close, table:{}, table_id:{}, err:{e}",
table_data.name, table_data.id
);
}

// Table has been closed so remove it from the space.
let removed_table = self.space.remove_table(&request.table_name);
assert!(removed_table.is_some());

// Table is already moved out of space, we should close it to stop background
// jobs.
table_data.set_closed();

info!(
"table:{}-{} has been removed from the space_id:{}",
table_data.name, table_data.id, self.space.id
);
Ok(())
}

async fn flush(&self, table_data: &TableDataRef) -> Result<()> {
// Flush table.
let opts = TableFlushOptions::default();
let mut serial_exec = table_data.serial_exec.lock().await;
let flush_scheduler = serial_exec.flush_scheduler();

self.flusher
.do_flush(flush_scheduler, &table_data, opts)
.do_flush(flush_scheduler, table_data, opts)
.await
.context(FlushTable {
space_id: self.space.id,
Expand All @@ -77,20 +102,6 @@ impl Closer {
.context(DoManifestSnapshot {
space_id: self.space.id,
table: &table_data.name,
})?;

// Table has been closed so remove it from the space.
let removed_table = self.space.remove_table(&request.table_name);
assert!(removed_table.is_some());

// Table is already moved out of space, we should close it to stop background
// jobs.
table_data.set_closed();

info!(
"table:{}-{} has been removed from the space_id:{}",
table_data.name, table_data.id, self.space.id
);
Ok(())
})
}
}
8 changes: 6 additions & 2 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ use table_engine::{
};
use wal::manager::WalLocation;

use super::open::{TableContext, TablesOfShardContext};
use crate::{
engine::build_space_id,
instance::{close::Closer, drop::Dropper, open::OpenTablesOfShardResult, Instance},
instance::{
close::Closer,
drop::Dropper,
open::{OpenTablesOfShardResult, TableContext, TablesOfShardContext},
Instance,
},
space::{MemSizeOptions, Space, SpaceAndTable, SpaceContext, SpaceId, SpaceRef},
};

Expand Down
Loading

0 comments on commit 9add4fa

Please sign in to comment.