@@ -35,6 +35,7 @@ use databend_common_sql::Planner;
3535use databend_common_storage:: DEFAULT_HISTOGRAM_BUCKETS ;
3636use databend_common_storages_factory:: NavigationPoint ;
3737use databend_common_storages_factory:: Table ;
38+ use databend_common_storages_fuse:: operations:: AnalyzeLightMutator ;
3839use databend_common_storages_fuse:: operations:: HistogramInfoSink ;
3940use databend_common_storages_fuse:: FuseTable ;
4041use databend_storages_common_index:: Index ;
@@ -109,66 +110,77 @@ impl Interpreter for AnalyzeTableInterpreter {
109110 Err ( _) => return Ok ( PipelineBuildResult :: create ( ) ) ,
110111 } ;
111112
112- let r = table. read_table_snapshot ( ) . await ;
113- let snapshot_opt = match r {
114- Err ( e) => return Err ( e) ,
115- Ok ( v) => v,
113+ let Some ( snapshot) = table. read_table_snapshot ( ) . await ? else {
114+ return Ok ( PipelineBuildResult :: create ( ) ) ;
116115 } ;
117116
118- if let Some ( snapshot) = snapshot_opt {
119- // plan sql
120- let _table_info = table. get_table_info ( ) ;
117+ if self . plan . no_scan {
118+ let operator = table. get_operator ( ) ;
119+ let cluster_key_id = table. cluster_key_id ( ) ;
120+ let table_meta_timestamps = self
121+ . ctx
122+ . get_table_meta_timestamps ( table, Some ( snapshot. clone ( ) ) ) ?;
123+ let mut mutator = AnalyzeLightMutator :: create (
124+ self . ctx . clone ( ) ,
125+ operator,
126+ snapshot,
127+ cluster_key_id,
128+ table_meta_timestamps,
129+ ) ;
130+ mutator. target_select ( ) . await ?;
131+ mutator. try_commit ( table) . await ?;
132+ return Ok ( PipelineBuildResult :: create ( ) ) ;
133+ }
121134
122- let table_statistics = table
123- . read_table_snapshot_statistics ( Some ( & snapshot) )
124- . await ?;
135+ let table_statistics = table
136+ . read_table_snapshot_statistics ( Some ( & snapshot) )
137+ . await ?;
125138
126- let ( is_full, temporal_str) = if let Some ( table_statistics) = & table_statistics {
127- let is_full = match table
128- . navigate_to_point (
129- & NavigationPoint :: SnapshotID (
130- table_statistics. snapshot_id . simple ( ) . to_string ( ) ,
131- ) ,
132- self . ctx . clone ( ) . get_abort_checker ( ) ,
133- )
139+ // plan sql
140+ let ( is_full, temporal_str) = if let Some ( table_statistics) = & table_statistics {
141+ let is_full = match table
142+ . navigate_to_point (
143+ & NavigationPoint :: SnapshotID ( table_statistics. snapshot_id . simple ( ) . to_string ( ) ) ,
144+ self . ctx . clone ( ) . get_abort_checker ( ) ,
145+ )
146+ . await
147+ {
148+ Ok ( t) => !t
149+ . read_table_snapshot ( )
134150 . await
135- {
136- Ok ( t) => !t
137- . read_table_snapshot ( )
138- . await
139- . is_ok_and ( |s| s. is_some_and ( |s| s. prev_table_seq . is_some ( ) ) ) ,
140- Err ( _) => true ,
141- } ;
151+ . is_ok_and ( |s| s. is_some_and ( |s| s. prev_table_seq . is_some ( ) ) ) ,
152+ Err ( _) => true ,
153+ } ;
142154
143- let temporal_str = if is_full {
144- format ! ( "AT (snapshot => '{}')" , snapshot. snapshot_id. simple( ) )
145- } else {
146- // analyze only need to collect the added blocks.
147- let table_alias = format ! ( "_change_insert${:08x}" , Utc :: now( ) . timestamp( ) ) ;
148- format ! (
155+ let temporal_str = if is_full {
156+ format ! ( "AT (snapshot => '{}')" , snapshot. snapshot_id. simple( ) )
157+ } else {
158+ // analyze only need to collect the added blocks.
159+ let table_alias = format ! ( "_change_insert${:08x}" , Utc :: now( ) . timestamp( ) ) ;
160+ format ! (
149161 "CHANGES(INFORMATION => DEFAULT) AT (snapshot => '{}') END (snapshot => '{}') AS {table_alias}" ,
150162 table_statistics. snapshot_id. simple( ) ,
151163 snapshot. snapshot_id. simple( ) ,
152164 )
153- } ;
154- ( is_full, temporal_str)
155- } else {
156- (
157- true ,
158- format ! ( "AT (snapshot => '{}')" , snapshot. snapshot_id. simple( ) ) ,
159- )
160165 } ;
166+ ( is_full, temporal_str)
167+ } else {
168+ (
169+ true ,
170+ format ! ( "AT (snapshot => '{}')" , snapshot. snapshot_id. simple( ) ) ,
171+ )
172+ } ;
161173
162- let quote = self
163- . ctx
164- . get_settings ( )
165- . get_sql_dialect ( ) ?
166- . default_ident_quote ( ) ;
174+ let quote = self
175+ . ctx
176+ . get_settings ( )
177+ . get_sql_dialect ( ) ?
178+ . default_ident_quote ( ) ;
167179
168- // 0.01625 --> 12 buckets --> 4K size per column
169- // 1.04 / math.sqrt(1<<12) --> 0.01625
170- const DISTINCT_ERROR_RATE : f64 = 0.01625 ;
171- let ndv_select_expr = snapshot
180+ // 0.01625 --> 12 buckets --> 4K size per column
181+ // 1.04 / math.sqrt(1<<12) --> 0.01625
182+ const DISTINCT_ERROR_RATE : f64 = 0.01625 ;
183+ let ndv_select_expr = snapshot
172184 . schema
173185 . fields ( )
174186 . iter ( )
@@ -182,22 +194,22 @@ impl Interpreter for AnalyzeTableInterpreter {
182194 } )
183195 . join ( ", " ) ;
184196
185- let sql = format ! (
186- "SELECT {ndv_select_expr}, {is_full} as is_full from {}.{} {temporal_str}" ,
187- plan. database, plan. table,
188- ) ;
197+ let sql = format ! (
198+ "SELECT {ndv_select_expr}, {is_full} as is_full from {}.{} {temporal_str}" ,
199+ plan. database, plan. table,
200+ ) ;
189201
190- info ! ( "Analyze via sql: {sql}" ) ;
202+ info ! ( "Analyze via sql: {sql}" ) ;
191203
192- let ( physical_plan, bind_context) = self . plan_sql ( sql) . await ?;
193- let mut build_res =
194- build_query_pipeline_without_render_result_set ( & self . ctx , & physical_plan) . await ?;
195- // After profiling, computing histogram is heavy and the bottleneck is window function(90%).
196- // It's possible to OOM if the table is too large and spilling isn't enabled.
197- // We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed).
198- let mut histogram_info_receivers = HashMap :: new ( ) ;
199- if self . ctx . get_settings ( ) . get_enable_analyze_histogram ( ) ? {
200- let histogram_sqls = table
204+ let ( physical_plan, bind_context) = self . plan_sql ( sql) . await ?;
205+ let mut build_res =
206+ build_query_pipeline_without_render_result_set ( & self . ctx , & physical_plan) . await ?;
207+ // After profiling, computing histogram is heavy and the bottleneck is window function(90%).
208+ // It's possible to OOM if the table is too large and spilling isn't enabled.
209+ // We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed).
210+ let mut histogram_info_receivers = HashMap :: new ( ) ;
211+ if self . ctx . get_settings ( ) . get_enable_analyze_histogram ( ) ? {
212+ let histogram_sqls = table
201213 . schema ( )
202214 . fields ( )
203215 . iter ( )
@@ -222,50 +234,47 @@ impl Interpreter for AnalyzeTableInterpreter {
222234 )
223235 } )
224236 . collect :: < Vec < _ > > ( ) ;
225- for ( sql, col_id) in histogram_sqls. into_iter ( ) {
226- info ! ( "Analyze histogram via sql: {sql}" ) ;
227- let ( mut histogram_plan, bind_context) = self . plan_sql ( sql) . await ?;
228- if !self . ctx . get_cluster ( ) . is_empty ( ) {
229- histogram_plan = remove_exchange ( histogram_plan) ;
230- }
231- let mut histogram_build_res = build_query_pipeline (
232- & QueryContext :: create_from ( self . ctx . as_ref ( ) ) ,
233- & bind_context. columns ,
234- & histogram_plan,
235- false ,
236- )
237- . await ?;
238- let ( tx, rx) = async_channel:: unbounded ( ) ;
239- histogram_build_res. main_pipeline . add_sink ( |input_port| {
240- Ok ( ProcessorPtr :: create ( HistogramInfoSink :: create (
241- Some ( tx. clone ( ) ) ,
242- input_port. clone ( ) ,
243- ) ) )
244- } ) ?;
245-
246- build_res
247- . sources_pipelines
248- . push ( histogram_build_res. main_pipeline . finalize ( None ) ) ;
249- build_res
250- . sources_pipelines
251- . extend ( histogram_build_res. sources_pipelines ) ;
252- histogram_info_receivers. insert ( col_id, rx) ;
237+ for ( sql, col_id) in histogram_sqls. into_iter ( ) {
238+ info ! ( "Analyze histogram via sql: {sql}" ) ;
239+ let ( mut histogram_plan, bind_context) = self . plan_sql ( sql) . await ?;
240+ if !self . ctx . get_cluster ( ) . is_empty ( ) {
241+ histogram_plan = remove_exchange ( histogram_plan) ;
253242 }
243+ let mut histogram_build_res = build_query_pipeline (
244+ & QueryContext :: create_from ( self . ctx . as_ref ( ) ) ,
245+ & bind_context. columns ,
246+ & histogram_plan,
247+ false ,
248+ )
249+ . await ?;
250+ let ( tx, rx) = async_channel:: unbounded ( ) ;
251+ histogram_build_res. main_pipeline . add_sink ( |input_port| {
252+ Ok ( ProcessorPtr :: create ( HistogramInfoSink :: create (
253+ Some ( tx. clone ( ) ) ,
254+ input_port. clone ( ) ,
255+ ) ) )
256+ } ) ?;
257+
258+ build_res
259+ . sources_pipelines
260+ . push ( histogram_build_res. main_pipeline . finalize ( None ) ) ;
261+ build_res
262+ . sources_pipelines
263+ . extend ( histogram_build_res. sources_pipelines ) ;
264+ histogram_info_receivers. insert ( col_id, rx) ;
254265 }
255- FuseTable :: do_analyze (
256- self . ctx . clone ( ) ,
257- bind_context. output_schema ( ) ,
258- & self . plan . catalog ,
259- & self . plan . database ,
260- & self . plan . table ,
261- snapshot. snapshot_id ,
262- & mut build_res. main_pipeline ,
263- histogram_info_receivers,
264- ) ?;
265- return Ok ( build_res) ;
266266 }
267-
268- return Ok ( PipelineBuildResult :: create ( ) ) ;
267+ FuseTable :: do_analyze (
268+ self . ctx . clone ( ) ,
269+ bind_context. output_schema ( ) ,
270+ & self . plan . catalog ,
271+ & self . plan . database ,
272+ & self . plan . table ,
273+ snapshot. snapshot_id ,
274+ & mut build_res. main_pipeline ,
275+ histogram_info_receivers,
276+ ) ?;
277+ Ok ( build_res)
269278 }
270279}
271280
0 commit comments