|
| 1 | +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. |
| 2 | +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. |
| 3 | +// All rights not expressly granted are reserved. |
| 4 | +// |
| 5 | +// This software is distributed under the terms of the GNU General Public |
| 6 | +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". |
| 7 | +// |
| 8 | +// In applying this license CERN does not waive the privileges and immunities |
| 9 | +// granted to it by virtue of its status as an Intergovernmental Organization |
| 10 | +// or submit itself to any jurisdiction. |
| 11 | + |
| 12 | +/// @file TOFIntegrateClusterReaderSpec.cxx |
| 13 | + |
| 14 | +#include <vector> |
| 15 | +#include <boost/algorithm/string/predicate.hpp> |
| 16 | + |
| 17 | +#include "TOFWorkflowIO/TOFIntegrateClusterReaderSpec.h" |
| 18 | +#include "Framework/Task.h" |
| 19 | +#include "Framework/ControlService.h" |
| 20 | +#include "Framework/ConfigParamRegistry.h" |
| 21 | +#include "CommonUtils/NameConf.h" |
| 22 | +#include "CommonDataFormat/TFIDInfo.h" |
| 23 | +#include "Algorithm/RangeTokenizer.h" |
| 24 | +#include "TChain.h" |
| 25 | +#include "TGrid.h" |
| 26 | + |
| 27 | +using namespace o2::framework; |
| 28 | + |
| 29 | +namespace o2 |
| 30 | +{ |
| 31 | +namespace tof |
| 32 | +{ |
| 33 | + |
| 34 | +class IntegratedClusterReader : public Task |
| 35 | +{ |
| 36 | + public: |
| 37 | + IntegratedClusterReader() = default; |
| 38 | + ~IntegratedClusterReader() override = default; |
| 39 | + void init(InitContext& ic) final; |
| 40 | + void run(ProcessingContext& pc) final; |
| 41 | + |
| 42 | + private: |
| 43 | + void connectTrees(); |
| 44 | + |
| 45 | + int mChainEntry = 0; ///< processed entries in the chain |
| 46 | + std::unique_ptr<TChain> mChain; ///< input TChain |
| 47 | + std::vector<std::string> mFileNames; ///< input files |
| 48 | + std::vector<float> mTOFCNCl, *mTOFCNClPtr = &mTOFCNCl; ///< branch integrated number of cluster TOF currents |
| 49 | + std::vector<float> mTOFCqTot, *mTOFCqTotPtr = &mTOFCqTot; ///< branch integrated q TOF currents |
| 50 | + o2::dataformats::TFIDInfo mTFinfo, *mTFinfoPtr = &mTFinfo; ///< branch TFIDInfo for injecting correct time |
| 51 | + std::vector<std::pair<unsigned long, int>> mIndices; ///< firstTfOrbit, file, index |
| 52 | +}; |
| 53 | + |
| 54 | +void IntegratedClusterReader::init(InitContext& ic) |
| 55 | +{ |
| 56 | + const auto dontCheckFileAccess = ic.options().get<bool>("dont-check-file-access"); |
| 57 | + auto fileList = o2::RangeTokenizer::tokenize<std::string>(ic.options().get<std::string>("tofcurrents-infiles")); |
| 58 | + |
| 59 | + // check if only one input file (a txt file contaning a list of files is provided) |
| 60 | + if (fileList.size() == 1) { |
| 61 | + if (boost::algorithm::ends_with(fileList.front(), "txt")) { |
| 62 | + LOGP(info, "Reading files from input file {}", fileList.front()); |
| 63 | + std::ifstream is(fileList.front()); |
| 64 | + std::istream_iterator<std::string> start(is); |
| 65 | + std::istream_iterator<std::string> end; |
| 66 | + std::vector<std::string> fileNamesTmp(start, end); |
| 67 | + fileList = fileNamesTmp; |
| 68 | + } |
| 69 | + } |
| 70 | + |
| 71 | + const std::string inpDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("input-dir")); |
| 72 | + for (const auto& file : fileList) { |
| 73 | + if ((file.find("alien://") == 0) && !gGrid && !TGrid::Connect("alien://")) { |
| 74 | + LOG(fatal) << "Failed to open alien connection"; |
| 75 | + } |
| 76 | + const auto fileDir = o2::utils::Str::concat_string(inpDir, file); |
| 77 | + if (!dontCheckFileAccess) { |
| 78 | + std::unique_ptr<TFile> filePtr(TFile::Open(fileDir.data())); |
| 79 | + if (!filePtr || !filePtr->IsOpen() || filePtr->IsZombie()) { |
| 80 | + LOGP(warning, "Could not open file {}", fileDir); |
| 81 | + continue; |
| 82 | + } |
| 83 | + } |
| 84 | + mFileNames.emplace_back(fileDir); |
| 85 | + } |
| 86 | + |
| 87 | + if (mFileNames.size() == 0) { |
| 88 | + LOGP(error, "No input files to process"); |
| 89 | + } |
| 90 | + connectTrees(); |
| 91 | +} |
| 92 | + |
| 93 | +void IntegratedClusterReader::run(ProcessingContext& pc) |
| 94 | +{ |
| 95 | + // check time order inside the TChain |
| 96 | + if (mChainEntry == 0) { |
| 97 | + mIndices.clear(); |
| 98 | + mIndices.reserve(mChain->GetEntries()); |
| 99 | + for (unsigned long i = 0; i < mChain->GetEntries(); i++) { |
| 100 | + mChain->GetEntry(i); |
| 101 | + mIndices.emplace_back(std::make_pair(mTFinfo.firstTForbit, i)); |
| 102 | + } |
| 103 | + std::sort(mIndices.begin(), mIndices.end()); |
| 104 | + } |
| 105 | + |
| 106 | + LOGP(debug, "Processing entry {}", mIndices[mChainEntry].second); |
| 107 | + mChain->GetEntry(mIndices[mChainEntry++].second); |
| 108 | + |
| 109 | + // inject correct timing informations |
| 110 | + auto& timingInfo = pc.services().get<o2::framework::TimingInfo>(); |
| 111 | + timingInfo.firstTForbit = mTFinfo.firstTForbit; |
| 112 | + timingInfo.tfCounter = mTFinfo.tfCounter; |
| 113 | + timingInfo.runNumber = mTFinfo.runNumber; |
| 114 | + timingInfo.creation = mTFinfo.creation; |
| 115 | + |
| 116 | + pc.outputs().snapshot(Output{header::gDataOriginTOF, "ITOFCN"}, mTOFCNCl); |
| 117 | + pc.outputs().snapshot(Output{header::gDataOriginTOF, "ITOFCQ"}, mTOFCqTot); |
| 118 | + usleep(100); |
| 119 | + |
| 120 | + if (mChainEntry >= mChain->GetEntries()) { |
| 121 | + pc.services().get<ControlService>().endOfStream(); |
| 122 | + pc.services().get<ControlService>().readyToQuit(QuitRequest::Me); |
| 123 | + } |
| 124 | +} |
| 125 | + |
| 126 | +void IntegratedClusterReader::connectTrees() |
| 127 | +{ |
| 128 | + mChain.reset(new TChain("itofc")); |
| 129 | + for (const auto& file : mFileNames) { |
| 130 | + LOGP(info, "Adding file to chain: {}", file); |
| 131 | + mChain->AddFile(file.data()); |
| 132 | + } |
| 133 | + assert(mChain->GetEntries()); |
| 134 | + mChain->SetBranchAddress("ITOFCN", &mTOFCNClPtr); |
| 135 | + mChain->SetBranchAddress("ITOFCQ", &mTOFCqTotPtr); |
| 136 | + mChain->SetBranchAddress("tfID", &mTFinfoPtr); |
| 137 | +} |
| 138 | + |
| 139 | +DataProcessorSpec getTOFIntegrateClusterReaderSpec() |
| 140 | +{ |
| 141 | + std::vector<OutputSpec> outputs; |
| 142 | + outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCN", 0, Lifetime::Sporadic); |
| 143 | + outputs.emplace_back(o2::header::gDataOriginTOF, "ITOFCQ", 0, Lifetime::Sporadic); |
| 144 | + |
| 145 | + return DataProcessorSpec{ |
| 146 | + "tof-integrated-cluster-reader", |
| 147 | + Inputs{}, |
| 148 | + outputs, |
| 149 | + AlgorithmSpec{adaptFromTask<IntegratedClusterReader>()}, |
| 150 | + Options{ |
| 151 | + {"tofcurrents-infiles", VariantType::String, "o2currents_tof.root", {"comma-separated list of input files or .txt file containing list of input files"}}, |
| 152 | + {"input-dir", VariantType::String, "none", {"Input directory"}}, |
| 153 | + {"dont-check-file-access", VariantType::Bool, false, {"Deactivate check if all files are accessible before adding them to the list of files"}}, |
| 154 | + }}; |
| 155 | +} |
| 156 | + |
| 157 | +} // namespace tof |
| 158 | +} // namespace o2 |
0 commit comments