Skip to content

Commit

Permalink
feat: Track return type of FairMQ msg factories
Browse files Browse the repository at this point in the history
  • Loading branch information
dennisklein committed Apr 13, 2022
1 parent a4d8466 commit 7dd9d87
Show file tree
Hide file tree
Showing 32 changed files with 101 additions and 110 deletions.
2 changes: 1 addition & 1 deletion base/MQ/devices/BaseMQFileSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BaseMQFileSink
{
int receivedMsg = 0;
while (!NewStatePending()) {
std::unique_ptr<FairMQMessage> msg(NewMessage());
auto msg(NewMessage());
if (Receive(msg, fInputChanName) > 0) {
FairMQDevice::Deserialize<typename InputPolicy::DeserializerType>(
*msg, InputPolicy::fInput); // get data from message.
Expand Down
2 changes: 1 addition & 1 deletion base/MQ/devices/FairMQLmdSampler.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class FairMQLmdSampler : public FairMQDevice
FairMQParts parts;

// send header
// std::unique_ptr<FairMQMessage> header(fTransportFactory->CreateMessage(fSubEvent, sizeof(fSubEvent),
// auto header(fTransportFactory->CreateMessage(fSubEvent, sizeof(fSubEvent),
// free_buffer, nullptr)); fChannels.at(chanName).at(0).SendPart(header);

int* arraySize = new int(sebuflength);
Expand Down
2 changes: 1 addition & 1 deletion base/MQ/devices/FairMQSampler.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class FairMQSampler : public FairMQDevice
{
uint64_t numAcks = 0;
for (Long64_t eventNr = 0; eventNr < fNumEvents; ++eventNr) {
FairMQMessagePtr ack(NewMessage());
auto ack(NewMessage());
if (Receive(ack, fAckChannelName) >= 0) {
++numAcks;
}
Expand Down
6 changes: 3 additions & 3 deletions base/MQ/devices/FairMQUnpacker.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
Expand Down Expand Up @@ -108,8 +108,8 @@ class FairMQUnpacker : public FairMQDevice
FairMQChannel& inputChannel = fChannels.at(fInputChannelName).at(0);

while (!NewStatePending()) {
FairMQMessagePtr msgSize(NewMessage());
FairMQMessagePtr msg(NewMessage());
auto msgSize(NewMessage());
auto msg(NewMessage());

if (inputChannel.Receive(msgSize) >= 0) {
if (inputChannel.Receive(msg) >= 0) {
Expand Down
5 changes: 3 additions & 2 deletions examples/MQ/histogramServer/FairMQExHistoDevice.cxx
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/

#include "FairMQExHistoDevice.h"

#include "RootSerializer.h"
Expand Down Expand Up @@ -51,7 +52,7 @@ bool FairMQExHistoDevice::ConditionalRun()
y = r * TMath::Sin(phi);
fh_histo4.Fill(x, y);

FairMQMessagePtr message(NewMessage());
auto message(NewMessage());
RootSerializer().Serialize(*message, &fArrayHisto);

for (auto& channel : fChannels) {
Expand Down
4 changes: 2 additions & 2 deletions examples/MQ/parameters/FairMQExParamsClient.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ bool FairMQExParamsClient::ConditionalRun()

// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr req(NewSimpleMessage(fParameterName + "," + to_string(fRunId)));
FairMQMessagePtr rep(NewMessage());
auto req(NewSimpleMessage(fParameterName + "," + to_string(fRunId)));
auto rep(NewMessage());

if (Send(req, "data") > 0) {
if (Receive(rep, "data") >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ bool FairMQPixAltFileSinkBin::StoreData(FairMQParts& parts, int /*index*/)
}

if (fAckChannelName != "") {
unique_ptr<FairMQMessage> msg(NewMessage());
auto msg(NewMessage());
Send(msg, fAckChannelName);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,14 @@ bool FairMQPixAltSamplerBin::ReadBinFile()
header->fRunId = head[0];
header->fMCEntryNo = head[1];
header->fPartNo = head[2];
FairMQMessagePtr msgHeader(
NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
delete static_cast<PixelPayload::EventHeader*>(data);
}));
auto msgHeader(NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
delete static_cast<PixelPayload::EventHeader*>(data);
}));
parts.AddPart(std::move(msgHeader));

size_t digisSize = head[3] * sizeof(PixelPayload::Digi);

FairMQMessagePtr msgDigis(NewMessage(digisSize));
auto msgDigis(NewMessage(digisSize));

PixelPayload::Digi* digiPayload = static_cast<PixelPayload::Digi*>(msgDigis->GetData());

Expand Down Expand Up @@ -203,14 +202,14 @@ bool FairMQPixAltSamplerBin::ReadRootFile()
header->fRunId = fEventHeader->fRunId;
header->fMCEntryNo = fEventHeader->fMCEntryNo;
header->fPartNo = fEventHeader->fPartNo;
FairMQMessagePtr msgHeader(NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
auto msgHeader(NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
delete static_cast<PixelPayload::EventHeader*>(data);
}));
parts.AddPart(std::move(msgHeader));

size_t digisSize = sizeof(PixelPayload::Digi) * fDigiArray->size();

FairMQMessagePtr msgDigis(NewMessage(digisSize));
auto msgDigis(NewMessage(digisSize));
PixelPayload::Digi* digiPayload = static_cast<PixelPayload::Digi*>(msgDigis->GetData());

for (int idigi = 0; idigi < (int)fDigiArray->size(); idigi++) {
Expand Down Expand Up @@ -247,7 +246,7 @@ void FairMQPixAltSamplerBin::ListenForAcks()
{
if (fAckChannelName != "") {
do {
FairMQMessagePtr ack(NewMessage());
auto ack(NewMessage());
if (Receive(ack, fAckChannelName) >= 0) {
fNofRecAcks++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,15 @@ class FairMQPixAltTaskProcessorBin : public FairMQDevice
header->fRunId = payloadE->fRunId;
header->fMCEntryNo = payloadE->fMCEntryNo;
header->fPartNo = payloadE->fPartNo;
FairMQMessagePtr msgHeader(
NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
delete static_cast<PixelPayload::EventHeader*>(data);
}));
auto msgHeader(NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
delete static_cast<PixelPayload::EventHeader*>(data);
}));
partsOut.AddPart(std::move(msgHeader));

// create part with hits
int hitsSize = nofDigis * sizeof(PixelPayload::Hit);

FairMQMessagePtr msgTCA = NewMessage(hitsSize);
auto msgTCA = NewMessage(hitsSize);

PixelPayload::Hit* hitPayload = static_cast<PixelPayload::Hit*>(msgTCA->GetData());

Expand Down Expand Up @@ -172,12 +171,11 @@ class FairMQPixAltTaskProcessorBin : public FairMQDevice
std::string* reqStr = new std::string(paramName + "," + std::to_string(fCurrentRunId));
LOG(warn) << "Requesting parameter \"" << paramName << "\" for Run ID " << fCurrentRunId << " (" << thisPar
<< ")";
FairMQMessagePtr req(NewMessage(
const_cast<char*>(reqStr->c_str()),
reqStr->length(),
[](void* /*data*/, void* obj) { delete static_cast<std::string*>(obj); },
reqStr));
FairMQMessagePtr rep(NewMessage());
auto req(NewMessage(const_cast<char*>(reqStr->c_str()),
reqStr->length(),
[](void* /*data*/, void* obj) { delete static_cast<std::string*>(obj); },
reqStr));
auto rep(NewMessage());

if (Send(req, fParamChannelName) > 0) {
if (Receive(rep, fParamChannelName) > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ bool FairMQPixelFileSink::StoreData(FairMQParts& parts, int /*index*/)
tempObjects.clear();

if (fAckChannelName != "") {
unique_ptr<FairMQMessage> msg(NewMessage());
auto msg(NewMessage());
Send(msg, fAckChannelName);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ bool FairMQPixelFileSinkBin::StoreData(FairMQParts& parts, int /*index*/)
}

if (fAckChannelName != "") {
FairMQMessagePtr msg(NewMessage());
auto msg(NewMessage());
Send(msg, fAckChannelName);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ bool FairMQPixelMerger::MergeData(FairMQParts& parts, int /*index*/)
FairMQMessagePtr messageTCA[10];
FairMQParts partsOut;

FairMQMessagePtr messFEH(NewMessage());
auto messFEH(NewMessage());
RootSerializer().Serialize(*messFEH, fEventHeader);
partsOut.AddPart(std::move(messFEH));
for (int iarray = 0; iarray < nofArrays; iarray++) {
Expand Down
4 changes: 2 additions & 2 deletions examples/MQ/pixelDetector/src/devices/FairMQPixelSampler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ bool FairMQPixelSampler::ConditionalRun()
FairMQParts parts;

for (int iobj = 0; iobj < fNObjects; iobj++) {
FairMQMessagePtr mess(NewMessage());
auto mess(NewMessage());
RootSerializer().Serialize(*mess, fInputObjects[iobj]);
parts.AddPart(std::move(mess));
}
Expand All @@ -130,7 +130,7 @@ void FairMQPixelSampler::ListenForAcks()
if (fAckChannelName != "") {
Long64_t numAcks = 0;
do {
unique_ptr<FairMQMessage> ack(NewMessage());
auto ack(NewMessage());
if (Receive(ack, fAckChannelName) >= 0) {
numAcks++;
}
Expand Down
11 changes: 5 additions & 6 deletions examples/MQ/pixelDetector/src/devices/FairMQPixelSamplerBin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,17 @@ bool FairMQPixelSamplerBin::ConditionalRun()
header->fRunId = ((FairEventHeader*)fInputObjects[iobj])->GetRunId();
header->fMCEntryNo = ((FairEventHeader*)fInputObjects[iobj])->GetMCEntryNumber();
header->fPartNo = 0;
FairMQMessagePtr msgHeader(
NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
delete static_cast<PixelPayload::EventHeader*>(data);
}));
auto msgHeader(NewMessage(header, sizeof(PixelPayload::EventHeader), [](void* data, void* /*hint*/) {
delete static_cast<PixelPayload::EventHeader*>(data);
}));
parts.AddPart(std::move(msgHeader));
// LOG(debug) << "-----------------------------";
// LOG(debug) << "first part has size = " << sizeof(PixelPayload::EventHeader);
} else {
Int_t nofEntries = ((TClonesArray*)fInputObjects[iobj])->GetEntries();
size_t digisSize = nofEntries * sizeof(PixelPayload::Digi);

FairMQMessagePtr msgTCA(NewMessage(digisSize));
auto msgTCA(NewMessage(digisSize));

PixelPayload::Digi* digiPayload = static_cast<PixelPayload::Digi*>(msgTCA->GetData());

Expand Down Expand Up @@ -155,7 +154,7 @@ void FairMQPixelSamplerBin::ListenForAcks()
if (fAckChannelName != "") {
Long64_t numAcks = 0;
do {
FairMQMessagePtr ack(NewMessage());
auto ack(NewMessage());
if (Receive(ack, fAckChannelName) >= 0) {
numAcks++;
}
Expand Down
19 changes: 9 additions & 10 deletions examples/MQ/pixelDetector/src/devices/FairMQPixelTaskProcessor.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
Expand Down Expand Up @@ -120,17 +120,17 @@ class FairMQPixelTaskProcessor : public FairMQDevice
FairMQParts partsOut;

if (fEventHeader) {
FairMQMessagePtr mess(NewMessage());
auto mess(NewMessage());
RootSerializer().Serialize(*mess, fEventHeader);
partsOut.AddPart(std::move(mess));
} else if (fMCEventHeader) {
FairMQMessagePtr mess(NewMessage());
auto mess(NewMessage());
RootSerializer().Serialize(*mess, fMCEventHeader);
partsOut.AddPart(std::move(mess));
}

for (int iobj = 0; iobj < fOutput->GetEntries(); iobj++) {
FairMQMessagePtr mess(NewMessage());
auto mess(NewMessage());
RootSerializer().Serialize(*mess, fOutput->At(iobj));
partsOut.AddPart(std::move(mess));
}
Expand Down Expand Up @@ -200,12 +200,11 @@ class FairMQPixelTaskProcessor : public FairMQDevice
LOG(debug) << "Requesting parameter \"" << paramName << "\" for Run ID " << fCurrentRunId << " (" << thisPar
<< ")";

FairMQMessagePtr req(NewMessage(
const_cast<char*>(reqStr->c_str()),
reqStr->length(),
[](void* /* data */, void* hint) { delete static_cast<std::string*>(hint); },
reqStr));
FairMQMessagePtr rep(NewMessage());
auto req(NewMessage(const_cast<char*>(reqStr->c_str()),
reqStr->length(),
[](void* /* data */, void* hint) { delete static_cast<std::string*>(hint); },
reqStr));
auto rep(NewMessage());

if (Send(req, fParamChannelName) > 0) {
if (Receive(rep, fParamChannelName) > 0) {
Expand Down
25 changes: 12 additions & 13 deletions examples/MQ/pixelDetector/src/devices/FairMQPixelTaskProcessorBin.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* Copyright (C) 2014-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
Expand Down Expand Up @@ -124,19 +124,19 @@ class FairMQPixelTaskProcessorBin : public FairMQDevice
header->fMCEntryNo = payloadE->fMCEntryNo;
header->fPartNo = payloadE->fPartNo;

FairMQMessagePtr msgHeader(NewMessage(
header,
sizeof(PixelPayload::EventHeader),
[](void* data, void* /*hint*/) { delete static_cast<PixelPayload::EventHeader*>(data); },
nullptr));
auto msgHeader(
NewMessage(header,
sizeof(PixelPayload::EventHeader),
[](void* data, void* /*hint*/) { delete static_cast<PixelPayload::EventHeader*>(data); },
nullptr));
partsOut.AddPart(std::move(msgHeader));

for (int iobj = 0; iobj < fOutput->GetEntries(); iobj++) {
if (strcmp(fOutput->At(iobj)->GetName(), "PixelHits") == 0) {
Int_t nofEntries = ((TClonesArray*)fOutput->At(iobj))->GetEntries();
size_t hitsSize = nofEntries * sizeof(PixelPayload::Hit);

FairMQMessagePtr msgTCA(NewMessage(hitsSize));
auto msgTCA(NewMessage(hitsSize));

PixelPayload::Hit* hitPayload = static_cast<PixelPayload::Hit*>(msgTCA->GetData());

Expand Down Expand Up @@ -222,12 +222,11 @@ class FairMQPixelTaskProcessorBin : public FairMQDevice
std::string* reqStr = new std::string(paramName + "," + std::to_string(fCurrentRunId));
LOG(warn) << "Requesting parameter \"" << paramName << "\" for Run ID " << fCurrentRunId << " (" << thisPar
<< ")";
FairMQMessagePtr req(NewMessage(
const_cast<char*>(reqStr->c_str()),
reqStr->length(),
[](void* /* data */, void* hint) { delete static_cast<std::string*>(hint); },
reqStr));
FairMQMessagePtr rep(NewMessage());
auto req(NewMessage(const_cast<char*>(reqStr->c_str()),
reqStr->length(),
[](void* /* data */, void* hint) { delete static_cast<std::string*>(hint); },
reqStr));
auto rep(NewMessage());

if (Send(req, fParamChannelName) > 0) {
if (Receive(rep, fParamChannelName) > 0) {
Expand Down
8 changes: 4 additions & 4 deletions examples/MQ/pixelDetector/src/devices/FairMQRunDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ std::mutex mtx; // mutex for critical section

void FairMQRunDevice::SendObject(TObject* obj, const std::string& chan)
{
FairMQMessagePtr mess(NewMessage());
auto mess(NewMessage());
RootSerializer().Serialize(*mess, obj);

FairMQMessagePtr rep(NewMessage());
auto rep(NewMessage());

printf("sending %s", obj->GetName());
if (Send(mess, chan) > 0) {
Expand Down Expand Up @@ -79,7 +79,7 @@ void FairMQRunDevice::SendBranches()
TObject* objClone = (*mcTrackArray)->Clone();
LOG(debug) << "FairMQRunDevice::SendBranches() the track array has "
<< ((TClonesArray*)(objClone))->GetEntries() << " entries.";
FairMQMessagePtr mess(NewMessage());
auto mess(NewMessage());
RootSerializer().Serialize(*mess, objClone);
parts.AddPart(std::move(mess));
LOG(debug) << "channel >" << mi.first.data() << "< --> >" << ObjStr->GetString().Data()
Expand All @@ -94,7 +94,7 @@ void FairMQRunDevice::SendBranches()
TObject* object = FairRootManager::Instance()->GetObject(ObjStr->GetString());
if (object) {
TObject* objClone = object->Clone();
FairMQMessagePtr mess(NewMessage());
auto mess(NewMessage());
RootSerializer().Serialize(*mess, objClone);
parts.AddPart(std::move(mess));
LOG(debug) << "channel >" << mi.first.data() << "< --> >" << ObjStr->GetString().Data() << "<";
Expand Down
11 changes: 5 additions & 6 deletions examples/MQ/pixelDetector/src/devices/FairMQSimDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ void FairMQSimDevice::InitializeRun()
// ----- ask the fParamMQServer ------------------------------------
// ----- receive the run number and sampler id ---------------------
std::string* askForRunNumber = new string("ReportSimDevice");
FairMQMessagePtr req(NewMessage(
const_cast<char*>(askForRunNumber->c_str()),
askForRunNumber->length(),
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
askForRunNumber));
FairMQMessagePtr rep(NewMessage());
auto req(NewMessage(const_cast<char*>(askForRunNumber->c_str()),
askForRunNumber->length(),
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
askForRunNumber));
auto rep(NewMessage());

unsigned int runId = 0;
if (Send(req, fUpdateChannelName) > 0) {
Expand Down
Loading

0 comments on commit 7dd9d87

Please sign in to comment.