66#include " bls/bls_worker.h"
77#include " hash.h"
88#include " serialize.h"
9+ #include " util/system.h"
910#include " util/threadnames.h"
1011
1112
@@ -61,7 +62,7 @@ CBLSWorker::~CBLSWorker()
6162
6263void CBLSWorker::Start ()
6364{
64- int workerCount = std::thread::hardware_concurrency () / 2 ;
65+ int workerCount = GetNumCores () / 2 ;
6566 workerCount = std::max (std::min (1 , workerCount), 4 );
6667 workerPool.resize (workerCount);
6768
@@ -86,7 +87,7 @@ bool CBLSWorker::GenerateContributions(int quorumThreshold, const BLSIdVector& i
8687 std::list<std::future<bool > > futures;
8788 size_t batchSize = 8 ;
8889
89- for (size_t i = 0 ; i < quorumThreshold; i += batchSize) {
90+ for (size_t i = 0 ; i < ( size_t ) quorumThreshold; i += batchSize) {
9091 size_t start = i;
9192 size_t count = std::min (batchSize, quorumThreshold - start);
9293 auto f = [&, start, count](int threadId) {
@@ -133,23 +134,23 @@ struct Aggregator {
133134 typedef T ElementType;
134135
135136 size_t batchSize{16 };
136- std::shared_ptr<std::vector<const T*> > inputVec;
137-
138- bool parallel;
139137 ctpl::thread_pool& workerPool;
138+ bool parallel;
139+
140+ std::shared_ptr<std::vector<const T*> > inputVec;
140141
141142 std::mutex m;
142143 // items in the queue are all intermediate aggregation results of finished batches.
143144 // The intermediate results must be deleted by us again (which we do in SyncAggregateAndPushAggQueue)
144145 boost::lockfree::queue<T*> aggQueue;
145146 std::atomic<size_t > aggQueueSize{0 };
146147
147- // keeps track of currently queued/in-progress batches. If it reaches 0, we are done
148- std::atomic<size_t > waitCount{0 };
149-
150148 typedef std::function<void (const T& agg)> DoneCallback;
151149 DoneCallback doneCallback;
152150
151+ // keeps track of currently queued/in-progress batches. If it reaches 0, we are done
152+ std::atomic<size_t > waitCount{0 };
153+
153154 // TP can either be a pointer or a reference
154155 template <typename TP>
155156 Aggregator (const std::vector<TP>& _inputVec,
@@ -338,14 +339,15 @@ struct VectorAggregator {
338339 typedef std::shared_ptr<VectorType> VectorPtrType;
339340 typedef std::vector<VectorPtrType> VectorVectorType;
340341 typedef std::function<void (const VectorPtrType& agg)> DoneCallback;
341- DoneCallback doneCallback;
342342
343343 const VectorVectorType& vecs;
344+ bool parallel;
344345 size_t start;
345346 size_t count;
346- bool parallel;
347+
347348 ctpl::thread_pool& workerPool;
348349
350+ DoneCallback doneCallback;
349351 std::atomic<size_t > doneCount;
350352
351353 VectorPtrType result;
@@ -764,13 +766,7 @@ std::future<bool> CBLSWorker::AsyncVerifyContributionShare(const CBLSId& forId,
764766 }
765767
766768 auto f = [this , &forId, &vvec, &skContribution](int threadId) {
767- CBLSPublicKey pk1;
768- if (!pk1.PublicKeyShare (*vvec, forId)) {
769- return false ;
770- }
771-
772- CBLSPublicKey pk2 = skContribution.GetPublicKey ();
773- return pk1 == pk2;
769+ return VerifyContributionShare (forId, vvec, skContribution);
774770 };
775771 return workerPool.push (f);
776772}
0 commit comments