Skip to content

Commit

Permalink
feat: use string for request id (#1349)
Browse files Browse the repository at this point in the history
## Rationale
Close #1178

## Detailed Changes
- Use string to represent request_id, which is uuid v4, random string
like `575c02e1-cd92-4c35-a5f3-353781163e93`
  - https://docs.rs/uuid/1.6.1/uuid/struct.Uuid.html#method.new_v4

## Test Plan
Manually.
  • Loading branch information
jiacai2050 authored Dec 7, 2023
1 parent 22b724c commit 46f54f4
Show file tree
Hide file tree
Showing 25 changed files with 153 additions and 129 deletions.
57 changes: 35 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.23"
ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev = "2c60e05" }
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand Down Expand Up @@ -183,6 +183,7 @@ tokio = { version = "1.29", features = ["full"] }
wal = { path = "src/wal" }
xorfilter-rs = { git = "https://github.com/CeresDB/xorfilter", rev = "ac8ef01" }
zstd = { version = "0.12", default-features = false }
uuid = "1.6.1"
regex = "1"

# This profile optimizes for good runtime performance.
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ impl ScheduleWorker {
}
let res = space_store
.compact_table(
request_id,
request_id.clone(),
&table_data,
&compaction_task,
scan_options,
Expand Down
20 changes: 11 additions & 9 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl FlushTask {
// Start flush duration timer.
let local_metrics = self.table_data.metrics.local_flush_metrics();
let _timer = local_metrics.start_flush_timer();
self.dump_memtables(request_id, &mems_to_flush, flush_req.need_reorder)
self.dump_memtables(request_id.clone(), &mems_to_flush, flush_req.need_reorder)
.await
.box_err()
.context(FlushJobWithCause {
Expand Down Expand Up @@ -421,7 +421,7 @@ impl FlushTask {
if let Some(sampling_mem) = &mems_to_flush.sampling_mem {
if let Some(seq) = self
.dump_sampling_memtable(
request_id,
request_id.clone(),
sampling_mem,
&mut files_to_level0,
need_reorder,
Expand All @@ -436,7 +436,7 @@ impl FlushTask {
}
}
for mem in &mems_to_flush.memtables {
let file = self.dump_normal_memtable(request_id, mem).await?;
let file = self.dump_normal_memtable(request_id.clone(), mem).await?;
if let Some(file) = file {
let sst_size = file.size;
files_to_level0.push(AddFile {
Expand Down Expand Up @@ -565,6 +565,7 @@ impl FlushTask {
let store = self.space_store.clone();
let storage_format_hint = self.table_data.table_options().storage_format_hint;
let sst_write_options = sst_write_options.clone();
let request_id = request_id.clone();

// spawn build sst
let handler = self.runtime.spawn(async move {
Expand Down Expand Up @@ -785,7 +786,7 @@ impl SpaceStore {
}

for files in task.expired() {
self.delete_expired_files(table_data, request_id, files, &mut edit_meta);
self.delete_expired_files(table_data, &request_id, files, &mut edit_meta);
}

info!(
Expand All @@ -798,7 +799,7 @@ impl SpaceStore {

for input in inputs {
self.compact_input_files(
request_id,
request_id.clone(),
table_data,
input,
scan_options.clone(),
Expand Down Expand Up @@ -874,7 +875,7 @@ impl SpaceStore {

info!(
"Instance try to compact table, table:{}, table_id:{}, request_id:{}, input_files:{:?}",
table_data.name, table_data.id, request_id, input.files,
table_data.name, table_data.id, &request_id, input.files,
);

// The schema may be modified during compaction, so we acquire it first and use
Expand Down Expand Up @@ -905,6 +906,7 @@ impl SpaceStore {
let space_id = table_data.space_id;
let table_id = table_data.id;
let sequence = table_data.last_sequence();
let request_id = request_id.clone();
let mut builder = MergeBuilder::new(MergeConfig {
request_id,
metrics_collector: None,
Expand Down Expand Up @@ -933,7 +935,7 @@ impl SpaceStore {

let record_batch_stream = if table_options.need_dedup() {
row_iter::record_batch_with_key_iter_to_stream(DedupIterator::new(
request_id,
request_id.clone(),
merge_iter,
iter_options,
))
Expand Down Expand Up @@ -978,7 +980,7 @@ impl SpaceStore {
})?;

let sst_info = sst_writer
.write(request_id, &sst_meta, record_batch_stream)
.write(request_id.clone(), &sst_meta, record_batch_stream)
.await
.box_err()
.with_context(|| WriteSst {
Expand Down Expand Up @@ -1038,7 +1040,7 @@ impl SpaceStore {
pub(crate) fn delete_expired_files(
&self,
table_data: &TableData,
request_id: RequestId,
request_id: &RequestId,
expired: &ExpiredFiles,
edit_meta: &mut VersionEditMeta,
) {
Expand Down
6 changes: 3 additions & 3 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl Instance {
.metrics_collector
.span(format!("{MERGE_ITER_METRICS_COLLECTOR_NAME_PREFIX}_{idx}"));
let merge_config = MergeConfig {
request_id: request.request_id,
request_id: request.request_id.clone(),
metrics_collector: Some(metrics_collector),
deadline: request.opts.deadline,
space_id: table_data.space_id,
Expand All @@ -230,7 +230,7 @@ impl Instance {
table: &table_data.name,
})?;
let dedup_iter =
DedupIterator::new(request.request_id, merge_iter, iter_options.clone());
DedupIterator::new(request.request_id.clone(), merge_iter, iter_options.clone());

iters.push(dedup_iter);
}
Expand Down Expand Up @@ -263,7 +263,7 @@ impl Instance {
.metrics_collector
.span(format!("{CHAIN_ITER_METRICS_COLLECTOR_NAME_PREFIX}_{idx}"));
let chain_config = ChainConfig {
request_id: request.request_id,
request_id: request.request_id.clone(),
metrics_collector: Some(metrics_collector),
deadline: request.opts.deadline,
num_streams_to_prefetch: self.scan_options.num_streams_to_prefetch,
Expand Down
5 changes: 3 additions & 2 deletions benchmarks/src/merge_memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl MergeMemTableBench {
let request_id = RequestId::next_id();
let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone());
let mut builder = MergeBuilder::new(MergeConfig {
request_id,
request_id: request_id.clone(),
metrics_collector: None,
deadline: None,
space_id,
Expand All @@ -181,7 +181,8 @@ impl MergeMemTableBench {
let mut batch_num = 0;

if self.dedup {
let mut dedup_iter = DedupIterator::new(request_id, merge_iter, iter_options);
let mut dedup_iter =
DedupIterator::new(request_id.clone(), merge_iter, iter_options);
while let Some(batch) = dedup_iter.next_batch().await.unwrap() {
let num_rows = batch.num_rows();
total_rows += num_rows;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/merge_sst_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl MergeSstBench {
let request_id = RequestId::next_id();
let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone());
let mut builder = MergeBuilder::new(MergeConfig {
request_id,
request_id: request_id.clone(),
metrics_collector: None,
deadline: None,
space_id,
Expand Down
Loading

0 comments on commit 46f54f4

Please sign in to comment.