Skip to content

Commit ccbf1cf

Browse files
committed
Measure replication time in weak consistency mode, separate ACK and replication time metrics
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.net>
1 parent 2ede3b5 commit ccbf1cf

13 files changed

+129
-127
lines changed

src/groups/mqb/mqba/mqba_clientsession.cpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1673,14 +1673,6 @@ void ClientSession::onAckEvent(const mqbi::DispatcherAckEvent& event)
16731673
queue_p->stats()
16741674
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(
16751675
timeDelta);
1676-
if (queue_p->partitionId() !=
1677-
mqbs::DataStore::k_INVALID_PARTITION_ID) {
1678-
queue_p->domain()->cluster()->stats().onPartitionEvent(
1679-
mqbstat::ClusterStats::PartitionEventType::
1680-
e_PARTITION_REPLICATION,
1681-
queue_p->partitionId(),
1682-
timeDelta);
1683-
}
16841676

16851677
if (!d_isClientGeneratingGUIDs) {
16861678
// Legacy client.

src/groups/mqb/mqbblp/mqbblp_localqueue.cpp

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <mqbblp_queuehandlecatalog.h>
2222
#include <mqbblp_rootqueueengine.h>
2323
#include <mqbblp_storagemanager.h>
24+
#include <mqbcfg_brokerconfig.h>
2425
#include <mqbcmd_messages.h>
2526
#include <mqbi_domain.h>
2627
#include <mqbi_queueengine.h>
@@ -471,7 +472,8 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
471472
static_cast<unsigned int>(appData->length()),
472473
translation,
473474
putHeader.compressionAlgorithmType(),
474-
!d_haveStrongConsistency,
475+
false,
476+
d_haveStrongConsistency,
475477
doAck ? source : 0,
476478
putHeader.crc32c(),
477479
timePoint); // Arrival Timepoint
@@ -488,28 +490,17 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader,
488490

