-
Notifications
You must be signed in to change notification settings - Fork 0
/
SafePq.h
1825 lines (1579 loc) · 68.1 KB
/
SafePq.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//
// Created by pengwang5 on 2022/12/11.
//
#ifndef FADAS_SAFEPQ_H
#define FADAS_SAFEPQ_H
#include <unordered_set>
#include <condition_variable>
#include <type_traits>
#include <thread>
#include <chrono>
#include <deque>
#include <algorithm>
template <typename ConditionVarType, typename LockType>
struct ConditionVarTraits;
template <>
struct ConditionVarTraits<std::condition_variable, std::mutex> {
static void initialize(std::condition_variable& cond_var) {
}
static void signal(std::condition_variable& cond_var) {
cond_var.notify_one();
}
static void broadcast(std::condition_variable& cond_var) {
cond_var.notify_all();
}
static void wait(std::condition_variable& cond_var,
std::unique_lock<std::mutex>& lock) {
cond_var.wait(lock);
}
template<class Rep, class Period> static bool wait_for(
std::condition_variable& cond_var,
std::unique_lock<std::mutex>& lock,
const std::chrono::duration<Rep, Period>& rel_time) {
return std::cv_status::timeout == cond_var.wait_for(lock, rel_time);
}
};
/// @class ConditionVariable
/// The ConditionVariable class wraps a operating system ConditionVariable.
///
/// In addition, it implements support for attaching and detaching workers
/// to the condition variable.
/// @tparam ThreadContainerType The type of thread Container.
/// @tparam SignalStrategyType The type of signal policy.
/// @see NotEmptySignalStrategy
/// @see NotFullSignalStrategy
template<typename ThreadContainerType, typename SignalStrategyType,
typename ConditionVarType, typename LockType>
class ConditionVariable {
public:
/// Initializes a new instance of the ConditionVariable class without an
/// upper-bound.
ConditionVariable()
: total_workers_(0), active_workers_(0), bounded_capacity_(SIZE_MAX),
item_count_(0) {
ConditionVarTraits<ConditionVarType, LockType>::initialize(
condition_var_);
}
~ConditionVariable() {
}
// "ConditionVariable" objects cannot be copied or assigned
ConditionVariable(const ConditionVariable&) = delete;
ConditionVariable& operator=(const ConditionVariable&) = delete;
/// Gets the number of workers attached to this condition variable.
/// @return The number of workers attached to this condition variable.
/// @see Attach
size_t total() const {
return total_workers_;
}
/// Gets the number of active workers for this condition variable.
/// active workers are workers that are currently NOT waiting on this
/// condition variable.
/// @return The number of active workers.
size_t active() const {
return active_workers_;
}
/// Gets the bounded capacity of this condition variable instance.
/// @return The bounded capacity of this condition variable.
size_t bounded_capacity() const {
return bounded_capacity_;
}
/// Sets the bounded capacity of this condition variable instance.
void bounded_capacity(size_t capacity) {
bounded_capacity_ = capacity;
}
/// Gets the number of items contained in this condition variable
/// instance.
/// @return The number of items
size_t size() const {
return item_count_;
}
/// Set the number of items contained in this condition variable
/// instance.
void size(size_t count) {
item_count_ = count;
}
/// Registers the a worker with this condition variable.
/// If the worker is already registered then this method has no effect.
/// @see Detach
void attach() {
if (container_.add()) {
increment_total();
increment_active();
}
}
/// Unregisters the worker from this condition variable.
/// If the worker was not previously registered then this method has
/// no effect.
/// @see Attach
void detach() {
if (container_.remove()) {
decrement_total();
decrement_active();
}
if (total_workers_ > 0 && active_workers_ == 0) {
increment_active();
ConditionVarTraits<ConditionVarType, LockType>::signal(
condition_var_);
}
}
/// Wakes up a worker waiting on this condition variable.
void signal() {
// if no workers attached always signal!
if (total_workers_ == 0) {
ConditionVarTraits<ConditionVarType, LockType>::signal(
condition_var_);
return;
}
// issue a signal only when there are no active workers, or when
// the count starts to grow beyond a threshold level
if (signal_.should_signal(active_workers_, total_workers_,
item_count_, bounded_capacity_)) {
increment_active();
ConditionVarTraits<ConditionVarType, LockType>::signal(
condition_var_);
}
}
/// Wakes up all workers waiting on this condition variable.
void broadcast() {
if (total_workers_ != 0) {
// set active only if workers attached
active(total_workers_);
}
ConditionVarTraits<ConditionVarType, LockType>::broadcast(
condition_var_);
}
/// Waits indefinitely for this condition variable to become signaled.
/// @param lock An object of type std::unique_lock which must be locked
/// by the current thread.
void wait(std::unique_lock<LockType>& lock) {
decrement_active();
ConditionVarTraits<ConditionVarType, LockType>::wait(
condition_var_, lock);
}
/// Waits up to specified duration for this condition variable to become
/// signaled.
/// @param lock An object of type std::unique_lock which must be locked
/// by the current thread.
/// @param rel_time An object of type std::chrono::duration representing
/// the maximum time to spend waiting.
template<class Rep, class Period> bool wait_for(
std::unique_lock<LockType>& lock,
const std::chrono::duration<Rep, Period>& rel_time) {
decrement_active();
bool timed_out =
ConditionVarTraits<ConditionVarType, LockType>::wait_for(
condition_var_, lock, rel_time);
if (timed_out) {
increment_active();
}
return timed_out;
}
private:
/// Sets the number of active workers for this condition variable.
/// @param active The number of active workers.
void active(size_t active) {
active_workers_ = active > total_workers_ ? total_workers_ : active;
}
/// Increments the total worker count for this condition variable by 1.
void increment_total() {
total_workers_ += 1;
}
/// Decrements the total worker count for this condition variable by 1.
void decrement_total() {
total_workers_ = total_workers_ > 0 ? total_workers_ - 1 : 0;
}
/// Increments the active worker count for this condition variable by 1.
void increment_active() {
if (++active_workers_ > total_workers_)
active_workers_ = total_workers_;
}
/// Decrements the active worker count for this condition variable by 1.
void decrement_active() {
active_workers_ = active_workers_ > 0 ? active_workers_ - 1 : 0;
}
size_t total_workers_;
size_t active_workers_;
size_t bounded_capacity_;
size_t item_count_;
ConditionVarType condition_var_;
ThreadContainerType container_;
SignalStrategyType signal_;
};
/// @class NotEmptySignalStrategy
///
/// A strategy object for determining whether or not a "not empty" condition
/// variable should issue a signal.
///
/// This strategy will only return true if there are no active workers
/// (i.e. all workers are waiting
/// on empty BlockingCollection). Or when the BlockingCollection's element
/// count starts to grow beyond a
/// threshold level.
///
/// This approach minimizes condition variable sleeps, wakes and lock
/// contention. Which in turn,
/// improves performance and makes it more predictable.
/// @tparam ItemsPerThread The number of items to allow per thread.
/// @see ConditionVariable
/// @see NotFullSignalStrategy
template<size_t ItemsPerThread = 16> struct NotEmptySignalStrategy {
bool should_signal(size_t active_workers, size_t total_workers,
size_t item_count, size_t /*capacity*/) const {
return active_workers == 0 || (active_workers < total_workers &&
item_count / active_workers > ItemsPerThread);
}
};
/// @class NotFullSignalStrategy
///
/// A strategy object for determining whether or not a "not full" condition
/// variable should issue a signal.
///
/// This strategy will only return true if there are no active workers
/// (i.e. all workers are
/// waiting on a full BlockingCollection). Or when the BlockingCollection's
/// available capacity
/// starts to grow beyond a threshold level.
///
/// This approach minimizes condition variable sleeps, wakes and lock
/// contention. Which in turn,
/// improves performance and makes it more predictable.
/// @tparam ItemsPerThread The number of items to allow per thread.
/// @see ConditionVariable
/// @see NotEmptySignalStrategy
template<size_t ItemsPerThread = 16> struct NotFullSignalStrategy {
bool should_signal(size_t active_workers, size_t total_workers,
size_t item_count, size_t capacity) const {
return (active_workers == 0 || (active_workers < total_workers &&
(capacity - item_count) / active_workers > ItemsPerThread));
}
};
/// @class ConditionVariableGenerator
///
/// Generates the "not full" and "not empty" condition variables for
/// the specified ThreadContainerType.
///
/// @tparam ThreadContainerType The thread Container policy to use when
/// generating the condition variables.
template<typename ThreadContainerType, typename NotFullSignalStrategy,
typename NotEmptySignalStrategy, typename ConditionVarType,
typename LockType> struct ConditionVariableGenerator {
using NotFullType = ConditionVariable<ThreadContainerType,
NotFullSignalStrategy, ConditionVarType, LockType>;
using NotEmptyType = ConditionVariable<ThreadContainerType,
NotEmptySignalStrategy, ConditionVarType, LockType>;
using lock_type = LockType;
};
template <typename T>
struct ThreadContainerTraits;
template <>
struct ThreadContainerTraits<std::thread::id> {
static std::thread::id get_thread_id() {
return std::this_thread::get_id();
}
};
/// @class ThreadContainer
/// This class adds and removes the specified thread type from the
/// Container.
/// @tparam T The thread type.
template<typename T> class ThreadContainer {
public:
ThreadContainer() {
}
/// Adds the calling thread to the Container.
/// @returns True if the calling thread was added to Container.
/// Otherwise false.
bool add() {
T id = ThreadContainerTraits<T>::get_thread_id();
typename std::unordered_set<T>::iterator itr = thread_id_.find(id);
if (itr != thread_id_.end()) {
return false;
}
thread_id_.insert(id);
return true;
}
/// Removes the calling thread from the Container.
/// @returns True if the calling thread was removed from Container.
/// Otherwise false.
bool remove() {
if (thread_id_.erase(ThreadContainerTraits<T>::get_thread_id())
> 0) {
return true;
}
return false;
}
private:
std::unordered_set<T> thread_id_;
};
namespace detail {
struct QueueType {};
struct StackType {};
template< typename T >
struct is_queue : std::false_type { };
template<>
struct is_queue<QueueType> : std::true_type {};
/// @class Container
///
/// Represents a first in-first out (FIFO) or a last in-first out
/// (LIFO) collection depending on
/// the ContainerType template parameter value.
///
/// Implements the implicitly defined IProducerConsumerCollection<T>
/// policy.
/// @tparam T The type of items in the Container.
/// @tparam ContainerType The type of Container (i.e. Queue or Stack).
template<typename T, typename ContainerType>
class Container {
public:
using container_type = std::deque<T>;
using value_type = typename container_type::value_type;
using size_type = typename container_type::size_type;
/// Initializes a new instance of the Container<T> class.
Container()
: bounded_capacity_(SIZE_MAX) {
}
/// Sets the max number of elements this container can hold.
/// @param bounded_capacity The max number of elements this
/// container can hold.
void bounded_capacity(size_t bounded_capacity) {
bounded_capacity_ = bounded_capacity;
}
/// Gets the max number of elements this container can hold.
/// @returns The max number of elements this container can hold.
size_t bounded_capacity() {
return bounded_capacity_;
}
/// Gets the number of elements contained in the collection.
/// @returns The number of elements contained in the collection.
size_type size() {
return container_.size();
}
/// Attempts to add an element to the collection.
/// @param item The element to add to the collection.
/// @returns True if the element was added successfully; otherwise,
/// false.
bool try_add(const value_type& item) {
if (container_.size() == bounded_capacity_)
return false;
container_.push_back(item);
return true;
}
/// Attempts to add an element to the collection.
/// @param item The element to add to the collection.
/// @returns True if the element was added successfully; otherwise,
/// false.
bool try_add(value_type&& item) {
if (container_.size() == bounded_capacity_)
return false;
container_.push_back(std::forward<value_type>(item));
return true;
}
/// Attempts to remove and return an element from the collection.
/// @param [out] item When this method returns, if the element was
/// removed and returned successfully, item
/// contains the removed element. If no element was available to be
/// removed, the value is unspecified.
/// @returns True if an element was removed and returned
/// successfully; otherwise, false.
bool try_take(value_type& item) {
if (container_.empty())
return false;
return try_take_i(item, is_queue<ContainerType>());
}
/// Attempts to add an element to the collection.
/// This new element is constructed in place using args as the
/// arguments for its construction.
/// @param args Arguments forwarded to construct the new element.
template <typename... Args> bool try_emplace(Args&&... args) {
if (container_.size() == bounded_capacity_)
return false;
return try_emplace_i<Args...>(std::forward<Args>(args)...,
is_queue<ContainerType>());
}
private:
size_t bounded_capacity_;
container_type container_;
bool try_take_i(value_type& item, std::false_type) {
item = container_.back();
container_.pop_back();
return true;
}
bool try_take_i(value_type& item, std::true_type) {
item = container_.front();
container_.pop_front();
return true;
}
template <typename... Args> bool try_emplace_i(Args&&... args,
std::false_type) {
container_.emplace_front(std::forward<Args>(args)...);
return true;
}
template <typename... Args> bool try_emplace_i(Args&&... args,
std::true_type) {
container_.emplace_back(std::forward<Args>(args)...);
return true;
}
};
} // namespace detail
template<typename T>
using QueueContainer = detail::Container<T, detail::QueueType>;
template<typename T>
using StackContainer = detail::Container<T, detail::StackType>;
using StdConditionVariableGenerator = ConditionVariableGenerator<
ThreadContainer<std::thread::id>, NotFullSignalStrategy<16>,
NotEmptySignalStrategy<16>, std::condition_variable, std::mutex>;
/// @enum BlockingCollectionState
/// The BlockCollection states.
enum class BlockingCollectionState {
// BlockingCollection is active and processing normally.
Activated = 1,
// BlockingCollection is deactivated; no add or take operations allowed.
Deactivated = 2,
// BlockingCollection was pulsed; add and take may proceed normally.
Pulsed = 3
};
/// @enum BlockingCollectionStatus
/// The BlockCollection status codes.
/// These are the status codes returned by all of BlockingCollection's Add
/// and Take operations.
enum class BlockingCollectionStatus {
/// Operation succeeded
Ok = 0,
/// Operation failed due to CompleteAdding() having been invoked
AddingCompleted = -1,
/// Operation failed due to time out
TimedOut = -2,
/// Operation failed due to BlockingCollection not being activated
NotActivated = -3,
/// Operation failed due to BlockingCollection being completed
Completed = -4,
/// Operation failed due to invalid iterators
InvalidIterators = -5,
/// Operation failed due to concurrent Add and CompleteAdding
CompleteAddingConcurrent = -6,
/// Operation failed due to BlockingCollection Container error
InternalError = -8
};
template <typename T, typename ContainerType = QueueContainer<T>,
typename ConditionVariableGenerator = StdConditionVariableGenerator>
class BlockingCollection {
public:
using LockType = typename ConditionVariableGenerator::lock_type;
/// Initializes a new instance of the BlockingCollection<T> class
/// without an upper-bound.
BlockingCollection()
: BlockingCollection(SIZE_MAX) {
}
/// Initializes a new instance of the BlockingCollection<T> class
/// with the specified upper-bound.
/// @param capacity The bounded size of the collection.
explicit BlockingCollection(size_t capacity)
: state_(BlockingCollectionState::Activated),
bounded_capacity_(capacity),
is_adding_completed_(false) {
not_empty_condition_var_.bounded_capacity(capacity);
not_full_condition_var_.bounded_capacity(capacity);
container_.bounded_capacity(capacity);
}
// "BlockingCollection" objects cannot be copied or assigned
BlockingCollection(const BlockingCollection&) = delete;
BlockingCollection& operator=(const BlockingCollection&) = delete;
~BlockingCollection() {
}
/// Gets the bounded capacity of this BlockingCollection<T> instance.
/// @return The bounded capacity of the collection.
size_t bounded_capacity() {
std::lock_guard<LockType> guard(lock_);
return bounded_capacity_;
}
/// Gets the current state of this BlockingCollection<T> instance.
/// @return The current state of the collection.
/// @see BlockingCollectionState
BlockingCollectionState state() {
std::lock_guard<LockType> guard(lock_);
return state_;
}
/// Gets whether this BlockingCollection<T> instance is full.
/// @return True if the collection is full; otherwise false.
bool is_full() {
std::lock_guard<LockType> guard(lock_);
return is_full_i();
}
/// Gets whether this BlockingCollection<T> instance is empty.
/// @return True if the collection is empty; otherwise false.
bool is_empty() {
std::lock_guard<LockType> guard(lock_);
return is_empty_i();
}
/// Gets the number of items contained in the BlockingCollection<T>
/// instance.
/// If any method in BlockingCollection is executing while the size
/// property is being accessd, the return value
/// is approximate. size may reflect a number that is either greater
/// than or less than the actual number of
/// items in the BlockingCollection.
/// @return The number of item in the collection.
size_t size() {
std::lock_guard<LockType> guard(lock_);
return container_.size();
}
/// Gets whether this BlockingCollection<T> instance has been
/// deactivated.
/// @return True is this collection has been deactivated.
/// Otherwise false.
bool is_deactivated() {
std::lock_guard<LockType> guard(lock_);
return state_ == BlockingCollectionState::Deactivated;
}
/// Gets whether this BlockingCollection<T> instance has been marked
/// as complete for adding and is empty.
/// @return True if this collection has been marked as complete for
/// adding and is empty. Otherwise false.
bool is_completed() {
std::lock_guard<LockType> guard(lock_);
return is_completed_i();
}
/// Gets whether this BlockingCollection<T> instance has been marked
/// as complete for adding.
/// @return True if this collection has been marked as complete for
/// adding. Otherwise false.
bool is_adding_completed() {
std::lock_guard<LockType> guard(lock_);
return is_adding_completed_i();
}
/// Pulses this BlockingCollection<T> instance to wake up any waiting
/// threads.
/// Changes the collection's state to Pulsed. Future Add and Take
/// operations proceed
/// as in the Activated state.
/// @return The BlockingCollection's state before this call.
/// @see BlockingCollectionState
BlockingCollectionState pulse() {
std::lock_guard<LockType> guard(lock_);
return deactivate_i(true);
}
/// Deactivate this BlockingCollection<T> instance and wakeup all
/// threads waiting
/// on the collection so they can continue. No items are removed from
/// the collection,
/// however. Any other operations called until the collection is
/// activated again will immediately return
/// BlockingCollectionStatus::NotActivated.
/// @return The BlockingCollection's state before this call.
/// @see BlockingCollectionState
/// @see BlockingCollectionStatus
BlockingCollectionState deactivate() {
std::lock_guard<LockType> guard(lock_);
return deactivate_i(false);
}
/// Reactivate this BlockingCollection<T> instance so that threads
/// can Add and Take
/// items again.
/// @return The BlockingCollection's state before this call.
/// @see BlockingCollectionState
BlockingCollectionState activate() {
std::lock_guard<LockType> guard(lock_);
return activate_i();
}
/// Releases all items from this BlockingCollection<T> instance
/// but does not mark it deactivated.
/// @return The number of items flushed.
size_t flush() {
std::lock_guard<LockType> guard(lock_);
auto itemsFlushed = container_.size();
T item;
while (container_.size() > 0) {
container_.try_take(item);
}
not_empty_condition_var_.size(0);
not_full_condition_var_.size(0);
return itemsFlushed;
}
/// Marks the BlockingCollection<T> instances as not accepting any more
/// additions.
/// After a collection has been marked as complete for adding, adding
/// to the collection
/// is not permitted and attempts to remove from the collection will
/// not wait when the collection is empty.
void complete_adding() {
std::lock_guard<LockType> guard(lock_);
if (is_adding_completed_)
return;
is_adding_completed_ = true;
not_empty_condition_var_.broadcast();
not_full_condition_var_.broadcast();
}
/// Gets the number of consumer threads that are actively taking items
/// from this BlockingCollection<T> instance.
/// @return The number of active consumer threads.
/// @see AttachConsumer
size_t active_consumers() {
std::lock_guard<LockType> guard(lock_);
return not_empty_condition_var_.active();
}
/// Gets the number of producer threads that are actively adding items
/// to this BlockingCollection<T> instance.
/// @return The number of active producer threads.
/// @see AttachProducer
size_t active_producers() {
std::lock_guard<LockType> guard(lock_);
return not_full_condition_var_.active();
}
/// Gets the total number of consumer threads that can take items
/// from this BlockingCollection<T> instance.
/// @return The total number of consumer threads.
/// @see AttachConsumer
size_t total_consumers() {
std::lock_guard<LockType> guard(lock_);
return not_empty_condition_var_.total();
}
/// Gets the total number of producer threads that can add items
/// to this BlockingCollection<T> instance.
/// @return The total number of producer threads.
/// @see AttachProducer
size_t total_producers() {
std::lock_guard<LockType> guard(lock_);
return not_full_condition_var_.total();
}
/// Registers a consumer thread with this BlockingCollection<T>
/// instance.
/// @see TotalConsumers
void attach_consumer() {
std::lock_guard<LockType> guard(lock_);
not_empty_condition_var_.attach();
}
/// Unregisters a consumer thread with this BlockingCollection<T>
/// instance.
/// @see TotalConsumers
void detach_consumer() {
std::lock_guard<LockType> guard(lock_);
not_empty_condition_var_.detach();
}
/// Registers a producer thread with this BlockingCollection<T>
/// instance.
/// @see TotalProducers
void attach_producer() {
std::lock_guard<LockType> guard(lock_);
not_full_condition_var_.attach();
}
/// Unregisters a producer thread with this BlockingCollection<T>
/// instance.
/// @see TotalProducers
void detach_producer() {
std::lock_guard<LockType> guard(lock_);
not_full_condition_var_.detach();
}
/// Adds the given element value to the BlockingCollection<T>.
/// The new element is initialized as a copy of value.
/// If a bounded capacity was specified when this instance of
/// BlockingCollection<T> was initialized,
/// a call to Add may block until space is available to store the
/// provided item.
/// @param value the value of the element to add
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
BlockingCollectionStatus add(const T& value) {
return try_emplace_timed(std::chrono::milliseconds(-1), value);
}
/// Adds the given element value to the BlockingCollection<T>.
/// Value is moved into the new element.
/// If a bounded capacity was specified when this instance of
/// BlockingCollection<T> was initialized,
/// a call to Add may block until space is available to store the
/// provided item.
/// @param value the value of the element to add
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
BlockingCollectionStatus add(T&& value) {
return try_emplace_timed(std::chrono::milliseconds(-1),
std::forward<T>(value));
}
/// Tries to add the given element value to the BlockingCollection<T>.
/// The new element is initialized as a copy of value.
/// If the collection is a bounded collection, and is full, this method
/// immediately returns without adding the item.
/// @param value the value of the element to try to add
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
BlockingCollectionStatus try_add(const T& value) {
return try_emplace_timed(std::chrono::milliseconds::zero(), value);
}
/// Tries to add the given element value to the BlockingCollection<T>.
/// Value is moved into the new element.
/// If the collection is a bounded collection, and is full, this
/// method immediately returns without adding the item.
/// @param value the value of the element to try to add
BlockingCollectionStatus try_add(T&& value) {
return try_emplace_timed(std::chrono::milliseconds::zero(),
std::forward<T>(value));
}
/// Tries to add the given element value to the BlockingCollection<T>
/// within the specified time period.
/// Value is moved into the new element.
/// @param value the value of the element to try to add
/// @param rel_time An object of type std::chrono::duration
/// representing the maximum time to spend waiting.
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
/// @see http://en.cppreference.com/w/cpp/chrono/duration
template<typename U, class Rep, class Period>
BlockingCollectionStatus try_add_timed(U&& value,
const std::chrono::duration<Rep, Period>& rel_time) {
return try_emplace_timed(rel_time, std::forward<U>(value));
}
/// Adds new element to the BlockingCollection<T>.
/// The arguments args... are forwarded to the constructor as
/// std::forward<Args>(args)....If a bounded capacity was specified
/// when this instance of BlockingCollection<T> was initialized,
/// a call to Emplace may block until space is available to store the
/// provided item.
/// @param args arguments to forward to the constructor of the element
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
template<typename... Args>
BlockingCollectionStatus emplace(Args&&... args) {
return try_emplace_timed(std::chrono::milliseconds(-1),
std::forward<Args>(args)...);
}
/// Tries to add new element to the BlockingCollection<T>.
/// The arguments args... are forwarded to the constructor as
/// std::forward<Args>(args)....
/// If the collection is a bounded collection, and is full, this method
/// immediately
/// returns without adding the item.
/// @param args arguments to forward to the constructor of the element
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
template<typename... Args>
BlockingCollectionStatus try_emplace(Args&&... args) {
return try_emplace_timed(std::chrono::milliseconds::zero(),
std::forward<Args>(args)...);
}
/// Tries to add the given element value to the BlockingCollection<T>
/// within the specified time period.
/// The arguments args... are forwarded to the constructor as
/// std::forward<Args>(args)....
/// If the collection is a bounded collection, and is full, this
/// method immediately returns without adding the item.
/// @param args arguments to forward to the constructor of the element
/// @param rel_time An object of type std::chrono::duration
/// representing the maximum time to spend waiting.
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
/// @see http://en.cppreference.com/w/cpp/chrono/duration
template<class Rep, class Period, typename... Args>
BlockingCollectionStatus try_emplace_timed(
const std::chrono::duration<Rep, Period>& rel_time,
Args&&... args) {
{
std::unique_lock<LockType> guard(lock_);
auto status = wait_not_full_condition(guard, rel_time);
if (BlockingCollectionStatus::Ok != status)
return status;
if (!container_.try_emplace(std::forward<Args>(args)...))
return BlockingCollectionStatus::InternalError;
signal(container_.size(), false);
}
return BlockingCollectionStatus::Ok;
}
/// Removes an item from the BlockingCollection<T>.
/// A call to Take may block until an item is available to be removed.
/// @param[out] item The item removed from the collection.
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
BlockingCollectionStatus take(T& item) {
return try_take(item, std::chrono::milliseconds(-1));
}
/// Tries to remove an item from the BlockingCollection<T>.
/// If the collection is empty, this method immediately returns without
/// taking an item.
/// @param[out] item The item removed from the collection.
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
BlockingCollectionStatus try_take(T& item) {
return try_take(item, std::chrono::milliseconds::zero());
}
/// Tries to remove an item from the BlockingCollection<T> in the
/// specified time period.
/// @param[out] item The item removed from the collection.
/// @param rel_time An object of type std::chrono::duration
/// representing the maximum time to spend waiting.
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
/// @see http://en.cppreference.com/w/cpp/chrono/duration
template<class Rep, class Period> BlockingCollectionStatus
try_take(T& item, const std::chrono::duration<Rep, Period>& rel_time) {
{
std::unique_lock<LockType> guard(lock_);
auto status = wait_not_empty_condition(guard, rel_time);
if (BlockingCollectionStatus::Ok != status)
return status;
if (!container_.try_take(item))
return BlockingCollectionStatus::InternalError;
signal(container_.size(), true);
}
return BlockingCollectionStatus::Ok;
}
/// Adds the items from range [first, last] to the
/// BlockingCollection<T>.
/// If a bounded capacity was specified when this instance of
/// BlockingCollection<T> was initialized,
/// a call to Add may block until space is available to store the
/// provided items.
/// Use std::make_move_iterator if the elements should be moved
/// instead of copied.
/// @param first The start range of elements to insert.
/// @param last The end range of elements to insert.
/// @param [out] added The actual number of elements added.
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
template <typename Iterator> BlockingCollectionStatus
add_bulk(Iterator first, Iterator last, size_t& added) {
return try_add_bulk(first, last, added,
std::chrono::milliseconds(-1));
}
/// Tries to add the items from range [first, last] to the
/// BlockingCollection<T>.
/// If the collection is a bounded collection, and is full, this method
/// immediately returns without adding the items.
/// Use std::make_move_iterator if the elements should be moved
/// instead of copied.
/// @param first The start range of elements to insert.
/// @param last The end range of elements to insert.
/// @param [out] added The actual number of elements added.
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
template <typename Iterator> BlockingCollectionStatus
try_add_bulk(Iterator first, Iterator last, size_t& added) {
return try_add_bulk(first, last, added,
std::chrono::milliseconds::zero());
}
/// Tries to add the specified items from the range [first, last] to
/// the BlockingCollection<T> within
/// the specified time period.
/// Use std::make_move_iterator if the elements should be moved
/// instead of copied.
/// @param first The start range of elements to insert.
/// @param last The end range of elements to insert.
/// @param [out] added The actual number of elements added.
/// @param rel_time An object of type std::chrono::duration representing
/// the maximum time to spend waiting.
/// @return A BlockCollectionStatus code.
/// @see BlockingCollectionStatus
/// @see http://en.cppreference.com/w/cpp/chrono/durations
template<typename Iterator, class Rep, class Period>
BlockingCollectionStatus try_add_bulk(Iterator first, Iterator last,
size_t& added, const std::chrono::duration<Rep, Period>& rel_time) {
{
added = 0;
std::unique_lock<LockType> guard(lock_);
auto status = wait_not_full_condition(guard, rel_time);
if (BlockingCollectionStatus::Ok != status)
return status;