Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: place table datas into manifest, update them together #863

Merged
merged 13 commits into from
May 9, 2023
26 changes: 11 additions & 15 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use crate::{
serial_executor::TableOpSerialExecutor,
InstanceRef,
},
manifest::meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate, MetaUpdateRequest},
manifest::meta_edit::{
AlterOptionsMeta, AlterSchemaMeta, MetaEdit, MetaEditRequest, MetaUpdate,
},
payload::WritePayload,
table::data::TableDataRef,
table_options,
Expand Down Expand Up @@ -120,27 +122,24 @@ impl<'a> Alterer<'a> {
);

// Write to Manifest
let update_req = {
let edit_req = {
let meta_update = MetaUpdate::AlterSchema(manifest_update);
MetaUpdateRequest {
MetaEditRequest {
shard_info: self.table_data.shard_info,
meta_update,
meta_edit: MetaEdit::Update(meta_update),
}
};
self.instance
.space_store
.manifest
.store_update(update_req)
.apply_edit(edit_req)
.await
.context(WriteManifest {
space_id: self.table_data.space_id,
table: &self.table_data.name,
table_id: self.table_data.id,
})?;

// Update schema in memory.
self.table_data.set_schema(request.schema);

Ok(())
}

Expand Down Expand Up @@ -249,27 +248,24 @@ impl<'a> Alterer<'a> {
})?;

// Write to Manifest
let update_req = {
let edit_req = {
let meta_update = MetaUpdate::AlterOptions(manifest_update);
MetaUpdateRequest {
MetaEditRequest {
shard_info: self.table_data.shard_info,
meta_update,
meta_edit: MetaEdit::Update(meta_update),
}
};
self.instance
.space_store
.manifest
.store_update(update_req)
.apply_edit(edit_req)
.await
.context(WriteManifest {
space_id: self.table_data.space_id,
table: &self.table_data.name,
table_id: self.table_data.id,
})?;

// Update memory status
self.table_data.set_table_options(table_opts);

Ok(())
}
}
66 changes: 26 additions & 40 deletions analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@

//! Create table logic of instance

use std::sync::Arc;

use common_util::error::BoxError;
use log::info;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use table_engine::engine::CreateTableRequest;

use crate::{
instance::{
engine::{CreateOpenFailedTable, CreateTableData, InvalidOptions, Result, WriteManifest},
engine::{CreateOpenFailedTable, InvalidOptions, Result, TableNotExist, WriteManifest},
Instance,
},
manifest::meta_update::{AddTableMeta, MetaUpdate, MetaUpdateRequest},
manifest::meta_edit::{AddTableMeta, MetaEdit, MetaEditRequest, MetaUpdate},
space::SpaceRef,
table::data::{TableData, TableDataRef},
table::data::{TableDataRef, TableShardInfo},
table_options,
};

Expand Down Expand Up @@ -51,50 +49,38 @@ impl Instance {
return Ok(table_data);
}

// Choose a write worker for this table
let (table_name, table_id) = (request.table_name.clone(), request.table_id);

let table_data = Arc::new(
TableData::new(
space.id,
request,
table_opts,
&self.file_purger,
self.preflush_write_buffer_size_ratio,
space.mem_usage_collector.clone(),
)
.context(CreateTableData {
space_id: space.id,
table: &table_name,
table_id,
})?,
);

// Store table info into meta
let update_req = {
// Store table info into meta both memory and storage.
let edit_req = {
let meta_update = MetaUpdate::AddTable(AddTableMeta {
space_id: space.id,
table_id: table_data.id,
table_name: table_data.name.clone(),
schema: table_data.schema(),
opts: table_data.table_options().as_ref().clone(),
table_id: request.table_id,
table_name: request.table_name.clone(),
schema: request.table_schema,
opts: table_opts,
});
MetaUpdateRequest {
shard_info: table_data.shard_info,
meta_update,
MetaEditRequest {
shard_info: TableShardInfo::new(request.shard_id),
meta_edit: MetaEdit::Update(meta_update),
}
};
self.space_store
.manifest
.store_update(update_req)
.apply_edit(edit_req)
.await
.context(WriteManifest {
.with_context(|| WriteManifest {
space_id: space.id,
table: &table_data.name,
table_id: table_data.id,
table: request.table_name.clone(),
table_id: request.table_id,
})?;

space.insert_table(table_data.clone());
Ok(table_data)
// Table is sure to exist here.
space
.find_table_by_id(request.table_id)
.with_context(|| TableNotExist {
msg: format!(
"table not exist, space_id:{}, table_id:{}, table_name:{}",
space.id, request.table_id, request.table_name
),
})
}
}
19 changes: 6 additions & 13 deletions analytic_engine/src/instance/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
flush_compaction::{Flusher, TableFlushOptions},
SpaceStoreRef,
},
manifest::meta_update::{DropTableMeta, MetaUpdate, MetaUpdateRequest},
manifest::meta_edit::{DropTableMeta, MetaEdit, MetaEditRequest, MetaUpdate},
space::SpaceRef,
};

Expand Down Expand Up @@ -49,6 +49,7 @@ impl Dropper {
// Fixme(xikai): Trigger a force flush so that the data of the table in the wal
// is marked for deletable. However, the overhead of the flushing can
// be avoided.

let opts = TableFlushOptions::default();
let flush_scheduler = serial_exec.flush_scheduler();
self.flusher
Expand All @@ -61,35 +62,27 @@ impl Dropper {
})?;

