@@ -162,6 +162,27 @@ class DataQueueImpl final : public DataQueue,
162162 " entries" , entries_, " std::vector<std::unique_ptr<Entry>>" );
163163 }
164164
165+ void addBackpressureListener (BackpressureListener* listener) override {
166+ if (idempotent_) return ;
167+ DCHECK_NOT_NULL (listener);
168+ backpressure_listeners_.insert (listener);
169+ }
170+
171+ void removeBackpressureListener (BackpressureListener* listener) override {
172+ if (idempotent_) return ;
173+ DCHECK_NOT_NULL (listener);
174+ backpressure_listeners_.erase (listener);
175+ }
176+
177+ void NotifyBackpressure (size_t amount) {
178+ if (idempotent_) return ;
179+ for (auto & listener : backpressure_listeners_) listener->EntryRead (amount);
180+ }
181+
182+ bool HasBackpressureListeners () const noexcept {
183+ return !backpressure_listeners_.empty ();
184+ }
185+
165186 std::shared_ptr<Reader> get_reader () override ;
166187 SET_MEMORY_INFO_NAME (DataQueue)
167188 SET_SELF_SIZE(DataQueueImpl)
@@ -173,6 +194,8 @@ class DataQueueImpl final : public DataQueue,
173194 std::optional<uint64_t > capped_size_ = std::nullopt ;
174195 bool locked_to_reader_ = false ;
175196
197+ std::unordered_set<BackpressureListener*> backpressure_listeners_;
198+
176199 friend class DataQueue ;
177200 friend class IdempotentDataQueueReader ;
178201 friend class NonIdempotentDataQueueReader ;
@@ -433,6 +456,17 @@ class NonIdempotentDataQueueReader final
433456 return ;
434457 }
435458
459+ // If there is a backpressure listener, lets report on how much data
460+ // was actually read.
461+ if (data_queue_->HasBackpressureListeners ()) {
462+ // How much did we actually read?
463+ size_t read = 0 ;
464+ for (uint64_t n = 0 ; n < count; n++) {
465+ read += vecs[n].len ;
466+ }
467+ data_queue_->NotifyBackpressure (read);
468+ }
469+
436470 // Now that we have updated this readers state, we can forward
437471 // everything on to the outer next.
438472 std::move (next)(status, vecs, count, std::move (done));
0 commit comments