Skip to content

Commit 5197994

Browse files
committed
feat: introduce reconcile table procedure
Signed-off-by: WenyXu <wenymedia@gmail.com>
1 parent 02a5acd commit 5197994

File tree

20 files changed

+1190
-137
lines changed

20 files changed

+1190
-137
lines changed

src/api/src/v1/column_def.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use greptime_proto::v1::{
2424
};
2525
use snafu::ResultExt;
2626

27-
use crate::error::{self, Result};
27+
use crate::error::{self, ConvertColumnDefaultConstraintSnafu, Result};
2828
use crate::helper::ColumnDataTypeWrapper;
2929
use crate::v1::{ColumnDef, ColumnOptions, SemanticType};
3030

@@ -77,6 +77,48 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
7777
})
7878
}
7979

80+
/// Tries to construct a `ColumnDef` from the given `ColumnSchema`.
81+
///
82+
/// TODO(weny): Add tests for this function.
83+
pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) -> Result<ColumnDef> {
84+
let column_datatype =
85+
ColumnDataTypeWrapper::try_from(column_schema.data_type.clone()).map(|w| w.to_parts())?;
86+
87+
let semantic_type = if column_schema.is_time_index() {
88+
SemanticType::Timestamp
89+
} else if is_primary_key {
90+
SemanticType::Tag
91+
} else {
92+
SemanticType::Field
93+
} as i32;
94+
let comment = column_schema
95+
.metadata()
96+
.get(COMMENT_KEY)
97+
.cloned()
98+
.unwrap_or_default();
99+
100+
let default_constraint = match column_schema.default_constraint() {
101+
None => vec![],
102+
Some(v) => v
103+
.clone()
104+
.try_into()
105+
.context(ConvertColumnDefaultConstraintSnafu {
106+
column: &column_schema.name,
107+
})?,
108+
};
109+
let options = options_from_column_schema(column_schema);
110+
Ok(ColumnDef {
111+
name: column_schema.name.clone(),
112+
data_type: column_datatype.0 as i32,
113+
is_nullable: column_schema.is_nullable(),
114+
default_constraint,
115+
semantic_type,
116+
comment,
117+
datatype_extension: column_datatype.1,
118+
options,
119+
})
120+
}
121+
80122
/// Constructs a `ColumnOptions` from the given `ColumnSchema`.
81123
pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<ColumnOptions> {
82124
let mut options = ColumnOptions::default();

src/common/meta/src/ddl/alter_logical_tables/update_metadata.rs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
// limitations under the License.
1414

1515
use common_grpc_expr::alter_expr_to_request;
16-
use itertools::Itertools;
1716
use snafu::ResultExt;
1817
use table::metadata::{RawTableInfo, TableInfo};
1918

2019
use crate::ddl::alter_logical_tables::executor::AlterLogicalTablesExecutor;
2120
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
21+
use crate::ddl::utils::table_info::batch_update_table_info_values;
2222
use crate::error;
2323
use crate::error::{ConvertAlterTableRequestSnafu, Result};
2424
use crate::key::table_info::TableInfoValue;
@@ -48,25 +48,8 @@ impl AlterLogicalTablesProcedure {
4848

4949
pub(crate) async fn update_logical_tables_metadata(&mut self) -> Result<()> {
5050
let table_info_values = self.build_update_metadata()?;
51-
let manager = &self.context.table_metadata_manager;
52-
let chunk_size = manager.batch_update_table_info_value_chunk_size();
53-
if table_info_values.len() > chunk_size {
54-
let chunks = table_info_values
55-
.into_iter()
56-
.chunks(chunk_size)
57-
.into_iter()
58-
.map(|check| check.collect::<Vec<_>>())
59-
.collect::<Vec<_>>();
60-
for chunk in chunks {
61-
manager.batch_update_table_info_values(chunk).await?;
62-
}
63-
} else {
64-
manager
65-
.batch_update_table_info_values(table_info_values)
66-
.await?;
67-
}
68-
69-
Ok(())
51+
batch_update_table_info_values(&self.context.table_metadata_manager, table_info_values)
52+
.await
7053
}
7154

7255
pub(crate) fn build_update_metadata(

src/common/meta/src/ddl/alter_logical_tables/validator.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use store_api::storage::TableId;
2121
use table::table_reference::TableReference;
2222

2323
use crate::ddl::utils::table_id::get_all_table_ids_by_names;
24-
use crate::ddl::utils::table_info::get_all_table_info_values_by_table_ids;
24+
use crate::ddl::utils::table_info::{
25+
all_logical_table_routes_have_same_physical_id, get_all_table_info_values_by_table_ids,
26+
};
2527
use crate::error::{
2628
AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu,
2729
TableRouteNotFoundSnafu,
@@ -146,23 +148,16 @@ impl<'a> AlterLogicalTableValidator<'a> {
146148
table_route_manager: &TableRouteManager,
147149
table_ids: &[TableId],
148150
) -> Result<()> {
149-
let table_routes = table_route_manager
150-
.table_route_storage()
151-
.batch_get(table_ids)
151+
let all_logical_table_routes_have_same_physical_id =
152+
all_logical_table_routes_have_same_physical_id(
153+
table_route_manager,
154+
table_ids,
155+
self.physical_table_id,
156+
)
152157
.await?;
153158

154-
let physical_table_id = self.physical_table_id;
155-
156-
let is_same_physical_table = table_routes.iter().all(|r| {
157-
if let Some(TableRouteValue::Logical(r)) = r {
158-
r.physical_table_id() == physical_table_id
159-
} else {
160-
false
161-
}
162-
});
163-
164159
ensure!(
165-
is_same_physical_table,
160+
all_logical_table_routes_have_same_physical_id,
166161
AlterLogicalTablesInvalidArgumentsSnafu {
167162
err_msg: "All the tasks should have the same physical table id"
168163
}

src/common/meta/src/ddl/alter_table/executor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,5 @@ fn build_new_table_info(
308308
"Built new table info: {:?} for table {}, table_id: {}",
309309
new_info.meta, table_name, table_id
310310
);
311-
312311
Ok(new_info)
313312
}

src/common/meta/src/ddl/create_table_template.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
2121
use store_api::storage::{RegionId, RegionNumber};
2222
use table::metadata::TableId;
2323

24-
use crate::error;
25-
use crate::error::Result;
24+
use crate::error::{self, Result};
2625
use crate::wal_options_allocator::prepare_wal_options;
2726

2827
pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<CreateRequest> {

src/common/meta/src/ddl/utils/table_info.rs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use itertools::Itertools;
1516
use snafu::OptionExt;
1617
use store_api::storage::TableId;
18+
use table::metadata::RawTableInfo;
1719
use table::table_reference::TableReference;
1820

1921
use crate::error::{Result, TableInfoNotFoundSnafu};
2022
use crate::key::table_info::{TableInfoManager, TableInfoValue};
21-
use crate::key::DeserializedValueWithBytes;
23+
use crate::key::table_route::{TableRouteManager, TableRouteValue};
24+
use crate::key::{DeserializedValueWithBytes, TableMetadataManager};
2225

2326
/// Get all table info values by table ids.
2427
///
@@ -42,3 +45,56 @@ pub(crate) async fn get_all_table_info_values_by_table_ids<'a>(
4245

4346
Ok(table_info_values)
4447
}
48+
49+
/// Checks if all the logical table routes have the same physical table id.
50+
pub(crate) async fn all_logical_table_routes_have_same_physical_id(
51+
table_route_manager: &TableRouteManager,
52+
table_ids: &[TableId],
53+
physical_table_id: TableId,
54+
) -> Result<bool> {
55+
let table_routes = table_route_manager
56+
.table_route_storage()
57+
.batch_get(table_ids)
58+
.await?;
59+
60+
let is_same_physical_table = table_routes.iter().all(|r| {
61+
if let Some(TableRouteValue::Logical(r)) = r {
62+
r.physical_table_id() == physical_table_id
63+
} else {
64+
false
65+
}
66+
});
67+
68+
Ok(is_same_physical_table)
69+
}
70+
71+
/// Batch updates the table info values.
72+
///
73+
/// The table info values are grouped into chunks, and each chunk is updated in a single transaction.
74+
///
75+
/// Returns an error if any table info value fails to update.
76+
pub(crate) async fn batch_update_table_info_values(
77+
table_metadata_manager: &TableMetadataManager,
78+
table_info_values: Vec<(DeserializedValueWithBytes<TableInfoValue>, RawTableInfo)>,
79+
) -> Result<()> {
80+
let chunk_size = table_metadata_manager.batch_update_table_info_value_chunk_size();
81+
if table_info_values.len() > chunk_size {
82+
let chunks = table_info_values
83+
.into_iter()
84+
.chunks(chunk_size)
85+
.into_iter()
86+
.map(|check| check.collect::<Vec<_>>())
87+
.collect::<Vec<_>>();
88+
for chunk in chunks {
89+
table_metadata_manager
90+
.batch_update_table_info_values(chunk)
91+
.await?;
92+
}
93+
} else {
94+
table_metadata_manager
95+
.batch_update_table_info_values(table_info_values)
96+
.await?;
97+
}
98+
99+
Ok(())
100+
}

src/common/meta/src/error.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -878,6 +878,12 @@ pub enum Error {
878878
error: object_store::Error,
879879
},
880880

881+
#[snafu(display("Missing column ids"))]
882+
MissingColumnIds {
883+
#[snafu(implicit)]
884+
location: Location,
885+
},
886+
881887
#[snafu(display(
882888
"Missing column in column metadata: {}, table: {}, table_id: {}",
883889
column_name,
@@ -907,6 +913,14 @@ pub enum Error {
907913
table_name: String,
908914
table_id: TableId,
909915
},
916+
917+
#[snafu(display("Failed to convert column def, column: {}", column))]
918+
ConvertColumnDef {
919+
column: String,
920+
#[snafu(implicit)]
921+
location: Location,
922+
source: api::error::Error,
923+
},
910924
}
911925

912926
pub type Result<T> = std::result::Result<T, Error>;
@@ -928,6 +942,7 @@ impl ErrorExt for Error {
928942
NoLeader { .. } => StatusCode::TableUnavailable,
929943
ValueNotExist { .. }
930944
| ProcedurePoisonConflict { .. }
945+
| MissingColumnIds { .. }
931946
| MissingColumnInColumnMetadata { .. }
932947
| MismatchColumnId { .. } => StatusCode::Unexpected,
933948

@@ -1013,6 +1028,7 @@ impl ErrorExt for Error {
10131028
AbortProcedure { source, .. } => source.status_code(),
10141029
ConvertAlterTableRequest { source, .. } => source.status_code(),
10151030
PutPoison { source, .. } => source.status_code(),
1031+
ConvertColumnDef { source, .. } => source.status_code(),
10161032

10171033
ParseProcedureId { .. }
10181034
| InvalidNumTopics { .. }

src/common/meta/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ pub mod key;
3232
pub mod kv_backend;
3333
pub mod leadership_notifier;
3434
pub mod lock_key;
35-
pub mod maintenance;
3635
pub mod metrics;
3736
pub mod node_expiry_listener;
3837
pub mod node_manager;
3938
pub mod peer;
4039
pub mod poison_key;
4140
pub mod range_stream;
41+
pub mod reconciliation;
4242
pub mod region_keeper;
4343
pub mod region_registry;
4444
pub mod rpc;

src/common/meta/src/maintenance.rs

Lines changed: 0 additions & 15 deletions
This file was deleted.

src/common/meta/src/maintenance/reconcile_table.rs renamed to src/common/meta/src/reconciliation.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
// TODO(weny): Remove it
16+
#[allow(dead_code)]
17+
pub(crate) mod reconcile_table;
1518
// TODO(weny): Remove it
1619
#[allow(dead_code)]
1720
pub(crate) mod utils;

0 commit comments

Comments
 (0)