Skip to content

Commit

Permalink
G-API: Desync -- fix the queue saturation problem
Browse files Browse the repository at this point in the history
Set queue size = 1 to Copy island right after the desync.
In this case, Copy won't read more data from a "last_written"
container than required, while feeding the desynchronized path.

Sometimes Copy don't get fused into an island and behaves
on its own -- in this case, it reads more data in advance
so the slow (desync) part actually processes some data in-sync
(more than actually required)
  • Loading branch information
dmatveev committed Nov 3, 2020
1 parent 691c3d1 commit 099ad1a
Showing 1 changed file with 43 additions and 1 deletion.
44 changes: 43 additions & 1 deletion modules/gapi/src/executor/gstreamingexecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

#include <opencv2/gapi/opencv_includes.hpp>

#if !defined(GAPI_STANDALONE)
#include <opencv2/gapi/core.hpp> // GCopy -- FIXME - to be removed!
#endif // GAPI_STANDALONE

#include "api/gproto_priv.hpp" // ptr(GRunArgP)
#include "compiler/passes/passes.hpp"
#include "backends/common/gbackend.hpp" // createMat
Expand Down Expand Up @@ -80,6 +84,10 @@ struct DataQueue {
std::shared_ptr<cv::gimpl::stream::Q> q;
};

struct DesyncSpecialCase {
static const char *name() { return "DesyncSpecialCase"; }
};

std::vector<cv::gimpl::stream::Q*> reader_queues( ade::Graph &g,
const ade::NodeHandle &obj)
{
Expand Down Expand Up @@ -936,19 +944,53 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr<ade::Graph> &&
, isl_exec
});
// Initialize queues for every operation's input
ade::TypedGraph<DataQueue> qgr(*m_island_graph);
ade::TypedGraph<DataQueue, DesyncSpecialCase> qgr(*m_island_graph);
bool is_desync_start = false;
for (auto eh : nh->inEdges())
{
// ...only if the data is not compile-const
if (const_ins.count(eh->srcNode()) == 0) {
if (m_gim.metadata(eh).contains<DesyncIslEdge>()) {
qgr.metadata(eh).set(DataQueue(DataQueue::DESYNC));
is_desync_start = true;
} else if (qgr.metadata(eh).contains<DesyncSpecialCase>()) {
// See comment below
// Limit queue size to 1 in this case
qgr.metadata(eh).set(DataQueue(1u));
} else {
qgr.metadata(eh).set(DataQueue(queue_capacity));
}
m_internal_queues.insert(qgr.metadata(eh).get<DataQueue>().q.get());
}
}
// WORKAROUND:
// Since now we always know desync() is followed by copy(),
// copy is always the island with DesyncIslEdge.
// Mark the node's outputs a special way so then its following
// queue sizes will be limited to 1 (to avoid copy reading more
// data in advance - as there's no other way for the underlying
// "slow" part to control it)
if (is_desync_start) {
auto isl = m_gim.metadata(nh).get<FusedIsland>().object;
// In the current implementation, such islands
// _must_ start with copy
GAPI_Assert(isl->in_ops().size() == 1u);
#if !defined(GAPI_STANDALONE)
GAPI_Assert(GModel::Graph(*m_orig_graph)
.metadata(*isl->in_ops().begin())
.get<cv::gimpl::Op>()
.k.name == cv::gapi::core::GCopy::id());
#endif // GAPI_STANDALONE
for (auto out_nh : nh->outNodes()) {
for (auto out_eh : out_nh->outEdges()) {
qgr.metadata(out_eh).set(DesyncSpecialCase{});
}
}
}
// It is ok to do it here since the graph is visited in
// a topologic order and its consumers (those checking
// their input edges & initializing queues) are yet to be
// visited
}
break;
case NodeKind::SLOT:
Expand Down

0 comments on commit 099ad1a

Please sign in to comment.