@@ -64,6 +64,7 @@ fn add_drive_progress(multi_progress: &MultiProgress, drive: char) -> ProgressBa
6464struct StreamingWriter < W : Write > {
6565 writer : Mutex < W > ,
6666 format : StreamingFormat ,
67+ output_config : OutputConfig ,
6768 header_written : AtomicBool ,
6869 rows_written : AtomicUsize ,
6970 limit : u32 ,
@@ -79,14 +80,15 @@ enum StreamingFormat {
7980
8081#[ cfg( windows) ]
8182impl < W : Write > StreamingWriter < W > {
82- fn new ( writer : W , format : & str , limit : u32 ) -> Self {
83+ fn new ( writer : W , format : & str , limit : u32 , output_config : OutputConfig ) -> Self {
8384 let fmt = match format. to_lowercase ( ) . as_str ( ) {
8485 "json" => StreamingFormat :: Json ,
8586 _ => StreamingFormat :: Csv ,
8687 } ;
8788 Self {
8889 writer : Mutex :: new ( writer) ,
8990 format : fmt,
91+ output_config,
9092 header_written : AtomicBool :: new ( false ) ,
9193 rows_written : AtomicUsize :: new ( 0 ) ,
9294 limit,
@@ -119,56 +121,49 @@ impl<W: Write> StreamingWriter<W> {
119121 }
120122
121123 fn write_csv_batch ( & self , writer : & mut W , df : & uffs_mft:: DataFrame ) -> Result < usize > {
122- let col_names: Vec < _ > = df. get_column_names ( ) ;
124+ let height = df. height ( ) ;
125+ if height == 0 {
126+ return Ok ( 0 ) ;
127+ }
123128
124- // Write header only once
125- if !self . header_written . swap ( true , Ordering :: SeqCst ) {
126- for ( i, name) in col_names. iter ( ) . enumerate ( ) {
127- if i > 0 {
128- write ! ( writer, "," ) ?;
129- }
130- write ! ( writer, "\" {name}\" " ) ?;
129+ // Determine if we should write header (only first batch)
130+ let write_header = !self . header_written . swap ( true , Ordering :: SeqCst ) ;
131+
132+ // Apply limit if set
133+ let rows_to_write = if self . limit > 0 {
134+ let current = self . rows_written . load ( Ordering :: Relaxed ) ;
135+ let remaining = ( self . limit as usize ) . saturating_sub ( current) ;
136+ if remaining == 0 {
137+ return Ok ( 0 ) ;
131138 }
132- // C++ outputs header followed by empty line
133- writeln ! ( writer ) ? ;
134- writeln ! ( writer ) ? ;
135- }
139+ remaining . min ( height )
140+ } else {
141+ height
142+ } ;
136143
137- // Cache column references to avoid repeated lookups
138- let columns: Vec < _ > = col_names
139- . iter ( )
140- . filter_map ( |name| df. column ( name) . ok ( ) )
141- . collect ( ) ;
144+ // Slice DataFrame if we need fewer rows
145+ let df_slice = if rows_to_write < height {
146+ df. slice ( 0 , rows_to_write)
147+ } else {
148+ df. clone ( )
149+ } ;
142150
143- let mut rows_written = 0 ;
144- let height = df . height ( ) ;
145- let mut line_buf = String :: with_capacity ( 256 ) ;
151+ // Use OutputConfig for proper formatting with C++ column names
152+ let mut config = self . output_config . clone ( ) ;
153+ config . header = write_header ;
146154
147- for row_idx in 0 ..height {
148- // Check limit
149- if self . limit > 0 {
150- let current = self . rows_written . fetch_add ( 1 , Ordering :: Relaxed ) ;
151- if current >= self . limit as usize {
152- break ;
153- }
154- } else {
155- self . rows_written . fetch_add ( 1 , Ordering :: Relaxed ) ;
156- }
155+ // Write using OutputConfig (handles column names, formatting, etc.)
156+ // Use &mut *writer to create a fresh reborrow so we can still use writer for flush()
157+ config
158+ . write ( & df_slice, & mut * writer)
159+ . map_err ( |e| anyhow:: anyhow!( "Write error: {e}" ) ) ?;
157160
158- // Reuse buffer for each line
159- line_buf. clear ( ) ;
160- for ( i, col) in columns. iter ( ) . enumerate ( ) {
161- if i > 0 {
162- line_buf. push ( ',' ) ;
163- }
164- line_buf. push_str ( & format_cell_value ( col, row_idx) ) ;
165- }
166- writeln ! ( writer, "{line_buf}" ) ?;
167- rows_written += 1 ;
168- }
161+ // Update rows written count
162+ self . rows_written
163+ . fetch_add ( rows_to_write, Ordering :: Relaxed ) ;
169164
170165 writer. flush ( ) ?;
171- Ok ( rows_written )
166+ Ok ( rows_to_write )
172167 }
173168
174169 fn write_json_batch ( & self , writer : & mut W , df : & uffs_mft:: DataFrame ) -> Result < usize > {
@@ -230,45 +225,6 @@ impl<W: Write> StreamingWriter<W> {
230225 }
231226}
232227
233- /// Format a cell value for CSV output.
234- #[ cfg( windows) ]
235- fn format_cell_value ( col : & uffs_polars:: Column , row_idx : usize ) -> String {
236- use uffs_polars:: { AnyValue , DataType , TimeUnit } ;
237-
238- let val = col. get ( row_idx) ;
239- match val {
240- Ok ( AnyValue :: Null ) => {
241- // C++ outputs "0" for null numeric/boolean values, empty for strings
242- match col. dtype ( ) {
243- DataType :: UInt8
244- | DataType :: UInt16
245- | DataType :: UInt32
246- | DataType :: UInt64
247- | DataType :: Int8
248- | DataType :: Int16
249- | DataType :: Int32
250- | DataType :: Int64
251- | DataType :: Boolean => "0" . to_owned ( ) ,
252- _ => String :: new ( ) ,
253- }
254- }
255- Ok ( AnyValue :: String ( s) ) => format ! ( "\" {}\" " , s. replace( '"' , "\" \" " ) ) ,
256- Ok ( AnyValue :: Boolean ( b) ) => if b { "1" } else { "0" } . to_owned ( ) ,
257- Ok ( AnyValue :: Datetime ( ts, TimeUnit :: Microseconds , _) ) => {
258- // Convert microseconds to datetime string
259- let secs = ts / 1_000_000 ;
260- let micros = ( ts % 1_000_000 ) as u32 ;
261- if let Some ( dt) = chrono:: DateTime :: from_timestamp ( secs, micros * 1000 ) {
262- format ! ( "\" {}\" " , dt. format( "%Y-%m-%d %H:%M:%S" ) )
263- } else {
264- String :: new ( )
265- }
266- }
267- Ok ( v) => v. to_string ( ) ,
268- Err ( _) => String :: new ( ) ,
269- }
270- }
271-
272228/// Format a cell value for JSON output.
273229#[ cfg( windows) ]
274230fn format_json_value ( col : & uffs_polars:: Column , row_idx : usize ) -> String {
@@ -461,7 +417,7 @@ async fn search_streaming(
461417 filters : & QueryFilters < ' _ > ,
462418 format : & str ,
463419 out : & str ,
464- _output_config : & OutputConfig ,
420+ output_config : & OutputConfig ,
465421) -> Result < ( ) > {
466422 // Determine drives to search
467423 let drives: Vec < char > = if let Some ( drives) = multi_drives {
@@ -495,12 +451,12 @@ async fn search_streaming(
495451
496452 if is_console {
497453 let stdout = std:: io:: stdout ( ) ;
498- search_multi_drive_streaming ( & drives, filters, format, stdout) . await
454+ search_multi_drive_streaming ( & drives, filters, format, stdout, output_config ) . await
499455 } else {
500456 let file =
501457 File :: create ( out) . with_context ( || format ! ( "Failed to create output file: {out}" ) ) ?;
502458 let writer = BufWriter :: new ( file) ;
503- search_multi_drive_streaming ( & drives, filters, format, writer) . await ?;
459+ search_multi_drive_streaming ( & drives, filters, format, writer, output_config ) . await ?;
504460 info ! ( file = out, "Results written to file" ) ;
505461 Ok ( ( ) )
506462 }
@@ -1166,6 +1122,7 @@ async fn search_multi_drive_streaming<W: Write + Send + 'static>(
11661122 filters : & QueryFilters < ' _ > ,
11671123 format : & str ,
11681124 writer : W ,
1125+ output_config : & OutputConfig ,
11691126) -> Result < ( ) > {
11701127 use tokio:: sync:: mpsc;
11711128 use uffs_mft:: { IntoLazy , col, lit} ;
@@ -1183,7 +1140,12 @@ async fn search_multi_drive_streaming<W: Write + Send + 'static>(
11831140 let owned_filters = Arc :: new ( OwnedQueryFilters :: from_borrowed ( filters) ) ;
11841141
11851142 // Create streaming writer (shared across all results)
1186- let streaming_writer = Arc :: new ( StreamingWriter :: new ( writer, format, filters. limit ) ) ;
1143+ let streaming_writer = Arc :: new ( StreamingWriter :: new (
1144+ writer,
1145+ format,
1146+ filters. limit ,
1147+ output_config. clone ( ) ,
1148+ ) ) ;
11871149
11881150 // Channel for receiving results from drive tasks
11891151 let ( tx, mut rx) = mpsc:: channel :: < DriveResult > ( drives. len ( ) ) ;
0 commit comments