@@ -446,58 +446,150 @@ async fn chunk_stream_io_loop(
446446 chunk_infos : Vec < RecordBatch > ,
447447 output_channel : Sender < ApiResult < ChunksWithSegment > > ,
448448) -> Result < ( ) , DataFusionError > {
449- let chunk_infos: Vec < _ > = chunk_infos. into_iter ( ) . map ( Into :: into) . collect ( ) ;
450-
451- // TODO(zehiko) same as previously with get_chunks, we keep sending 1 request per segment.
452- // As these batches are sorted per segment (see docs above), this ensures that ordering by
453- // segment id is preserved regardless of how server might order responses (in the case of having
454- // batches with different segments in the same request). However, quick testing shows that this
455- // is at least 2x slower than sending all segments in one request. Consider providing ordering
456- // guarantees server side in the future.
457-
458- // Convert to concurrent processing using buffered streams
459- const CONCURRENT_REQUESTS : usize = 16 ; // Adjust based on your needs
460-
461- futures_util:: stream:: iter ( chunk_infos)
462- . map ( |chunk_info| {
463- let mut client = client. clone ( ) ;
464- let output_channel = output_channel. clone ( ) ;
465- async move {
466- let fetch_chunks_request = FetchChunksRequest {
467- chunk_infos : vec ! [ chunk_info] ,
468- } ;
469-
470- let fetch_chunks_response_stream = client
471- . inner ( )
472- . fetch_chunks ( fetch_chunks_request)
473- . instrument ( tracing:: trace_span!( "chunk_stream_io_loop" ) )
474- . await
475- . map_err ( |err| exec_datafusion_err ! ( "{err}" ) ) ?
476- . into_inner ( ) ;
477-
478- // Then we need to fully decode these chunks, i.e. both the transport layer (Protobuf)
479- // and the app layer (Arrow).
480- let mut chunk_stream =
481- re_redap_client:: fetch_chunks_response_to_chunk_and_segment_id (
482- fetch_chunks_response_stream,
483- ) ;
484-
485- while let Some ( chunk_and_segment_id) = chunk_stream. next ( ) . await {
486- if output_channel. send ( chunk_and_segment_id) . await . is_err ( ) {
487- break ;
449+ // Pipeline configuration
450+ const CONCURRENT_REQUESTS : usize = 16 ;
451+ const BUFFER_SIZE_MB : usize = 512 ;
452+ const BUFFER_SIZE_BYTES : usize = BUFFER_SIZE_MB * 1024 * 1024 ;
453+
454+ // Create intermediate channel for ordered buffering
455+ let ( intermediate_tx, mut intermediate_rx) =
456+ tokio:: sync:: mpsc:: channel :: < ( usize , ApiResult < ChunksWithSegment > , u64 ) > ( 1024 ) ;
457+
458+ // We need to pair original RecordBatch with converted ChunkInfo for byte length extraction
459+ use re_protos:: common:: v1alpha1:: DataframePart ;
460+ let chunk_info_pairs: Vec < _ > = chunk_infos
461+ . into_iter ( )
462+ . enumerate ( )
463+ . map ( |( index, batch) | {
464+ let chunk_info: DataframePart = batch. clone ( ) . into ( ) ;
465+ ( index, batch, chunk_info)
466+ } )
467+ . collect ( ) ;
468+
469+ // Spawn concurrent fetchers
470+ let fetcher_handle = tokio:: spawn ( async move {
471+ futures_util:: stream:: iter ( chunk_info_pairs)
472+ . map ( |( index, original_batch, chunk_info) | {
473+ let mut client = client. clone ( ) ;
474+ let intermediate_tx = intermediate_tx. clone ( ) ;
475+ async move {
476+ let fetch_chunks_request = FetchChunksRequest {
477+ chunk_infos : vec ! [ chunk_info] ,
478+ } ;
479+
480+ let fetch_chunks_response_stream = client
481+ . inner ( )
482+ . fetch_chunks ( fetch_chunks_request)
483+ . instrument ( tracing:: trace_span!( "chunk_stream_io_loop" ) )
484+ . await
485+ . map_err ( |err| exec_datafusion_err ! ( "{err}" ) ) ?
486+ . into_inner ( ) ;
487+
488+ let mut chunk_stream =
489+ re_redap_client:: fetch_chunks_response_to_chunk_and_segment_id (
490+ fetch_chunks_response_stream,
491+ ) ;
492+
493+ while let Some ( chunk_and_segment_id) = chunk_stream. next ( ) . await {
494+ // Extract byte length from original RecordBatch
495+ let byte_len = extract_chunk_byte_len ( & original_batch) ?;
496+
497+ if intermediate_tx
498+ . send ( ( index, chunk_and_segment_id, byte_len) )
499+ . await
500+ . is_err ( )
501+ {
502+ break ;
503+ }
504+ }
505+
506+ Ok :: < ( ) , DataFusionError > ( ( ) )
507+ }
508+ } )
509+ . buffered ( CONCURRENT_REQUESTS )
510+ . try_collect :: < Vec < _ > > ( )
511+ . await
512+ } ) ;
513+
514+ // Spawn ordered buffer manager
515+ let buffer_handle = tokio:: spawn ( async move {
516+ let mut buffer = Vec :: new ( ) ;
517+ let mut total_bytes = 0u64 ;
518+
519+ while let Some ( ( index, chunk_result, byte_len) ) = intermediate_rx. recv ( ) . await {
520+ buffer. push ( ( index, chunk_result, byte_len) ) ;
521+ total_bytes += byte_len;
522+
523+ // Check if we should flush (either buffer size reached or no more data expected)
524+ if total_bytes >= BUFFER_SIZE_BYTES as u64 || intermediate_rx. is_closed ( ) {
525+ // Sort buffer by original index to preserve input ordering
526+ buffer. sort_by_key ( |( index, _, _) | * index) ;
527+
528+ // Flush ordered chunks to output
529+ for ( _, chunk_result, _) in buffer. drain ( ..) {
530+ if output_channel. send ( chunk_result) . await . is_err ( ) {
531+ return Ok ( ( ) ) ;
488532 }
489533 }
490534
491- Ok :: < ( ) , DataFusionError > ( ( ) )
535+ total_bytes = 0 ;
492536 }
493- } )
494- . buffered ( CONCURRENT_REQUESTS )
495- . try_collect :: < Vec < _ > > ( )
496- . await ?;
537+ }
538+
539+ // Flush any remaining chunks
540+ if !buffer. is_empty ( ) {
541+ buffer. sort_by_key ( |( index, _, _) | * index) ;
542+ for ( _, chunk_result, _) in buffer. drain ( ..) {
543+ if output_channel. send ( chunk_result) . await . is_err ( ) {
544+ break ;
545+ }
546+ }
547+ }
548+
549+ Ok :: < ( ) , DataFusionError > ( ( ) )
550+ } ) ;
551+
552+ // Wait for both tasks to complete
553+ let ( fetcher_result, buffer_result) = tokio:: try_join!( fetcher_handle, buffer_handle)
554+ . map_err ( |err| exec_datafusion_err ! ( "Task join error: {err}" ) ) ?;
555+
556+ fetcher_result?;
557+ buffer_result?;
497558
498559 Ok ( ( ) )
499560}
500561
562+ fn extract_chunk_byte_len ( chunk_info_batch : & RecordBatch ) -> Result < u64 , DataFusionError > {
563+ use arrow:: array:: AsArray as _;
564+ use re_protos:: cloud:: v1alpha1:: FetchChunksRequest ;
565+
566+ // Find the chunk_byte_len column in the batch
567+ let schema = chunk_info_batch. schema ( ) ;
568+ let chunk_byte_len_col_idx = schema
569+ . column_with_name ( FetchChunksRequest :: FIELD_CHUNK_BYTE_LEN )
570+ . ok_or_else ( || {
571+ exec_datafusion_err ! (
572+ "Missing {} column in chunk info" ,
573+ FetchChunksRequest :: FIELD_CHUNK_BYTE_LEN
574+ )
575+ } ) ?
576+ . 0 ;
577+
578+ let chunk_byte_len_array = chunk_info_batch. column ( chunk_byte_len_col_idx) ;
579+
580+ // Assuming it's a UInt64 array with a single value (since we're processing one chunk at a time)
581+ let uint64_array = chunk_byte_len_array. as_primitive :: < arrow:: datatypes:: UInt64Type > ( ) ;
582+
583+ if uint64_array. len ( ) != 1 {
584+ return Err ( exec_datafusion_err ! (
585+ "Expected exactly one chunk_byte_len value, got {}" ,
586+ uint64_array. len( )
587+ ) ) ;
588+ }
589+
590+ Ok ( uint64_array. value ( 0 ) )
591+ }
592+
501593impl ExecutionPlan for SegmentStreamExec {
502594 fn name ( & self ) -> & ' static str {
503595 "SegmentStreamExec"
0 commit comments