@@ -18,7 +18,7 @@ use uffs_core::tree::add_tree_columns;
1818use uffs_core:: { MftQuery , export_csv, export_json, export_table} ;
1919use uffs_mft:: { MftProgress , MftReader , load_raw_mft_header} ;
2020#[ cfg( windows) ]
21- use uffs_mft:: { MultiDriveMftReader , SaveRawOptions } ;
21+ use uffs_mft:: SaveRawOptions ;
2222
2323/// Search for files matching a pattern.
2424///
@@ -58,10 +58,7 @@ pub async fn search(
5858 . with_context ( || format ! ( "Invalid pattern: {pattern}" ) ) ?
5959 . with_case_sensitive ( case_sensitive) ;
6060
61- // Load data from index, multi-drive, single drive, or all drives (default)
62- let df = load_search_data ( index, multi_drives, single_drive, & parsed) . await ?;
63-
64- // Build and execute query
61+ // Build filters struct for reuse
6562 let filters = QueryFilters {
6663 parsed : & parsed,
6764 ext_filter,
@@ -71,7 +68,9 @@ pub async fn search(
7168 max_size,
7269 limit,
7370 } ;
74- let mut results = execute_query ( df, & filters) ?;
71+
72+ // Load and filter data - for multi-drive, filter per-drive to reduce memory
73+ let mut results = load_and_filter_data ( index, multi_drives, single_drive, & filters) . await ?;
7574
7675 // Build output configuration
7776 let output_config = OutputConfig :: new ( )
@@ -97,34 +96,39 @@ pub async fn search(
9796 Ok ( ( ) )
9897}
9998
100- /// Load search data from index file, multiple drives, single drive, or all NTFS
101- /// drives.
99+ /// Load and filter search data from index file, multiple drives, single drive,
100+ /// or all NTFS drives.
101+ ///
102+ /// For multi-drive searches, applies filters per-drive to reduce memory usage.
103+ /// This prevents OOM errors when searching many drives with millions of files.
102104#[ allow( clippy:: single_call_fn) ] // Extracted to reduce search() line count below clippy::too_many_lines limit
103- async fn load_search_data (
105+ async fn load_and_filter_data (
104106 index : Option < std:: path:: PathBuf > ,
105107 multi_drives : Option < Vec < char > > ,
106108 single_drive : Option < char > ,
107- parsed : & ParsedPattern ,
109+ filters : & QueryFilters < ' _ > ,
108110) -> Result < uffs_mft:: DataFrame > {
109111 if let Some ( index_path) = index {
110- // Load from pre-built index
111- return MftReader :: load_parquet ( & index_path)
112- . with_context ( || format ! ( "Failed to load index: {}" , index_path. display( ) ) ) ;
112+ // Load from pre-built index and filter
113+ let df = MftReader :: load_parquet ( & index_path)
114+ . with_context ( || format ! ( "Failed to load index: {}" , index_path. display( ) ) ) ?;
115+ return execute_query ( df, filters) ;
113116 }
114117
115118 if let Some ( drives) = multi_drives {
116- // Multi-drive concurrent search (explicit )
117- return search_multi_drive ( & drives) . await ;
119+ // Multi-drive search with per-drive filtering (memory efficient )
120+ return search_multi_drive_filtered ( & drives, filters ) . await ;
118121 }
119122
120123 // Check for single drive: CLI flag overrides pattern-embedded drive
121- let effective_drive = single_drive. or_else ( || parsed. drive ( ) ) ;
124+ let effective_drive = single_drive. or_else ( || filters . parsed . drive ( ) ) ;
122125 if let Some ( drive_letter) = effective_drive {
123126 // Single drive search
124127 let reader = MftReader :: open ( drive_letter)
125128 . await
126129 . with_context ( || format ! ( "Failed to open drive {drive_letter}:" ) ) ?;
127- return Ok ( reader. read_all ( ) . await ?) ;
130+ let df = reader. read_all ( ) . await ?;
131+ return execute_query ( df, filters) ;
128132 }
129133
130134 // No drive specified - search ALL available NTFS drives
@@ -144,7 +148,7 @@ async fn load_search_data(
144148 bail ! ( "No NTFS drives found on this system" ) ;
145149 }
146150 info ! ( drives = ?all_drives, count = all_drives. len( ) , "No drive specified - searching all NTFS drives" ) ;
147- search_multi_drive ( & all_drives) . await
151+ search_multi_drive_filtered ( & all_drives, filters ) . await
148152 }
149153 #[ cfg( not( windows) ) ]
150154 {
@@ -246,66 +250,143 @@ fn write_results(
246250 Ok ( ( ) )
247251}
248252
249- /// Search multiple drives concurrently.
253+ /// Search multiple drives sequentially with per-drive filtering.
254+ ///
255+ /// This approach processes one drive at a time and applies filters immediately,
256+ /// keeping only matching results in memory. This prevents OOM errors when
257+ /// searching many drives with millions of files.
250258#[ cfg( windows) ]
251- async fn search_multi_drive ( drives : & [ char ] ) -> Result < uffs_mft:: DataFrame > {
259+ async fn search_multi_drive_filtered (
260+ drives : & [ char ] ,
261+ filters : & QueryFilters < ' _ > ,
262+ ) -> Result < uffs_mft:: DataFrame > {
252263 use indicatif:: MultiProgress ;
264+ use uffs_mft:: { IntoLazy , col, lit} ;
253265
254266 if drives. is_empty ( ) {
255267 bail ! ( "No drives specified for multi-drive search" ) ;
256268 }
257269
258- info ! ( count = drives. len( ) , "Searching drives concurrently" ) ;
270+ info ! (
271+ count = drives. len( ) ,
272+ "Searching drives sequentially (memory-efficient mode)"
273+ ) ;
259274
260- let reader = MultiDriveMftReader :: new ( drives. to_vec ( ) ) ;
261275 let multi_progress = MultiProgress :: new ( ) ;
276+ let mut filtered_results: Vec < uffs_mft:: DataFrame > = Vec :: new ( ) ;
277+ let mut total_matches = 0usize ;
262278
263- // Create progress bars for each drive
264- let mut progress_bars = std :: collections :: HashMap :: new ( ) ;
265- for & drive_char in reader . drives ( ) {
266- let pb = multi_progress. add ( ProgressBar :: new ( 0 ) ) ;
279+ // Process drives sequentially to limit memory usage
280+ for & drive_char in drives {
281+ // Create progress bar for this drive (wrapped in Arc for closure sharing)
282+ let pb = std :: sync :: Arc :: new ( multi_progress. add ( ProgressBar :: new ( 0 ) ) ) ;
267283 let style = ProgressStyle :: default_bar ( )
268284 . template ( & format ! (
269285 "{{spinner:.green}} [{drive_char}:] [{{elapsed_precise}}] [{{bar:30.cyan/blue}}] {{pos}}/{{len}} ({{eta}})"
270286 ) )
271287 . context ( "Invalid progress bar template" ) ?
272288 . progress_chars ( "#>-" ) ;
273289 pb. set_style ( style) ;
274- progress_bars. insert ( drive_char, pb) ;
275- }
276290
277- let progress_bars = std:: sync:: Arc :: new ( progress_bars) ;
278- let pbs = progress_bars. clone ( ) ;
291+ // Read this drive
292+ let reader = match MftReader :: open ( drive_char) . await {
293+ Ok ( r) => r,
294+ Err ( e) => {
295+ pb. finish_with_message ( format ! ( "Error: {e}" ) ) ;
296+ info ! ( drive = %drive_char, error = %e, "Skipping drive due to error" ) ;
297+ continue ;
298+ }
299+ } ;
279300
280- let df = reader
281- . read_with_progress ( move |drive_char , progress| {
282- if let Some ( pb ) = pbs . get ( & drive_char ) {
301+ let pb_clone = pb . clone ( ) ;
302+ let df = match reader
303+ . read_with_progress ( move |progress| {
283304 if let Some ( total) = progress. total_records {
284- pb . set_length ( total) ;
305+ pb_clone . set_length ( total) ;
285306 }
286- pb. set_position ( progress. records_read ) ;
307+ pb_clone. set_position ( progress. records_read ) ;
308+ } )
309+ . await
310+ {
311+ Ok ( df) => df,
312+ Err ( e) => {
313+ pb. finish_with_message ( format ! ( "Error: {e}" ) ) ;
314+ info ! ( drive = %drive_char, error = %e, "Skipping drive due to read error" ) ;
315+ continue ;
287316 }
288- } )
289- . await
290- . context ( "Failed to read MFTs from drives" ) ?;
317+ } ;
291318
292- for pb in progress_bars . values ( ) {
319+ let records_read = df . height ( ) ;
293320 pb. finish ( ) ;
321+
322+ // Apply filters immediately to reduce memory
323+ let filtered = execute_query ( df, filters) ?;
324+ let matches = filtered. height ( ) ;
325+ total_matches += matches;
326+
327+ info ! (
328+ drive = %drive_char,
329+ records = records_read,
330+ matches = matches,
331+ "Drive processed"
332+ ) ;
333+
334+ if matches > 0 {
335+ // Add drive column and store filtered results
336+ let df_with_drive = filtered
337+ . lazy ( )
338+ . with_column ( lit ( format ! ( "{drive_char}:" ) ) . alias ( "drive" ) )
339+ . collect ( )
340+ . context ( "Failed to add drive column" ) ?;
341+ filtered_results. push ( df_with_drive) ;
342+ }
343+ }
344+
345+ if filtered_results. is_empty ( ) {
346+ // Return empty DataFrame with correct schema
347+ bail ! ( "No matching files found across {} drives" , drives. len( ) ) ;
348+ }
349+
350+ // Concatenate filtered results (much smaller than full data)
351+ let mut result = filtered_results. remove ( 0 ) ;
352+ for df in filtered_results {
353+ result = result. vstack ( & df) . context ( "Failed to merge results" ) ?;
294354 }
295355
356+ // Reorder columns to put "drive" first
357+ let column_names: Vec < String > = result
358+ . get_column_names ( )
359+ . into_iter ( )
360+ . filter ( |c| c. as_str ( ) != "drive" )
361+ . map ( |c| c. to_string ( ) )
362+ . collect ( ) ;
363+ let columns: Vec < _ > = std:: iter:: once ( "drive" . to_string ( ) )
364+ . chain ( column_names)
365+ . map ( |s| col ( & s) )
366+ . collect ( ) ;
367+
368+ let result = result
369+ . lazy ( )
370+ . select ( columns)
371+ . collect ( )
372+ . context ( "Failed to reorder columns" ) ?;
373+
296374 info ! (
297- records = df . height ( ) ,
375+ total_matches = total_matches ,
298376 drives = drives. len( ) ,
299- "Read records from drives "
377+ "Multi-drive search complete "
300378 ) ;
301- Ok ( df)
379+
380+ Ok ( result)
302381}
303382
304383/// Stub for non-Windows platforms.
305384#[ cfg( not( windows) ) ]
306- // Platform-specific stub must match Windows signature; called once per platform is expected.
307385#[ allow( clippy:: unused_async, clippy:: single_call_fn) ]
308- async fn search_multi_drive ( _drives : & [ char ] ) -> Result < uffs_mft:: DataFrame > {
386+ async fn search_multi_drive_filtered (
387+ _drives : & [ char ] ,
388+ _filters : & QueryFilters < ' _ > ,
389+ ) -> Result < uffs_mft:: DataFrame > {
309390 bail ! ( "Multi-drive search is only supported on Windows" )
310391}
311392
0 commit comments