@@ -177,7 +177,7 @@ class DataQueueImpl final : public DataQueue,
177
177
tracker->TrackField (" entries" , entries_);
178
178
}
179
179
180
- std::unique_ptr <Reader> getReader () override ;
180
+ std::shared_ptr <Reader> getReader () override ;
181
181
SET_MEMORY_INFO_NAME (DataQueue);
182
182
SET_SELF_SIZE (DataQueueImpl);
183
183
@@ -197,7 +197,9 @@ class DataQueueImpl final : public DataQueue,
197
197
// DataQueue with which it is associated, and always from the beginning.
198
198
// Reads are non-destructive, meaning that the state of the DataQueue
199
199
// will not and cannot be changed.
200
- class IdempotentDataQueueReader final : public DataQueue::Reader {
200
+ class IdempotentDataQueueReader final
201
+ : public DataQueue::Reader,
202
+ public std::enable_shared_from_this<DataQueue::Reader> {
201
203
public:
202
204
IdempotentDataQueueReader (std::shared_ptr<DataQueueImpl> data_queue)
203
205
: data_queue_(std::move(data_queue)) {
@@ -216,6 +218,8 @@ class IdempotentDataQueueReader final : public DataQueue::Reader {
216
218
DataQueue::Vec* data,
217
219
size_t count,
218
220
size_t max_count_hint = bob::kMaxCountHint ) override {
221
+ std::shared_ptr<DataQueue::Reader> self = shared_from_this ();
222
+
219
223
// If ended is true, this reader has already reached the end and cannot
220
224
// provide any more data.
221
225
if (ended_) {
@@ -360,7 +364,7 @@ class IdempotentDataQueueReader final : public DataQueue::Reader {
360
364
private:
361
365
std::shared_ptr<DataQueueImpl> data_queue_;
362
366
Maybe<uint32_t > current_index_ = Nothing<uint32_t >();
363
- std::unique_ptr <DataQueue::Reader> current_reader_ = nullptr ;
367
+ std::shared_ptr <DataQueue::Reader> current_reader_ = nullptr ;
364
368
bool ended_ = false ;
365
369
bool pull_pending_ = false ;
366
370
int last_status_ = 0 ;
@@ -370,7 +374,9 @@ class IdempotentDataQueueReader final : public DataQueue::Reader {
370
374
// and removes those entries from the queue as they are fully consumed.
371
375
// This means that reads are destructive and the state of the DataQueue
372
376
// is mutated as the read proceeds.
373
- class NonIdempotentDataQueueReader final : public DataQueue::Reader {
377
+ class NonIdempotentDataQueueReader final
378
+ : public DataQueue::Reader,
379
+ public std::enable_shared_from_this<NonIdempotentDataQueueReader> {
374
380
public:
375
381
NonIdempotentDataQueueReader (std::shared_ptr<DataQueueImpl> data_queue)
376
382
: data_queue_(std::move(data_queue)) {
@@ -390,6 +396,8 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader {
390
396
DataQueue::Vec* data,
391
397
size_t count,
392
398
size_t max_count_hint = bob::kMaxCountHint ) override {
399
+ std::shared_ptr<DataQueue::Reader> self = shared_from_this ();
400
+
393
401
// If ended is true, this reader has already reached the end and cannot
394
402
// provide any more data.
395
403
if (ended_) {
@@ -543,21 +551,21 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader {
543
551
544
552
private:
545
553
std::shared_ptr<DataQueueImpl> data_queue_;
546
- std::unique_ptr <DataQueue::Reader> current_reader_ = nullptr ;
554
+ std::shared_ptr <DataQueue::Reader> current_reader_ = nullptr ;
547
555
bool ended_ = false ;
548
556
bool pull_pending_ = false ;
549
557
int last_status_ = 0 ;
550
558
};
551
559
552
- std::unique_ptr <DataQueue::Reader> DataQueueImpl::getReader () {
560
+ std::shared_ptr <DataQueue::Reader> DataQueueImpl::getReader () {
553
561
if (isIdempotent ()) {
554
- return std::make_unique <IdempotentDataQueueReader>(shared_from_this ());
562
+ return std::make_shared <IdempotentDataQueueReader>(shared_from_this ());
555
563
}
556
564
557
565
if (lockedToReader_) return nullptr ;
558
566
lockedToReader_ = true ;
559
567
560
- return std::make_unique <NonIdempotentDataQueueReader>(shared_from_this ());
568
+ return std::make_shared <NonIdempotentDataQueueReader>(shared_from_this ());
561
569
}
562
570
563
571
// ============================================================================
@@ -755,7 +763,7 @@ class DataQueueEntry : public EntryBase {
755
763
DataQueueEntry& operator =(DataQueueEntry&&) = delete ;
756
764
757
765
std::unique_ptr<DataQueue::Reader> getReader () override {
758
- return data_queue_->getReader ();
766
+ return std::make_unique<ReaderImpl>( data_queue_->getReader () );
759
767
}
760
768
761
769
std::unique_ptr<Entry> slice (
@@ -794,6 +802,26 @@ class DataQueueEntry : public EntryBase {
794
802
795
803
private:
796
804
std::shared_ptr<DataQueue> data_queue_;
805
+
806
+ class ReaderImpl : public DataQueue ::Reader {
807
+ public:
808
+ explicit ReaderImpl (std::shared_ptr<DataQueue::Reader> inner) : inner_(std::move(inner)) {}
809
+
810
+ int Pull (DataQueue::Reader::Next next,
811
+ int options,
812
+ DataQueue::Vec* data,
813
+ size_t count,
814
+ size_t max_count_hint) override {
815
+ return inner_->Pull (std::move (next), options, data, count, max_count_hint);
816
+ }
817
+
818
+ SET_NO_MEMORY_INFO ()
819
+ SET_MEMORY_INFO_NAME (ReaderImpl)
820
+ SET_SELF_SIZE (ReaderImpl)
821
+
822
+ private:
823
+ std::shared_ptr<DataQueue::Reader> inner_;
824
+ };
797
825
};
798
826
799
827
// ============================================================================
0 commit comments