@@ -11,6 +11,7 @@ use vortex_buffer::ByteBuffer;
1111use vortex_error:: VortexExpect ;
1212use vortex_error:: VortexResult ;
1313use vortex_error:: vortex_bail;
14+ use vortex_metrics:: Counter ;
1415use vortex_metrics:: Histogram ;
1516use vortex_metrics:: Timer ;
1617use vortex_metrics:: VortexMetrics ;
@@ -161,6 +162,7 @@ impl VortexReadAt for ByteBuffer {
161162pub struct InstrumentedReadAt < T : VortexReadAt > {
162163 read : Arc < T > ,
163164 sizes : Arc < Histogram > ,
165+ total_size : Arc < Counter > ,
164166 durations : Arc < Timer > ,
165167}
166168
@@ -169,6 +171,7 @@ impl<T: VortexReadAt> InstrumentedReadAt<T> {
169171 Self {
170172 read,
171173 sizes : metrics. histogram ( "vortex.io.read.size" ) ,
174+ total_size : metrics. counter ( "vortex.io.read.total_size" ) ,
172175 durations : metrics. timer ( "vortex.io.read.duration" ) ,
173176 }
174177 }
@@ -188,6 +191,10 @@ where
188191 sizes. value( 0.99 ) ,
189192 sizes. value( 0.999 ) ,
190193 ) ;
194+
195+ let total_size = self . total_size . count ( ) ;
196+ log:: debug!( "Total read size: {total_size}" ) ;
197+
191198 let durations = self . durations . snapshot ( ) ;
192199 log:: debug!(
193200 "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms" ,
@@ -209,11 +216,13 @@ impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
209216 ) -> BoxFuture < ' static , VortexResult < ByteBuffer > > {
210217 let durations = self . durations . clone ( ) ;
211218 let sizes = self . sizes . clone ( ) ;
219+ let total_size = self . total_size . clone ( ) ;
212220 let read_fut = self . read . read_at ( offset, length, alignment) ;
213221 async move {
214222 let _timer = durations. time ( ) ;
215223 let buf = read_fut. await ;
216224 sizes. update ( length as i64 ) ;
225+ total_size. add ( length as i64 ) ;
217226 buf
218227 }
219228 . boxed ( )
0 commit comments