// Store the dropping information into meta
let update_req = {
let edit_req = {
let meta_update = MetaUpdate::DropTable(DropTableMeta {
space_id: self.space.id,
table_id: table_data.id,
table_name: table_data.name.clone(),
});
MetaUpdateRequest {
MetaEditRequest {
shard_info: table_data.shard_info,
meta_update,
meta_edit: MetaEdit::Update(meta_update),
}
};
self.space_store
.manifest
.store_update(update_req)
.apply_edit(edit_req)
.await
.context(WriteManifest {
space_id: self.space.id,
table: &table_data.name,
table_id: table_data.id,
})?;

// Set the table dropped after finishing flushing and storing drop table meta
// information.
table_data.set_dropped();

// Clear the memory status after updating manifest and clearing wal so that
// the drop is retryable if fails to update and clear.
self.space.remove_table(&table_data.name);

Ok(true)
}
}
6 changes: 5 additions & 1 deletion analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ pub enum Error {
OpenManifest {
source: crate::manifest::details::Error,
},

#[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
TableNotExist { msg: String, backtrace: Backtrace },
}

define_result!(Error);
Expand Down Expand Up @@ -240,7 +243,8 @@ impl From<Error> for table_engine::engine::Error {
| Error::EncodePayloads { .. }
| Error::CreateOpenFailedTable { .. }
| Error::DoManifestSnapshot { .. }
| Error::OpenManifest { .. } => Self::Unexpected {
| Error::OpenManifest { .. }
| Error::TableNotExist { .. } => Self::Unexpected {
source: Box::new(err),
},
}
Expand Down
50 changes: 19 additions & 31 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use crate::{
TableCompactionRequest,
},
instance::{self, serial_executor::TableFlushScheduler, SpaceStore, SpaceStoreRef},
manifest::meta_update::{AlterOptionsMeta, MetaUpdate, MetaUpdateRequest, VersionEditMeta},
manifest::meta_edit::{
AlterOptionsMeta, MetaEdit, MetaEditRequest, MetaUpdate, VersionEditMeta,
},
memtable::{ColumnarIterPtr, MemTableRef, ScanContext, ScanRequest},
row_iter::{
self,
Expand All @@ -53,7 +55,7 @@ use crate::{
table::{
data::{TableData, TableDataRef},
version::{FlushableMemTables, MemTableState, SamplingMemTable},
version_edit::{AddFile, DeleteFile, VersionEdit},
version_edit::{AddFile, DeleteFile},
},
table_options::StorageFormatHint,
};
Expand Down Expand Up @@ -231,25 +233,23 @@ impl Flusher {
let mut new_table_opts = (*table_data.table_options()).clone();
new_table_opts.segment_duration = Some(ReadableDuration(suggest_segment_duration));

let update_req = {
let edit_req = {
let meta_update = MetaUpdate::AlterOptions(AlterOptionsMeta {
space_id: table_data.space_id,
table_id: table_data.id,
options: new_table_opts.clone(),
});
MetaUpdateRequest {
MetaEditRequest {
shard_info: table_data.shard_info,
meta_update,
meta_edit: MetaEdit::Update(meta_update),
}
};
self.space_store
.manifest
.store_update(update_req)
.apply_edit(edit_req)
.await
.context(StoreVersionEdit)?;

table_data.set_table_options(new_table_opts);

// Now the segment duration is applied, we can stop sampling and freeze the
// sampling memtable.
current_version.freeze_sampling();
Expand Down Expand Up @@ -415,36 +415,27 @@ impl FlushTask {
);

// Persist the flush result to manifest.
let update_req = {
let edit_req = {
let edit_meta = VersionEditMeta {
space_id: self.table_data.space_id,
table_id: self.table_data.id,
flushed_sequence,
files_to_add: files_to_level0.clone(),
files_to_delete: vec![],
mems_to_remove: mems_to_flush.ids(),
};
let meta_update = MetaUpdate::VersionEdit(edit_meta);
MetaUpdateRequest {
MetaEditRequest {
shard_info: self.table_data.shard_info,
meta_update,
meta_edit: MetaEdit::Update(meta_update),
}
};
self.space_store
.manifest
.store_update(update_req)
.apply_edit(edit_req)
.await
.context(StoreVersionEdit)?;

// Edit table version to remove dumped memtables.
let mems_to_remove = mems_to_flush.ids();
let edit = VersionEdit {
flushed_sequence,
mems_to_remove,
files_to_add: files_to_level0,
files_to_delete: vec![],
};
self.table_data.current_version().apply_edit(edit);

// Mark sequence <= flushed_sequence to be deleted.
let table_location = self.table_data.table_location();
let wal_location =
Expand Down Expand Up @@ -689,7 +680,8 @@ impl SpaceStore {
flushed_sequence: 0,
// Use the number of compaction inputs as the estimated number of files to add.
files_to_add: Vec::with_capacity(task.compaction_inputs.len()),
files_to_delete: Vec::new(),
files_to_delete: vec![],
mems_to_remove: vec![],
};

if task.expired.is_empty() && task.compaction_inputs.is_empty() {
Expand Down Expand Up @@ -722,22 +714,18 @@ impl SpaceStore {
.await?;
}

let update_req = {
let edit_req = {
let meta_update = MetaUpdate::VersionEdit(edit_meta.clone());
MetaUpdateRequest {
MetaEditRequest {
shard_info: table_data.shard_info,
meta_update,
meta_edit: MetaEdit::Update(meta_update),
}
};
self.manifest
.store_update(update_req)
.apply_edit(edit_req)
.await
.context(StoreVersionEdit)?;

// Apply to the table version.
let edit = edit_meta.into_version_edit();
table_data.current_version().apply_edit(edit);

Ok(())
}

Expand Down
Loading