@@ -2620,6 +2620,260 @@ impl ParallelMftReader {
26202620 }
26212621 }
26222622
2623+ /// Reads all MFT records using bulk I/O (C++ style: read all, then parse).
2624+ ///
2625+ /// This method pre-allocates a single buffer for the entire MFT and reads
2626+ /// each extent directly into it, eliminating per-chunk allocations and
2627+ /// copies. This matches the C++ "tsunami" pattern for maximum I/O
2628+ /// throughput.
2629+ ///
2630+ /// # Performance
2631+ ///
2632+ /// - Single allocation for entire MFT (~11GB for large drives)
2633+ /// - Zero intermediate copies during I/O phase
2634+ /// - Continuous sequential reads without CPU interruption
2635+ /// - Parallel parsing after all I/O completes
2636+ ///
2637+ /// # Arguments
2638+ ///
2639+ /// * `handle` - Windows file handle to the MFT
2640+ /// * `merge_extensions` - If true, merge extension records
2641+ /// * `progress_callback` - Optional callback for progress reporting
2642+ ///
2643+ /// # Returns
2644+ ///
2645+ /// Vector of parsed records.
2646+ #[ allow( unsafe_code) ]
2647+ pub fn read_all_bulk < F > (
2648+ & self ,
2649+ handle : HANDLE ,
2650+ merge_extensions : bool ,
2651+ progress_callback : Option < F > ,
2652+ ) -> Result < Vec < ParsedRecord > >
2653+ where
2654+ F : Fn ( u64 , u64 ) ,
2655+ {
2656+ use rayon:: prelude:: * ;
2657+
2658+ let record_size = self . extent_map . bytes_per_record as usize ;
2659+ let total_records = self . extent_map . total_records ( ) as usize ;
2660+ let total_bytes = total_records * record_size;
2661+
2662+ info ! (
2663+ total_records,
2664+ total_bytes_mb = total_bytes / ( 1024 * 1024 ) ,
2665+ "🚀 Starting bulk MFT read (C++ style: read all, then parse)"
2666+ ) ;
2667+
2668+ // Phase 1: Allocate single buffer for entire MFT
2669+ let alloc_start = std:: time:: Instant :: now ( ) ;
2670+ let mut mft_buffer = AlignedBuffer :: new ( total_bytes) ;
2671+ info ! (
2672+ alloc_ms = alloc_start. elapsed( ) . as_millis( ) ,
2673+ "📦 Allocated MFT buffer"
2674+ ) ;
2675+
2676+ // Phase 2: Read all extents directly into buffer (pure I/O, no parsing)
2677+ let read_start = std:: time:: Instant :: now ( ) ;
2678+ let mut bytes_read_total: u64 = 0 ;
2679+
2680+ for extent in self . extent_map . extents ( ) {
2681+ if extent. lcn < 0 {
2682+ // Sparse extent - leave as zeros
2683+ continue ;
2684+ }
2685+
2686+ // Calculate where this extent goes in the buffer
2687+ let records_per_cluster =
2688+ self . extent_map . bytes_per_cluster / self . extent_map . bytes_per_record ;
2689+ let extent_start_frs = extent. vcn as u64 * records_per_cluster as u64 ;
2690+ let extent_records = extent. cluster_count * records_per_cluster as u64 ;
2691+ let buffer_offset = extent_start_frs as usize * record_size;
2692+ let extent_bytes = extent_records as usize * record_size;
2693+
2694+ // Calculate disk offset
2695+ let disk_offset = extent. lcn as u64 * self . extent_map . bytes_per_cluster as u64 ;
2696+
2697+ // Seek to extent
2698+ let mut new_pos: i64 = 0 ;
2699+ unsafe {
2700+ SetFilePointerEx ( handle, disk_offset as i64 , Some ( & mut new_pos) , FILE_BEGIN ) ?;
2701+ }
2702+
2703+ // Read directly into the correct position in mft_buffer
2704+ let target_slice =
2705+ & mut mft_buffer. as_mut_slice ( ) [ buffer_offset..buffer_offset + extent_bytes] ;
2706+ let mut bytes_read: u32 = 0 ;
2707+ unsafe {
2708+ ReadFile ( handle, Some ( target_slice) , Some ( & mut bytes_read) , None ) ?;
2709+ }
2710+
2711+ bytes_read_total += bytes_read as u64 ;
2712+
2713+ // Report progress
2714+ if let Some ( ref cb) = progress_callback {
2715+ cb ( bytes_read_total, total_bytes as u64 ) ;
2716+ }
2717+ }
2718+
2719+ info ! (
2720+ read_ms = read_start. elapsed( ) . as_millis( ) ,
2721+ bytes_mb = bytes_read_total / ( 1024 * 1024 ) ,
2722+ "✅ Bulk read complete (pure I/O phase)"
2723+ ) ;
2724+
2725+ // Phase 3: Parse all records in parallel using par_chunks_mut
2726+ let parse_start = std:: time:: Instant :: now ( ) ;
2727+ let buffer_slice = mft_buffer. as_mut_slice ( ) ;
2728+
2729+ // Extract bitmap reference before parallel section (avoids capturing self)
2730+ let bitmap_ref = self . bitmap . as_ref ( ) ;
2731+
2732+ // Estimate capacity
2733+ let estimated_records = if let Some ( ref bm) = bitmap_ref {
2734+ bm. count_in_use ( )
2735+ } else {
2736+ total_records
2737+ } ;
2738+
2739+ // Use par_chunks_mut to give each thread its own mutable slice
2740+ let records_per_chunk = 4096usize ;
2741+ let bytes_per_chunk = records_per_chunk * record_size;
2742+
2743+ if merge_extensions {
2744+ // Full parsing with extension merging
2745+ let results: Vec < ( Vec < ParseResult > , u64 , u64 ) > = buffer_slice
2746+ . par_chunks_mut ( bytes_per_chunk)
2747+ . enumerate ( )
2748+ . map ( |( chunk_idx, chunk) | {
2749+ let mut results = Vec :: new ( ) ;
2750+ let mut skipped = 0u64 ;
2751+ let mut processed = 0u64 ;
2752+
2753+ let start_frs = chunk_idx * records_per_chunk;
2754+ let records_in_chunk = chunk. len ( ) / record_size;
2755+
2756+ for i in 0 ..records_in_chunk {
2757+ let frs = start_frs + i;
2758+
2759+ // Check bitmap if available
2760+ if let Some ( bm) = bitmap_ref {
2761+ if !bm. is_record_in_use ( frs as u64 ) {
2762+ skipped += 1 ;
2763+ processed += 1 ;
2764+ continue ;
2765+ }
2766+ }
2767+
2768+ let offset = i * record_size;
2769+ let record_slice = & mut chunk[ offset..offset + record_size] ;
2770+
2771+ // Apply fixup in-place
2772+ if !apply_fixup ( record_slice) {
2773+ skipped += 1 ;
2774+ processed += 1 ;
2775+ continue ;
2776+ }
2777+
2778+ // Parse record
2779+ let result = parse_record_full ( record_slice, frs as u64 ) ;
2780+ if matches ! ( result, ParseResult :: Skip ) {
2781+ skipped += 1 ;
2782+ } else {
2783+ results. push ( result) ;
2784+ }
2785+ processed += 1 ;
2786+ }
2787+ ( results, skipped, processed)
2788+ } )
2789+ . collect ( ) ;
2790+
2791+ // Combine results
2792+ let mut total_skipped = 0u64 ;
2793+ let mut total_processed = 0u64 ;
2794+ let mut all_results = Vec :: with_capacity ( estimated_records) ;
2795+ for ( chunk_results, skipped, processed) in results {
2796+ all_results. extend ( chunk_results) ;
2797+ total_skipped += skipped;
2798+ total_processed += processed;
2799+ }
2800+
2801+ info ! (
2802+ parse_ms = parse_start. elapsed( ) . as_millis( ) ,
2803+ records = total_processed,
2804+ skipped = total_skipped,
2805+ "✅ Parallel parse complete"
2806+ ) ;
2807+
2808+ // Merge extensions
2809+ let mut merger = MftRecordMerger :: with_capacity ( estimated_records) ;
2810+ for result in all_results {
2811+ merger. add_result ( result) ;
2812+ }
2813+ Ok ( merger. merge ( ) )
2814+ } else {
2815+ // Fast path: skip extension merging using par_chunks_mut
2816+ let results: Vec < ( Vec < ParsedRecord > , u64 , u64 ) > = buffer_slice
2817+ . par_chunks_mut ( bytes_per_chunk)
2818+ . enumerate ( )
2819+ . map ( |( chunk_idx, chunk) | {
2820+ let mut records = Vec :: new ( ) ;
2821+ let mut skipped = 0u64 ;
2822+ let mut processed = 0u64 ;
2823+
2824+ let start_frs = chunk_idx * records_per_chunk;
2825+ let records_in_chunk = chunk. len ( ) / record_size;
2826+
2827+ for i in 0 ..records_in_chunk {
2828+ let frs = start_frs + i;
2829+
2830+ if let Some ( bm) = bitmap_ref {
2831+ if !bm. is_record_in_use ( frs as u64 ) {
2832+ skipped += 1 ;
2833+ processed += 1 ;
2834+ continue ;
2835+ }
2836+ }
2837+
2838+ let offset = i * record_size;
2839+ let record_slice = & mut chunk[ offset..offset + record_size] ;
2840+
2841+ if !apply_fixup ( record_slice) {
2842+ skipped += 1 ;
2843+ processed += 1 ;
2844+ continue ;
2845+ }
2846+
2847+ if let Some ( record) = parse_record ( record_slice, frs as u64 ) {
2848+ records. push ( record) ;
2849+ } else {
2850+ skipped += 1 ;
2851+ }
2852+ processed += 1 ;
2853+ }
2854+ ( records, skipped, processed)
2855+ } )
2856+ . collect ( ) ;
2857+
2858+ // Combine results
2859+ let mut total_skipped = 0u64 ;
2860+ let mut all_records = Vec :: with_capacity ( estimated_records) ;
2861+ for ( chunk_records, skipped, _processed) in results {
2862+ all_records. extend ( chunk_records) ;
2863+ total_skipped += skipped;
2864+ }
2865+
2866+ info ! (
2867+ parse_ms = parse_start. elapsed( ) . as_millis( ) ,
2868+ records = all_records. len( ) ,
2869+ skipped = total_skipped,
2870+ "✅ Parallel parse complete (fast path)"
2871+ ) ;
2872+
2873+ Ok ( all_records)
2874+ }
2875+ }
2876+
26232877 /// Reads all MFT records and returns them as `ParsedColumns` (SoA layout).
26242878 ///
26252879 /// This is the optimized path that avoids the AoS→SoA transpose by:
0 commit comments