Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Framework/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"tasks": {
"QcTask": {
"active": "true",
"critical": "false", "": "if false the task is allowed to die without stopping the workflow, default: true",
"className": "o2::quality_control_modules::skeleton::SkeletonTask",
"moduleName": "QcSkeleton",
"detectorName": "TST",
Expand Down Expand Up @@ -105,8 +106,7 @@
"fraction": "0.1",
"seed": "1234"
}
],
"blocking": "false"
]
}
]
}
10 changes: 7 additions & 3 deletions Framework/include/QualityControl/InfrastructureGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,13 @@ class InfrastructureGenerator
static void generateMergers(framework::WorkflowSpec& workflow, const std::string& taskName,
size_t numberOfLocalMachines,
std::vector<std::pair<size_t, size_t>> cycleDurationSeconds,
const std::string& mergingMode, size_t resetAfterCycles,
std::string monitoringUrl, const std::string& detectorName,
std::vector<size_t> mergersPerLayer, bool enableMovingWindows);
const std::string& mergingMode,
size_t resetAfterCycles,
std::string monitoringUrl,
const std::string& detectorName,
std::vector<size_t> mergersPerLayer,
bool enableMovingWindows,
bool critical);
static void generateCheckRunners(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
static void generateAggregator(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
static void generatePostProcessing(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/PostProcessingConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct PostProcessingConfig {
std::string consulUrl;
core::Activity activity;
bool matchAnyRunNumber = false;
bool critical;
core::CustomParameters customParameters;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct PostProcessingTaskSpec {
std::string id = "Invalid";
std::string taskName = "Invalid";
bool active = true;
bool critical = true;
std::string detectorName = "Invalid";
boost::property_tree::ptree tree = {};
core::CustomParameters customParameters;
Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/TaskRunnerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct TaskRunnerConfig {
std::string className;
std::vector<std::pair<size_t, size_t>> cycleDurations = {};
int maxNumberCycles;
bool critical;
std::string consulUrl{};
std::string conditionUrl{};
std::string monitoringUrl{};
Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/TaskSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct TaskSpec {
DataSourceSpec dataSource;
// advanced
bool active = true;
bool critical = true;
int maxNumberCycles = -1;
size_t resetAfterCycles = 0;
std::string saveObjectsToFile;
Expand Down
1 change: 1 addition & 0 deletions Framework/postprocessing.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"postprocessing": {
"ExamplePostprocessing": {
"active": "true",
"critical": "false", "": "if false the task is allowed to die without stopping the workflow, default: true",
"className": "o2::quality_control_modules::skeleton::SkeletonPostProcessing",
"moduleName": "QcSkeleton",
"detectorName": "TST",
Expand Down
2 changes: 2 additions & 0 deletions Framework/src/AggregatorRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ DataProcessorSpec AggregatorRunnerFactory::create(const core::CommonSpec& common
};
newAggregatorRunner.labels.emplace_back(o2::framework::ecs::qcReconfigurable);
newAggregatorRunner.labels.emplace_back(AggregatorRunner::getLabel());
framework::DataProcessorLabel resilientLabel = { "resilient" };
newAggregatorRunner.labels.emplace_back(resilientLabel);
newAggregatorRunner.algorithm = adaptFromTask<AggregatorRunner>(std::move(aggregatorRunner));
return newAggregatorRunner;
}
Expand Down
3 changes: 2 additions & 1 deletion Framework/src/CheckRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ QualityObjectsType CheckRunner::check()
QualityObjectsType allQOs;
for (auto& [checkName, check] : mChecks) {
if (updatePolicyManager.isReady(check.getName())) {
ILOG(Debug, Support) << "Monitor Objects for the check '" << checkName << "' are ready --> check()" << ENDM;
auto newQOs = check.check(mMonitorObjects);
mTotalNumberCheckExecuted += newQOs.size();

Expand All @@ -366,7 +367,7 @@ QualityObjectsType CheckRunner::check()
// Was checked, update latest revision
updatePolicyManager.updateActorRevision(checkName);
} else {
ILOG(Info, Support) << "Monitor Objects for the check '" << checkName << "' are not ready, ignoring" << ENDM;
ILOG(Debug, Support) << "Monitor Objects for the check '" << checkName << "' are not ready, ignoring" << ENDM;
}
}
return allQOs;
Expand Down
3 changes: 2 additions & 1 deletion Framework/src/CheckRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig
options };
newCheckRunner.labels.emplace_back(o2::framework::ecs::qcReconfigurable);
newCheckRunner.labels.emplace_back(CheckRunner::getCheckRunnerLabel());
newCheckRunner.labels.emplace_back(framework::DataProcessorLabel{ "resilient" });
newCheckRunner.algorithm = adaptFromTask<CheckRunner>(std::move(qcCheckRunner));
return newCheckRunner;
}
Expand All @@ -61,7 +62,7 @@ DataProcessorSpec CheckRunnerFactory::createSinkDevice(const CheckRunnerConfig&
checkRunnerConfig.options,
{},
{ o2::framework::ecs::qcReconfigurable } };

newCheckRunner.labels.emplace_back(framework::DataProcessorLabel{ "resilient" });
return newCheckRunner;
}

Expand Down
28 changes: 18 additions & 10 deletions Framework/src/InfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ framework::WorkflowSpec InfrastructureGenerator::generateFullChainInfrastructure
bool enableMovingWindows = !taskSpec.movingWindows.empty();
generateMergers(workflow, taskSpec.taskName, 1, cycleDurationsMultiplied,
taskSpec.mergingMode, resetAfterCycles, infrastructureSpec.common.monitoringUrl,
taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows);
taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows, taskSpec.critical);
} else { // TaskLocationSpec::Remote
auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, taskSpec.resetAfterCycles);
workflow.emplace_back(TaskRunnerFactory::create(taskConfig));
Expand Down Expand Up @@ -277,9 +277,8 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
std::for_each(cycleDurationsMultiplied.begin(), cycleDurationsMultiplied.end(),
[taskSpec](std::pair<size_t, size_t>& p) { p.first *= taskSpec.mergerCycleMultiplier; });
bool enableMovingWindows = !taskSpec.movingWindows.empty();
generateMergers(workflow, taskSpec.taskName, numberOfLocalMachines, cycleDurationsMultiplied,
taskSpec.mergingMode, resetAfterCycles, infrastructureSpec.common.monitoringUrl,
taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows);
generateMergers(workflow, taskSpec.taskName, numberOfLocalMachines, cycleDurationsMultiplied, taskSpec.mergingMode,
resetAfterCycles, infrastructureSpec.common.monitoringUrl, taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows, taskSpec.critical);

} else if (taskSpec.location == TaskLocationSpec::Remote) {

Expand Down Expand Up @@ -550,6 +549,9 @@ void InfrastructureGenerator::generateLocalTaskLocalProxy(framework::WorkflowSpe
{ proxyInput },
channelConfig.c_str()));
workflow.back().labels.emplace_back(taskSpec.localControl == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
if (!taskSpec.critical) {
workflow.back().labels.emplace_back(framework::DataProcessorLabel{ "expendable" });
}
if (getenv("O2_QC_KILL_PROXIES") != nullptr) {
workflow.back().metadata.push_back(DataProcessorMetadata{ ecs::privateMemoryKillThresholdMB, proxyMemoryKillThresholdMB });
}
Expand Down Expand Up @@ -577,6 +579,9 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp
channelConfig.c_str(),
dplModelAdaptor());
proxy.labels.emplace_back(taskSpec.localControl == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
if (!taskSpec.critical) {
workflow.back().labels.emplace_back(framework::DataProcessorLabel{ "expendable" });
}
// if not in RUNNING, we should drop all the incoming messages, we set the corresponding proxy option.
enableDraining(proxy.options);
if (getenv("O2_QC_KILL_PROXIES") != nullptr) {
Expand All @@ -585,11 +590,9 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp
workflow.emplace_back(std::move(proxy));
}
void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, const std::string& taskName,
size_t numberOfLocalMachines,
std::vector<std::pair<size_t, size_t>> cycleDurations,
const std::string& mergingMode, size_t resetAfterCycles,
std::string monitoringUrl, const std::string& detectorName,
std::vector<size_t> mergersPerLayer, bool enableMovingWindows)
size_t numberOfLocalMachines, std::vector<std::pair<size_t, size_t>> cycleDurations,
const std::string& mergingMode, size_t resetAfterCycles, std::string monitoringUrl,
const std::string& detectorName, std::vector<size_t> mergersPerLayer, bool enableMovingWindows, bool critical)
{
Inputs mergerInputs;
for (size_t id = 1; id <= numberOfLocalMachines; id++) {
Expand Down Expand Up @@ -617,8 +620,9 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow,
mergerConfig.topologySize = { TopologySize::MergersPerLayer, mergersPerLayer };
mergerConfig.monitoringUrl = std::move(monitoringUrl);
mergerConfig.detectorName = detectorName;
mergerConfig.parallelismType = { (mergerConfig.inputObjectTimespan.value == InputObjectsTimespan::LastDifference) ? ParallelismType::RoundRobin : ParallelismType::SplitInputs };
mergerConfig.labels.push_back({ "resilient" });
mergerConfig.publishMovingWindow = { enableMovingWindows ? PublishMovingWindow::Yes : PublishMovingWindow::No };
mergerConfig.parallelismType = { (mergerConfig.inputObjectTimespan.value == InputObjectsTimespan::LastDifference) ? ParallelismType::RoundRobin : ParallelismType::SplitInputs };
mergersBuilder.setConfig(mergerConfig);

mergersBuilder.generateInfrastructure(workflow);
Expand Down Expand Up @@ -786,6 +790,10 @@ void InfrastructureGenerator::generatePostProcessing(WorkflowSpec& workflow, con
ppTask.getOptions()
};
dataProcessorSpec.labels.emplace_back(PostProcessingDevice::getLabel());
if (!ppTaskSpec.critical) {
framework::DataProcessorLabel expendableLabel = { "expendable" };
dataProcessorSpec.labels.emplace_back(expendableLabel);
}
dataProcessorSpec.algorithm = adaptFromTask<PostProcessingDevice>(std::move(ppTask));

workflow.emplace_back(std::move(dataProcessorSpec));
Expand Down
2 changes: 2 additions & 0 deletions Framework/src/InfrastructureSpecReader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ TaskSpec InfrastructureSpecReader::readSpecEntry<TaskSpec>(const std::string& ta
}
ts.dataSource = readSpecEntry<DataSourceSpec>(taskID, taskTree.get_child("dataSource"), wholeTree);
ts.active = taskTree.get<bool>("active", ts.active);
ts.critical = taskTree.get<bool>("critical", ts.critical);
ts.maxNumberCycles = taskTree.get<int>("maxNumberCycles", ts.maxNumberCycles);
ts.resetAfterCycles = taskTree.get<size_t>("resetAfterCycles", ts.resetAfterCycles);
ts.saveObjectsToFile = taskTree.get<std::string>("saveObjectsToFile", ts.saveObjectsToFile);
Expand Down Expand Up @@ -363,6 +364,7 @@ PostProcessingTaskSpec
ppts.id = ppTaskId;
ppts.taskName = ppTaskTree.get<std::string>("taskName", ppts.id);
ppts.active = ppTaskTree.get<bool>("active", ppts.active);
ppts.critical = ppTaskTree.get<bool>("critical", ppts.critical);
ppts.detectorName = ppTaskTree.get<std::string>("detectorName", ppts.detectorName);
ppts.tree = wholeTree;

Expand Down
3 changes: 2 additions & 1 deletion Framework/src/PostProcessingConfig.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ PostProcessingConfig::PostProcessingConfig(const std::string& id, const boost::p
config.get<std::string>("qc.config.Activity.provenance", "qc"),
{ config.get<uint64_t>("qc.config.Activity.start", 0),
config.get<uint64_t>("qc.config.Activity.end", -1) }),
matchAnyRunNumber(config.get<bool>("qc.config.postprocessing.matchAnyRunNumber", false))
matchAnyRunNumber(config.get<bool>("qc.config.postprocessing.matchAnyRunNumber", false)),
critical(true)
{
for (const auto& initTrigger : config.get_child("qc.postprocessing." + id + ".initTrigger")) {
initTriggers.push_back(initTrigger.second.get_value<std::string>());
Expand Down
1 change: 1 addition & 0 deletions Framework/src/TaskRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ void TaskRunner::printTaskConfig() const
<< " / Module name : " << mTaskConfig.moduleName //
<< " / Detector name : " << mTaskConfig.detectorName //
<< " / Max number cycles : " << mTaskConfig.maxNumberCycles //
<< " / critical : " << mTaskConfig.critical //
<< " / Save to file : " << mTaskConfig.saveToFile
<< " / Cycle duration seconds : ";
for (auto& [cycleDuration, period] : mTaskConfig.cycleDurations) {
Expand Down
5 changes: 5 additions & 0 deletions Framework/src/TaskRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ o2::framework::DataProcessorSpec TaskRunnerFactory::create(const TaskRunnerConfi
};
newTask.labels.emplace_back(o2::framework::ecs::qcReconfigurable);
newTask.labels.emplace_back(TaskRunner::getTaskRunnerLabel());
if (!taskConfig.critical) {
framework::DataProcessorLabel expendableLabel = { "expendable" };
newTask.labels.emplace_back(expendableLabel);
}

return newTask;
}
Expand Down Expand Up @@ -144,6 +148,7 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig
taskSpec.className,
multipleCycleDurations,
taskSpec.maxNumberCycles,
taskSpec.critical,
globalConfig.consulUrl,
globalConfig.conditionDBUrl,
globalConfig.monitoringUrl,
Expand Down
1 change: 1 addition & 0 deletions Framework/test/testTaskInterface.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ TEST_CASE("test_task_factory")
"o2::quality_control_modules::skeleton::SkeletonTask",
{ { 10, 1 } },
-1,
true,
""
};

Expand Down
51 changes: 51 additions & 0 deletions doc/Advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Advanced topics
<!--TOC generated with https://github.com/ekalinin/github-markdown-toc-->
<!--./gh-md-toc --insert --no-backup --hide-footer --indent 3 QualityControl/doc/Advanced.md -->
<!--ts-->
* [Advanced topics](#advanced-topics)
* [Framework](#framework)
* [Plugging the QC to an existing DPL workflow](#plugging-the-qc-to-an-existing-dpl-workflow)
* [Production of QC objects outside this framework](#production-of-qc-objects-outside-this-framework)
Expand All @@ -18,6 +19,7 @@ Advanced topics
* [Monitor cycles](#monitor-cycles)
* [Writing a DPL data producer](#writing-a-dpl-data-producer)
* [Custom merging](#custom-merging)
* [Critical and non-critical tasks](#critical-and-non-critical-tasks)
* [QC with DPL Analysis](#qc-with-dpl-analysis)
* [Uploading objects to QCDB](#uploading-objects-to-qcdb)
* [Getting AODs in QC Tasks](#getting-aods-in-qc-tasks)
Expand Down Expand Up @@ -502,6 +504,54 @@ Feel free to consult the existing usage examples among other modules in the QC r

Once a custom class is implemented, one should let QCG know how to display it correctly, which is explained in the subsection [Display a non-standard ROOT object in QCG](#display-a-non-standard-root-object-in-qcg).

## Critical, resilient and non-critical tasks

DPL devices can be marked as expendable, resilient or critical. Expendable tasks can die without affecting the run.
Resilient tasks can survive having one or all their inputs coming from an expendable task but they will stop the system if they themselves die.
Critical tasks (default) will stop the system if they die and will not accept input from expendable tasks.

In QC we use these `labels`.

### QC tasks

In QC, one can mark a task as critical or non-critical:
```json
"tasks": {
"QcTask": {
"active": "true",
"critical": "false", "": "if false the task is allowed to die without stopping the workflow, default: true",
```
By default they are `critical` meaning that their failure will stop the run.
If they are not critical, they will be `expendable` and will not stop the run if they die.

### Auto-generated proxies

They adopt the criticality of the task they are proxying.

### QC mergers

Mergers are `resilient`.

### QC check runners

CheckRunners are `resilient`.

### QC aggregators

Aggregators are `resilient`.

### QC post-processing tasks

Post-processing tasks can be marked as critical or non-critical:
```json
"postprocessing": {
"ExamplePostprocessing": {
"active": "true",
"critical": "false", "": "if false the task is allowed to die without stopping the workflow, default: true",
```
By default, they are critical meaning that their failure will stop the run.
If they are not critical, they will be `expendable` and will not stop the run if they die.

## QC with DPL Analysis

QC offers several ways to interact with the DPL Analysis framework.
Expand Down Expand Up @@ -1308,6 +1358,7 @@ the "tasks" path.
"className": "namespace::of::Task", "": "Class name of the QC Task with full namespace.",
"moduleName": "QcSkeleton", "": "Library name. It can be found in CMakeLists of the detector module.",
"detectorName": "TST", "": "3-letter code of the detector.",
"critical": "true", "": "if false the task is allowed to die without stopping the workflow, default: true",
"cycleDurationSeconds": "10", "": "Cycle duration (how often objects are published), 10 seconds minimum.",
"": "The first cycle will be randomly shorter. ",
"": "Alternatively, one can specify different cycle durations for different periods. The last item in cycleDurations will be used for the rest of the duration whatever the period. The first cycle will be randomly shorter.",
Expand Down