Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,6 @@ pub trait TableContext: Send + Sync {
previous_snapshot: Option<Arc<TableSnapshot>>,
) -> Result<TableMetaTimestamps>;

fn clear_table_meta_timestamps_cache(&self);

fn get_read_block_thresholds(&self) -> BlockThresholds;
fn set_read_block_thresholds(&self, _thresholds: BlockThresholds);

Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ async fn compact_table(
&compact_target.table,
)?;

ctx.clear_table_meta_timestamps_cache();

{
// do compact.
let compact_block = RelOperator::CompactBlock(OptimizeCompactBlock {
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc) -> Result<()> {
.await?;
let table_id = table.get_id();

ctx.clear_table_meta_timestamps_cache();

let mut plans = Vec::new();

// Generate sync aggregating indexes.
Expand Down
37 changes: 17 additions & 20 deletions src/query/service/src/interpreters/interpreter_copy_into_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use databend_common_sql::executor::PhysicalPlan;
use databend_common_storage::StageFileInfo;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_stage::StageTable;
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
use log::debug;
use log::info;

Expand Down Expand Up @@ -100,21 +101,8 @@ impl CopyIntoTableInterpreter {
&self,
table_info: TableInfo,
plan: &CopyIntoTablePlan,
table_meta_timestamps: TableMetaTimestamps,
) -> Result<(PhysicalPlan, Vec<UpdateStreamMetaReq>)> {
let to_table = self
.ctx
.get_table(
plan.catalog_info.catalog_name(),
&plan.database_name,
&plan.table_name,
)
.await?;
let snapshot = FuseTable::try_from_table(to_table.as_ref())?
.read_table_snapshot()
.await?;
let table_meta_timestamps = self
.ctx
.get_table_meta_timestamps(to_table.as_ref(), snapshot)?;
let mut update_stream_meta_reqs = vec![];
let (source, project_columns) = if let Some(ref query) = plan.query {
let query = if plan.enable_distributed {
Expand Down Expand Up @@ -244,6 +232,7 @@ impl CopyIntoTableInterpreter {
update_stream_meta: Vec<UpdateStreamMetaReq>,
deduplicated_label: Option<String>,
path_prefix: Option<String>,
table_meta_timestamps: TableMetaTimestamps,
) -> Result<()> {
let ctx = self.ctx.clone();
let to_table = ctx
Expand All @@ -264,11 +253,6 @@ impl CopyIntoTableInterpreter {
path_prefix,
)?;

let fuse_table = FuseTable::try_from_table(to_table.as_ref())?;
let table_meta_timestamps = ctx.get_table_meta_timestamps(
to_table.as_ref(),
fuse_table.read_table_snapshot().await?,
)?;
to_table.commit_insertion(
ctx.clone(),
main_pipeline,
Expand Down Expand Up @@ -379,9 +363,21 @@ impl Interpreter for CopyIntoTableInterpreter {
return self.on_no_files_to_copy().await;
}

let snapshot = FuseTable::try_from_table(to_table.as_ref())?
.read_table_snapshot()
.await?;
let table_meta_timestamps = self
.ctx
.get_table_meta_timestamps(to_table.as_ref(), snapshot)?;

let (physical_plan, update_stream_meta) = self
.build_physical_plan(to_table.get_table_info().clone(), &self.plan)
.build_physical_plan(
to_table.get_table_info().clone(),
&self.plan,
table_meta_timestamps,
)
.await?;

let mut build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;

Expand All @@ -405,6 +401,7 @@ impl Interpreter for CopyIntoTableInterpreter {
update_stream_meta,
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
self.plan.path_prefix.clone(),
table_meta_timestamps,
)
.await?;
}
Expand Down
6 changes: 5 additions & 1 deletion src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use chrono::Duration;
use databend_common_catalog::lock::LockTableOption;
use databend_common_catalog::table::TableExt;
use databend_common_exception::ErrorCode;
Expand All @@ -35,6 +36,7 @@ use databend_common_sql::plans::InsertValue;
use databend_common_sql::plans::Plan;
use databend_common_sql::NameResolutionContext;
use databend_common_storages_stage::build_streaming_load_pipeline;
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
use log::info;

use crate::interpreters::common::check_deduplicate_label;
Expand Down Expand Up @@ -117,7 +119,9 @@ impl Interpreter for InsertInterpreter {
self.ctx
.get_table_meta_timestamps(table.as_ref(), snapshot)?
} else {
Default::default()
// For non-fuse table, the table meta timestamps does not matter,
// just passes a placeholder value here
TableMetaTimestamps::new(None, Duration::hours(1))
};

let mut build_res = PipelineBuildResult::create();
Expand Down
5 changes: 4 additions & 1 deletion src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use databend_common_sql::ScalarBinder;
use databend_common_storage::StageFileInfo;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
use databend_storages_common_table_meta::readers::snapshot_reader::TableSnapshotAccessor;
use databend_storages_common_table_meta::table::ClusterType;
use parking_lot::RwLock;
Expand Down Expand Up @@ -198,6 +199,7 @@ impl ReplaceInterpreter {
&self.plan.source,
self.plan.schema(),
&mut purge_info,
table_meta_timestamps,
)
.await?;
if let Some(s) = &select_ctx {
Expand Down Expand Up @@ -394,6 +396,7 @@ impl ReplaceInterpreter {
source: &'a InsertInputSource,
schema: DataSchemaRef,
purge_info: &mut Option<(Vec<StageFileInfo>, StageInfo, CopyIntoTableOptions)>,
table_meta_timestamps: TableMetaTimestamps,
) -> Result<ReplaceSourceCtx> {
match source {
InsertInputSource::Values(source) => self
Expand All @@ -413,7 +416,7 @@ impl ReplaceInterpreter {
let interpreter =
CopyIntoTableInterpreter::try_create(ctx.clone(), *copy_plan.clone())?;
let (physical_plan, _) = interpreter
.build_physical_plan(table_info, &copy_plan)
.build_physical_plan(table_info, &copy_plan, table_meta_timestamps)
.await?;

// TODO optimization: if copy_plan.stage_table_info.files_to_copy is None, there should be a short-cut plan
Expand Down
28 changes: 28 additions & 0 deletions src/query/service/src/interpreters/interpreter_txn_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@
use std::sync::Arc;
use std::time::Instant;

use databend_common_base::base::GlobalInstance;
use databend_common_exception::Result;
use databend_common_license::license::Feature::Vacuum;
use databend_common_license::license_manager::LicenseManagerSwitch;
use databend_common_meta_app::principal::StageInfo;
use databend_common_metrics::storage::metrics_inc_copy_purge_files_cost_milliseconds;
use databend_common_metrics::storage::metrics_inc_copy_purge_files_counter;
use databend_common_storage::init_stage_operator;
use databend_common_storages_fuse::commit_with_backoff;
use databend_common_storages_fuse::operations::vacuum_tables_from_info;
use databend_common_storages_fuse::TableContext;
use databend_enterprise_vacuum_handler::VacuumHandlerWrapper;
use databend_storages_common_io::Files;
use databend_storages_common_session::TxnManagerRef;
use log::error;
use log::info;
use log::warn;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -83,6 +89,28 @@ pub async fn execute_commit_statement(ctx: Arc<dyn TableContext>) -> Result<()>
for (stage_info, files) in need_purge_files {
try_purge_files(ctx.clone(), &stage_info, &files).await;
}

let tables_need_purge = { ctx.txn_mgr().lock().table_need_purge() };

if !tables_need_purge.is_empty() {
if LicenseManagerSwitch::instance()
.check_enterprise_enabled(ctx.get_license_key(), Vacuum)
.is_ok()
{
let handler: Arc<VacuumHandlerWrapper> = GlobalInstance::get();
let num_tables = tables_need_purge.len();
info!("Vacuuming {num_tables} tables after transaction commit");
if let Err(e) =
vacuum_tables_from_info(tables_need_purge, ctx.clone(), handler).await
{
warn!( "Failed to vacuum tables after transaction commit (best-effort operation): {e}");
} else {
info!( "{num_tables} tables vacuumed after transaction commit in a best-effort manner" );
}
} else {
warn!("EE feature is not enabled, vacuum after transaction commit is skipped");
}
}
}
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::Duration;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_sql::executor::physical_plans::CopyIntoLocation;
use databend_common_storages_stage::StageSinkTable;
use databend_storages_common_table_meta::meta::TableMetaTimestamps;

use crate::pipelines::PipelineBuilder;

Expand All @@ -32,7 +34,11 @@ impl PipelineBuilder {
false,
)?;

// The stage table that copying into
let to_table = StageSinkTable::create(copy.info.clone(), copy.input_table_schema.clone())?;

// StageSinkTable needs not to hold the table meta timestamps invariants, just pass a dummy one
let dummy_table_meta_timestamps = TableMetaTimestamps::new(None, Duration::hours(1));
PipelineBuilder::build_append2table_with_commit_pipeline(
self.ctx.clone(),
&mut self.main_pipeline,
Expand All @@ -42,7 +48,7 @@ impl PipelineBuilder {
vec![],
false,
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
Default::default(),
dummy_table_meta_timestamps,
)
}
}
Loading
Loading