@@ -35,6 +35,7 @@ use databend_common_sql::Planner;
3535use databend_common_storage:: DEFAULT_HISTOGRAM_BUCKETS ;
3636use databend_common_storages_factory:: NavigationPoint ;
3737use databend_common_storages_factory:: Table ;
38+ use databend_common_storages_fuse:: operations:: AnalyzeLightMutator ;
3839use databend_common_storages_fuse:: operations:: HistogramInfoSink ;
3940use databend_common_storages_fuse:: FuseTable ;
4041use databend_storages_common_index:: Index ;
@@ -109,66 +110,71 @@ impl Interpreter for AnalyzeTableInterpreter {
109110 Err ( _) => return Ok ( PipelineBuildResult :: create ( ) ) ,
110111 } ;
111112
112- let r = table. read_table_snapshot ( ) . await ;
113- let snapshot_opt = match r {
114- Err ( e) => return Err ( e) ,
115- Ok ( v) => v,
113+ let Some ( snapshot) = table. read_table_snapshot ( ) . await ? else {
114+ return Ok ( PipelineBuildResult :: create ( ) ) ;
116115 } ;
117116
118- if let Some ( snapshot) = snapshot_opt {
119- // plan sql
120- let _table_info = table. get_table_info ( ) ;
117+ if !self . plan . full {
118+ let operator = table. get_operator ( ) ;
119+ let cluster_key_id = table. cluster_key_id ( ) ;
120+ let mut mutator =
121+ AnalyzeLightMutator :: create ( self . ctx . clone ( ) , operator, snapshot, cluster_key_id) ;
122+ mutator. target_select ( ) . await ?;
123+ mutator. try_commit ( table) . await ?;
124+ return Ok ( PipelineBuildResult :: create ( ) ) ;
125+ }
121126
122- let table_statistics = table
123- . read_table_snapshot_statistics ( Some ( & snapshot) )
124- . await ?;
127+ // plan sql
128+ let _table_info = table. get_table_info ( ) ;
125129
126- let ( is_full, temporal_str) = if let Some ( table_statistics) = & table_statistics {
127- let is_full = match table
128- . navigate_to_point (
129- & NavigationPoint :: SnapshotID (
130- table_statistics. snapshot_id . simple ( ) . to_string ( ) ,
131- ) ,
132- self . ctx . clone ( ) . get_abort_checker ( ) ,
133- )
130+ let table_statistics = table
131+ . read_table_snapshot_statistics ( Some ( & snapshot) )
132+ . await ?;
133+
134+ let ( is_full, temporal_str) = if let Some ( table_statistics) = & table_statistics {
135+ let is_full = match table
136+ . navigate_to_point (
137+ & NavigationPoint :: SnapshotID ( table_statistics. snapshot_id . simple ( ) . to_string ( ) ) ,
138+ self . ctx . clone ( ) . get_abort_checker ( ) ,
139+ )
140+ . await
141+ {
142+ Ok ( t) => !t
143+ . read_table_snapshot ( )
134144 . await
135- {
136- Ok ( t) => !t
137- . read_table_snapshot ( )
138- . await
139- . is_ok_and ( |s| s. is_some_and ( |s| s. prev_table_seq . is_some ( ) ) ) ,
140- Err ( _) => true ,
141- } ;
145+ . is_ok_and ( |s| s. is_some_and ( |s| s. prev_table_seq . is_some ( ) ) ) ,
146+ Err ( _) => true ,
147+ } ;
142148
143- let temporal_str = if is_full {
144- format ! ( "AT (snapshot => '{}')" , snapshot. snapshot_id. simple( ) )
145- } else {
146- // analyze only need to collect the added blocks.
147- let table_alias = format ! ( "_change_insert${:08x}" , Utc :: now( ) . timestamp( ) ) ;
148- format ! (
149+ let temporal_str = if is_full {
150+ format ! ( "AT (snapshot => '{}')" , snapshot. snapshot_id. simple( ) )
151+ } else {
152+ // analyze only need to collect the added blocks.
153+ let table_alias = format ! ( "_change_insert${:08x}" , Utc :: now( ) . timestamp( ) ) ;
154+ format ! (
149155 "CHANGES(INFORMATION => DEFAULT) AT (snapshot => '{}') END (snapshot => '{}') AS {table_alias}" ,
150156 table_statistics. snapshot_id. simple( ) ,
151157 snapshot. snapshot_id. simple( ) ,
152158 )
153- } ;
154- ( is_full, temporal_str)
155- } else {
156- (
157- true ,
158- format ! ( "AT (snapshot => '{}')" , snapshot. snapshot_id. simple( ) ) ,
159- )
160159 } ;
160+ ( is_full, temporal_str)
161+ } else {
162+ (
163+ true ,
164+ format ! ( "AT (snapshot => '{}')" , snapshot. snapshot_id. simple( ) ) ,
165+ )
166+ } ;
161167
162- let quote = self
163- . ctx
164- . get_settings ( )
165- . get_sql_dialect ( ) ?
166- . default_ident_quote ( ) ;
168+ let quote = self
169+ . ctx
170+ . get_settings ( )
171+ . get_sql_dialect ( ) ?
172+ . default_ident_quote ( ) ;
167173
168- // 0.01625 --> 12 buckets --> 4K size per column
169- // 1.04 / math.sqrt(1<<12) --> 0.01625
170- const DISTINCT_ERROR_RATE : f64 = 0.01625 ;
171- let ndv_select_expr = snapshot
174+ // 0.01625 --> 12 buckets --> 4K size per column
175+ // 1.04 / math.sqrt(1<<12) --> 0.01625
176+ const DISTINCT_ERROR_RATE : f64 = 0.01625 ;
177+ let ndv_select_expr = snapshot
172178 . schema
173179 . fields ( )
174180 . iter ( )
@@ -182,22 +188,22 @@ impl Interpreter for AnalyzeTableInterpreter {
182188 } )
183189 . join ( ", " ) ;
184190
185- let sql = format ! (
186- "SELECT {ndv_select_expr}, {is_full} as is_full from {}.{} {temporal_str}" ,
187- plan. database, plan. table,
188- ) ;
191+ let sql = format ! (
192+ "SELECT {ndv_select_expr}, {is_full} as is_full from {}.{} {temporal_str}" ,
193+ plan. database, plan. table,
194+ ) ;
189195
190- info ! ( "Analyze via sql: {sql}" ) ;
196+ info ! ( "Analyze via sql: {sql}" ) ;
191197
192- let ( physical_plan, bind_context) = self . plan_sql ( sql) . await ?;
193- let mut build_res =
194- build_query_pipeline_without_render_result_set ( & self . ctx , & physical_plan) . await ?;
195- // After profiling, computing histogram is heavy and the bottleneck is window function(90%).
196- // It's possible to OOM if the table is too large and spilling isn't enabled.
197- // We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed).
198- let mut histogram_info_receivers = HashMap :: new ( ) ;
199- if self . ctx . get_settings ( ) . get_enable_analyze_histogram ( ) ? {
200- let histogram_sqls = table
198+ let ( physical_plan, bind_context) = self . plan_sql ( sql) . await ?;
199+ let mut build_res =
200+ build_query_pipeline_without_render_result_set ( & self . ctx , & physical_plan) . await ?;
201+ // After profiling, computing histogram is heavy and the bottleneck is window function(90%).
202+ // It's possible to OOM if the table is too large and spilling isn't enabled.
203+ // We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed).
204+ let mut histogram_info_receivers = HashMap :: new ( ) ;
205+ if self . ctx . get_settings ( ) . get_enable_analyze_histogram ( ) ? {
206+ let histogram_sqls = table
201207 . schema ( )
202208 . fields ( )
203209 . iter ( )
@@ -222,50 +228,47 @@ impl Interpreter for AnalyzeTableInterpreter {
222228 )
223229 } )
224230 . collect :: < Vec < _ > > ( ) ;
225- for ( sql, col_id) in histogram_sqls. into_iter ( ) {
226- info ! ( "Analyze histogram via sql: {sql}" ) ;
227- let ( mut histogram_plan, bind_context) = self . plan_sql ( sql) . await ?;
228- if !self . ctx . get_cluster ( ) . is_empty ( ) {
229- histogram_plan = remove_exchange ( histogram_plan) ;
230- }
231- let mut histogram_build_res = build_query_pipeline (
232- & QueryContext :: create_from ( self . ctx . as_ref ( ) ) ,
233- & bind_context. columns ,
234- & histogram_plan,
235- false ,
236- )
237- . await ?;
238- let ( tx, rx) = async_channel:: unbounded ( ) ;
239- histogram_build_res. main_pipeline . add_sink ( |input_port| {
240- Ok ( ProcessorPtr :: create ( HistogramInfoSink :: create (
241- Some ( tx. clone ( ) ) ,
242- input_port. clone ( ) ,
243- ) ) )
244- } ) ?;
245-
246- build_res
247- . sources_pipelines
248- . push ( histogram_build_res. main_pipeline . finalize ( None ) ) ;
249- build_res
250- . sources_pipelines
251- . extend ( histogram_build_res. sources_pipelines ) ;
252- histogram_info_receivers. insert ( col_id, rx) ;
231+ for ( sql, col_id) in histogram_sqls. into_iter ( ) {
232+ info ! ( "Analyze histogram via sql: {sql}" ) ;
233+ let ( mut histogram_plan, bind_context) = self . plan_sql ( sql) . await ?;
234+ if !self . ctx . get_cluster ( ) . is_empty ( ) {
235+ histogram_plan = remove_exchange ( histogram_plan) ;
253236 }
237+ let mut histogram_build_res = build_query_pipeline (
238+ & QueryContext :: create_from ( self . ctx . as_ref ( ) ) ,
239+ & bind_context. columns ,
240+ & histogram_plan,
241+ false ,
242+ )
243+ . await ?;
244+ let ( tx, rx) = async_channel:: unbounded ( ) ;
245+ histogram_build_res. main_pipeline . add_sink ( |input_port| {
246+ Ok ( ProcessorPtr :: create ( HistogramInfoSink :: create (
247+ Some ( tx. clone ( ) ) ,
248+ input_port. clone ( ) ,
249+ ) ) )
250+ } ) ?;
251+
252+ build_res
253+ . sources_pipelines
254+ . push ( histogram_build_res. main_pipeline . finalize ( None ) ) ;
255+ build_res
256+ . sources_pipelines
257+ . extend ( histogram_build_res. sources_pipelines ) ;
258+ histogram_info_receivers. insert ( col_id, rx) ;
254259 }
255- FuseTable :: do_analyze (
256- self . ctx . clone ( ) ,
257- bind_context. output_schema ( ) ,
258- & self . plan . catalog ,
259- & self . plan . database ,
260- & self . plan . table ,
261- snapshot. snapshot_id ,
262- & mut build_res. main_pipeline ,
263- histogram_info_receivers,
264- ) ?;
265- return Ok ( build_res) ;
266260 }
267-
268- return Ok ( PipelineBuildResult :: create ( ) ) ;
261+ FuseTable :: do_analyze (
262+ self . ctx . clone ( ) ,
263+ bind_context. output_schema ( ) ,
264+ & self . plan . catalog ,
265+ & self . plan . database ,
266+ & self . plan . table ,
267+ snapshot. snapshot_id ,
268+ & mut build_res. main_pipeline ,
269+ histogram_info_receivers,
270+ ) ?;
271+ Ok ( build_res)
269272 }
270273}
271274
0 commit comments