Skip to content

Commit 9e6e10b

Browse files
committed
Revert "fix: transient table no work as expected inside explicit transaction (databendlabs#18160)"
This reverts commit 461a1e4.
1 parent 461a1e4 commit 9e6e10b

File tree

11 files changed

+89
-136
lines changed

11 files changed

+89
-136
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/catalog/src/table_context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,8 @@ pub trait TableContext: Send + Sync {
357357
previous_snapshot: Option<Arc<TableSnapshot>>,
358358
) -> Result<TableMetaTimestamps>;
359359

360+
fn clear_table_meta_timestamps_cache(&self);
361+
360362
fn get_read_block_thresholds(&self) -> BlockThresholds;
361363
fn set_read_block_thresholds(&self, _thresholds: BlockThresholds);
362364

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ impl ReclusterTableInterpreter {
192192
push_downs: &mut Option<PushDownInfo>,
193193
hilbert_info: &mut Option<HilbertBuildInfo>,
194194
) -> Result<bool> {
195+
self.ctx.clear_table_meta_timestamps_cache();
195196
let start = SystemTime::now();
196197
let settings = self.ctx.get_settings();
197198

src/query/service/src/sessions/query_ctx.rs

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,55 +1532,53 @@ impl TableContext for QueryContext {
15321532
previous_snapshot: Option<Arc<TableSnapshot>>,
15331533
) -> Result<TableMetaTimestamps> {
15341534
let table_id = table.get_id();
1535-
{
1536-
// Defensively check that:
1537-
// For each table, the previous_snapshot's timestamp passed in is strictly increasing,
1538-
// Or put it another way, caller should always pass in the same or newer snapshot of the same table.
1539-
let table_snapshot_timestamp_history =
1540-
self.shared.get_table_snapshot_timestamp_history();
1541-
let mut history = table_snapshot_timestamp_history.lock();
1542-
let previous_snapshot_timestamp = previous_snapshot.as_ref().and_then(|s| s.timestamp);
1543-
if let Some(last_accessed) = history.get(&table_id) {
1544-
if last_accessed > &previous_snapshot_timestamp {
1545-
return Err(ErrorCode::Internal(
1546-
format!(
1547-
"[QUERY-CTX] Generating new table meta timestamps failed: table_id = {}, previous_snapshot_timestamp {:?} is lesser than the snapshot timestamp accessed last time {:?}",
1548-
table_id, previous_snapshot_timestamp, last_accessed
1549-
)
1550-
));
1551-
}
1535+
let cache = self.shared.get_table_meta_timestamps();
1536+
let cached_item = cache.lock().get(&table_id).copied();
1537+
1538+
match cached_item {
1539+
Some(ts) => Ok(ts),
1540+
None => {
1541+
let delta = {
1542+
let fuse_table = FuseTable::try_from_table(table)?;
1543+
let duration = if fuse_table.is_transient() {
1544+
Duration::from_secs(0)
1545+
} else {
1546+
let settings = self.get_settings();
1547+
let max_exec_time_secs = settings.get_max_execute_time_in_seconds()?;
1548+
if max_exec_time_secs != 0 {
1549+
Duration::from_secs(max_exec_time_secs)
1550+
} else {
1551+
// no limit, use retention period as delta
1552+
// prefer table-level retention setting.
1553+
match fuse_table.get_table_retention_period() {
1554+
None => {
1555+
Duration::from_days(settings.get_data_retention_time_in_days()?)
1556+
}
1557+
Some(v) => v,
1558+
}
1559+
}
1560+
};
1561+
1562+
chrono::Duration::from_std(duration).map_err(|e| {
1563+
ErrorCode::Internal(format!(
1564+
"[QUERY-CTX] Unable to construct delta duration of table meta timestamp: {e}",
1565+
))
1566+
})?
1567+
};
1568+
let ts = self.txn_mgr().lock().get_table_meta_timestamps(
1569+
table_id,
1570+
previous_snapshot,
1571+
delta,
1572+
);
1573+
cache.lock().insert(table_id, ts);
1574+
Ok(ts)
15521575
}
1553-
history.insert(table_id, previous_snapshot_timestamp);
1554-
drop(history)
15551576
}
1577+
}
15561578

1557-
let delta = {
1558-
let fuse_table = FuseTable::try_from_table(table)?;
1559-
let duration = if fuse_table.is_transient() {
1560-
Duration::from_secs(0)
1561-
} else {
1562-
let settings = self.get_settings();
1563-
let max_exec_time_secs = settings.get_max_execute_time_in_seconds()?;
1564-
if max_exec_time_secs != 0 {
1565-
Duration::from_secs(max_exec_time_secs)
1566-
} else {
1567-
// no limit, use retention period as delta
1568-
// prefer table-level retention setting.
1569-
match fuse_table.get_table_retention_period() {
1570-
None => Duration::from_days(settings.get_data_retention_time_in_days()?),
1571-
Some(v) => v,
1572-
}
1573-
}
1574-
};
1575-
1576-
chrono::Duration::from_std(duration).map_err(|e| {
1577-
ErrorCode::Internal(format!(
1578-
"[QUERY-CTX] Unable to construct delta duration of table meta timestamp: {e}",
1579-
))
1580-
})?
1581-
};
1582-
let ts = TableMetaTimestamps::new(previous_snapshot, delta);
1583-
Ok(ts)
1579+
fn clear_table_meta_timestamps_cache(&self) {
1580+
let cache = self.shared.get_table_meta_timestamps();
1581+
cache.lock().clear();
15841582
}
15851583

15861584
fn get_read_block_thresholds(&self) -> BlockThresholds {

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ use std::time::SystemTime;
2626

2727
use async_channel::Receiver;
2828
use async_channel::Sender;
29-
use chrono::DateTime;
30-
use chrono::Utc;
3129
use dashmap::DashMap;
3230
use databend_common_base::base::short_sql;
3331
use databend_common_base::base::Progress;
@@ -66,6 +64,7 @@ use databend_common_storage::StorageMetrics;
6664
use databend_common_storages_stream::stream_table::StreamTable;
6765
use databend_common_users::UserApiProvider;
6866
use databend_storages_common_table_meta::meta::Location;
67+
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
6968
use parking_lot::Mutex;
7069
use parking_lot::RwLock;
7170
use uuid::Uuid;
@@ -162,8 +161,7 @@ pub struct QueryContextShared {
162161
pub(in crate::sessions) query_cache_metrics: DataCacheMetrics,
163162

164163
pub(in crate::sessions) query_queued_duration: Arc<RwLock<Duration>>,
165-
pub(in crate::sessions) table_snapshot_timestamp_history:
166-
Arc<Mutex<HashMap<u64, Option<DateTime<Utc>>>>>,
164+
pub(in crate::sessions) table_meta_timestamps: Arc<Mutex<HashMap<u64, TableMetaTimestamps>>>,
167165

168166
pub(in crate::sessions) cluster_spill_progress: Arc<RwLock<HashMap<String, SpillProgress>>>,
169167
pub(in crate::sessions) spilled_files:
@@ -243,7 +241,7 @@ impl QueryContextShared {
243241
merge_into_join: Default::default(),
244242
multi_table_insert_status: Default::default(),
245243
query_queued_duration: Arc::new(RwLock::new(Duration::from_secs(0))),
246-
table_snapshot_timestamp_history: Arc::new(Mutex::new(HashMap::new())),
244+
table_meta_timestamps: Arc::new(Mutex::new(HashMap::new())),
247245

248246
cluster_spill_progress: Default::default(),
249247
spilled_files: Default::default(),
@@ -842,10 +840,8 @@ impl QueryContextShared {
842840
nodes_peek_memory_usage
843841
}
844842

845-
pub fn get_table_snapshot_timestamp_history(
846-
&self,
847-
) -> Arc<Mutex<HashMap<u64, Option<DateTime<Utc>>>>> {
848-
self.table_snapshot_timestamp_history.clone()
843+
pub fn get_table_meta_timestamps(&self) -> Arc<Mutex<HashMap<u64, TableMetaTimestamps>>> {
844+
self.table_meta_timestamps.clone()
849845
}
850846

851847
pub fn get_pruned_partitions_stats(&self) -> HashMap<u32, PartStatistics> {

src/query/service/tests/it/sql/exec/get_table_bind_test.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,10 @@ impl TableContext for CtxDelegation {
979979
self.ctx.get_table_meta_timestamps(table, previous_snapshot)
980980
}
981981

982+
fn clear_table_meta_timestamps_cache(&self) {
983+
self.ctx.clear_table_meta_timestamps_cache();
984+
}
985+
982986
fn get_temp_table_prefix(&self) -> Result<String> {
983987
todo!()
984988
}

src/query/service/tests/it/storages/fuse/operations/commit.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,10 @@ impl TableContext for CtxDelegation {
891891
) -> Result<TableMetaTimestamps> {
892892
self.ctx.get_table_meta_timestamps(table, previous_snapshot)
893893
}
894+
895+
fn clear_table_meta_timestamps_cache(&self) {
896+
self.ctx.clear_table_meta_timestamps_cache();
897+
}
894898
}
895899

896900
#[derive(Clone, Debug)]

src/query/storages/common/session/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ databend-common-storage = { workspace = true }
1414
databend-storages-common-blocks = { workspace = true }
1515
databend-storages-common-table-meta = { workspace = true }
1616

17+
chrono = { workspace = true }
1718
log = { workspace = true }
1819
parking_lot = { workspace = true }
1920
serde = { workspace = true, features = ["derive"] }

src/query/storages/common/session/src/transaction.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::hash_map::Entry;
1516
use std::collections::BTreeMap;
1617
use std::collections::HashMap;
1718
use std::collections::HashSet;
1819
use std::sync::Arc;
1920

21+
use chrono::Duration;
2022
use databend_common_meta_app::principal::StageInfo;
2123
use databend_common_meta_app::schema::TableCopiedFileInfo;
2224
use databend_common_meta_app::schema::TableIdent;
@@ -28,6 +30,8 @@ use databend_common_meta_app::schema::UpdateTempTableReq;
2830
use databend_common_meta_app::schema::UpsertTableCopiedFileReq;
2931
use databend_common_meta_app::tenant::Tenant;
3032
use databend_common_meta_types::MatchSeq;
33+
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
34+
use databend_storages_common_table_meta::meta::TableSnapshot;
3135
use databend_storages_common_table_meta::table_id_ranges::is_temp_table_id;
3236
use parking_lot::Mutex;
3337
use serde::Deserialize;
@@ -62,6 +66,8 @@ pub struct TxnBuffer {
6266
stream_tables: HashMap<u64, StreamSnapshot>,
6367
need_purge_files: Vec<(StageInfo, Vec<String>)>,
6468

69+
pub table_meta_timestamps: HashMap<u64, TableMetaTimestamps>,
70+
6571
temp_table_desc_to_id: HashMap<String, u64>,
6672
mutated_temp_tables: HashMap<u64, TempTable>,
6773
}
@@ -358,6 +364,27 @@ impl TxnManager {
358364
std::mem::take(&mut self.txn_buffer.need_purge_files)
359365
}
360366

367+
pub fn get_table_meta_timestamps(
368+
&mut self,
369+
table_id: u64,
370+
previous_snapshot: Option<Arc<TableSnapshot>>,
371+
delta: Duration,
372+
) -> TableMetaTimestamps {
373+
if !self.is_active() {
374+
return TableMetaTimestamps::new(previous_snapshot, delta);
375+
}
376+
377+
let entry = self.txn_buffer.table_meta_timestamps.entry(table_id);
378+
match entry {
379+
Entry::Occupied(e) => *e.get(),
380+
Entry::Vacant(e) => {
381+
let timestamps = TableMetaTimestamps::new(previous_snapshot, delta);
382+
e.insert(timestamps);
383+
timestamps
384+
}
385+
}
386+
}
387+
361388
pub fn get_base_snapshot_location(&self, table_id: u64) -> Option<String> {
362389
self.txn_buffer
363390
.base_snapshot_location

tests/sqllogictests/suites/base/issues/issue_18160.test

Lines changed: 0 additions & 35 deletions
This file was deleted.

0 commit comments

Comments
 (0)