4242#include " cloud/cloud_warm_up_manager.h"
4343#include " cloud/config.h"
4444#include " cloud/pb_convert.h"
45- #include " cloud/schema_cloud_dictionary_cache.h"
4645#include " common/config.h"
4746#include " common/logging.h"
4847#include " common/status.h"
@@ -445,49 +444,6 @@ Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
445444 return Status::RpcError (" failed to {}: rpc timeout, last msg={}" , op_name, error_msg);
446445}
447446
448- Status fill_schema_with_dict (const RowsetMetaCloudPB& in, RowsetMetaPB* out,
449- const SchemaCloudDictionary& dict) {
450- std::unordered_map<int32_t , ColumnPB*> unique_id_map;
451- // init map
452- for (ColumnPB& column : *out->mutable_tablet_schema ()->mutable_column ()) {
453- unique_id_map[column.unique_id ()] = &column;
454- }
455- // column info
456- for (int i = 0 ; i < in.schema_dict_key_list ().column_dict_key_list_size (); ++i) {
457- int dict_key = in.schema_dict_key_list ().column_dict_key_list (i);
458- if (dict.column_dict ().find (dict_key) == dict.column_dict ().end ()) {
459- return Status::NotFound (" Not found entry {}" , dict_key);
460- }
461- const ColumnPB& dict_val = dict.column_dict ().at (dict_key);
462- ColumnPB& to_add = *out->mutable_tablet_schema ()->add_column ();
463- to_add = dict_val;
464- VLOG_DEBUG << " fill dict column " << dict_val.ShortDebugString ();
465- }
466-
467- // index info
468- for (int i = 0 ; i < in.schema_dict_key_list ().index_info_dict_key_list_size (); ++i) {
469- int dict_key = in.schema_dict_key_list ().index_info_dict_key_list (i);
470- if (dict.index_dict ().find (dict_key) == dict.index_dict ().end ()) {
471- return Status::NotFound (" Not found entry {}" , dict_key);
472- }
473- const doris::TabletIndexPB& dict_val = dict.index_dict ().at (dict_key);
474- *out->mutable_tablet_schema ()->add_index () = dict_val;
475- VLOG_DEBUG << " fill dict index " << dict_val.ShortDebugString ();
476- }
477-
478- // sparse column info
479- for (int i = 0 ; i < in.schema_dict_key_list ().sparse_column_dict_key_list_size (); ++i) {
480- int dict_key = in.schema_dict_key_list ().sparse_column_dict_key_list (i);
481- if (dict.column_dict ().find (dict_key) == dict.column_dict ().end ()) {
482- return Status::NotFound (" Not found entry {}" , dict_key);
483- }
484- const ColumnPB& dict_val = dict.column_dict ().at (dict_key);
485- *unique_id_map.at (dict_val.parent_unique_id ())->add_sparse_columns () = dict_val;
486- VLOG_DEBUG << " fill dict sparse column" << dict_val.ShortDebugString ();
487- }
488- return Status::OK ();
489- }
490-
491447} // namespace
492448
493449Status CloudMetaMgr::get_tablet_meta (int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) {
@@ -562,10 +518,6 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
562518 req.set_cumulative_point (tablet->cumulative_layer_point ());
563519 }
564520 req.set_end_version (-1 );
565- // backend side use schema dict in cache if enable cloud schema dict cache
566- req.set_schema_op (config::variant_use_cloud_schema_dict_cache
567- ? GetRowsetRequest::NO_DICT
568- : GetRowsetRequest::RETURN_DICT);
569521 VLOG_DEBUG << " send GetRowsetRequest: " << req.ShortDebugString ();
570522 auto start = std::chrono::steady_clock::now ();
571523 stub->get_rowset (&cntl, &req, &resp, nullptr );
@@ -766,30 +718,7 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
766718 existed_rowset->rowset_id ().to_string () == cloud_rs_meta_pb.rowset_id_v2 ()) {
767719 continue ; // Same rowset, skip it
768720 }
769- RowsetMetaPB meta_pb;
770- // Check if the rowset meta contains a schema dictionary key list.
771- if (cloud_rs_meta_pb.has_schema_dict_key_list () && !resp.has_schema_dict ()) {
772- // Use the locally cached dictionary.
773- RowsetMetaCloudPB copied_cloud_rs_meta_pb = cloud_rs_meta_pb;
774- CloudStorageEngine& engine =
775- ExecEnv::GetInstance ()->storage_engine ().to_cloud ();
776- {
777- wlock.unlock ();
778- RETURN_IF_ERROR (
779- engine.get_schema_cloud_dictionary_cache ()
780- .replace_dict_keys_to_schema (cloud_rs_meta_pb.index_id (),
781- &copied_cloud_rs_meta_pb));
782- wlock.lock ();
783- }
784- meta_pb = cloud_rowset_meta_to_doris (copied_cloud_rs_meta_pb);
785- } else {
786- // Otherwise, use the schema dictionary from the response (if available).
787- meta_pb = cloud_rowset_meta_to_doris (cloud_rs_meta_pb);
788- if (resp.has_schema_dict ()) {
789- RETURN_IF_ERROR (fill_schema_with_dict (cloud_rs_meta_pb, &meta_pb,
790- resp.schema_dict ()));
791- }
792- }
721+ RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris (cloud_rs_meta_pb);
793722 auto rs_meta = std::make_shared<RowsetMeta>();
794723 rs_meta->init_from_pb (meta_pb);
795724 RowsetSharedPtr rowset;
@@ -810,9 +739,6 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
810739 tablet->add_rowsets (std::move (rowsets), version_overlap, wlock,
811740 options.warmup_delta_data ||
812741 config::enable_warmup_immediately_on_new_rowset);
813- if (options.merge_schema ) {
814- RETURN_IF_ERROR (tablet->merge_rowsets_schema ());
815- }
816742 }
817743 tablet->last_base_compaction_success_time_ms = stats.last_base_compaction_time_ms ();
818744 tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms ();
@@ -1052,21 +978,6 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_i
1052978
1053979 RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb ();
1054980 doris_rowset_meta_to_cloud (req.mutable_rowset_meta (), std::move (rs_meta_pb));
1055- // Replace schema dictionary keys based on the rowset's index ID to maintain schema consistency.
1056- CloudStorageEngine& engine = ExecEnv::GetInstance ()->storage_engine ().to_cloud ();
1057- // if not enable dict cache, then directly return true to avoid refresh
1058- Status replaced_st =
1059- config::variant_use_cloud_schema_dict_cache
1060- ? engine.get_schema_cloud_dictionary_cache ().replace_schema_to_dict_keys (
1061- rs_meta_pb.index_id (), req.mutable_rowset_meta ())
1062- : Status::OK ();
1063- // if the replaced_st is not ok and alse not NotFound, then we need to just return the replaced_st
1064- VLOG_DEBUG << " replace schema to dict keys, replaced_st: " << replaced_st.to_string ()
1065- << " , replaced_st.is<ErrorCode::NOT_FOUND>(): "
1066- << replaced_st.is <ErrorCode::NOT_FOUND>();
1067- if (!replaced_st.ok () && !replaced_st.is <ErrorCode::NOT_FOUND>()) {
1068- return replaced_st;
1069- }
1070981 Status st = retry_rpc (" commit rowset" , req, &resp, &MetaService_Stub::commit_rowset);
1071982 if (!st.ok () && resp.status ().code () == MetaServiceCode::ALREADY_EXISTED) {
1072983 if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta ()) {
@@ -1077,14 +988,6 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_i
1077988 }
1078989 return Status::AlreadyExist (" failed to commit rowset: {}" , resp.status ().msg ());
1079990 }
1080- // If dictionary replacement fails, it may indicate that the local schema dictionary is outdated.
1081- // Refreshing the dictionary here ensures that the rowset metadata is updated with the latest schema definitions,
1082- // which is critical for maintaining consistency between the rowset and its corresponding schema.
1083- if (replaced_st.is <ErrorCode::NOT_FOUND>()) {
1084- RETURN_IF_ERROR (
1085- engine.get_schema_cloud_dictionary_cache ().refresh_dict (rs_meta_pb.index_id ()));
1086- }
1087-
1088991 int64_t timeout_ms = -1 ;
1089992 // if the `job_id` is not empty, it means this rowset was produced by a compaction job.
1090993 if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty ()) {
@@ -1267,12 +1170,14 @@ Status CloudMetaMgr::commit_restore_job(const int64_t tablet_id) {
12671170 return retry_rpc (" commit restore job" , req, &resp, &MetaService_Stub::commit_restore_job);
12681171}
12691172
1270- Status CloudMetaMgr::finish_restore_job (const int64_t tablet_id) {
1271- VLOG_DEBUG << " finish restore job, tablet_id: " << tablet_id;
1173+ Status CloudMetaMgr::finish_restore_job (const int64_t tablet_id, bool is_completed) {
1174+ VLOG_DEBUG << " finish restore job, tablet_id: " << tablet_id
1175+ << " , is_completed: " << is_completed;
12721176 RestoreJobRequest req;
12731177 RestoreJobResponse resp;
12741178 req.set_cloud_unique_id (config::cloud_unique_id);
12751179 req.set_tablet_id (tablet_id);
1180+ req.set_is_completed (is_completed);
12761181
12771182 return retry_rpc (" finish restore job" , req, &resp, &MetaService_Stub::finish_restore_job);
12781183}
@@ -1685,33 +1590,5 @@ int64_t CloudMetaMgr::get_inverted_index_file_szie(const RowsetMeta& rs_meta) {
16851590 return total_inverted_index_size;
16861591}
16871592
1688- Status CloudMetaMgr::get_schema_dict (int64_t index_id,
1689- std::shared_ptr<SchemaCloudDictionary>* schema_dict) {
1690- VLOG_DEBUG << " Sending GetSchemaDictRequest, index_id: " << index_id;
1691-
1692- // Create the request and response objects.
1693- GetSchemaDictRequest req;
1694- GetSchemaDictResponse resp;
1695- req.set_cloud_unique_id (config::cloud_unique_id);
1696- req.set_index_id (index_id);
1697-
1698- // Invoke RPC via the retry_rpc helper function.
1699- // It will call the MetaService_Stub::get_schema_dict method.
1700- Status st = retry_rpc (" get schema dict" , req, &resp, &MetaService_Stub::get_schema_dict);
1701- if (!st.ok ()) {
1702- return st;
1703- }
1704-
1705- // Optionally, additional checking of the response status can be done here.
1706- // For example, if the returned status code indicates a parsing or not found error,
1707- // you may return an error accordingly.
1708-
1709- // Copy the retrieved schema dictionary from the response.
1710- *schema_dict = std::make_shared<SchemaCloudDictionary>();
1711- (*schema_dict)->Swap (resp.mutable_schema_dict ());
1712- VLOG_DEBUG << " Successfully obtained schema dict, index_id: " << index_id;
1713- return Status::OK ();
1714- }
1715-
17161593#include " common/compile_check_end.h"
17171594} // namespace doris::cloud
0 commit comments