489491
// Send acknowledgement if post failed or if ack was requested (both could
490492
// be true as well).
491-
if (res != mqbi::StorageResult::e_SUCCESS || haveReceipt) {
492-
// Calculate time delta between PUT and ACK
493-
const bsls::Types::Int64 timeDelta =
494-
bmqsys::Time::highResolutionTimer() - timePoint;
495-
d_state_p->stats()
496-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(
497-
timeDelta);
498-
d_state_p->queue()->domain()->cluster()->stats().onPartitionEvent(
499-
mqbstat::ClusterStats::PartitionEventType::e_PARTITION_REPLICATION,
500-
d_state_p->partitionId(),
501-
timeDelta);
502-
if (res != mqbi::StorageResult::e_SUCCESS || doAck) {
503-
bmqp::AckMessage ackMessage;
504-
ackMessage
505-
.setStatus(bmqp::ProtocolUtil::ackResultToCode(
506-
mqbi::StorageResult::toAckResult(res)))
507-
.setMessageGUID(putHeader.messageGUID());
508-
// CorrelationId & QueueId are left unset as those fields will
509-
// be filled downstream.
510-
511-
source->onAckMessage(ackMessage);
512-
}
493+
if (res != mqbi::StorageResult::e_SUCCESS || haveReceipt ||
494+
!d_haveStrongConsistency) {
495+
bmqp::AckMessage ackMessage;
496+
ackMessage
497+
.setStatus(bmqp::ProtocolUtil::ackResultToCode(
498+
mqbi::StorageResult::toAckResult(res)))
499+
.setMessageGUID(putHeader.messageGUID());
500+
// CorrelationId & QueueId are left unset as those fields will
501+
// be filled downstream.
502+
503+
source->onAckMessage(ackMessage);
513504
}
514505

515506
if (BSLS_PERFORMANCEHINT_PREDICT_LIKELY(res ==
@@ -555,21 +546,9 @@ void LocalQueue::onPushMessage(
555546
"onPushMessage should not be called on LocalQueue");
556547
}
557548

558-
void LocalQueue::onReceipt(const bmqt::MessageGUID& msgGUID,
559-
mqbi::QueueHandle* qH,
560-
const bsls::Types::Int64& arrivalTimepoint)
549+
void LocalQueue::onReceipt(const bmqt::MessageGUID& msgGUID,
550+
mqbi::QueueHandle* qH)
561551
{
562-
// Calculate time delta between PUT and ACK
563-
const bsls::Types::Int64 timeDelta = bmqsys::Time::highResolutionTimer() -
564-
arrivalTimepoint;
565-
566-
d_state_p->stats()
567-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ACK_TIME>(timeDelta);
568-
d_state_p->queue()->domain()->cluster()->stats().onPartitionEvent(
569-
mqbstat::ClusterStats::PartitionEventType::e_PARTITION_REPLICATION,
570-
d_state_p->partitionId(),
571-
timeDelta);
572-
573552
if (d_state_p->handleCatalog().hasHandle(qH)) {
574553
// Send acknowledgement
575554
bmqp::AckMessage ackMessage;

src/groups/mqb/mqbblp/mqbblp_localqueue.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,13 +180,10 @@ class LocalQueue BSLS_CPP11_FINAL {
180180

181181
/// Invoked by the Data Store when it receives quorum Receipts for the
182182
/// specified `msgGUID`. Send ACK to the specified `qH` if it is
183-
/// present in the queue handle catalog. Update ACK time stats using
184-
/// the specified `arrivalTimepoint`.
183+
/// present in the queue handle catalog.
185184
///
186185
/// THREAD: This method is called from the Storage dispatcher thread.
187-
void onReceipt(const bmqt::MessageGUID& msgGUID,
188-
mqbi::QueueHandle* qH,
189-
const bsls::Types::Int64& arrivalTimepoint);
186+
void onReceipt(const bmqt::MessageGUID& msgGUID, mqbi::QueueHandle* qH);
190187

191188
/// Invoked by the Data Store when it removes (times out waiting for
192189
/// quorum Receipts for) a message with the specified `msgGUID`. Send

src/groups/mqb/mqbblp/mqbblp_queue.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -565,13 +565,12 @@ void Queue::onOpenUpstream(bsls::Types::Uint64 genCount,
565565
}
566566
}
567567

568-
void Queue::onReceipt(const bmqt::MessageGUID& msgGUID,
569-
mqbi::QueueHandle* queueHandle,
570-
const bsls::Types::Int64& arrivalTimepoint)
568+
void Queue::onReceipt(const bmqt::MessageGUID& msgGUID,
569+
mqbi::QueueHandle* queueHandle)
571570
{
572571
BSLS_ASSERT_SAFE(d_localQueue_mp);
573572

574-
d_localQueue_mp->onReceipt(msgGUID, queueHandle, arrivalTimepoint);
573+
d_localQueue_mp->onReceipt(msgGUID, queueHandle);
575574
}
576575

577576
void Queue::onRemoval(const bmqt::MessageGUID& msgGUID,

src/groups/mqb/mqbblp/mqbblp_queue.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -337,10 +337,8 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue {
337337
/// the specified `arrivalTimepoint`.
338338
///
339339
/// THREAD: This method is called from the Storage dispatcher thread.
340-
void onReceipt(const bmqt::MessageGUID& msgGUID,
341-
mqbi::QueueHandle* queueHandle,
342-
const bsls::Types::Int64& arrivalTimepoint)
343-
BSLS_KEYWORD_OVERRIDE;
340+
void onReceipt(const bmqt::MessageGUID& msgGUID,
341+
mqbi::QueueHandle* queueHandle) BSLS_KEYWORD_OVERRIDE;
344342

345343
/// Invoked by the Data Store when it removes (times out waiting for
346344
/// quorum Receipts for) a message with the specified `msgGUID`. Send

src/groups/mqb/mqbi/mqbi_queue.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -869,13 +869,11 @@ class Queue : public DispatcherClient {
869869

870870
/// Invoked by the Data Store when it receives quorum Receipts for the
871871
/// specified `msgGUID`. Send ACK to the specified `queueHandle` if it
872-
/// is present in the queue handle catalog. Update AVK time stats using
873-
/// the specified `arrivalTimepoint`.
872+
/// is present in the queue handle catalog..
874873
///
875874
/// THREAD: This method is called from the Queue's dispatcher thread.
876-
virtual void onReceipt(const bmqt::MessageGUID& msgGUID,
877-
mqbi::QueueHandle* queueHandle,
878-
const bsls::Types::Int64& arrivalTimepoint) = 0;
875+
virtual void onReceipt(const bmqt::MessageGUID& msgGUID,
876+
mqbi::QueueHandle* queueHandle) = 0;
879877

880878
/// Invoked by the Data Store when it removes (times out waiting for
881879
/// quorum Receipts for) a message with the specified `msgGUID`. Send

src/groups/mqb/mqbi/mqbi_storage.h

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ class StorageMessageAttributes {
175175

176176
bool d_hasReceipt;
177177

178+
bool d_strongConsistency;
179+
178180
mqbi::QueueHandle* d_queueHandle;
179181

180182
unsigned int d_crc32c;
@@ -214,10 +216,11 @@ class StorageMessageAttributes {
214216
unsigned int refCount,
215217
const bmqp::MessagePropertiesInfo& messagePropertiesInfo,
216218
bmqt::CompressionAlgorithmType::Enum compressionAlgorithmType,
217-
bool hasReceipt = true,
218-
mqbi::QueueHandle* queueHandle = 0,
219-
unsigned int crc32c = 0,
220-
bsls::Types::Int64 arrivalTimepoint = 0);
219+
bool hasReceipt = true,
220+
bool strongConsistency = true,
221+
mqbi::QueueHandle* queueHandle = 0,
222+
unsigned int crc32c = 0,
223+
bsls::Types::Int64 arrivalTimepoint = 0);
221224

222225
// MANIPULATORS
223226
StorageMessageAttributes& setArrivalTimestamp(bsls::Types::Uint64 value);
@@ -243,6 +246,7 @@ class StorageMessageAttributes {
243246
unsigned int appDataLen() const;
244247
const bmqp::MessagePropertiesInfo& messagePropertiesInfo() const;
245248
bool hasReceipt() const;
249+
bool strongConsistency() const;
246250
mqbi::QueueHandle* queueHandle() const;
247251

248252
/// Return the CRC32-C associated with this object.
@@ -788,6 +792,7 @@ inline StorageMessageAttributes::StorageMessageAttributes()
788792
, d_appDataLen(0)
789793
, d_messagePropertiesInfo()
790794
, d_hasReceipt(true)
795+
, d_strongConsistency(true)
791796
, d_queueHandle(0)
792797
, d_crc32c(0)
793798
, d_compressionAlgorithmType(bmqt::CompressionAlgorithmType::e_NONE)
@@ -801,6 +806,7 @@ inline StorageMessageAttributes::StorageMessageAttributes(
801806
const bmqp::MessagePropertiesInfo& messagePropertiesInfo,
802807
bmqt::CompressionAlgorithmType::Enum compressionAlgorithmType,
803808
bool hasReceipt,
809+
bool strongConsistency,
804810
mqbi::QueueHandle* queueHandle,
805811
unsigned int crc32c,
806812
bsls::Types::Int64 arrivalTimepoint)
@@ -810,6 +816,7 @@ inline StorageMessageAttributes::StorageMessageAttributes(
810816
, d_appDataLen(appDataLen)
811817
, d_messagePropertiesInfo(messagePropertiesInfo)
812818
, d_hasReceipt(hasReceipt)
819+
, d_strongConsistency(strongConsistency)
813820
, d_queueHandle(queueHandle)
814821
, d_crc32c(crc32c)
815822
, d_compressionAlgorithmType(compressionAlgorithmType)
@@ -921,6 +928,11 @@ inline bool StorageMessageAttributes::hasReceipt() const
921928
return d_hasReceipt;
922929
}
923930

931+
inline bool StorageMessageAttributes::strongConsistency() const
932+
{
933+
return d_strongConsistency;
934+
}
935+
924936
inline mqbi::QueueHandle* StorageMessageAttributes::queueHandle() const
925937
{
926938
return d_queueHandle;

src/groups/mqb/mqbmock/mqbmock_queue.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,8 +363,7 @@ void Queue::onOpenFailure(BSLA_UNUSED unsigned int subQueueId)
363363
}
364364

365365
void Queue::onReceipt(BSLA_UNUSED const bmqt::MessageGUID& msgGUID,
366-
BSLA_UNUSED mqbi::QueueHandle* qH,
367-
BSLA_UNUSED const bsls::Types::Int64& arrivalTimepoint)
366+
BSLA_UNUSED mqbi::QueueHandle* qH)
368367
{
369368
// NOTHING
370369
}

src/groups/mqb/mqbmock/mqbmock_queue.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,10 +324,8 @@ class Queue : public mqbi::Queue {
324324
/// Invoked by the Data Store when it receives quorum Receipts.
325325
///
326326
/// THREAD: This method is called from the Queue's dispatcher thread.
327-
void onReceipt(const bmqt::MessageGUID& msgGUID,
328-
mqbi::QueueHandle* qH,
329-
const bsls::Types::Int64& arrivalTimepoint)
330-
BSLS_KEYWORD_OVERRIDE;
327+
void onReceipt(const bmqt::MessageGUID& msgGUID,
328+
mqbi::QueueHandle* qH) BSLS_KEYWORD_OVERRIDE;
331329

332330
/// Invoked by the Data Store when it removes (times out waiting for
333331
/// quorum Receipts for) a message with the specified `msgGUID`. Send

src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,6 @@ FileBackedStorage::put(mqbi::StorageMessageAttributes* attributes,
374374
queue()
375375
->stats()
376376
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE>(
377-
378377
msgSize);
379378

380379
d_isEmpty.storeRelaxed(0);

0 commit comments

Comments
 (0)