diff --git a/modules/gapi/CMakeLists.txt b/modules/gapi/CMakeLists.txt index 82b719ad4e39..acfbd1d70ead 100644 --- a/modules/gapi/CMakeLists.txt +++ b/modules/gapi/CMakeLists.txt @@ -74,6 +74,7 @@ set(gapi_srcs src/api/kernels_imgproc.cpp src/api/kernels_video.cpp src/api/kernels_nnparsers.cpp + src/api/kernels_streaming.cpp src/api/render.cpp src/api/render_ocv.cpp src/api/ginfer.cpp @@ -97,6 +98,7 @@ set(gapi_srcs src/compiler/passes/pattern_matching.cpp src/compiler/passes/perform_substitution.cpp src/compiler/passes/streaming.cpp + src/compiler/passes/intrin.cpp # Executor src/executor/gexecutor.cpp diff --git a/modules/gapi/include/opencv2/gapi.hpp b/modules/gapi/include/opencv2/gapi.hpp index c6ab3f13fdf2..844574671031 100644 --- a/modules/gapi/include/opencv2/gapi.hpp +++ b/modules/gapi/include/opencv2/gapi.hpp @@ -33,4 +33,8 @@ #include #include +// Include this file here to avoid cyclic dependency between +// Desync & GKernel & GComputation & GStreamingCompiled. +#include + #endif // OPENCV_GAPI_HPP diff --git a/modules/gapi/include/opencv2/gapi/garray.hpp b/modules/gapi/include/opencv2/gapi/garray.hpp index 9118f4de9841..07986556663f 100644 --- a/modules/gapi/include/opencv2/gapi/garray.hpp +++ b/modules/gapi/include/opencv2/gapi/garray.hpp @@ -284,6 +284,14 @@ namespace detail return static_cast&>(*m_ref).rref(); } + // Check if was created for/from std::vector + template bool holds() const + { + if (!m_ref) return false; + using U = typename std::decay::type; + return dynamic_cast*>(m_ref.get()) != nullptr; + } + void mov(VectorRef &v) { m_ref->mov(*v.m_ref); @@ -341,15 +349,18 @@ template class GArray explicit GArray(detail::GArrayU &&ref) // GArrayU-based constructor : m_ref(ref) { putDetails(); } // (used by GCall, not for users) - detail::GArrayU strip() const { return m_ref; } - -private: - static void VCTor(detail::VectorRef& vref) { + /// @private + detail::GArrayU strip() const { + return m_ref; + } + /// @private + static void VCtor(detail::VectorRef& vref) { vref.reset(); - vref.storeKind(); } + +private: void putDetails() { - m_ref.setConstructFcn(&VCTor); + m_ref.setConstructFcn(&VCtor); m_ref.specifyType(); // FIXME: to unify those 2 to avoid excessive dynamic_cast m_ref.storeKind(); // } diff --git a/modules/gapi/include/opencv2/gapi/gkernel.hpp b/modules/gapi/include/opencv2/gapi/gkernel.hpp index b04cedecadbe..d4c3e6c634f8 100644 --- a/modules/gapi/include/opencv2/gapi/gkernel.hpp +++ b/modules/gapi/include/opencv2/gapi/gkernel.hpp @@ -28,6 +28,7 @@ namespace cv { using GShapes = std::vector; using GKinds = std::vector; +using GCtors = std::vector; // GKernel describes kernel API to the system // FIXME: add attributes of a kernel, (e.g. number and types @@ -41,6 +42,7 @@ struct GAPI_EXPORTS GKernel M outMeta; // generic adaptor to API::outMeta(...) GShapes outShapes; // types (shapes) kernel's outputs GKinds inKinds; // kinds of kernel's inputs (fixme: below) + GCtors outCtors; // captured constructors for template output types }; // TODO: It's questionable if inKinds should really be here. Instead, // this information could come from meta. @@ -60,30 +62,27 @@ namespace detail // yield() is used in graph construction time as a generic method to obtain // lazy "return value" of G-API operations // - namespace + template struct Yield; + template<> struct Yield { - template struct Yield; - template<> struct Yield - { - static inline cv::GMat yield(cv::GCall &call, int i) { return call.yield(i); } - }; - template<> struct Yield - { - static inline cv::GMatP yield(cv::GCall &call, int i) { return call.yieldP(i); } - }; - template<> struct Yield - { - static inline cv::GScalar yield(cv::GCall &call, int i) { return call.yieldScalar(i); } - }; - template struct Yield > - { - static inline cv::GArray yield(cv::GCall &call, int i) { return call.yieldArray(i); } - }; - template struct Yield > - { - static inline cv::GOpaque yield(cv::GCall &call, int i) { return call.yieldOpaque(i); } - }; - } // anonymous namespace + static inline cv::GMat yield(cv::GCall &call, int i) { return call.yield(i); } + }; + template<> struct Yield + { + static inline cv::GMatP yield(cv::GCall &call, int i) { return call.yieldP(i); } + }; + template<> struct Yield + { + static inline cv::GScalar yield(cv::GCall &call, int i) { return call.yieldScalar(i); } + }; + template struct Yield > + { + static inline cv::GArray yield(cv::GCall &call, int i) { return call.yieldArray(i); } + }; + template struct Yield > + { + static inline cv::GOpaque yield(cv::GCall &call, int i) { return call.yieldOpaque(i); } + }; //////////////////////////////////////////////////////////////////////////// // Helper classes which brings outputMeta() marshalling to kernel @@ -215,7 +214,8 @@ class GKernelTypeM(Args...)> > , K::tag() , &K::getOutMeta , {detail::GTypeTraits::shape...} - , {detail::GTypeTraits::op_kind...}}); + , {detail::GTypeTraits::op_kind...} + , {detail::GObtainCtor::get()...}}); call.pass(args...); // TODO: std::forward() here? return yield(call, typename detail::MkSeq::type()); } @@ -240,7 +240,8 @@ class GKernelType > , K::tag() , &K::getOutMeta , {detail::GTypeTraits::shape} - , {detail::GTypeTraits::op_kind...}}); + , {detail::GTypeTraits::op_kind...} + , {detail::GObtainCtor::get()}}); call.pass(args...); return detail::Yield::yield(call, 0); } @@ -459,11 +460,6 @@ namespace gapi { std::vector m_transformations; protected: - /// @private - // Check if package contains ANY implementation of a kernel API - // by API textual id. - bool includesAPI(const std::string &id) const; - /// @private // Remove ALL implementations of the given API (identified by ID) void removeAPI(const std::string &id); @@ -566,6 +562,9 @@ namespace gapi { return includesAPI(KAPI::id()); } + /// @private + bool includesAPI(const std::string &id) const; + // FIXME: The below comment is wrong, and who needs this function? /** * @brief Find a kernel (by its API) diff --git a/modules/gapi/include/opencv2/gapi/gopaque.hpp b/modules/gapi/include/opencv2/gapi/gopaque.hpp index 3d1394473b3b..6ab28910d6b3 100644 --- a/modules/gapi/include/opencv2/gapi/gopaque.hpp +++ b/modules/gapi/include/opencv2/gapi/gopaque.hpp @@ -295,25 +295,27 @@ namespace detail template class GOpaque { public: - GOpaque() { putDetails(); } // Empty constructor - explicit GOpaque(detail::GOpaqueU &&ref) // GOpaqueU-based constructor - : m_ref(ref) { putDetails(); } // (used by GCall, not for users) - - detail::GOpaqueU strip() const { return m_ref; } - -private: // Host type (or Flat type) - the type this GOpaque is actually // specified to. using HT = typename detail::flatten_g>::type; - static void CTor(detail::OpaqueRef& ref) { + GOpaque() { putDetails(); } // Empty constructor + explicit GOpaque(detail::GOpaqueU &&ref) // GOpaqueU-based constructor + : m_ref(ref) { putDetails(); } // (used by GCall, not for users) + + /// @private + detail::GOpaqueU strip() const { + return m_ref; + } + /// @private + static void Ctor(detail::OpaqueRef& ref) { ref.reset(); - ref.storeKind(); } +private: void putDetails() { - m_ref.setConstructFcn(&CTor); - m_ref.specifyType(); // FIXME: to unify those 2 to avoid excessive dynamic_cast - m_ref.storeKind(); // + m_ref.setConstructFcn(&Ctor); + m_ref.specifyType(); + m_ref.storeKind(); } detail::GOpaqueU m_ref; diff --git a/modules/gapi/include/opencv2/gapi/gstreaming.hpp b/modules/gapi/include/opencv2/gapi/gstreaming.hpp index 037fa9445215..e09cf8d0f78f 100644 --- a/modules/gapi/include/opencv2/gapi/gstreaming.hpp +++ b/modules/gapi/include/opencv2/gapi/gstreaming.hpp @@ -8,15 +8,99 @@ #ifndef OPENCV_GAPI_GSTREAMING_COMPILED_HPP #define OPENCV_GAPI_GSTREAMING_COMPILED_HPP +#include #include #include #include +#include #include #include namespace cv { +template using optional = cv::util::optional; + +namespace detail { +template struct wref_spec { + using type = T; +}; +template struct wref_spec > { + using type = T; +}; + +template +struct OptRef { + struct OptHolder { + virtual void mov(RefHolder &h) = 0; + virtual void reset() = 0; + virtual ~OptHolder() = default; + using Ptr = std::shared_ptr; + }; + template struct Holder final: OptHolder { + std::reference_wrapper > m_opt_ref; + + explicit Holder(cv::optional& opt) : m_opt_ref(std::ref(opt)) { + } + virtual void mov(RefHolder &h) override { + using U = typename wref_spec::type; + m_opt_ref.get() = cv::util::make_optional(std::move(h.template wref())); + } + virtual void reset() override { + m_opt_ref.get().reset(); + } + }; + template + explicit OptRef(cv::optional& t) : m_opt{new Holder(t)} {} + void mov(RefHolder &h) { m_opt->mov(h); } + void reset() { m_opt->reset();} +private: + typename OptHolder::Ptr m_opt; +}; +using OptionalVectorRef = OptRef; +using OptionalOpaqueRef = OptRef; +} // namespace detail + +// TODO: Keep it in sync with GRunArgP (derive the type automatically?) +using GOptRunArgP = util::variant< + optional*, + optional*, + optional*, + cv::detail::OptionalVectorRef, + cv::detail::OptionalOpaqueRef +>; +using GOptRunArgsP = std::vector; + +namespace detail { + +template inline GOptRunArgP wrap_opt_arg(optional& arg) { + // By default, T goes to an OpaqueRef. All other types are specialized + return GOptRunArgP{OptionalOpaqueRef(arg)}; +} + +template inline GOptRunArgP wrap_opt_arg(optional >& arg) { + return GOptRunArgP{OptionalVectorRef(arg)}; +} + +template<> inline GOptRunArgP wrap_opt_arg(optional &m) { + return GOptRunArgP{&m}; +} + +template<> inline GOptRunArgP wrap_opt_arg(optional &s) { + return GOptRunArgP{&s}; +} + +} // namespace detail + +// Now cv::gout() may produce an empty vector (see "dynamic graphs"), so +// there may be a conflict between these two. State here that Opt version +// _must_ have at least one input for this overload +template +inline GOptRunArgsP gout(optional&arg, optional&... args) +{ + return GOptRunArgsP{ detail::wrap_opt_arg(arg), detail::wrap_opt_arg(args)... }; +} + /** * \addtogroup gapi_main_classes * @{ @@ -169,6 +253,44 @@ class GAPI_EXPORTS_W_SIMPLE GStreamingCompiled // NB: Used from python GAPI_WRAP std::tuple pull(); + /** + * @brief Get some next available data from the pipeline. + * + * This method takes a vector of cv::optional object. An object is + * assigned to some value if this value is available (ready) at + * the time of the call, and resets the object to empty() if it is + * not. + * + * This is a blocking method which guarantees that some data has + * been written to the output vector on return. + * + * Using this method only makes sense if the graph has + * desynchronized parts (see cv::gapi::desync). If there is no + * desynchronized parts in the graph, the behavior of this + * method is identical to the regular pull() (all data objects are + * produced synchronously in the output vector). + * + * Use gout() to create an output parameter vector. + * + * Output vectors must have the same number of elements as defined + * in the cv::GComputation protocol (at the moment of its + * construction). Shapes of elements also must conform to protocol + * (e.g. cv::optional needs to be passed where cv::GMat + * has been declared as output, and so on). Run-time exception is + * generated on type mismatch. + * + * This method writes new data into objects passed via output + * vector. If there is no data ready yet, this method blocks. Use + * try_pull() if you need a non-blocking version. + * + * @param outs vector of output parameters to obtain. + * @return true if next result has been obtained, + * false marks end of the stream. + * + * @sa cv::gapi::desync + */ + bool pull(cv::GOptRunArgsP &&outs); + /** * @brief Try to get the next processed frame from the pipeline. * diff --git a/modules/gapi/include/opencv2/gapi/gtype_traits.hpp b/modules/gapi/include/opencv2/gapi/gtype_traits.hpp index c9800b2b16ed..2e8dcb1aec7d 100644 --- a/modules/gapi/include/opencv2/gapi/gtype_traits.hpp +++ b/modules/gapi/include/opencv2/gapi/gtype_traits.hpp @@ -191,6 +191,29 @@ namespace detail template using wrap_gapi_helper = WrapValue::type>; template using wrap_host_helper = WrapValue >::type>; + +// Union type for various user-defined type constructors (GArray, +// GOpaque, etc) +// +// TODO: Replace construct-only API with a more generic one (probably +// with bits of introspection) +// +// Not required for non-user-defined types (GMat, GScalar, etc) +using HostCtor = util::variant + < util::monostate + , detail::ConstructVec + , detail::ConstructOpaque + >; + +template struct GObtainCtor { + static HostCtor get() { return HostCtor{}; } +}; +template struct GObtainCtor > { + static HostCtor get() { return HostCtor{ConstructVec{&GArray::VCtor}}; }; +}; +template struct GObtainCtor > { + static HostCtor get() { return HostCtor{ConstructOpaque{&GOpaque::Ctor}}; }; +}; } // namespace detail } // namespace cv diff --git a/modules/gapi/include/opencv2/gapi/infer.hpp b/modules/gapi/include/opencv2/gapi/infer.hpp index 9b4580ec6bc8..b850775a62c4 100644 --- a/modules/gapi/include/opencv2/gapi/infer.hpp +++ b/modules/gapi/include/opencv2/gapi/infer.hpp @@ -2,7 +2,7 @@ // It is subject to the license terms in the LICENSE file found in the top-level directory // of this distribution and at http://opencv.org/license.html. // -// Copyright (C) 2019 Intel Corporation +// Copyright (C) 2019-2020 Intel Corporation #ifndef OPENCV_GAPI_INFER_HPP @@ -77,6 +77,9 @@ class GNetworkType(Args...)> > using ResultL = std::tuple< cv::GArray... >; using APIList = std::function, Args...)>; + + // FIXME: Args... must be limited to a single GMat + using APIRoi = std::function, Args...)>; }; // Single-return-value network definition (specialized base class) @@ -92,6 +95,9 @@ class GNetworkType > using ResultL = cv::GArray; using APIList = std::function, Args...)>; + + // FIXME: Args... must be limited to a single GMat + using APIRoi = std::function, Args...)>; }; // APIList2 is also template to allow different calling options @@ -114,10 +120,10 @@ struct InferAPIList2 { // a particular backend, not by a network itself. struct GInferBase { static constexpr const char * id() { - return "org.opencv.dnn.infer"; // Universal stub + return "org.opencv.dnn.infer"; // Universal stub } static GMetaArgs getOutMeta(const GMetaArgs &, const GArgs &) { - return GMetaArgs{}; // One more universal stub + return GMetaArgs{}; // One more universal stub } }; @@ -164,15 +170,25 @@ struct GAPI_EXPORTS_W_SIMPLE GInferOutputs std::shared_ptr m_priv; }; /** @} */ +// Base "InferROI" kernel. +// All notes from "Infer" kernel apply here as well. +struct GInferROIBase { + static constexpr const char * id() { + return "org.opencv.dnn.infer-roi"; // Universal stub + } + static GMetaArgs getOutMeta(const GMetaArgs &, const GArgs &) { + return GMetaArgs{}; // One more universal stub + } +}; // Base "Infer list" kernel. // All notes from "Infer" kernel apply here as well. struct GInferListBase { static constexpr const char * id() { - return "org.opencv.dnn.infer-roi"; // Universal stub + return "org.opencv.dnn.infer-roi-list-1"; // Universal stub } static GMetaArgs getOutMeta(const GMetaArgs &, const GArgs &) { - return GMetaArgs{}; // One more universal stub + return GMetaArgs{}; // One more universal stub } }; @@ -180,10 +196,10 @@ struct GInferListBase { // All notes from "Infer" kernel apply here as well. struct GInferList2Base { static constexpr const char * id() { - return "org.opencv.dnn.infer-roi-list"; // Universal stub + return "org.opencv.dnn.infer-roi-list-2"; // Universal stub } static GMetaArgs getOutMeta(const GMetaArgs &, const GArgs &) { - return GMetaArgs{}; // One more universal stub + return GMetaArgs{}; // One more universal stub } }; @@ -200,6 +216,19 @@ struct GInfer final static constexpr const char* tag() { return Net::tag(); } }; +// A specific roi-inference kernel. API (::on()) is fixed here and +// verified against Net. +template +struct GInferROI final + : public GInferROIBase + , public detail::KernelTypeMedium< GInferROI + , typename Net::APIRoi > { + using GInferROIBase::getOutMeta; // FIXME: name lookup conflict workaround? + + static constexpr const char* tag() { return Net::tag(); } +}; + + // A generic roi-list inference kernel. API (::on()) is derived from // the Net template parameter (see more in infer<> overload). template @@ -238,6 +267,23 @@ struct GInferList2 final namespace cv { namespace gapi { +/** @brief Calculates response for the specified network (template + * parameter) for the specified region in the source image. + * Currently expects a single-input network only. + * + * @tparam A network type defined with G_API_NET() macro. + * @param in input image where to take ROI from. + * @param roi an object describing the region of interest + * in the source image. May be calculated in the same graph dynamically. + * @return an object of return type as defined in G_API_NET(). + * If a network has multiple return values (defined with a tuple), a tuple of + * objects of appropriate type is returned. + * @sa G_API_NET() + */ +template +typename Net::Result infer(cv::GOpaque roi, cv::GMat in) { + return GInferROI::on(roi, in); +} /** @brief Calculates responses for the specified network (template * parameter) for every region in the source image. @@ -328,7 +374,8 @@ infer(const std::string& tag, const GInferInputs& inputs) tag, GInferBase::getOutMeta, {}, // outShape will be filled later - std::move(kinds) + std::move(kinds), + {}, // outCtors will be filled later }); call->setArgs(std::move(input_args)); diff --git a/modules/gapi/include/opencv2/gapi/streaming/desync.hpp b/modules/gapi/include/opencv2/gapi/streaming/desync.hpp new file mode 100644 index 000000000000..86de279fe941 --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/streaming/desync.hpp @@ -0,0 +1,84 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2020 Intel Corporation + + +#ifndef OPENCV_GAPI_GSTREAMING_DESYNC_HPP +#define OPENCV_GAPI_GSTREAMING_DESYNC_HPP + +#include + +#include +#include +#include +#include +#include + +namespace cv { +namespace gapi { +namespace streaming { + +namespace detail { +struct GDesync { + static const char *id() { + return "org.opencv.streaming.desync"; + } + + // An universal yield for desync. + // Yields output objects according to the input Types... + // Reuses gkernel machinery. + // FIXME: This function can be generic and declared in gkernel.hpp + // (it is there already, but a part of GKernelType[M] + template + static std::tuple yield(cv::GCall &call, cv::detail::Seq) { + return std::make_tuple(cv::detail::Yield::yield(call, IIs)...); + } +}; + +template +G desync(const G &g) { + cv::GKernel k{ + GDesync::id() // kernel id + , "" // kernel tag + , [](const GMetaArgs &a, const GArgs &) {return a;} // outMeta callback + , {cv::detail::GTypeTraits::shape} // output Shape + , {cv::detail::GTypeTraits::op_kind} // input data kinds + , {cv::detail::GObtainCtor::get()} // output template ctors + }; + cv::GCall call(std::move(k)); + call.pass(g); + return std::get<0>(GDesync::yield(call, cv::detail::MkSeq<1>::type())); +} +} // namespace detail + +/** + * @brief Starts a desynchronized branch in the graph. + * + * This operation takes a single G-API data object and returns a + * graph-level "duplicate" of this object. + * + * Operations which use this data object can be desynchronized + * from the rest of the graph. + * + * This operation has no effect when a GComputation is compiled with + * regular cv::GComputation::compile(), since cv::GCompiled objects + * always produce their full output vectors. + * + * This operation only makes sense when a GComputation is compiled in + * straming mode with cv::GComputation::compileStreaming(). If this + * operation is used and there are desynchronized outputs, the user + * should use a special version of cv::GStreamingCompiled::pull() + * which produces an array of cv::util::optional<> objects. + * + * @note This feature is highly experimental now and is currently + * limited to a single GMat argument only. + */ +GAPI_EXPORTS GMat desync(const GMat &g); + +} // namespace streaming +} // namespace gapi +} // namespace cv + +#endif // OPENCV_GAPI_GSTREAMING_DESYNC_HPP diff --git a/modules/gapi/samples/infer_single_roi.cpp b/modules/gapi/samples/infer_single_roi.cpp new file mode 100644 index 000000000000..6054a3f4a629 --- /dev/null +++ b/modules/gapi/samples/infer_single_roi.cpp @@ -0,0 +1,264 @@ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +const std::string keys = + "{ h help | | Print this help message }" + "{ input | | Path to the input video file }" + "{ facem | face-detection-adas-0001.xml | Path to OpenVINO IE face detection model (.xml) }" + "{ faced | CPU | Target device for face detection model (e.g. CPU, GPU, VPU, ...) }" + "{ r roi | -1,-1,-1,-1 | Region of interest (ROI) to use for inference. Identified automatically when not set }"; + +namespace { + +std::string weights_path(const std::string &model_path) { + const auto EXT_LEN = 4u; + const auto sz = model_path.size(); + CV_Assert(sz > EXT_LEN); + + auto ext = model_path.substr(sz - EXT_LEN); + std::transform(ext.begin(), ext.end(), ext.begin(), [](unsigned char c){ + return static_cast(std::tolower(c)); + }); + CV_Assert(ext == ".xml"); + return model_path.substr(0u, sz - EXT_LEN) + ".bin"; +} + +cv::util::optional parse_roi(const std::string &rc) { + cv::Rect rv; + char delim[3]; + + std::stringstream is(rc); + is >> rv.x >> delim[0] >> rv.y >> delim[1] >> rv.width >> delim[2] >> rv.height; + if (is.bad()) { + return cv::util::optional(); // empty value + } + const auto is_delim = [](char c) { + return c == ','; + }; + if (!std::all_of(std::begin(delim), std::end(delim), is_delim)) { + return cv::util::optional(); // empty value + + } + if (rv.x < 0 || rv.y < 0 || rv.width <= 0 || rv.height <= 0) { + return cv::util::optional(); // empty value + } + return cv::util::make_optional(std::move(rv)); +} + +} // namespace + +namespace custom { + +G_API_NET(FaceDetector, , "face-detector"); + +using GDetections = cv::GArray; +using GRect = cv::GOpaque; +using GSize = cv::GOpaque; +using GPrims = cv::GArray; + +G_API_OP(GetSize, , "sample.custom.get-size") { + static cv::GOpaqueDesc outMeta(const cv::GMatDesc &) { + return cv::empty_gopaque_desc(); + } +}; + +G_API_OP(LocateROI, , "sample.custom.locate-roi") { + static cv::GOpaqueDesc outMeta(const cv::GMatDesc &) { + return cv::empty_gopaque_desc(); + } +}; + +G_API_OP(ParseSSD, , "sample.custom.parse-ssd") { + static cv::GArrayDesc outMeta(const cv::GMatDesc &, const cv::GOpaqueDesc &, const cv::GOpaqueDesc &) { + return cv::empty_array_desc(); + } +}; + +G_API_OP(BBoxes, , "sample.custom.b-boxes") { + static cv::GArrayDesc outMeta(const cv::GArrayDesc &, const cv::GOpaqueDesc &) { + return cv::empty_array_desc(); + } +}; + +GAPI_OCV_KERNEL(OCVGetSize, GetSize) { + static void run(const cv::Mat &in, cv::Size &out) { + out = {in.cols, in.rows}; + } +}; + +GAPI_OCV_KERNEL(OCVLocateROI, LocateROI) { + // This is the place where we can run extra analytics + // on the input image frame and select the ROI (region + // of interest) where we want to detect our objects (or + // run any other inference). + // + // Currently it doesn't do anything intelligent, + // but only crops the input image to square (this is + // the most convenient aspect ratio for detectors to use) + + static void run(const cv::Mat &in_mat, cv::Rect &out_rect) { + + // Identify the central point & square size (- some padding) + const auto center = cv::Point{in_mat.cols/2, in_mat.rows/2}; + auto sqside = std::min(in_mat.cols, in_mat.rows); + + // Now build the central square ROI + out_rect = cv::Rect{ center.x - sqside/2 + , center.y - sqside/2 + , sqside + , sqside + }; + } +}; + +GAPI_OCV_KERNEL(OCVParseSSD, ParseSSD) { + static void run(const cv::Mat &in_ssd_result, + const cv::Rect &in_roi, + const cv::Size &in_parent_size, + std::vector &out_objects) { + const auto &in_ssd_dims = in_ssd_result.size; + CV_Assert(in_ssd_dims.dims() == 4u); + + const int MAX_PROPOSALS = in_ssd_dims[2]; + const int OBJECT_SIZE = in_ssd_dims[3]; + CV_Assert(OBJECT_SIZE == 7); // fixed SSD object size + + const cv::Size up_roi = in_roi.size(); + const cv::Rect surface({0,0}, in_parent_size); + + out_objects.clear(); + + const float *data = in_ssd_result.ptr(); + for (int i = 0; i < MAX_PROPOSALS; i++) { + const float image_id = data[i * OBJECT_SIZE + 0]; + const float label = data[i * OBJECT_SIZE + 1]; + const float confidence = data[i * OBJECT_SIZE + 2]; + const float rc_left = data[i * OBJECT_SIZE + 3]; + const float rc_top = data[i * OBJECT_SIZE + 4]; + const float rc_right = data[i * OBJECT_SIZE + 5]; + const float rc_bottom = data[i * OBJECT_SIZE + 6]; + (void) label; // unused + + if (image_id < 0.f) { + break; // marks end-of-detections + } + if (confidence < 0.5f) { + continue; // skip objects with low confidence + } + + // map relative coordinates to the original image scale + // taking the ROI into account + cv::Rect rc; + rc.x = static_cast(rc_left * up_roi.width); + rc.y = static_cast(rc_top * up_roi.height); + rc.width = static_cast(rc_right * up_roi.width) - rc.x; + rc.height = static_cast(rc_bottom * up_roi.height) - rc.y; + rc.x += in_roi.x; + rc.y += in_roi.y; + out_objects.emplace_back(rc & surface); + } + } +}; + +GAPI_OCV_KERNEL(OCVBBoxes, BBoxes) { + // This kernel converts the rectangles into G-API's + // rendering primitives + static void run(const std::vector &in_face_rcs, + const cv::Rect &in_roi, + std::vector &out_prims) { + out_prims.clear(); + const auto cvt = [](const cv::Rect &rc, const cv::Scalar &clr) { + return cv::gapi::wip::draw::Rect(rc, clr, 2); + }; + out_prims.emplace_back(cvt(in_roi, CV_RGB(0,255,255))); // cyan + for (auto &&rc : in_face_rcs) { + out_prims.emplace_back(cvt(rc, CV_RGB(0,255,0))); // green + } + } +}; + +} // namespace custom + +int main(int argc, char *argv[]) +{ + cv::CommandLineParser cmd(argc, argv, keys); + if (cmd.has("help")) { + cmd.printMessage(); + return 0; + } + + // Prepare parameters first + const std::string input = cmd.get("input"); + const auto opt_roi = parse_roi(cmd.get("roi")); + + const auto face_model_path = cmd.get("facem"); + auto face_net = cv::gapi::ie::Params { + face_model_path, // path to topology IR + weights_path(face_model_path), // path to weights + cmd.get("faced"), // device specifier + }; + auto kernels = cv::gapi::kernels + < custom::OCVGetSize + , custom::OCVLocateROI + , custom::OCVParseSSD + , custom::OCVBBoxes>(); + auto networks = cv::gapi::networks(face_net); + + // Now build the graph. The graph structure may vary + // pased on the input parameters + cv::GStreamingCompiled pipeline; + auto inputs = cv::gin(cv::gapi::wip::make_src(input)); + + if (opt_roi.has_value()) { + // Use the value provided by user + std::cout << "Will run inference for static region " + << opt_roi.value() + << " only" + << std::endl; + cv::GMat in; + cv::GOpaque in_roi; + auto blob = cv::gapi::infer(in_roi, in); + auto rcs = custom::ParseSSD::on(blob, in_roi, custom::GetSize::on(in)); + auto out = cv::gapi::wip::draw::render3ch(in, custom::BBoxes::on(rcs, in_roi)); + pipeline = cv::GComputation(cv::GIn(in, in_roi), cv::GOut(out)) + .compileStreaming(cv::compile_args(kernels, networks)); + + // Since the ROI to detect is manual, make it part of the input vector + inputs.push_back(cv::gin(opt_roi.value())[0]); + } else { + // Automatically detect ROI to infer. Make it output parameter + std::cout << "ROI is not set or invalid. Locating it automatically" + << std::endl; + cv::GMat in; + cv::GOpaque roi = custom::LocateROI::on(in); + auto blob = cv::gapi::infer(roi, in); + auto rcs = custom::ParseSSD::on(blob, roi, custom::GetSize::on(in)); + auto out = cv::gapi::wip::draw::render3ch(in, custom::BBoxes::on(rcs, roi)); + pipeline = cv::GComputation(cv::GIn(in), cv::GOut(out)) + .compileStreaming(cv::compile_args(kernels, networks)); + } + + // The execution part + pipeline.setSource(std::move(inputs)); + pipeline.start(); + + cv::Mat out; + while (pipeline.pull(cv::gout(out))) { + cv::imshow("Out", out); + cv::waitKey(1); + } + return 0; +} diff --git a/modules/gapi/src/api/gbackend.cpp b/modules/gapi/src/api/gbackend.cpp index 600e5cc84db3..6b8d0fcbee8d 100644 --- a/modules/gapi/src/api/gbackend.cpp +++ b/modules/gapi/src/api/gbackend.cpp @@ -67,6 +67,21 @@ cv::gapi::GKernelPackage cv::gapi::GBackend::Priv::auxiliaryKernels() const return {}; } +bool cv::gapi::GBackend::Priv::controlsMerge() const +{ + return false; +} + +bool cv::gapi::GBackend::Priv::allowsMerge(const cv::gimpl::GIslandModel::Graph &, + const ade::NodeHandle &, + const ade::NodeHandle &, + const ade::NodeHandle &) const +{ + GAPI_Assert(controlsMerge()); + return true; +} + + // GBackend public implementation ////////////////////////////////////////////// cv::gapi::GBackend::GBackend() { diff --git a/modules/gapi/src/api/gbackend_priv.hpp b/modules/gapi/src/api/gbackend_priv.hpp index 13f39acc86c9..45237514a53e 100644 --- a/modules/gapi/src/api/gbackend_priv.hpp +++ b/modules/gapi/src/api/gbackend_priv.hpp @@ -19,7 +19,7 @@ #include "opencv2/gapi/gkernel.hpp" #include "compiler/gmodel.hpp" - +#include "compiler/gislandmodel.hpp" namespace cv { @@ -68,6 +68,22 @@ class GAPI_EXPORTS cv::gapi::GBackend::Priv virtual cv::gapi::GKernelPackage auxiliaryKernels() const; + // Ask backend if it has a custom control over island fusion process + // This method is quite redundant but there's nothing better fits + // the current fusion process. By default, [existing] backends don't + // control the merge. + // FIXME: Refactor to a single entity? + virtual bool controlsMerge() const; + + // Ask backend if it is ok to merge these two islands connected + // via a data slot. By default, [existing] backends allow to merge everything. + // FIXME: Refactor to a single entity? + // FIXME: Strip down the type details form graph? (make it ade::Graph?) + virtual bool allowsMerge(const cv::gimpl::GIslandModel::Graph &g, + const ade::NodeHandle &a_nh, + const ade::NodeHandle &slot_nh, + const ade::NodeHandle &b_nh) const; + virtual ~Priv() = default; }; diff --git a/modules/gapi/src/api/ginfer.cpp b/modules/gapi/src/api/ginfer.cpp index 20511a4aafd0..156f8938c4fd 100644 --- a/modules/gapi/src/api/ginfer.cpp +++ b/modules/gapi/src/api/ginfer.cpp @@ -70,7 +70,10 @@ cv::GMat cv::GInferOutputs::at(const std::string& name) auto it = m_priv->out_blobs.find(name); if (it == m_priv->out_blobs.end()) { // FIXME: Avoid modifying GKernel + // Expect output to be always GMat m_priv->call->kernel().outShapes.push_back(cv::GShape::GMAT); + // ...so _empty_ constructor is passed here. + m_priv->call->kernel().outCtors.emplace_back(cv::util::monostate{}); int out_idx = static_cast(m_priv->out_blobs.size()); it = m_priv->out_blobs.emplace(name, m_priv->call->yield(out_idx)).first; m_priv->info->out_names.push_back(name); diff --git a/modules/gapi/src/api/kernels_streaming.cpp b/modules/gapi/src/api/kernels_streaming.cpp new file mode 100644 index 000000000000..af7bd19dd182 --- /dev/null +++ b/modules/gapi/src/api/kernels_streaming.cpp @@ -0,0 +1,74 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2020 Intel Corporation + +#include "precomp.hpp" + +#include +#include + +cv::GMat cv::gapi::streaming::desync(const cv::GMat &g) { + // FIXME: this is a limited implementation of desync + // The real implementation must be generic (template) and + // reside in desync.hpp (and it is detail::desync<>()) + + // FIXME: Put a copy here to solve the below problem + // FIXME: Because of the copy, the desync functionality is limited + // to GMat only (we don't have generic copy kernel for other + // object types) + return cv::gapi::copy(detail::desync(g)); + + // FIXME + // + // If consumed by multiple different islands (OCV and Fluid by + // example, an object needs to be desynchronized individually + // for every path. + // + // This is a limitation of the current implementation. It works + // this way: every "desync" link from the main path to a new + // desync path gets its "DesyncQueue" object which stores only the + // last value written before of the desync object (DO) it consumes + // (the container of type "last written value" or LWV. + // + // LWV + // [Sync path] -> desync() - - > DO -> [ISL0 @ Desync path #1] + // + // At the same time, generally, every island in the streaming + // graph gets its individual input as a queue (so normally, a + // writer pushes the same output MULTIPLE TIMES if it has mutliple + // readers): + // + // LWV + // [Sync path] -> desync() - - > DO1 -> [ISL0 @ Desync path #1] + // : LWV + // ' - - > DO2 -> [ISL1 @ Desync path #1] + // + // For users, it may seem legit to use desync here only once, and + // it MUST BE legit once the problem is fixed. + // But the problem with the current implementation is that islands + // on the same desync path get different desync queues and in fact + // stay desynchronized between each other. One shouldn't consider + // this as a single desync path anymore. + // If these two ISLs are then merged e.g. with add(a,b), the + // results will be inconsistent, given that the latency of ISL0 + // and ISL1 may be different. This is not the same frame anymore + // coming as `a` and `b` to add(a,b) because of it. + // + // To make things clear, we forbid this now and ask to call + // desync one more time to allow that. It is bad since the graph + // structure and island layout depends on kernel packages used, + // not on the sole GComputation structure. This needs to be fixed! + // Here's the working configuration: + // + // LWV + // [Sync path] -> desync() - - > DO1 -> [ISL0 @ Desync path #1] + // : LWV + // '-> desync() - - > DO2 -> [ISL1 @ Desync path #2] <-(!) + // + // Put an operation right after desync() is a quick workaround to + // this synchronization problem. There will be one "last_written_value" + // connected to a desynchronized data object, and this sole last_written_value + // object will feed both branches of the streaming executable. +} diff --git a/modules/gapi/src/backends/ie/giebackend.cpp b/modules/gapi/src/backends/ie/giebackend.cpp index 08836163a7cc..c66fa4436124 100644 --- a/modules/gapi/src/backends/ie/giebackend.cpp +++ b/modules/gapi/src/backends/ie/giebackend.cpp @@ -519,6 +519,65 @@ struct Infer: public cv::detail::KernelTag { } }; +struct InferROI: public cv::detail::KernelTag { + using API = cv::GInferROIBase; + static cv::gapi::GBackend backend() { return cv::gapi::ie::backend(); } + static KImpl kernel() { return KImpl{outMeta, run}; } + + static cv::GMetaArgs outMeta(const ade::Graph &gr, + const ade::NodeHandle &nh, + const cv::GMetaArgs &in_metas, + const cv::GArgs &/*in_args*/) { + cv::GMetaArgs result; + + GConstGIEModel gm(gr); + const auto &uu = gm.metadata(nh).get(); + + // Initialize input information + // FIXME: So far it is pretty limited + GAPI_Assert(1u == uu.params.input_names.size()); + GAPI_Assert(2u == in_metas.size()); + + // 0th is ROI, 1st is in0put image + auto &&ii = uu.inputs.at(uu.params.input_names.at(0)); + const auto &meta = util::get(in_metas.at(1)); + ii->setPrecision(toIE(meta.depth)); + ii->getPreProcess().setResizeAlgorithm(IE::RESIZE_BILINEAR); + + // FIXME: It would be nice here to have an exact number of network's + // input/output parameters. Probably GCall should store it here for us. + // It doesn't, as far as I know.. + for (const auto &out_name : uu.params.output_names) { + // NOTE: our output_names vector follows the API order + // of this operation's outputs + const IE::DataPtr& ie_out = uu.outputs.at(out_name); + const IE::SizeVector dims = ie_out->getTensorDesc().getDims(); + + cv::GMatDesc outm(toCV(ie_out->getPrecision()), + toCV(ie_out->getTensorDesc().getDims())); + result.emplace_back(outm); + } + return result; + } + + static void run(IECompiled &iec, const IEUnit &uu, IECallContext &ctx) { + // non-generic version for now, per the InferROI's definition + GAPI_Assert(uu.params.num_in == 1); + const auto& this_roi = ctx.inArg(0).rref(); + const auto this_mat = ctx.inMat(1); + IE::Blob::Ptr this_blob = wrapIE(this_mat, cv::gapi::ie::TraitAs::IMAGE); + IE::Blob::Ptr roi_blob = IE::make_shared_blob(this_blob, toIE(this_roi)); + iec.this_request.SetBlob(*uu.params.input_names.begin(), roi_blob); + iec.this_request.Infer(); + for (auto i : ade::util::iota(uu.params.num_out)) { + cv::Mat& out_mat = ctx.outMatR(i); + IE::Blob::Ptr out_blob = iec.this_request.GetBlob(uu.params.output_names[i]); + copyFromIE(out_blob, out_mat); + } + } +}; + + struct InferList: public cv::detail::KernelTag { using API = cv::GInferListBase; static cv::gapi::GBackend backend() { return cv::gapi::ie::backend(); } @@ -780,6 +839,7 @@ namespace { virtual cv::gapi::GKernelPackage auxiliaryKernels() const override { return cv::gapi::kernels< cv::gimpl::ie::Infer + , cv::gimpl::ie::InferROI , cv::gimpl::ie::InferList , cv::gimpl::ie::InferList2 >(); diff --git a/modules/gapi/src/backends/ocl/goclbackend.cpp b/modules/gapi/src/backends/ocl/goclbackend.cpp index 34dba01afe7a..847b802fd29a 100644 --- a/modules/gapi/src/backends/ocl/goclbackend.cpp +++ b/modules/gapi/src/backends/ocl/goclbackend.cpp @@ -272,4 +272,8 @@ void cv::gimpl::GOCLExecutable::run(std::vector &&input_objs, GAPI_Assert((out_arg_data == (mag_mat.getMat(ACCESS_RW).data)) && " data for output parameters was reallocated ?"); } } + + // In/Out args clean-up is mandatory now with RMat + for (auto &it : input_objs) magazine::unbind(m_res, it.first); + for (auto &it : output_objs) magazine::unbind(m_res, it.first); } diff --git a/modules/gapi/src/compiler/gcompiler.cpp b/modules/gapi/src/compiler/gcompiler.cpp index 76c40ddca0f3..eb75f44e0e58 100644 --- a/modules/gapi/src/compiler/gcompiler.cpp +++ b/modules/gapi/src/compiler/gcompiler.cpp @@ -238,6 +238,11 @@ cv::gimpl::GCompiler::GCompiler(const cv::GComputation &c, // (no compound backend present here) m_e.addPass("kernels", "check_islands_content", passes::checkIslandsContent); + // Special stage for intrinsics handling + m_e.addPassStage("intrin"); + m_e.addPass("intrin", "desync", passes::intrinDesync); + m_e.addPass("intrin", "finalizeIntrin", passes::intrinFinalize); + //Input metas may be empty when a graph is compiled for streaming m_e.addPassStage("meta"); if (!m_metas.empty()) @@ -384,6 +389,9 @@ cv::gimpl::GCompiler::GPtr cv::gimpl::GCompiler::generateGraph() { GModel::Graph(*g).metadata().set(OriginalInputMeta{m_metas}); } + // FIXME: remove m_args, remove GCompileArgs from backends' method signatures, + // rework backends to access GCompileArgs from graph metadata + GModel::Graph(*g).metadata().set(CompileArgs{m_args}); return g; } diff --git a/modules/gapi/src/compiler/gislandmodel.cpp b/modules/gapi/src/compiler/gislandmodel.cpp index aee0477e0864..9ffc60537291 100644 --- a/modules/gapi/src/compiler/gislandmodel.cpp +++ b/modules/gapi/src/compiler/gislandmodel.cpp @@ -175,13 +175,26 @@ void GIslandModel::generateInitial(GIslandModel::Graph &g, { auto src_data_nh = in_edge->srcNode(); auto isl_slot_nh = data_to_slot.at(src_data_nh); - g.link(isl_slot_nh, nh); // no other data stored yet + auto isl_new_eh = g.link(isl_slot_nh, nh); // no other data stored yet + // Propagate some special metadata from the GModel to GIslandModel + // TODO: Make it a single place (a function) for both inputs/outputs? + // (since it is duplicated in the below code block) + if (src_g.metadata(in_edge).contains()) + { + const auto idx = src_g.metadata(in_edge).get().index; + g.metadata(isl_new_eh).set(DesyncIslEdge{idx}); + } } for (auto out_edge : src_op_nh->outEdges()) { auto dst_data_nh = out_edge->dstNode(); auto isl_slot_nh = data_to_slot.at(dst_data_nh); - g.link(nh, isl_slot_nh); + auto isl_new_eh = g.link(nh, isl_slot_nh); + if (src_g.metadata(out_edge).contains()) + { + const auto idx = src_g.metadata(out_edge).get().index; + g.metadata(isl_new_eh).set(DesyncIslEdge{idx}); + } } } // for(all_operations) } @@ -254,6 +267,9 @@ void GIslandModel::syncIslandTags(Graph &g, ade::Graph &orig_g) void GIslandModel::compileIslands(Graph &g, const ade::Graph &orig_g, const GCompileArgs &args) { GModel::ConstGraph gm(orig_g); + if (gm.metadata().contains()) { + util::throw_error(std::logic_error("FATAL: The graph has unresolved intrinsics")); + } auto original_sorted = gm.metadata().get(); for (auto nh : g.nodes()) diff --git a/modules/gapi/src/compiler/gislandmodel.hpp b/modules/gapi/src/compiler/gislandmodel.hpp index 6cf8f986673d..c2e7b96d450c 100644 --- a/modules/gapi/src/compiler/gislandmodel.hpp +++ b/modules/gapi/src/compiler/gislandmodel.hpp @@ -142,6 +142,14 @@ class GAPI_EXPORTS GIslandExecutable // at that stage. virtual void handleNewStream() {}; // do nothing here by default + // This method is called for every IslandExecutable when + // the stream-based execution is stopped. + // All processing is guaranteed to be stopped by this moment, + // with no pending or running 'run()' processes ran in background. + // FIXME: This method is tightly bound to the GStreamingExecutor + // now. + virtual void handleStopStream() {} // do nothing here by default + virtual ~GIslandExecutable() = default; }; @@ -222,8 +230,19 @@ struct IslandsCompiled static const char *name() { return "IslandsCompiled"; } }; +// This flag marks an edge in an GIslandModel as "desynchronized" +// i.e. it starts a new desynchronized subgraph +struct DesyncIslEdge +{ + static const char *name() { return "DesynchronizedIslandEdge"; } + + // Projection from GModel/DesyncEdge.index + int index; +}; + namespace GIslandModel { + using Graph = ade::TypedGraph < NodeKind , FusedIsland @@ -232,6 +251,7 @@ namespace GIslandModel , Emitter , Sink , IslandsCompiled + , DesyncIslEdge , ade::passes::TopologicalSortData >; @@ -244,6 +264,7 @@ namespace GIslandModel , Emitter , Sink , IslandsCompiled + , DesyncIslEdge , ade::passes::TopologicalSortData >; diff --git a/modules/gapi/src/compiler/gmodel.cpp b/modules/gapi/src/compiler/gmodel.cpp index b5b76fd1c988..ea4eb880a435 100644 --- a/modules/gapi/src/compiler/gmodel.cpp +++ b/modules/gapi/src/compiler/gmodel.cpp @@ -77,7 +77,7 @@ ade::NodeHandle GModel::mkDataNode(GModel::Graph &g, const GShape shape) return data_h; } -void GModel::linkIn(Graph &g, ade::NodeHandle opH, ade::NodeHandle objH, std::size_t in_port) +ade::EdgeHandle GModel::linkIn(Graph &g, ade::NodeHandle opH, ade::NodeHandle objH, std::size_t in_port) { // Check if input is already connected for (const auto& in_e : opH->inEdges()) @@ -96,9 +96,11 @@ void GModel::linkIn(Graph &g, ade::NodeHandle opH, ade::NodeHandle objH, std::si // Replace an API object with a REF (G* -> GOBJREF) op.args[in_port] = cv::GArg(RcDesc{gm.rc, gm.shape, {}}); + + return eh; } -void GModel::linkOut(Graph &g, ade::NodeHandle opH, ade::NodeHandle objH, std::size_t out_port) +ade::EdgeHandle GModel::linkOut(Graph &g, ade::NodeHandle opH, ade::NodeHandle objH, std::size_t out_port) { // FIXME: check validity using kernel prototype @@ -121,6 +123,8 @@ void GModel::linkOut(Graph &g, ade::NodeHandle opH, ade::NodeHandle objH, std::s const auto min_out_size = std::max(op.outs.size(), storage_with_port); op.outs.resize(min_out_size, RcDesc{-1,GShape::GMAT,{}}); // FIXME: Invalid shape instead? op.outs[out_port] = RcDesc{gm.rc, gm.shape, {}}; + + return eh; } std::vector GModel::orderedInputs(const ConstGraph &g, ade::NodeHandle nh) @@ -210,26 +214,29 @@ ade::NodeHandle GModel::detail::dataNodeOf(const ConstLayoutGraph &g, const GOri return g.metadata().get().object_nodes.at(origin); } -void GModel::redirectReaders(Graph &g, ade::NodeHandle from, ade::NodeHandle to) +std::vector GModel::redirectReaders(Graph &g, ade::NodeHandle from, ade::NodeHandle to) { std::vector ehh(from->outEdges().begin(), from->outEdges().end()); + std::vector ohh; + ohh.reserve(ehh.size()); for (auto e : ehh) { auto dst = e->dstNode(); auto input = g.metadata(e).get(); g.erase(e); - linkIn(g, dst, to, input.port); + ohh.push_back(linkIn(g, dst, to, input.port)); } + return ohh; } -void GModel::redirectWriter(Graph &g, ade::NodeHandle from, ade::NodeHandle to) +ade::EdgeHandle GModel::redirectWriter(Graph &g, ade::NodeHandle from, ade::NodeHandle to) { GAPI_Assert(from->inEdges().size() == 1); auto e = from->inEdges().front(); auto op = e->srcNode(); auto output = g.metadata(e).get(); g.erase(e); - linkOut(g, op, to, output.port); + return linkOut(g, op, to, output.port); } GMetaArgs GModel::collectInputMeta(const GModel::ConstGraph &cg, ade::NodeHandle node) diff --git a/modules/gapi/src/compiler/gmodel.hpp b/modules/gapi/src/compiler/gmodel.hpp index 5f02e583549a..d016766fb507 100644 --- a/modules/gapi/src/compiler/gmodel.hpp +++ b/modules/gapi/src/compiler/gmodel.hpp @@ -211,6 +211,58 @@ struct CustomMetaFunction CM customOutMeta; }; +// This is a general flag indicating that this GModel has intrinsics. +// In the beginning of the compilation, it is a quick check to +// indicate there are intrinsics. +// +// In the end of the compilation, having this flag is fatal -- all +// intrinsics must be resolved. +struct HasIntrinsics +{ + static const char *name() { return "HasIntrinsicsFlag"; } +}; + +// This is a special tag for both DATA and OP nodes indicating +// which desynchronized path this node belongs to. +// This tag is set by a special complex pass intrinDesync/accept. +struct DesyncPath +{ + static const char *name() { return "DesynchronizedPath"; } + + // A zero-based index of the desynchronized path in the graph. + // Set by intrinDesync() compiler pass + int index; +}; + +// This is a special tag for graph Edges indicating that this +// particular edge starts a desynchronized path in the graph. +// At the execution stage, the data coming "through" these edges +// (virtually, of course, since our GModel edges never transfer the +// actual data, they just represent these transfers) is desynchronized +// from the rest of the pipeline, i.e. may be "lost" (stay unconsumed +// and then overwritten with some new data when streaming). +struct DesyncEdge +{ + static const char *name() { return "DesynchronizedEdge"; } + + // A zero-based index of the desynchronized path in the graph. + // Set by intrinDesync/apply() compiler pass + int index; +}; + +// This flag marks the island graph as "desynchronized" +struct Desynchronized +{ + static const char *name() { return "Desynchronized"; } +}; + +// Reference to compile args of the computation +struct CompileArgs +{ + static const char *name() { return "CompileArgs"; } + GCompileArgs args; +}; + namespace GModel { using Graph = ade::TypedGraph @@ -232,6 +284,11 @@ namespace GModel , CustomMetaFunction , Streaming , Deserialized + , HasIntrinsics + , DesyncPath + , DesyncEdge + , Desynchronized + , CompileArgs >; // FIXME: How to define it based on GModel??? @@ -254,6 +311,11 @@ namespace GModel , CustomMetaFunction , Streaming , Deserialized + , HasIntrinsics + , DesyncPath + , DesyncEdge + , Desynchronized + , CompileArgs >; // FIXME: @@ -278,11 +340,11 @@ namespace GModel // Clears logged messages of a node. GAPI_EXPORTS void log_clear(Graph &g, ade::NodeHandle node); - GAPI_EXPORTS void linkIn (Graph &g, ade::NodeHandle op, ade::NodeHandle obj, std::size_t in_port); - GAPI_EXPORTS void linkOut (Graph &g, ade::NodeHandle op, ade::NodeHandle obj, std::size_t out_port); + GAPI_EXPORTS ade::EdgeHandle linkIn (Graph &g, ade::NodeHandle op, ade::NodeHandle obj, std::size_t in_port); + GAPI_EXPORTS ade::EdgeHandle linkOut (Graph &g, ade::NodeHandle op, ade::NodeHandle obj, std::size_t out_port); - GAPI_EXPORTS void redirectReaders(Graph &g, ade::NodeHandle from, ade::NodeHandle to); - GAPI_EXPORTS void redirectWriter (Graph &g, ade::NodeHandle from, ade::NodeHandle to); + GAPI_EXPORTS std::vector redirectReaders(Graph &g, ade::NodeHandle from, ade::NodeHandle to); + GAPI_EXPORTS ade::EdgeHandle redirectWriter (Graph &g, ade::NodeHandle from, ade::NodeHandle to); GAPI_EXPORTS std::vector orderedInputs (const ConstGraph &g, ade::NodeHandle nh); GAPI_EXPORTS std::vector orderedOutputs(const ConstGraph &g, ade::NodeHandle nh); diff --git a/modules/gapi/src/compiler/gmodelbuilder.cpp b/modules/gapi/src/compiler/gmodelbuilder.cpp index 80abadd9c6cf..5f8f3518fce1 100644 --- a/modules/gapi/src/compiler/gmodelbuilder.cpp +++ b/modules/gapi/src/compiler/gmodelbuilder.cpp @@ -134,12 +134,19 @@ cv::gimpl::Unrolled cv::gimpl::unrollExpr(const GProtoArgs &ins, // Put the outputs object description of the node // so that they are not lost if they are not consumed by other operations + GAPI_Assert(call_p.m_k.outCtors.size() == call_p.m_k.outShapes.size()); for (const auto &it : ade::util::indexed(call_p.m_k.outShapes)) { std::size_t port = ade::util::index(it); GShape shape = ade::util::value(it); - GOrigin org { shape, node, port, {}, origin.kind }; + // FIXME: then use ZIP + HostCtor ctor = call_p.m_k.outCtors[port]; + + // NB: Probably this fixes all other "missing host ctor" + // problems. + // TODO: Clean-up the old workarounds if it really is. + GOrigin org {shape, node, port, std::move(ctor), origin.kind}; origins.insert(org); } diff --git a/modules/gapi/src/compiler/gobjref.hpp b/modules/gapi/src/compiler/gobjref.hpp index dd0939c439ce..bca6fa525e47 100644 --- a/modules/gapi/src/compiler/gobjref.hpp +++ b/modules/gapi/src/compiler/gobjref.hpp @@ -16,15 +16,9 @@ namespace cv namespace gimpl { - // Union type for various user-defined type constructors (GArray, GOpaque, etc) - // FIXME: Replace construct-only API with a more generic one - // (probably with bits of introspection) - // Not required for non-user-defined types (GMat, GScalar, etc) - using HostCtor = util::variant - < util::monostate - , detail::ConstructVec - , detail::ConstructOpaque - >; + // HostCtor was there, but then moved to public + // Redeclare here to avoid changing tons of code + using HostCtor = cv::detail::HostCtor; using ConstVal = util::variant < util::monostate diff --git a/modules/gapi/src/compiler/gstreaming.cpp b/modules/gapi/src/compiler/gstreaming.cpp index 29c98ddfd440..eb06f3f6f29b 100644 --- a/modules/gapi/src/compiler/gstreaming.cpp +++ b/modules/gapi/src/compiler/gstreaming.cpp @@ -69,6 +69,11 @@ bool cv::GStreamingCompiled::Priv::pull(cv::GRunArgsP &&outs) return m_exec->pull(std::move(outs)); } +bool cv::GStreamingCompiled::Priv::pull(cv::GOptRunArgsP &&outs) +{ + return m_exec->pull(std::move(outs)); +} + bool cv::GStreamingCompiled::Priv::try_pull(cv::GRunArgsP &&outs) { return m_exec->try_pull(std::move(outs)); @@ -113,6 +118,7 @@ bool cv::GStreamingCompiled::pull(cv::GRunArgsP &&outs) std::tuple cv::GStreamingCompiled::pull() { + // FIXME: Why it is not @ priv?? GRunArgs run_args; GRunArgsP outs; const auto& out_shapes = m_priv->outShapes(); @@ -144,6 +150,11 @@ std::tuple cv::GStreamingCompiled::pull() return std::make_tuple(is_over, run_args); } +bool cv::GStreamingCompiled::pull(cv::GOptRunArgsP &&outs) +{ + return m_priv->pull(std::move(outs)); +} + bool cv::GStreamingCompiled::try_pull(cv::GRunArgsP &&outs) { return m_priv->try_pull(std::move(outs)); diff --git a/modules/gapi/src/compiler/gstreaming_priv.hpp b/modules/gapi/src/compiler/gstreaming_priv.hpp index 73ca002f850a..2f195ca22671 100644 --- a/modules/gapi/src/compiler/gstreaming_priv.hpp +++ b/modules/gapi/src/compiler/gstreaming_priv.hpp @@ -42,6 +42,7 @@ class GAPI_EXPORTS GStreamingCompiled::Priv void setSource(GRunArgs &&args); void start(); bool pull(cv::GRunArgsP &&outs); + bool pull(cv::GOptRunArgsP &&outs); bool try_pull(cv::GRunArgsP &&outs); void stop(); diff --git a/modules/gapi/src/compiler/passes/exec.cpp b/modules/gapi/src/compiler/passes/exec.cpp index 0eb8352b763f..f6a73489eb39 100644 --- a/modules/gapi/src/compiler/passes/exec.cpp +++ b/modules/gapi/src/compiler/passes/exec.cpp @@ -20,6 +20,7 @@ #include // util::optional #include "logger.hpp" // GAPI_LOG +#include "api/gbackend_priv.hpp" // for canMerge() #include "compiler/gmodel.hpp" #include "compiler/gislandmodel.hpp" #include "compiler/passes/passes.hpp" @@ -54,11 +55,28 @@ namespace // Also check the cases backend can't handle // (e.x. GScalar connecting two fluid ops should split the graph) const GModel::ConstGraph g(src_graph); + if (g.metadata().contains()) { + // Fusion of a graph having a desynchronized path is + // definitely non-trivial + return false; + } const auto& active_backends = g.metadata().get().backends; - return active_backends.size() == 1 && - ade::util::all_of(g.nodes(), [&](ade::NodeHandle nh) { - return !g.metadata(nh).contains(); - }); + if (active_backends.size() != 1u) { + // More than 1 backend involved - non-trivial + return false; + } + const auto& has_island_tags = [&](ade::NodeHandle nh) { + return g.metadata(nh).contains(); + }; + if (ade::util::any_of(g.nodes(), has_island_tags)) { + // There are user-defined islands - non-trivial + return false; + } + if (active_backends.begin()->priv().controlsMerge()) { + // If the only backend controls Island Fusion on its own - non-trivial + return false; + } + return true; } void fuseTrivial(GIslandModel::Graph &g, const ade::Graph &src_graph) @@ -125,9 +143,9 @@ namespace }; bool canMerge(const GIslandModel::Graph &g, - const ade::NodeHandle a_nh, - const ade::NodeHandle /*slot_nh*/, - const ade::NodeHandle b_nh, + const ade::NodeHandle &a_nh, + const ade::NodeHandle &slot_nh, + const ade::NodeHandle &b_nh, const MergeContext &ctx = MergeContext()) { auto a_ptr = g.metadata(a_nh).get().object; @@ -142,8 +160,8 @@ namespace // Islands which cause a cycle can't be merged as well // (since the flag is set, the procedure already tried to // merge these islands in the past) - if (ade::util::contains(ctx.cycle_causers, std::make_pair(a_ptr, b_ptr))|| - ade::util::contains(ctx.cycle_causers, std::make_pair(b_ptr, a_ptr))) + if ( ade::util::contains(ctx.cycle_causers, std::make_pair(a_ptr, b_ptr)) + || ade::util::contains(ctx.cycle_causers, std::make_pair(b_ptr, a_ptr))) return false; // There may be user-defined islands. Initially user-defined @@ -163,7 +181,13 @@ namespace return false; } - // FIXME: add a backend-specified merge checker + // If available, run the backend-specified merge checker + const auto &this_backend_p = a_ptr->backend().priv(); + if ( this_backend_p.controlsMerge() + && !this_backend_p.allowsMerge(g, a_nh, slot_nh, b_nh)) + { + return false; + } return true; } @@ -205,10 +229,31 @@ namespace { using namespace std::placeholders; + // Before checking for candidates, find and ban neighbor nodes + // (input or outputs) which are connected via desynchronized + // edges. + GIsland::node_set nodes_with_desync_edges; + for (const auto& in_eh : nh->inEdges()) { + if (g.metadata(in_eh).contains()) { + nodes_with_desync_edges.insert(in_eh->srcNode()); + } + } + for (const auto& output_data_nh : nh->outNodes()) { + for (const auto &out_reader_eh : output_data_nh->outEdges()) { + if (g.metadata(out_reader_eh).contains()) { + nodes_with_desync_edges.insert(out_reader_eh->dstNode()); + } + } + } + // Find a first matching candidate GIsland for merge // among inputs - for (const auto& input_data_nh : nh->inNodes()) + for (const auto& in_eh : nh->inEdges()) { + if (ade::util::contains(nodes_with_desync_edges, in_eh->srcNode())) { + continue; // desync edges can never be fused + } + const auto& input_data_nh = in_eh->srcNode(); if (input_data_nh->inNodes().size() != 0) { // Data node must have a single producer only @@ -224,14 +269,17 @@ namespace // Ok, now try to find it among the outputs for (const auto& output_data_nh : nh->outNodes()) { - auto mergeTest = [&](ade::NodeHandle cons_nh) -> bool { - return canMerge(g, nh, output_data_nh, cons_nh, ctx); + auto mergeTest = [&](ade::EdgeHandle cons_eh) -> bool { + if (ade::util::contains(nodes_with_desync_edges, cons_eh->dstNode())) { + return false; // desync edges can never be fused + } + return canMerge(g, nh, output_data_nh, cons_eh->dstNode(), ctx); }; - auto cand_it = std::find_if(output_data_nh->outNodes().begin(), - output_data_nh->outNodes().end(), + auto cand_it = std::find_if(output_data_nh->outEdges().begin(), + output_data_nh->outEdges().end(), mergeTest); - if (cand_it != output_data_nh->outNodes().end()) - return std::make_tuple(*cand_it, + if (cand_it != output_data_nh->outEdges().end()) + return std::make_tuple((*cand_it)->dstNode(), output_data_nh, Direction::Out); } // for(outNodes) @@ -251,6 +299,7 @@ namespace ade::NodeHandle m_slot; ade::NodeHandle m_cons; + using Change = ChangeT; Change::List m_changes; struct MergeObjects @@ -423,10 +472,10 @@ namespace auto backend = m_gim.metadata(m_prod).get() .object->backend(); auto merged = std::make_shared(backend, - std::move(mo.all), - std::move(mo.in_ops), - std::move(mo.out_ops), - std::move(maybe_user_tag)); + std::move(mo.all), + std::move(mo.in_ops), + std::move(mo.out_ops), + std::move(maybe_user_tag)); // FIXME: move this debugging to some user-controllable log-level #ifdef DEBUG_MERGE merged->debug(); @@ -440,7 +489,9 @@ namespace m_prod->inEdges().end()); for (auto in_edge : input_edges) { - m_changes.enqueue(m_g, in_edge->srcNode(), new_nh); + // FIXME: Introduce a Relink primitive instead? + // (combining the both actions into one?) + m_changes.enqueue(m_g, in_edge->srcNode(), new_nh, in_edge); m_changes.enqueue(m_g, m_prod, in_edge); } @@ -450,7 +501,7 @@ namespace m_cons->outEdges().end()); for (auto out_edge : output_edges) { - m_changes.enqueue(m_g, new_nh, out_edge->dstNode()); + m_changes.enqueue(m_g, new_nh, out_edge->dstNode(), out_edge); m_changes.enqueue(m_g, m_cons, out_edge); } @@ -491,6 +542,10 @@ namespace m_changes.enqueue(m_g, non_opt_slot_nh, eh); } } + // FIXME: No metadata copied here (from where??) + // For DesyncIslEdges it still works, as these tags are + // placed to Data->Op edges and this one is an Op->Data + // edge. m_changes.enqueue(m_g, new_nh, non_opt_slot_nh); } @@ -502,7 +557,7 @@ namespace m_prod->outEdges().end()); for (auto extra_out : prod_extra_out_edges) { - m_changes.enqueue(m_g, new_nh, extra_out->dstNode()); + m_changes.enqueue(m_g, new_nh, extra_out->dstNode(), extra_out); m_changes.enqueue(m_g, m_prod, extra_out); } @@ -514,7 +569,7 @@ namespace m_cons->inEdges().end()); for (auto extra_in : cons_extra_in_edges) { - m_changes.enqueue(m_g, extra_in->srcNode(), new_nh); + m_changes.enqueue(m_g, extra_in->srcNode(), new_nh, extra_in); m_changes.enqueue(m_g, m_cons, extra_in); } @@ -557,10 +612,10 @@ namespace there_was_a_merge = false; // FIXME: move this debugging to some user-controllable log level - #ifdef DEBUG_MERGE +#ifdef DEBUG_MERGE GAPI_LOG_INFO(NULL, "Before next merge attempt " << iteration << "..."); merge_debug(g, iteration); - #endif +#endif iteration++; auto sorted = pass_helpers::topoSort(im); for (auto nh : sorted) @@ -600,9 +655,9 @@ namespace "merge(" << l_obj->name() << "," << r_obj->name() << ") was successful!"); action.commit(); - #ifdef DEBUG_MERGE +#ifdef DEBUG_MERGE GIslandModel::syncIslandTags(gim, g); - #endif +#endif there_was_a_merge = true; break; // start do{}while from the beginning } diff --git a/modules/gapi/src/compiler/passes/intrin.cpp b/modules/gapi/src/compiler/passes/intrin.cpp new file mode 100644 index 000000000000..5d2707570a40 --- /dev/null +++ b/modules/gapi/src/compiler/passes/intrin.cpp @@ -0,0 +1,305 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2020 Intel Corporation + + +#include "precomp.hpp" + +#include +#include +#include // GDesync intrinsic + +#include "compiler/gmodel.hpp" +#include "compiler/passes/passes.hpp" + +namespace desync { +namespace { + +// Drop the desynchronized node `nh` from the graph, reconnect the +// graph structure properly. This is a helper function which is used +// in both drop(g) and apply(g) passes. +// +// @return a vector of new edge handles connecting the "main" graph +// with its desynchronized part. +std::vector drop(cv::gimpl::GModel::Graph &g, + ade::NodeHandle nh) { + using namespace cv::gimpl; + + // What we need to do here: + // 1. Connect the readers of its produced data objects + // to the input data objects of desync; + // 2. Drop the data object it produces. + // 3. Drop the desync operation itself; + std::vector in_data_objs = GModel::orderedInputs(g, nh); + std::vector out_data_objs = GModel::orderedOutputs(g, nh); + std::vector new_links; + GAPI_Assert(in_data_objs.size() == out_data_objs.size()); + GAPI_DbgAssert(ade::util::all_of + (out_data_objs, + [&](const ade::NodeHandle &oh) { + return g.metadata(oh).contains(); + })); + // (1) + for (auto &&it: ade::util::zip(ade::util::toRange(in_data_objs), + ade::util::toRange(out_data_objs))) { + auto these_new_links = GModel::redirectReaders(g, + std::get<1>(it), + std::get<0>(it)); + new_links.insert(new_links.end(), + these_new_links.begin(), + these_new_links.end()); + } + // (2) + for (auto &&old_out_nh : out_data_objs) { + g.erase(old_out_nh); + } + // (3) + g.erase(nh); + + return new_links; +} + +// Tracing a desynchronizing subgraph is somewhat tricky and happens +// in both directions: downwards and upwards. +// +// The downward process is the basic one: we start with a "desync" +// OP node and go down to the graph using the "output" edges. We check +// if all nodes on this path [can] belong to this desynchronized path +// and don't overlap with others. +// +// An important contract to maintain is that the desynchronized part +// can't have any input references from the "main" graph part or any +// other desynchronized part in the graph. This contract is validated +// by checking every node's input which must belong to the same +// desynchronized part. +// +// Here is the pitfall of this check: +// +// v +// GMat_0 +// v +// +----------+ +// | desync() | <- This point originates the traceDown process +// +----------+ +// v +// GMat_0' <- This node will be tagged for this desync at +// :--------. step 0/1 +// v : <- The order how output nodes are visited is not +// +----------+ : specified, we can visit Op2() first (as there +// | Op1() | : is a direct link) bypassing visiting and tagging +// +----------+ : Op1() and GMat_1 +// v : +// GMat_1 : +// : .---' +// v v <- When we visit Op2() via the 2nd edge on this +// +----------+ graph, we check if all inputs belong to the same +// | Op2() | desynchronized graph and GMat_1 fails this check +// +----------+ (since the traceDown() process haven't visited +// it yet). +// +// Cases like this originate the traceUp() process: if we find an +// input node in our desynchronized path which doesn't belong to this +// path YET, it is not 100% a problem, and we need to trace it back +// (upwards) to see if it is really a case. + +// This recursive function checks the desync_id in the graph upwards. +// The process doesn't continue for nodes which have a valid +// desync_id already. +// The process only continues for nodes which have no desync_id +// assigned. If there's no such nodes anymore, the procedure is +// considered complete and a list of nodes to tag is returned to the +// caller. +// +// If NO inputs of this node have a valid desync_id, the desync +// invariant is broken and the function throws. +void traceUp(cv::gimpl::GModel::Graph &g, + const ade::NodeHandle &nh, + int desync_id, + std::vector &path) { + using namespace cv::gimpl; + + GAPI_Assert(!nh->inNodes().empty() + && "traceUp: a desynchronized part of the graph is not isolated?"); + + if (g.metadata(nh).contains()) { + // We may face nodes which have DesyncPath already visited during + // this recursive process (e.g. via some other output or branch in the + // subgraph) + if (g.metadata(nh).get().index != desync_id) { + GAPI_Assert(false && "Desynchronization can't be nested!"); + } + return; // This object belongs to the desync path - exit early. + } + + // Regardless of the result, put this nh to the path + path.push_back(nh); + + // Check if the input nodes are OK + std::vector nodes_to_trace; + nodes_to_trace.reserve(nh->inNodes().size()); + for (auto &&in_nh : nh->inNodes()) { + if (g.metadata(in_nh).contains()) { + // We may face nodes which have DesyncPath already visited during + // this recursive process (e.g. via some other output or branch in the + // subgraph) + GAPI_Assert(g.metadata(in_nh).get().index == desync_id + && "Desynchronization can't be nested!"); + } else { + nodes_to_trace.push_back(in_nh); + } + } + + // If there are nodes to trace, continue the recursion + for (auto &&up_nh : nodes_to_trace) { + traceUp(g, up_nh, desync_id, path); + } +} + +// This recursive function propagates the desync_id down to the graph +// starting at nh, and also checks: +// - if this desync path is isolated; +// - if this desync path is not overlapped. +// It also originates the traceUp() process at the points of +// uncertainty (as described in the comment above). +void traceDown(cv::gimpl::GModel::Graph &g, + const ade::NodeHandle &nh, + int desync_id) { + using namespace cv::gimpl; + + if (g.metadata(nh).contains()) { + // We may face nodes which have DesyncPath already visited during + // this recursive process (e.g. via some other output or branch in the + // subgraph) + GAPI_Assert(g.metadata(nh).get().index == desync_id + && "Desynchronization can't be nested!"); + } else { + g.metadata(nh).set(DesyncPath{desync_id}); + } + + // All inputs of this data object must belong to the same + // desync path. + for (auto &&in_nh : nh->inNodes()) { + // If an input object is not assigned to this desync path, + // it does not means that the object doesn't belong to + // this path. Check it. + std::vector path_up; + traceUp(g, in_nh, desync_id, path_up); + // We get here on success. Just set the proper tags for + // the identified input path. + for (auto &&up_nh : path_up) { + g.metadata(up_nh).set(DesyncPath{desync_id}); + } + } + + // Propagate the tag & check down + for (auto &&out_nh : nh->outNodes()) { + traceDown(g, out_nh, desync_id); + } +} + +// Streaming case: ensure the graph has proper isolation of the +// desynchronized parts, set proper Edge metadata hints for +// GStreamingExecutable +void apply(cv::gimpl::GModel::Graph &g) { + using namespace cv::gimpl; + + // Stage 0. Trace down the desync operations in the graph. + // Tag them with their unique (per graph) identifiers. + int total_desync = 0; + for (auto &&nh : g.nodes()) { + if (g.metadata(nh).get().t == NodeType::OP) { + const auto &op = g.metadata(nh).get(); + if (op.k.name == cv::gapi::streaming::detail::GDesync::id()) { + GAPI_Assert(!g.metadata(nh).contains() + && "Desynchronization can't be nested!"); + const int this_desync_id = total_desync++; + g.metadata(nh).set(DesyncPath{this_desync_id}); + for (auto &&out_nh: nh->outNodes()) { + traceDown(g, out_nh, this_desync_id); + } + } // if (desync) + } // if(OP) + } // for(nodes) + + // Tracing is done for all desync ops in the graph now. + // Stage 1. Drop the desync operations from the graph, but mark + // the desynchronized edges a special way. + // The desynchronized edge is the edge which connects a main + // subgraph data with a desynchronized subgraph data. + std::vector nodes(g.nodes().begin(), g.nodes().end()); + for (auto &&nh : nodes) { + if (nh == nullptr) { + // Some nodes could be dropped already during the procedure + // thanks ADE their NodeHandles updated automatically + continue; + } + if (g.metadata(nh).get().t == NodeType::OP) { + const auto &op = g.metadata(nh).get(); + if (op.k.name == cv::gapi::streaming::detail::GDesync::id()) { + auto index = g.metadata(nh).get().index; + auto new_links = drop(g, nh); + for (auto &&eh : new_links) { + g.metadata(eh).set(DesyncEdge{index}); + } + } // if (desync) + } // if (Op) + } // for(nodes) + + // Stage 2. Put a synchronized tag if there were changes applied + if (total_desync > 0) { + g.metadata().set(Desynchronized{}); + } +} + +// Probably the simplest case: desync makes no sense in the regular +// compilation process, so just drop all its occurences in the graph, +// reconnecting nodes properly. +void drop(cv::gimpl::GModel::Graph &g) { + // FIXME: LOG here that we're dropping the desync operations as + // they have no sense when compiling in the regular mode. + using namespace cv::gimpl; + std::vector nodes(g.nodes().begin(), g.nodes().end()); + for (auto &&nh : nodes) { + if (nh == nullptr) { + // Some nodes could be dropped already during the procedure + // thanks ADE their NodeHandles updated automatically + continue; + } + if (g.metadata(nh).get().t == NodeType::OP) { + const auto &op = g.metadata(nh).get(); + if (op.k.name == cv::gapi::streaming::detail::GDesync::id()) { + drop(g, nh); + } // if (desync) + } // if (Op) + } // for(nodes) +} + +} // anonymous namespace +} // namespace desync + +void cv::gimpl::passes::intrinDesync(ade::passes::PassContext &ctx) { + GModel::Graph gr(ctx.graph); + if (!gr.metadata().contains()) + return; + + gr.metadata().contains() + ? desync::apply(gr) // Streaming compilation + : desync::drop(gr); // Regular compilation +} + +// Clears the HasIntrinsics flag if all intrinsics have been handled. +void cv::gimpl::passes::intrinFinalize(ade::passes::PassContext &ctx) { + GModel::Graph gr(ctx.graph); + for (auto &&nh : gr.nodes()) { + if (gr.metadata(nh).get().t == NodeType::OP) { + const auto &op = gr.metadata(nh).get(); + if (is_intrinsic(op.k.name)) { + return; + } + } + } + // If reached here, really clear the flag + gr.metadata().erase(); +} diff --git a/modules/gapi/src/compiler/passes/kernels.cpp b/modules/gapi/src/compiler/passes/kernels.cpp index 69b339fb1eda..100a32ec57c1 100644 --- a/modules/gapi/src/compiler/passes/kernels.cpp +++ b/modules/gapi/src/compiler/passes/kernels.cpp @@ -14,6 +14,7 @@ #include // compound::backend() #include // GKernelPackage #include // GNetPackage +#include // GDesync intrinsic #include "compiler/gmodel.hpp" #include "compiler/passes/passes.hpp" @@ -24,6 +25,20 @@ #include "logger.hpp" // GAPI_LOG #include "api/gproto_priv.hpp" // is_dynamic, rewrap +namespace +{ + // FIXME: This may be not the right design choice, but so far it works + const std::vector known_intrinsics = { + cv::gapi::streaming::detail::GDesync::id() + }; +} +bool cv::gimpl::is_intrinsic(const std::string &s) { + // FIXME: This search might be better in time once we start using string + return std::find(known_intrinsics.begin(), + known_intrinsics.end(), + s) != known_intrinsics.end(); +} + namespace { struct ImplInfo @@ -130,8 +145,13 @@ void cv::gimpl::passes::bindNetParams(ade::passes::PassContext &ctx, } } -// This pass, given the kernel package, selects a kernel implementation -// for every operation in the graph +// This pass, given the kernel package, selects a kernel +// implementation for every operation in the graph +// +// Starting OpenCV 4.3, G-API may have some special "intrinsic" +// operations. Those can be implemented by backends as regular +// kernels, but if not, they are handled by the framework itself in +// its optimization/execution passes. void cv::gimpl::passes::resolveKernels(ade::passes::PassContext &ctx, const gapi::GKernelPackage &kernels) { @@ -142,7 +162,25 @@ void cv::gimpl::passes::resolveKernels(ade::passes::PassContext &ctx, { if (gr.metadata(nh).get().t == NodeType::OP) { + // If the operation is known to be intrinsic and is NOT + // implemented in the package, just skip it - there should + // be some pass which handles it. auto &op = gr.metadata(nh).get(); + if (is_intrinsic(op.k.name) && !kernels.includesAPI(op.k.name)) { + gr.metadata().set(HasIntrinsics{}); + continue; + } + // FIXME: And this logic is terribly wrong. The right + // thing is to assign an intrinsic to a particular island + // if and only if it is: + // (a) surrounded by nodes of backend X, AND + // (b) is supported by backend X. + // Here we may have multiple backends supporting an + // intrinsic but only one of those gets selected. And + // this is exactly a situation we need multiple versions + // of the same kernel to be presented in the kernel + // package (as it was designed originally). + cv::gapi::GBackend selected_backend; cv::GKernelImpl selected_impl; std::tie(selected_backend, selected_impl) = kernels.lookup(op.k.name); @@ -181,6 +219,12 @@ void cv::gimpl::passes::expandKernels(ade::passes::PassContext &ctx, const gapi: if (gr.metadata(nh).get().t == NodeType::OP) { const auto& op = gr.metadata(nh).get(); + // FIXME: Essentially the same problem as in the above resolveKernels + if (is_intrinsic(op.k.name) && !kernels.includesAPI(op.k.name)) { + // Note: There's no need to set HasIntrinsics flag here + // since resolveKernels would do it later. + continue; + } cv::gapi::GBackend selected_backend; cv::GKernelImpl selected_impl; diff --git a/modules/gapi/src/compiler/passes/passes.hpp b/modules/gapi/src/compiler/passes/passes.hpp index 84142fc0554a..8f187f6bb75d 100644 --- a/modules/gapi/src/compiler/passes/passes.hpp +++ b/modules/gapi/src/compiler/passes/passes.hpp @@ -31,7 +31,11 @@ namespace gapi { struct GNetPackage; } // namespace gapi -namespace gimpl { namespace passes { +namespace gimpl { + +bool is_intrinsic(const std::string &op_name); + +namespace passes { void dumpDot(const ade::Graph &g, std::ostream& os); void dumpDot(ade::passes::PassContext &ctx, std::ostream& os); @@ -66,6 +70,9 @@ void applyTransformations(ade::passes::PassContext &ctx, void addStreaming(ade::passes::PassContext &ctx); +void intrinDesync(ade::passes::PassContext &ctx); +void intrinFinalize(ade::passes::PassContext &ctx); + }} // namespace gimpl::passes } // namespace cv diff --git a/modules/gapi/src/compiler/transactions.hpp b/modules/gapi/src/compiler/transactions.hpp index 54af8a6e69aa..bdc1723e197d 100644 --- a/modules/gapi/src/compiler/transactions.hpp +++ b/modules/gapi/src/compiler/transactions.hpp @@ -14,6 +14,7 @@ #include +#include "opencv2/gapi/util/util.hpp" // Seq #include "opencv2/gapi/own/assert.hpp" enum class Direction: int {Invalid, In, Out}; @@ -21,8 +22,50 @@ enum class Direction: int {Invalid, In, Out}; //////////////////////////////////////////////////////////////////////////// //// // TODO: Probably it can be moved to ADE - -namespace Change +template +class Preserved +{ + using S = typename cv::detail::MkSeq::type; + std::tuple...> m_data; + + template + cv::util::optional get(ade::ConstTypedGraph g, H h) { + return g.metadata(h).template contains() + ? cv::util::make_optional(g.metadata(h).template get()) + : cv::util::optional{}; + } + template + int set(ade::TypedGraph &g, H &h) { + const auto &opt = std::get(m_data); + if (opt.has_value()) + g.metadata(h).set(opt.value()); + return 0; + } + template + void copyTo_impl(ade::TypedGraph &g, H h, cv::detail::Seq) { + int unused[] = {0, set(g, h)...}; + (void) unused; + } +public: + Preserved(const ade::Graph &g, H h) { + ade::ConstTypedGraph tg(g); + m_data = std::make_tuple(get(tg, h)...); + } + void copyTo(ade::Graph &g, H h) { + ade::TypedGraph tg(g); + copyTo_impl(tg, h, S{}); + } +}; +// Do nothing if there's no metadata +template +class Preserved { +public: + Preserved(const ade::Graph &, H) {} + void copyTo(ade::Graph &, H) {} +}; + +template +struct ChangeT { struct Base { @@ -31,6 +74,8 @@ namespace Change virtual ~Base() = default; }; + template using Preserved = ::Preserved; + class NodeCreated final: public Base { ade::NodeHandle m_node; @@ -39,11 +84,7 @@ namespace Change virtual void rollback(ade::Graph &g) override { g.erase(m_node); } }; - // NB: Drops all metadata stored in the EdgeHandle, - // which is not restored even in the rollback - - // FIXME: either add a way for users to preserve meta manually - // or extend ADE to manipulate with meta such way + // FIXME: maybe extend ADE to clone/copy the whole metadata? class DropLink final: public Base { ade::NodeHandle m_node; @@ -51,13 +92,15 @@ namespace Change ade::NodeHandle m_sibling; + Preserved m_meta; + public: DropLink(ade::Graph &g, const ade::NodeHandle &node, const ade::EdgeHandle &edge) - : m_node(node), m_dir(node == edge->srcNode() - ? Direction::Out - : Direction::In) + : m_node(node) + , m_dir(node == edge->srcNode() ? Direction::Out : Direction::In) + , m_meta(g, edge) { m_sibling = (m_dir == Direction::In ? edge->srcNode() @@ -67,12 +110,17 @@ namespace Change virtual void rollback(ade::Graph &g) override { + // FIXME: Need to preserve metadata here! + // GIslandModel edges now have metadata + ade::EdgeHandle eh; switch(m_dir) { - case Direction::In: g.link(m_sibling, m_node); break; - case Direction::Out: g.link(m_node, m_sibling); break; + case Direction::In: eh = g.link(m_sibling, m_node); break; + case Direction::Out: eh = g.link(m_node, m_sibling); break; default: GAPI_Assert(false); } + GAPI_Assert(eh != nullptr); + m_meta.copyTo(g, eh); } }; @@ -82,10 +130,15 @@ namespace Change public: NewLink(ade::Graph &g, - const ade::NodeHandle &prod, - const ade::NodeHandle &cons) + const ade::NodeHandle &prod, + const ade::NodeHandle &cons, + const ade::EdgeHandle ©_from = ade::EdgeHandle()) : m_edge(g.link(prod, cons)) { + if (copy_from != nullptr) + { + Preserved(g, copy_from).copyTo(g, m_edge); + } } virtual void rollback(ade::Graph &g) override @@ -141,7 +194,7 @@ namespace Change } } }; -} // namespace Change +}; // struct Change //////////////////////////////////////////////////////////////////////////// #endif // OPENCV_GAPI_COMPILER_TRANSACTIONS_HPP diff --git a/modules/gapi/src/executor/conc_queue.hpp b/modules/gapi/src/executor/conc_queue.hpp index 5de50ef34bb3..9875e8245a82 100644 --- a/modules/gapi/src/executor/conc_queue.hpp +++ b/modules/gapi/src/executor/conc_queue.hpp @@ -119,8 +119,7 @@ void concurrent_bounded_queue::set_capacity(std::size_t capacity) { // Clear the queue. Similar to the TBB version, this method is not // thread-safe. template -void concurrent_bounded_queue::clear() -{ +void concurrent_bounded_queue::clear() { m_data = std::queue{}; } diff --git a/modules/gapi/src/executor/gstreamingexecutor.cpp b/modules/gapi/src/executor/gstreamingexecutor.cpp index afdebee0200a..41cb83f7100d 100644 --- a/modules/gapi/src/executor/gstreamingexecutor.cpp +++ b/modules/gapi/src/executor/gstreamingexecutor.cpp @@ -6,6 +6,7 @@ #include "precomp.hpp" +#include // make_shared #include #include @@ -60,14 +61,23 @@ class ConstEmitter final: public cv::gimpl::GIslandEmitter { struct DataQueue { static const char *name() { return "StreamingDataQueue"; } + enum tag { DESYNC }; // Enum of 1 element: purely a syntax sugar explicit DataQueue(std::size_t capacity) { - if (capacity) { - q.set_capacity(capacity); + // Note: `ptr` is shared, while the `q` is a shared + auto ptr = std::make_shared(); + if (capacity != 0) { + ptr->set_capacity(capacity); } + q = std::move(ptr); + } + explicit DataQueue(tag t) + : q(new cv::gimpl::stream::DesyncQueue()) { + GAPI_Assert(t == DESYNC); } - cv::gimpl::stream::Q q; + // FIXME: ADE metadata requires types to be copiable + std::shared_ptr q; }; std::vector reader_queues( ade::Graph &g, @@ -77,7 +87,7 @@ std::vector reader_queues( ade::Graph &g, std::vector result; for (auto &&out_eh : obj->outEdges()) { - result.push_back(&qgr.metadata(out_eh).get().q); + result.push_back(qgr.metadata(out_eh).get().q.get()); } return result; } @@ -90,7 +100,7 @@ std::vector input_queues( ade::Graph &g, for (auto &&in_eh : obj->inEdges()) { result.push_back(qgr.metadata(in_eh).contains() - ? &qgr.metadata(in_eh).get().q + ? qgr.metadata(in_eh).get().q.get() : nullptr); } return result; @@ -133,6 +143,77 @@ void sync_data(cv::GRunArgs &results, cv::GRunArgsP &outputs) } } +// FIXME: Is there a way to derive function from its GRunArgsP version? +template using O = cv::util::optional; +void sync_data(cv::gimpl::stream::Result &r, cv::GOptRunArgsP &outputs) +{ + namespace own = cv::gapi::own; + + for (auto && it : ade::util::zip(ade::util::toRange(outputs), + ade::util::toRange(r.args), + ade::util::toRange(r.flags))) + { + auto &out_obj = std::get<0>(it); + auto &res_obj = std::get<1>(it); + bool available = std::get<2>(it); + + using T = cv::GOptRunArgP; +#define HANDLE_CASE(Type) \ + case T::index_of*>(): \ + if (available) { \ + *cv::util::get*>(out_obj) \ + = cv::util::make_optional(std::move(cv::util::get(res_obj))); \ + } else { \ + cv::util::get*>(out_obj)->reset(); \ + } + + // FIXME: this conversion should be unified + switch (out_obj.index()) + { + HANDLE_CASE(cv::Scalar); break; + HANDLE_CASE(cv::RMat); break; + + case T::index_of*>(): { + // Mat: special handling. + auto &mat_opt = *cv::util::get*>(out_obj); + if (available) { + auto q_map = cv::util::get(res_obj).access(cv::RMat::Access::R); + // FIXME: Copy! Maybe we could do some optimization for this case! + // e.g. don't handle RMat for last ilsand in the graph. + // It is not always possible though. + mat_opt = cv::util::make_optional(cv::gimpl::asMat(q_map).clone()); + } else { + mat_opt.reset(); + } + } break; + case T::index_of(): { + // std::vector<>: special handling + auto &vec_opt = cv::util::get(out_obj); + if (available) { + vec_opt.mov(cv::util::get(res_obj)); + } else { + vec_opt.reset(); + } + } break; + case T::index_of(): { + // std::vector<>: special handling + auto &opq_opt = cv::util::get(out_obj); + if (available) { + opq_opt.mov(cv::util::get(res_obj)); + } else { + opq_opt.reset(); + } + } break; + default: + // ...maybe because of STANDALONE mode. + GAPI_Assert(false && "This value type is not supported!"); + break; + } + } +#undef HANDLE_CASE +} + + // Pops an item from every input queue and combine it to the final // result. Blocks the current thread. Returns true if the vector has // been obtained successfully and false if a Stop message has been @@ -206,12 +287,39 @@ class QueueReader bool m_finishing = false; // Set to true once a "soft" stop is received std::vector m_cmd; + void rewindToStop(std::vector &in_queues, + const std::size_t this_id); + public: - bool getInputVector(std::vector &in_queues, - cv::GRunArgs &in_constants, - cv::GRunArgs &isl_inputs); + bool getInputVector (std::vector &in_queues, + cv::GRunArgs &in_constants, + cv::GRunArgs &isl_inputs); + + bool getResultsVector(std::vector &in_queues, + const std::vector &in_mapping, + const std::size_t out_size, + cv::GRunArgs &out_results); }; +// This method handles a stop sign got from some input +// island. Reiterate through all _remaining valid_ queues (some of +// them can be set to nullptr already -- see handling in +// getInputVector) and rewind data to every Stop sign per queue. +void QueueReader::rewindToStop(std::vector &in_queues, + const std::size_t this_id) +{ + for (auto &&qit : ade::util::indexed(in_queues)) + { + auto id2 = ade::util::index(qit); + auto &q2 = ade::util::value(qit); + if (this_id == id2) continue; + + Cmd cmd; + while (q2 && !cv::util::holds_alternative(cmd)) + q2->pop(cmd); + } +} + bool QueueReader::getInputVector(std::vector &in_queues, cv::GRunArgs &in_constants, cv::GRunArgs &isl_inputs) @@ -271,20 +379,7 @@ bool QueueReader::getInputVector(std::vector &in_queues, else { GAPI_Assert(stop.kind == Stop::Kind::HARD); - // Just got a stop sign. Reiterate through all - // _remaining valid_ queues (some of them can be - // set to nullptr already -- see above) and rewind - // data to every Stop sign per queue - for (auto &&qit : ade::util::indexed(in_queues)) - { - auto id2 = ade::util::index(qit); - auto &q2 = ade::util::value(qit); - if (id == id2) continue; - - Cmd cmd2; - while (q2 && !cv::util::holds_alternative(cmd2)) - q2->pop(cmd2); - } + rewindToStop(in_queues, id); // After queues are read to the proper indicator, // indicate end-of-stream return false; @@ -303,6 +398,60 @@ bool QueueReader::getInputVector(std::vector &in_queues, return true; // A regular case - there is data to process. } +// This is a special method to obtain a result vector +// for the entire pipeline's outputs. +// +// After introducing desync(), the pipeline output's vector +// can be produced just partially. Also, if a desynchronized +// path has multiple outputs for the pipeline, _these_ outputs +// should still come synchronized to the end user (via pull()) +// +// +// This method handles all this. +// It takes a number of input queues, which may or may not be +// equal to the number of pipeline outputs (<=). +// It also takes indexes saying which queue produces which +// output in the resulting pipeline. +// +// `out_results` is always produced with the size of full output +// vector. In the desync case, the number of in_queues will +// be less than this size and some of the items won't be produced. +// In the sync case, there will be a 1-1 mapping. +// +// In the desync case, there _will be_ multiple collector threads +// calling this method, and pushing their whole-pipeline outputs +// (_may be_ partially filled) to the same final output queue. +// The receiver part at the GStreamingExecutor level won't change +// because of that. +bool QueueReader::getResultsVector(std::vector &in_queues, + const std::vector &in_mapping, + const std::size_t out_size, + cv::GRunArgs &out_results) +{ + m_cmd.resize(out_size); + for (auto &&it : ade::util::indexed(in_queues)) + { + auto ii = ade::util::index(it); + auto oi = in_mapping[ii]; + auto &q = ade::util::value(it); + q->pop(m_cmd[oi]); + if (!cv::util::holds_alternative(m_cmd[oi])) + { + out_results[oi] = std::move(cv::util::get(m_cmd[oi])); + } + else // A Stop sign + { + // In theory, the CNST should never reach here. + // Collector thread never handles the inputs directly + // (collector's input queues are always produced by + // islands in the graph). + rewindToStop(in_queues, ii); + return false; + } // if(Stop) + } // for(in_queues) + return true; +} + // This thread is a plain dump source actor. What it do is just: // - Check input queue (the only one) for a control command @@ -603,22 +752,78 @@ void islandActorThread(std::vector in_rcs, // // and then put the resulting vector into one single queue. While it // looks redundant, it simplifies dramatically the way how try_pull() // is implemented - we need to check one queue instead of many. -void collectorThread(std::vector in_queues, - Q& out_queue) +// +// After desync() is added, there may be multiple collector threads +// running, every thread producing its own part of the partial +// pipeline output (optional...). All partial outputs are pushed +// to the same output queue and then picked by GStreamingExecutor +// in the end. +void collectorThread(std::vector in_queues, + std::vector in_mapping, + const std::size_t out_size, + Q& out_queue) { + // These flags are static now: regardless if the sync or + // desync branch is collected by this thread, all in_queue + // data should come in sync. + std::vector flags(out_size, false); + for (auto idx : in_mapping) { + flags[idx] = true; + } + QueueReader qr; while (true) { - cv::GRunArgs this_result(in_queues.size()); - cv::GRunArgs this_const(in_queues.size()); - if (!qr.getInputVector(in_queues, this_const, this_result)) + cv::GRunArgs this_result(out_size); + if (!qr.getResultsVector(in_queues, in_mapping, out_size, this_result)) { out_queue.push(Cmd{Stop{}}); return; } - out_queue.push(Cmd{this_result}); + out_queue.push(Cmd{Result{std::move(this_result), flags}}); } } + +void check_DesyncObjectConsumedByMultipleIslands(const cv::gimpl::GIslandModel::Graph &gim) { + using namespace cv::gimpl; + + // Since the limitation exists only in this particular + // implementation, the check is also done only here but not at the + // graph compiler level. + // + // See comment in desync(GMat) src/api/kernels_streaming.cpp for details. + for (auto &&nh : gim.nodes()) { + if (gim.metadata(nh).get().k == NodeKind::SLOT) { + // SLOTs are read by ISLANDs, so look for the metadata + // of the outbound edges + std::unordered_map out_desync_islands; + for (auto &&out_eh : nh->outEdges()) { + if (gim.metadata(out_eh).contains()) { + // This is a desynchronized edge + // Look what Island it leads to + const auto out_desync_idx = gim.metadata(out_eh) + .get().index; + const auto out_island = gim.metadata(out_eh->dstNode()) + .get().object; + + auto it = out_desync_islands.find(out_desync_idx); + if (it != out_desync_islands.end()) { + // If there's already an edge with this desync + // id, it must point to the same island object + GAPI_Assert(it->second == out_island.get() + && "A single desync object may only be used by a single island!"); + } else { + // Store the island pointer for the further check + out_desync_islands[out_desync_idx] = out_island.get(); + } + } // if(desync) + } // for(out_eh) + // There must be only one backend in the end of the day + // (under this desync path) + } // if(SLOT) + } // for(nodes) +} + } // anonymous namespace // GStreamingExecutor expects compile arguments as input to have possibility to do @@ -630,20 +835,28 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr && .get().model) , m_comp_args(comp_args) , m_gim(*m_island_graph) + , m_desync(GModel::Graph(*m_orig_graph).metadata() + .contains()) { GModel::Graph gm(*m_orig_graph); // NB: Right now GIslandModel is acyclic, and all the below code assumes that. - // NB: This naive execution code is taken from GExecutor nearly "as-is" + // NB: This naive execution code is taken from GExecutor nearly + // "as-is" + + if (m_desync) { + check_DesyncObjectConsumedByMultipleIslands(m_gim); + } const auto proto = gm.metadata().get(); m_emitters .resize(proto.in_nhs.size()); m_emitter_queues.resize(proto.in_nhs.size()); m_sinks .resize(proto.out_nhs.size()); - m_sink_queues .resize(proto.out_nhs.size()); + m_sink_queues .resize(proto.out_nhs.size(), nullptr); + m_sink_sync .resize(proto.out_nhs.size(), -1); // Very rough estimation to limit internal queue sizes. // Pipeline depth is equal to number of its (pipeline) steps. - const auto queue_capacity = std::count_if + const auto queue_capacity = 3*std::count_if (m_gim.nodes().begin(), m_gim.nodes().end(), [&](ade::NodeHandle nh) { @@ -728,8 +941,12 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr && { // ...only if the data is not compile-const if (const_ins.count(eh->srcNode()) == 0) { - qgr.metadata(eh).set(DataQueue(queue_capacity)); - m_internal_queues.insert(&qgr.metadata(eh).get().q); + if (m_gim.metadata(eh).contains()) { + qgr.metadata(eh).set(DataQueue(DataQueue::DESYNC)); + } else { + qgr.metadata(eh).set(DataQueue(queue_capacity)); + } + m_internal_queues.insert(qgr.metadata(eh).get().q.get()); } } } @@ -760,7 +977,14 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr && ade::TypedGraph qgr(*m_island_graph); GAPI_Assert(nh->inEdges().size() == 1u); qgr.metadata(nh->inEdges().front()).set(DataQueue(queue_capacity)); - m_sink_queues[sink_idx] = &qgr.metadata(nh->inEdges().front()).get().q; + m_sink_queues[sink_idx] = qgr.metadata(nh->inEdges().front()).get().q.get(); + + // Assign a desync tag + const auto sink_out_nh = gm.metadata().get().out_nhs[sink_idx]; + if (gm.metadata(sink_out_nh).contains()) { + // metadata().get_or<> could make this thing better + m_sink_sync[sink_idx] = gm.metadata(sink_out_nh).get().index; + } } break; default: @@ -768,7 +992,23 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr && break; } // switch(kind) } // for(gim nodes) - m_out_queue.set_capacity(queue_capacity); + + // If there are desynchronized parts in the graph, there may be + // multiple theads polling every separate (desynchronized) + // branch in the graph individually. Prepare a mapping information + // for any such thread + for (auto &&idx : ade::util::iota(m_sink_queues.size())) { + auto path_id = m_sink_sync[idx]; + auto &info = m_collector_map[path_id]; + info.queues.push_back(m_sink_queues[idx]); + info.mapping.push_back(static_cast(idx)); + } + + // Reserve space in the final queue based on the number + // of desync parts (they can generate output individually + // per the same input frame, so the output traffic multiplies) + GAPI_Assert(m_collector_map.size() > 0u); + m_out_queue.set_capacity(queue_capacity * m_collector_map.size()); } cv::gimpl::GStreamingExecutor::~GStreamingExecutor() @@ -938,6 +1178,9 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins) real_video_completion_cb); } + for (auto &&op : m_ops) { + op.isl_exec->handleNewStream(); + } // Now do this for every island (in a topological order) for (auto &&op : m_ops) @@ -974,10 +1217,17 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins) out_queues); } - // Finally, start a collector thread. - m_threads.emplace_back(collectorThread, - m_sink_queues, - std::ref(m_out_queue)); + // Finally, start collector thread(s). + // If there are desynchronized parts in the graph, there may be + // multiple theads polling every separate (desynchronized) + // branch in the graph individually. + for (auto &&info : m_collector_map) { + m_threads.emplace_back(collectorThread, + info.second.queues, + info.second.mapping, + m_sink_queues.size(), + std::ref(m_out_queue)); + } state = State::READY; } @@ -1018,15 +1268,25 @@ void cv::gimpl::GStreamingExecutor::wait_shutdown() for (auto &q : m_internal_queues) q->clear(); m_out_queue.clear(); + for (auto &&op : m_ops) { + op.isl_exec->handleStopStream(); + } + state = State::STOPPED; } bool cv::gimpl::GStreamingExecutor::pull(cv::GRunArgsP &&outs) { + // This pull() can only be called when there's no desynchronized + // parts in the graph. + GAPI_Assert(!m_desync && + "This graph has desynchronized parts! Please use another pull()"); + if (state == State::STOPPED) return false; GAPI_Assert(state == State::RUNNING); - GAPI_Assert(m_sink_queues.size() == outs.size()); + GAPI_Assert(m_sink_queues.size() == outs.size() && + "Number of data objects in cv::gout() must match the number of graph outputs in cv::GOut()"); Cmd cmd; m_out_queue.pop(cmd); @@ -1036,12 +1296,39 @@ bool cv::gimpl::GStreamingExecutor::pull(cv::GRunArgsP &&outs) return false; } - GAPI_Assert(cv::util::holds_alternative(cmd)); - cv::GRunArgs &this_result = cv::util::get(cmd); + GAPI_Assert(cv::util::holds_alternative(cmd)); + cv::GRunArgs &this_result = cv::util::get(cmd).args; sync_data(this_result, outs); return true; } +bool cv::gimpl::GStreamingExecutor::pull(cv::GOptRunArgsP &&outs) +{ + // This pull() can only be called in both cases: if there are + // desyncrhonized parts or not. + + // FIXME: so far it is a full duplicate of standard pull except + // the sync_data version called. + if (state == State::STOPPED) + return false; + GAPI_Assert(state == State::RUNNING); + GAPI_Assert(m_sink_queues.size() == outs.size() && + "Number of data objects in cv::gout() must match the number of graph outputs in cv::GOut()"); + + Cmd cmd; + m_out_queue.pop(cmd); + if (cv::util::holds_alternative(cmd)) + { + wait_shutdown(); + return false; + } + + GAPI_Assert(cv::util::holds_alternative(cmd)); + sync_data(cv::util::get(cmd), outs); + return true; +} + + bool cv::gimpl::GStreamingExecutor::try_pull(cv::GRunArgsP &&outs) { if (state == State::STOPPED) @@ -1059,8 +1346,8 @@ bool cv::gimpl::GStreamingExecutor::try_pull(cv::GRunArgsP &&outs) return false; } - GAPI_Assert(cv::util::holds_alternative(cmd)); - cv::GRunArgs &this_result = cv::util::get(cmd); + GAPI_Assert(cv::util::holds_alternative(cmd)); + cv::GRunArgs &this_result = cv::util::get(cmd).args; sync_data(this_result, outs); return true; } diff --git a/modules/gapi/src/executor/gstreamingexecutor.hpp b/modules/gapi/src/executor/gstreamingexecutor.hpp index d10f9eddd096..b6093ac1ef38 100644 --- a/modules/gapi/src/executor/gstreamingexecutor.hpp +++ b/modules/gapi/src/executor/gstreamingexecutor.hpp @@ -14,6 +14,8 @@ #include // unique_ptr, shared_ptr #include // thread +#include +#include #if defined(HAVE_TBB) # include // FIXME: drop it from here! @@ -22,6 +24,7 @@ template using QueueClass = tbb::concurrent_bounded_queue; # include "executor/conc_queue.hpp" template using QueueClass = cv::gapi::own::concurrent_bounded_queue; #endif // TBB +#include "executor/last_value.hpp" #include @@ -40,14 +43,61 @@ struct Stop { cv::GRunArg cdata; // const data for CNST stop }; +struct Result { + cv::GRunArgs args; // Full results vector + std::vector flags; // Availability flags (in case of desync) +}; + using Cmd = cv::util::variant < cv::util::monostate , Start // Tells emitters to start working. Not broadcasted to workers. , Stop // Tells emitters to stop working. Broadcasted to workers. , cv::GRunArg // Workers data payload to process. - , cv::GRunArgs // Full results vector + , Result // Pipeline's data for gout() >; -using Q = QueueClass; + +// Interface over a queue. The underlying queue implementation may be +// different. This class is mainly introduced to bring some +// abstraction over the real queues (bounded in-order) and a +// desynchronized data slots (see required to implement +// cv::gapi::desync) + +class Q { +public: + virtual void push(const Cmd &cmd) = 0; + virtual void pop(Cmd &cmd) = 0; + virtual bool try_pop(Cmd &cmd) = 0; + virtual void clear() = 0; + virtual ~Q() = default; +}; + +// A regular queue implementation +class SyncQueue final: public Q { + QueueClass m_q; // FIXME: OWN or WRAP?? + +public: + virtual void push(const Cmd &cmd) override { m_q.push(cmd); } + virtual void pop(Cmd &cmd) override { m_q.pop(cmd); } + virtual bool try_pop(Cmd &cmd) override { return m_q.try_pop(cmd); } + virtual void clear() override { m_q.clear(); } + + void set_capacity(std::size_t c) { m_q.set_capacity(c);} +}; + +// Desynchronized "queue" implementation +// Every push overwrites value which is not yet popped +// This container can hold 0 or 1 element +// Special handling for Stop is implemented (FIXME: not really) +class DesyncQueue final: public Q { + cv::gapi::own::last_written_value m_v; + +public: + virtual void push(const Cmd &cmd) override { m_v.push(cmd); } + virtual void pop(Cmd &cmd) override { m_v.pop(cmd); } + virtual bool try_pop(Cmd &cmd) override { return m_v.try_pop(cmd); } + virtual void clear() override { m_v.clear(); } +}; + } // namespace stream // FIXME: Currently all GExecutor comments apply also @@ -87,6 +137,7 @@ class GStreamingExecutor final util::optional m_reshapable; cv::gimpl::GIslandModel::Graph m_gim; // FIXME: make const? + const bool m_desync; // FIXME: Naive executor details are here for now // but then it should be moved to another place @@ -117,11 +168,27 @@ class GStreamingExecutor final std::vector m_sinks; std::vector m_threads; - std::vector m_emitter_queues; - std::vector m_const_emitter_queues; // a view over m_emitter_queues - std::vector m_sink_queues; - std::unordered_set m_internal_queues; - stream::Q m_out_queue; + std::vector m_emitter_queues; + + // a view over m_emitter_queues + std::vector m_const_emitter_queues; + + std::vector m_sink_queues; + + // desync path tags for outputs. -1 means that output + // doesn't belong to a desync path + std::vector m_sink_sync; + + std::unordered_set m_internal_queues; + stream::SyncQueue m_out_queue; + + // Describes mapping from desync paths to collector threads + struct CollectorThreadInfo { + std::vector queues; + std::vector mapping; + }; + std::unordered_map m_collector_map; + void wait_shutdown(); @@ -132,6 +199,7 @@ class GStreamingExecutor final void setSource(GRunArgs &&args); void start(); bool pull(cv::GRunArgsP &&outs); + bool pull(cv::GOptRunArgsP &&outs); bool try_pull(cv::GRunArgsP &&outs); void stop(); bool running() const; diff --git a/modules/gapi/src/executor/last_value.hpp b/modules/gapi/src/executor/last_value.hpp new file mode 100644 index 000000000000..152449a879ab --- /dev/null +++ b/modules/gapi/src/executor/last_value.hpp @@ -0,0 +1,105 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2020 Intel Corporation + +#ifndef OPENCV_GAPI_EXECUTOR_LAST_VALUE_HPP +#define OPENCV_GAPI_EXECUTOR_LAST_VALUE_HPP + +#include +#include + +#include +#include + +namespace cv { +namespace gapi { +namespace own { + +// This class implements a "Last Written Value" thing. Writer threads +// (in our case, it is just one) can write as many values there as it +// can. +// +// The reader thread gets only a value it gets at the time (or blocks +// if there was no value written since the last read). +// +// Again, the implementation is highly inefficient right now. +template +class last_written_value { + cv::util::optional m_data; + + std::mutex m_mutex; + std::condition_variable m_cond_empty; + + void unsafe_pop(T &t); + +public: + last_written_value() {} + last_written_value(const last_written_value &cc) + : m_data(cc.m_data) { + // FIXME: what to do with all that locks, etc? + } + last_written_value(last_written_value &&cc) + : m_data(std::move(cc.m_data)) { + // FIXME: what to do with all that locks, etc? + } + + // FIXME: && versions + void push(const T &t); + void pop(T &t); + bool try_pop(T &t); + + // Not thread-safe + void clear(); +}; + +// Internal: do shared pop things assuming the lock is already there +template +void last_written_value::unsafe_pop(T &t) { + GAPI_Assert(m_data.has_value()); + t = std::move(m_data.value()); + m_data.reset(); +} + +// Push an element to the queue. Blocking if there's no space left +template +void last_written_value::push(const T& t) { + std::unique_lock lock(m_mutex); + m_data = cv::util::make_optional(t); + lock.unlock(); + m_cond_empty.notify_one(); +} + +// Pop an element from the queue. Blocking if there's no items +template +void last_written_value::pop(T &t) { + std::unique_lock lock(m_mutex); + if (!m_data.has_value()) { + // if there is no data, wait + m_cond_empty.wait(lock, [&](){return m_data.has_value();}); + } + unsafe_pop(t); +} + +// Try pop an element from the queue. Returns false if queue is empty +template +bool last_written_value::try_pop(T &t) { + std::unique_lock lock(m_mutex); + if (!m_data.has_value()) { + // if there is no data, return + return false; + } + unsafe_pop(t); + return true; +} + +// Clear the value holder. This method is not thread-safe. +template +void last_written_value::clear() { + m_data.reset(); +} + +}}} // namespace cv::gapi::own + +#endif // OPENCV_GAPI_EXECUTOR_CONC_QUEUE_HPP diff --git a/modules/gapi/test/internal/gapi_int_gmodel_builder_test.cpp b/modules/gapi/test/internal/gapi_int_gmodel_builder_test.cpp index f6543e59f7c8..c9d992654233 100644 --- a/modules/gapi/test/internal/gapi_int_gmodel_builder_test.cpp +++ b/modules/gapi/test/internal/gapi_int_gmodel_builder_test.cpp @@ -2,7 +2,7 @@ // It is subject to the license terms in the LICENSE file found in the top-level directory // of this distribution and at http://opencv.org/license.html. // -// Copyright (C) 2018 Intel Corporation +// Copyright (C) 2018-2020 Intel Corporation #include "../test_precomp.hpp" @@ -29,7 +29,9 @@ namespace , "" , nullptr , { GShape::GMAT } - , { D::OpaqueKind::CV_UNKNOWN } }).pass(m).yield(0); + , { D::OpaqueKind::CV_UNKNOWN } + , { cv::detail::HostCtor{cv::util::monostate{}} } + }).pass(m).yield(0); } cv::GMat binaryOp(cv::GMat m1, cv::GMat m2) @@ -38,7 +40,9 @@ namespace , "" , nullptr , { GShape::GMAT } - , { D::OpaqueKind::CV_UNKNOWN, D::OpaqueKind::CV_UNKNOWN } }).pass(m1, m2).yield(0); + , { D::OpaqueKind::CV_UNKNOWN, D::OpaqueKind::CV_UNKNOWN } + , { cv::detail::HostCtor{cv::util::monostate{}} } + }).pass(m1, m2).yield(0); } std::vector collectOperations(const cv::gimpl::GModel::Graph& gr) diff --git a/modules/gapi/test/internal/gapi_int_island_fusion_tests.cpp b/modules/gapi/test/internal/gapi_int_island_fusion_tests.cpp index c247cc7b7970..723e42a6dfc1 100644 --- a/modules/gapi/test/internal/gapi_int_island_fusion_tests.cpp +++ b/modules/gapi/test/internal/gapi_int_island_fusion_tests.cpp @@ -513,7 +513,65 @@ TEST(IslandFusion, Regression_ShouldFuseAll) EXPECT_EQ(1u, isl_nhs.size()); // 1 island } -// FIXME: add more tests on mixed (hetero) graphs +TEST(IslandFusion, Test_Desync_NoFuse) +{ + cv::GMat in; + cv::GMat tmp1 = in*0.5f; + cv::GMat tmp2 = tmp1 + in; + + cv::GMat tmp3 = cv::gapi::streaming::desync(tmp1); + cv::GMat tmp4 = tmp3*0.1f; + + const auto in_meta = cv::GMetaArg(cv::GMatDesc{CV_8U,1,cv::Size(32,32)}); + cv::GComputation comp(cv::GIn(in), cv::GOut(tmp2, tmp4)); + + ////////////////////////////////////////////////////////////////// + // Compile the graph in "regular" mode, it should produce a single island + { + using namespace cv::gimpl; + + GCompiler compiler(comp, {in_meta}, cv::compile_args()); + GCompiler::GPtr graph = compiler.generateGraph(); + compiler.runPasses(*graph); + + auto isl_model = GModel::ConstGraph(*graph).metadata() + .get().model; + GIslandModel::ConstGraph gim(*isl_model); + + const auto is_island = [&](ade::NodeHandle nh) { + return (NodeKind::ISLAND == gim.metadata(nh).get().k); + }; + const auto num_isl = std::count_if(gim.nodes().begin(), + gim.nodes().end(), + is_island); + EXPECT_EQ(1, num_isl); + } + ////////////////////////////////////////////////////////////////// + // Now compile the graph in the streaming mode. + // It has to produce two islands + { + using namespace cv::gimpl; + + GCompiler compiler(comp, {in_meta}, cv::compile_args()); + GCompiler::GPtr graph = compiler.generateGraph(); + GModel::Graph(*graph).metadata().set(Streaming{}); + compiler.runPasses(*graph); + + auto isl_model = GModel::ConstGraph(*graph).metadata() + .get().model; + GIslandModel::ConstGraph gim(*isl_model); + + const auto is_island = [&](ade::NodeHandle nh) { + return (NodeKind::ISLAND == gim.metadata(nh).get().k); + }; + const auto num_isl = std::count_if(gim.nodes().begin(), + gim.nodes().end(), + is_island); + EXPECT_EQ(2, num_isl); + } +} + +// Fixme: add more tests on mixed (hetero) graphs // ADE-222, ADE-223 // FIXME: add test on combination of user-specified island diff --git a/modules/gapi/test/internal/gapi_transactions_test.cpp b/modules/gapi/test/internal/gapi_transactions_test.cpp index ac77c33d13e1..9d36401a71b9 100644 --- a/modules/gapi/test/internal/gapi_transactions_test.cpp +++ b/modules/gapi/test/internal/gapi_transactions_test.cpp @@ -2,11 +2,14 @@ // It is subject to the license terms in the LICENSE file found in the top-level directory // of this distribution and at http://opencv.org/license.html. // -// Copyright (C) 2018 Intel Corporation +// Copyright (C) 2018 - 2020 Intel Corporation #include "../test_precomp.hpp" + #include +#include + #include "compiler/transactions.hpp" namespace opencv_test @@ -33,10 +36,11 @@ struct SimpleGraph enum { node_nums = 5 }; ade::Graph graph; - ade::NodeHandle fused_nh; /* For check that fusion node is connected to the - inputs of the prod and the outputs of the cons */ + ade::NodeHandle fused_nh; // For check that fusion node is connected to the + // inputs of the prod and the outputs of the cons std::array nhs; std::array ehs; + using Change = ChangeT<>; Change::List changes; SimpleGraph() @@ -192,8 +196,6 @@ TEST_F(Transactions, DropNode_Commit) TEST_F(Transactions, Fusion_Commit) { - namespace C = Change; - fuse(); commit(); @@ -204,8 +206,6 @@ TEST_F(Transactions, Fusion_Commit) TEST_F(Transactions, Fusion_RollBack) { - namespace C = Change; - fuse(); rollback(); @@ -219,4 +219,151 @@ TEST_F(Transactions, Fusion_RollBack) } } +namespace +{ + struct MetaInt { + static const char *name() { return "int_meta"; } + int x; + }; + + struct MetaStr { + static const char *name() { return "string_meta"; } + std::string s; + }; +} + +TEST(PreservedMeta, TestMetaCopy_Full) +{ + ade::Graph g; + ade::TypedGraph tg(g); + + auto src_nh = tg.createNode(); + tg.metadata(src_nh).set(MetaInt{42}); + tg.metadata(src_nh).set(MetaStr{"hi"}); + + auto dst_nh = tg.createNode(); + + EXPECT_FALSE(tg.metadata(dst_nh).contains()); + EXPECT_FALSE(tg.metadata(dst_nh).contains()); + + // Here we specify all the meta types we know about the src node + // Assume Preserved copies its all for us + Preserved(g, src_nh).copyTo(g, dst_nh); + + ASSERT_TRUE(tg.metadata(dst_nh).contains()); + ASSERT_TRUE(tg.metadata(dst_nh).contains()); + + EXPECT_EQ(42, tg.metadata(dst_nh).get().x); + EXPECT_EQ("hi", tg.metadata(dst_nh).get().s); +} + + +TEST(PreservedMeta, TestMetaCopy_Partial_Dst) +{ + ade::Graph g; + ade::TypedGraph tg(g); + + auto tmp_nh1 = tg.createNode(); + auto tmp_nh2 = tg.createNode(); + auto src_eh = tg.link(tmp_nh1, tmp_nh2); + + tg.metadata(src_eh).set(MetaInt{42}); + tg.metadata(src_eh).set(MetaStr{"hi"}); + + auto tmp_nh3 = tg.createNode(); + auto tmp_nh4 = tg.createNode(); + auto dst_eh = tg.link(tmp_nh3, tmp_nh4); + + EXPECT_FALSE(tg.metadata(dst_eh).contains()); + EXPECT_FALSE(tg.metadata(dst_eh).contains()); + + // Here we specify just a single meta type for the src node + // Assume Preserved copies only this type and nothing else + Preserved(g, src_eh).copyTo(g, dst_eh); + + ASSERT_FALSE(tg.metadata(dst_eh).contains()); + ASSERT_TRUE (tg.metadata(dst_eh).contains()); + + EXPECT_EQ("hi", tg.metadata(dst_eh).get().s); +} + +TEST(PreservedMeta, TestMetaCopy_Partial_Src) +{ + ade::Graph g; + ade::TypedGraph tg(g); + + auto src_nh = tg.createNode(); + tg.metadata(src_nh).set(MetaInt{42}); + + auto dst_nh = tg.createNode(); + + EXPECT_FALSE(tg.metadata(dst_nh).contains()); + EXPECT_FALSE(tg.metadata(dst_nh).contains()); + + // Here we specify all the meta types we know about the src node + // but the src node has just one of them. + // A valid situation, only MetaInt to be copied. + Preserved(g, src_nh).copyTo(g, dst_nh); + + ASSERT_TRUE (tg.metadata(dst_nh).contains()); + ASSERT_FALSE(tg.metadata(dst_nh).contains()); + + EXPECT_EQ(42, tg.metadata(dst_nh).get().x); +} + +TEST(PreservedMeta, TestMetaCopy_Nothing) +{ + ade::Graph g; + ade::TypedGraph tg(g); + + auto src_nh = tg.createNode(); + auto dst_nh = tg.createNode(); + + EXPECT_FALSE(tg.metadata(src_nh).contains()); + EXPECT_FALSE(tg.metadata(src_nh).contains()); + + EXPECT_FALSE(tg.metadata(dst_nh).contains()); + EXPECT_FALSE(tg.metadata(dst_nh).contains()); + + // Here we specify all the meta types we know about the src node + // but the src node has none of those. See how it works now + Preserved(g, src_nh).copyTo(g, dst_nh); + + ASSERT_FALSE(tg.metadata(dst_nh).contains()); + ASSERT_FALSE(tg.metadata(dst_nh).contains()); +} + +TEST(PreservedMeta, DropEdge) +{ + ade::Graph g; + ade::TypedGraph tg(g); + + auto nh1 = tg.createNode(); + auto nh2 = tg.createNode(); + auto eh = tg.link(nh1, nh2); + + tg.metadata(eh).set(MetaInt{42}); + tg.metadata(eh).set(MetaStr{"hi"}); + + // Drop an edge using the transaction API + using Change = ChangeT; + Change::List changes; + changes.enqueue(g, nh1, eh); + + EXPECT_EQ(0u, nh1->outNodes().size()); + EXPECT_EQ(nullptr, eh); + + // Now restore the edge and check if it's meta was restored + changes.rollback(g); + + ASSERT_EQ(1u, nh1->outNodes().size()); + eh = *nh1->outEdges().begin(); + + ASSERT_TRUE(tg.metadata(eh).contains()); + ASSERT_TRUE(tg.metadata(eh).contains()); + + EXPECT_EQ(42, tg.metadata(eh).get().x); + EXPECT_EQ("hi", tg.metadata(eh).get().s); +} + } // opencv_test diff --git a/modules/gapi/test/own/conc_queue_tests.cpp b/modules/gapi/test/own/conc_queue_tests.cpp index c3e6fd6e0864..6e268f318cb8 100644 --- a/modules/gapi/test/own/conc_queue_tests.cpp +++ b/modules/gapi/test/own/conc_queue_tests.cpp @@ -55,7 +55,7 @@ TEST(ConcQueue, Clear) EXPECT_FALSE(q.try_pop(x)); } -// In this test, every writer thread produce its own range of integer +// In this test, every writer thread produces its own range of integer // numbers, writing those to a shared queue. // // Every reader thread pops elements from the queue (until -1 is @@ -64,12 +64,12 @@ TEST(ConcQueue, Clear) // Finally, the master thread waits for completion of all other // threads and verifies that all the necessary data is // produced/obtained. +namespace +{ using StressParam = std::tuple; // Queue capacity -namespace -{ constexpr int STOP_SIGN = -1; constexpr int BASE = 1000; } diff --git a/modules/gapi/test/own/last_written_value_tests.cpp b/modules/gapi/test/own/last_written_value_tests.cpp new file mode 100644 index 000000000000..4bfb27f15f3c --- /dev/null +++ b/modules/gapi/test/own/last_written_value_tests.cpp @@ -0,0 +1,156 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2020 Intel Corporation + +#include "../test_precomp.hpp" + +#include +#include + +#include "executor/last_value.hpp" + +namespace opencv_test { +using namespace cv::gapi; + +TEST(LastValue, PushPop) { + own::last_written_value v; + for (int i = 0; i < 100; i++) { + v.push(i); + + int x = 1; + v.pop(x); + EXPECT_EQ(x, i); + } +} + +TEST(LastValue, TryPop) { + own::last_written_value v; + int x = 0; + EXPECT_FALSE(v.try_pop(x)); + + v.push(1); + EXPECT_TRUE(v.try_pop(x)); + EXPECT_EQ(1, x); +} + +TEST(LastValue, Clear) { + own::last_written_value v; + v.push(42); + v.clear(); + + int x = 0; + EXPECT_FALSE(v.try_pop(x)); +} + +TEST(LastValue, Overwrite) { + own::last_written_value v; + v.push(42); + v.push(0); + + int x = -1; + EXPECT_TRUE(v.try_pop(x)); + EXPECT_EQ(0, x); +} + +// In this test, every writer thread produces its own range of integer +// numbers, writing those to a shared queue. +// +// Every reader thread pops elements from the queue (until -1 is +// reached) and stores those in its own associated set. +// +// Finally, the master thread waits for completion of all other +// threads and verifies that all the necessary data is +// produced/obtained. +namespace { +using StressParam = std::tuple; // Num reader threads +constexpr int STOP_SIGN = -1; +constexpr int BASE = 1000; +} +struct LastValue_: public ::testing::TestWithParam { + using V = own::last_written_value; + using S = std::unordered_set; + + static void writer(int base, int writes, V& v) { + for (int i = 0; i < writes; i++) { + if (i % 2) { + std::this_thread::sleep_for(std::chrono::milliseconds{1}); + } + v.push(base + i); + } + v.push(STOP_SIGN); + } + + static void reader(V& v, S& s) { + int x = 0; + while (true) { + v.pop(x); + if (x == STOP_SIGN) { + // If this thread was lucky enough to read this STOP_SIGN, + // push it back to v to make other possible readers able + // to read it again (note due to the last_written_value + // semantic, those STOP_SIGN could be simply lost i.e. + // overwritten. + v.push(STOP_SIGN); + return; + } + s.insert(x); + } + } +}; + +TEST_P(LastValue_, Test) +{ + int num_writers = 0; + int num_writes = 0; + int num_readers = 0; + std::tie(num_writers, num_writes, num_readers) = GetParam(); + + CV_Assert(num_writers < 20); + CV_Assert(num_writes < BASE); + + V v; + + // Start reader threads + std::vector storage(num_readers); + std::vector readers; + for (S& s : storage) { + readers.emplace_back(reader, std::ref(v), std::ref(s)); + } + + // Start writer threads, also pre-generate reference numbers + S reference; + std::vector writers; + for (int w = 0; w < num_writers; w++) { + writers.emplace_back(writer, w*BASE, num_writes, std::ref(v)); + for (int r = 0; r < num_writes; r++) { + reference.insert(w*BASE + r); + } + } + + // Wait for completions + for (auto &t : readers) t.join(); + for (auto &t : writers) t.join(); + + // Validate the result. Some values are read, and the values are + // correct (i.e. such values have been written) + std::size_t num_values_read = 0u; + for (const auto &s : storage) { + num_values_read += s.size(); + for (auto &x : s) { + EXPECT_TRUE(reference.count(x) > 0); + } + } + // NOTE: Some combinations may end-up in 0 values read + // it is normal, the main thing is that the test shouldn't hang! + EXPECT_LE(0u, num_values_read); +} + +INSTANTIATE_TEST_CASE_P(LastValueStress, LastValue_, + Combine( Values(1, 2, 4, 8, 16) // writers + , Values(32, 96, 256) // writes + , Values(1, 2, 10))); // readers +} // namespace opencv_test diff --git a/modules/gapi/test/streaming/gapi_streaming_tests.cpp b/modules/gapi/test/streaming/gapi_streaming_tests.cpp index dfd2331bfd93..69b85c0d3498 100644 --- a/modules/gapi/test/streaming/gapi_streaming_tests.cpp +++ b/modules/gapi/test/streaming/gapi_streaming_tests.cpp @@ -2,11 +2,13 @@ // It is subject to the license terms in the LICENSE file found in the top-level directory // of this distribution and at http://opencv.org/license.html. // -// Copyright (C) 2019 Intel Corporation +// Copyright (C) 2019-2020 Intel Corporation #include "../test_precomp.hpp" +#include // sleep_for (Delay) + #include #include @@ -18,6 +20,7 @@ #include #include +#include namespace opencv_test { @@ -100,6 +103,16 @@ struct GAPI_Streaming: public ::testing::TestWithParam { } }; +G_API_OP(Delay, , "org.opencv.test.delay") { + static cv::GMatDesc outMeta(const cv::GMatDesc &in, int) { return in; } +}; +GAPI_OCV_KERNEL(OCVDelay, Delay) { + static void run(const cv::Mat &in, int ms, cv::Mat &out) { + std::this_thread::sleep_for(std::chrono::milliseconds{ms}); + in.copyTo(out); + } +}; + } // anonymous namespace TEST_P(GAPI_Streaming, SmokeTest_ConstInput_GMat) @@ -794,6 +807,104 @@ TEST(GAPI_Streaming_Types, OutputVector) EXPECT_LT(0u, num_frames); } +G_API_OP(DimsChans, + , cv::GOpaque>(cv::GMat)>, + "test.streaming.dims_chans") { + static std::tuple outMeta(const cv::GMatDesc &) { + return std::make_tuple(cv::empty_array_desc(), + cv::empty_gopaque_desc()); + } +}; + +GAPI_OCV_KERNEL(OCVDimsChans, DimsChans) { + static void run(const cv::Mat &in, std::vector &ov, int &oi) { + ov = {in.cols, in.rows}; + oi = in.channels(); + } +}; + +struct GAPI_Streaming_TemplateTypes: ::testing::Test { + // There was a problem in GStreamingExecutor + // when outputs were formally not used by the graph + // but still should be in place as operation need + // to produce them, and host data type constructors + // were missing for GArray and GOpaque in this case. + // This test tests exactly this. + + GAPI_Streaming_TemplateTypes() { + // Prepare everything for the test: + // Graph itself + blur = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + cv::GMat blur_d = cv::gapi::streaming::desync(blur); + std::tie(vec, opq) = DimsChans::on(blur_d); + + // Kernel package + pkg = cv::gapi::kernels(); + + // Input mat + in_mat = cv::Mat::eye(cv::Size(320,240), CV_8UC3); + } + + cv::GMat in; + cv::GMat blur; + cv::GArray vec; + cv::GOpaque opq; + cv::gapi::GKernelPackage pkg; + cv::Mat in_mat; +}; + +TEST_F(GAPI_Streaming_TemplateTypes, UnusedVectorIsOK) +{ + // Declare graph without listing vec as output + auto sc = cv::GComputation(cv::GIn(in), cv::GOut(blur, opq)) + .compileStreaming(cv::compile_args(pkg)); + sc.setSource(cv::gin(in_mat)); + sc.start(); + + cv::optional out_mat; + cv::optional out_int; + + int counter = 0; + while (sc.pull(cv::gout(out_mat, out_int))) { + if (counter++ == 10) { + // Stop the test after 10 iterations + sc.stop(); + break; + } + GAPI_Assert(out_mat || out_int); + if (out_int) { + EXPECT_EQ( 3, out_int.value()); + } + } +} + +TEST_F(GAPI_Streaming_TemplateTypes, UnusedOpaqueIsOK) +{ + // Declare graph without listing opq as output + auto sc = cv::GComputation(cv::GIn(in), cv::GOut(blur, vec)) + .compileStreaming(cv::compile_args(pkg)); + sc.setSource(cv::gin(in_mat)); + sc.start(); + + cv::optional out_mat; + cv::optional > out_vec; + + int counter = 0; + while (sc.pull(cv::gout(out_mat, out_vec))) { + if (counter++ == 10) { + // Stop the test after 10 iterations + sc.stop(); + break; + } + GAPI_Assert(out_mat || out_vec); + if (out_vec) { + EXPECT_EQ(320, out_vec.value()[0]); + EXPECT_EQ(240, out_vec.value()[1]); + } + } +} + struct GAPI_Streaming_Unit: public ::testing::Test { cv::Mat m; @@ -882,7 +993,7 @@ TEST_F(GAPI_Streaming_Unit, StartStopStart_NoSetSource) EXPECT_NO_THROW(sc.setSource(cv::gin(m, m))); EXPECT_NO_THROW(sc.start()); EXPECT_NO_THROW(sc.stop()); - EXPECT_ANY_THROW(sc.start()); // Should fails since setSource was not called + EXPECT_ANY_THROW(sc.start()); // Should fail since setSource was not called } TEST_F(GAPI_Streaming_Unit, StartStopStress_Const) @@ -1018,4 +1129,380 @@ TEST(Streaming, Python_Pull_Overload) EXPECT_FALSE(ccomp.running()); } +TEST(GAPI_Streaming_Desync, SmokeTest_Regular) +{ + cv::GMat in; + cv::GMat tmp1 = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + cv::GMat out1 = cv::gapi::Canny(tmp1, 32, 128, 3); + + // FIXME: Unary desync should not require tie! + cv::GMat tmp2 = cv::gapi::streaming::desync(tmp1); + cv::GMat out2 = tmp2 / cv::gapi::Sobel(tmp2, CV_8U, 1, 1);; + + cv::Mat test_in = cv::Mat::eye(cv::Size(32,32), CV_8UC3); + cv::Mat test_out1, test_out2; + cv::GComputation(cv::GIn(in), cv::GOut(out1, out2)) + .apply(cv::gin(test_in), cv::gout(test_out1, test_out2)); +} + +TEST(GAPI_Streaming_Desync, SmokeTest_Streaming) +{ + initTestDataPath(); + + cv::GMat in; + cv::GMat tmp1 = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + cv::GMat out1 = cv::gapi::Canny(tmp1, 32, 128, 3); + + cv::GMat tmp2 = cv::gapi::streaming::desync(tmp1); + cv::GMat out2 = Delay::on(tmp2,10) / cv::gapi::Sobel(tmp2, CV_8U, 1, 1); + + auto sc = cv::GComputation(cv::GIn(in), cv::GOut(out1, out2)) + .compileStreaming(cv::compile_args(cv::gapi::kernels())); + auto sc_file = findDataFile("cv/video/768x576.avi"); + auto sc_src = gapi::wip::make_src(sc_file); + sc.setSource(cv::gin(sc_src)); + sc.start(); + + std::size_t out1_hits = 0u; + std::size_t out2_hits = 0u; + cv::optional test_out1, test_out2; + while (sc.pull(cv::gout(test_out1, test_out2))) { + GAPI_Assert(test_out1 || test_out2); + if (test_out1) out1_hits++; + if (test_out2) out2_hits++; + } + EXPECT_EQ(100u, out1_hits); // out1 must be available for all frames + EXPECT_LE(out2_hits, out1_hits); // out2 must appear less times than out1 + std::cout << "Got " << out1_hits << " out1's and " << out2_hits << " out2's" << std::endl; +} + +TEST(GAPI_Streaming_Desync, SmokeTest_Streaming_TwoParts) +{ + initTestDataPath(); + + cv::GMat in; + cv::GMat tmp1 = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + cv::GMat out1 = cv::gapi::Canny(tmp1, 32, 128, 3); + + // Desynchronized path 1 + cv::GMat tmp2 = cv::gapi::streaming::desync(tmp1); + cv::GMat out2 = tmp2 / cv::gapi::Sobel(tmp2, CV_8U, 1, 1); + + // Desynchronized path 2 + cv::GMat tmp3 = cv::gapi::streaming::desync(tmp1); + cv::GMat out3 = 0.5*tmp3 + 0.5*cv::gapi::medianBlur(tmp3, 7); + + // The code should compile and execute well (desynchronized parts don't cross) + auto sc = cv::GComputation(cv::GIn(in), cv::GOut(out1, out2, out3)) + .compileStreaming(); + auto sc_file = findDataFile("cv/video/768x576.avi"); + auto sc_src = gapi::wip::make_src(sc_file); + sc.setSource(cv::gin(sc_src)); + sc.start(); + + std::size_t test_frames = 0u; + cv::optional test_out1, test_out2, test_out3; + while (sc.pull(cv::gout(test_out1, test_out2, test_out3))) { + GAPI_Assert(test_out1 || test_out2 || test_out3); + if (test_out1) { + // count frames only for synchronized output + test_frames++; + } + } + EXPECT_EQ(100u, test_frames); +} + +TEST(GAPI_Streaming_Desync, Negative_NestedDesync_Tier0) +{ + cv::GMat in; + cv::GMat tmp1 = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + // Desynchronized path 1 + cv::GMat tmp2 = cv::gapi::streaming::desync(tmp1); + cv::GMat out1 = cv::gapi::medianBlur(tmp2, 3); + + // Desynchronized path 2, nested from 1 (directly from desync) + cv::GMat tmp3 = cv::gapi::streaming::desync(tmp2); + cv::GMat out2 = 0.5*tmp3; + + // This shouldn't compile + EXPECT_ANY_THROW(cv::GComputation(cv::GIn(in), cv::GOut(out1, out2)) + .compileStreaming()); +} + +TEST(GAPI_Streaming_Desync, Negative_NestedDesync_Tier1) +{ + cv::GMat in; + cv::GMat tmp1 = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + // Desynchronized path 1 + cv::GMat tmp2 = cv::gapi::streaming::desync(tmp1); + cv::GMat out1 = cv::gapi::medianBlur(tmp2, 3); + + // Desynchronized path 2, nested from 1 (indirectly from desync) + cv::GMat tmp3 = cv::gapi::streaming::desync(out1); + cv::GMat out2 = 0.5*tmp3; + + // This shouldn't compile + EXPECT_ANY_THROW(cv::GComputation(cv::GIn(in), cv::GOut(out1, out2)) + .compileStreaming()); +} + +TEST(GAPI_Streaming_Desync, Negative_CrossMainPart_Tier0) +{ + cv::GMat in; + cv::GMat tmp1 = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + // Desynchronized path: depends on both tmp1 and tmp2 + cv::GMat tmp2 = cv::gapi::streaming::desync(tmp1); + cv::GMat out1 = 0.5*tmp1 + 0.5*tmp2; + + // This shouldn't compile + EXPECT_ANY_THROW(cv::GComputation(in, out1).compileStreaming()); +} + +TEST(GAPI_Streaming_Desync, Negative_CrossMainPart_Tier1) +{ + cv::GMat in; + cv::GMat tmp1 = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + // Desynchronized path: depends on both tmp1 and tmp2 + cv::GMat tmp2 = cv::gapi::streaming::desync(tmp1); + cv::GMat out1 = 0.5*tmp1 + 0.5*cv::gapi::medianBlur(tmp2, 3); + + // This shouldn't compile + EXPECT_ANY_THROW(cv::GComputation(in, out1).compileStreaming()); +} + +TEST(GAPI_Streaming_Desync, Negative_CrossOtherDesync_Tier0) +{ + cv::GMat in; + cv::GMat tmp1 = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + // Desynchronized path 1 + cv::GMat tmp2 = cv::gapi::streaming::desync(tmp1); + cv::GMat out1 = 0.5*tmp2; + + // Desynchronized path 2 (depends on 1) + cv::GMat tmp3 = cv::gapi::streaming::desync(tmp1); + cv::GMat out2 = 0.5*tmp3 + tmp2; + + // This shouldn't compile + EXPECT_ANY_THROW(cv::GComputation(cv::GIn(in), cv::GOut(out1, out2)) + .compileStreaming()); +} + +TEST(GAPI_Streaming_Desync, Negative_CrossOtherDesync_Tier1) +{ + cv::GMat in; + cv::GMat tmp1 = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + // Desynchronized path 1 + cv::GMat tmp2 = cv::gapi::streaming::desync(tmp1); + cv::GMat out1 = 0.5*tmp2; + + // Desynchronized path 2 (depends on 1) + cv::GMat tmp3 = cv::gapi::streaming::desync(tmp1); + cv::GMat out2 = 0.5*cv::gapi::medianBlur(tmp3,3) + 1.0*tmp2; + + // This shouldn't compile + EXPECT_ANY_THROW(cv::GComputation(cv::GIn(in), cv::GOut(out1, out2)) + .compileStreaming()); +} + +TEST(GAPI_Streaming_Desync, Negative_SynchronizedPull) +{ + initTestDataPath(); + + cv::GMat in; + cv::GMat out1 = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + cv::GMat tmp1 = cv::gapi::streaming::desync(out1); + cv::GMat out2 = 0.5*tmp1; + + auto sc = cv::GComputation(cv::GIn(in), cv::GOut(out1, out2)) + .compileStreaming(); + + auto sc_file = findDataFile("cv/video/768x576.avi"); + auto sc_src = gapi::wip::make_src(sc_file); + sc.setSource(cv::gin(sc_src)); + sc.start(); + + cv::Mat o1, o2; + EXPECT_ANY_THROW(sc.pull(cv::gout(o1, o2))); +} + +TEST(GAPI_Streaming_Desync, UseSpecialPull) +{ + initTestDataPath(); + + cv::GMat in; + cv::GMat out1 = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + cv::GMat tmp1 = cv::gapi::streaming::desync(out1); + cv::GMat out2 = 0.5*tmp1; + + auto sc = cv::GComputation(cv::GIn(in), cv::GOut(out1, out2)) + .compileStreaming(); + + auto sc_file = findDataFile("cv/video/768x576.avi"); + auto sc_src = gapi::wip::make_src(sc_file); + sc.setSource(cv::gin(sc_src)); + sc.start(); + + cv::optional o1, o2; + std::size_t num_frames = 0u; + + while (sc.pull(cv::gout(o1, o2))) { + if (o1) num_frames++; + } + EXPECT_EQ(100u, num_frames); +} + +G_API_OP(ProduceVector, (cv::GMat)>, "test.desync.vector") { + static cv::GArrayDesc outMeta(const cv::GMatDesc &) { + return cv::empty_array_desc(); + } +}; + +G_API_OP(ProduceOpaque, (cv::GMat)>, "test.desync.opaque") { + static cv::GOpaqueDesc outMeta(const cv::GMatDesc &) { + return cv::empty_gopaque_desc(); + } +}; + +GAPI_OCV_KERNEL(OCVVector, ProduceVector) { + static void run(const cv::Mat& in, std::vector &out) { + out = {in.cols, in.rows}; + } +}; + +GAPI_OCV_KERNEL(OCVOpaque, ProduceOpaque) { + static void run(const cv::Mat &in, int &v) { + v = in.channels(); + } +}; + +namespace { +cv::GStreamingCompiled desyncTestObject() { + cv::GMat in; + cv::GMat blur = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + cv::GMat blur_d = cv::gapi::copy(cv::gapi::streaming::desync(blur)); + cv::GMat d1 = Delay::on(blur_d, 10); + cv::GMat d2 = Delay::on(blur_d, 30); + + cv::GArray vec = ProduceVector::on(d1); + cv::GOpaque opq = ProduceOpaque::on(d2); + + auto pkg = cv::gapi::kernels(); + return cv::GComputation(cv::GIn(in), cv::GOut(blur, vec, opq)) + .compileStreaming(cv::compile_args(pkg)); +} +} // anonymous namespace + +TEST(GAPI_Streaming_Desync, MultipleDesyncOutputs_1) { + auto sc = desyncTestObject(); + const cv::Mat in_mat = cv::Mat::eye(cv::Size(320,240), CV_8UC3); + + sc.setSource(cv::gin(in_mat)); + sc.start(); + + cv::optional out_mat; + cv::optional > out_vec; + cv::optional out_int; + + int counter = 0; + while (sc.pull(cv::gout(out_mat, out_vec, out_int))) { + if (counter++ == 1000) { + // Stop the test after 1000 iterations + sc.stop(); + break; + } + GAPI_Assert(out_mat || out_vec || out_int); + + // out_vec and out_int are on the same desynchronized path + // they MUST arrive together. If one is available, the other + // also must be available. + if (out_vec) { ASSERT_TRUE(out_int.has_value()); } + if (out_int) { ASSERT_TRUE(out_vec.has_value()); } + + if (out_vec || out_int) { + EXPECT_EQ(320, out_vec.value()[0]); + EXPECT_EQ(240, out_vec.value()[1]); + EXPECT_EQ( 3, out_int.value()); + } + } +} + +TEST(GAPI_Streaming_Desync, StartStop_Stress) { + auto sc = desyncTestObject(); + const cv::Mat in_mat = cv::Mat::eye(cv::Size(320,240), CV_8UC3); + + cv::optional out_mat; + cv::optional > out_vec; + cv::optional out_int; + + for (int i = 0; i < 10; i++) { + sc.setSource(cv::gin(in_mat)); + sc.start(); + int counter = 0; + while (counter++ < 100) { + sc.pull(cv::gout(out_mat, out_vec, out_int)); + GAPI_Assert(out_mat || out_vec || out_int); + if (out_vec) { ASSERT_TRUE(out_int.has_value()); } + if (out_int) { ASSERT_TRUE(out_vec.has_value()); } + } + sc.stop(); + } +} + +GAPI_FLUID_KERNEL(FluidCopy, cv::gapi::core::GCopy, false) { + static const int Window = 1; + + static void run(const cv::gapi::fluid::View &in, + cv::gapi::fluid::Buffer &out) { + const uint8_t *in_ptr = in.InLineB(0); + uint8_t *out_ptr = out.OutLineB(0); + + const auto in_type = CV_MAKETYPE(in.meta().depth, in.meta().chan); + const auto out_type = CV_MAKETYPE(out.meta().depth, out.meta().chan); + GAPI_Assert(in_type == out_type); + std::copy_n(in_ptr, in.length()*CV_ELEM_SIZE(in_type), out_ptr); + } +}; + + +TEST(GAPI_Streaming_Desync, DesyncObjectConsumedByTwoIslandsViaSeparateDesync) { + // See comment in the implementation of cv::gapi::streaming::desync (.cpp) + cv::GMat in; + cv::GMat tmp = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + cv::GMat tmp1 = cv::gapi::streaming::desync(tmp); + cv::GMat out1 = cv::gapi::copy(tmp1); // ran via Fluid backend + + cv::GMat tmp2 = cv::gapi::streaming::desync(tmp); + cv::GMat out2 = tmp2 * 0.5; // ran via OCV backend + + auto c = cv::GComputation(cv::GIn(in), cv::GOut(out1, out2)); + auto p = cv::gapi::kernels(); + + EXPECT_NO_THROW(c.compileStreaming(cv::compile_args(p))); +} + +TEST(GAPI_Streaming_Desync, DesyncObjectConsumedByTwoIslandsViaSameDesync) { + // See comment in the implementation of cv::gapi::streaming::desync (.cpp) + cv::GMat in; + cv::GMat tmp = cv::gapi::boxFilter(in, -1, cv::Size(3,3)); + + cv::GMat tmp1 = cv::gapi::streaming::desync(tmp); + cv::GMat out1 = cv::gapi::copy(tmp1); // ran via Fluid backend + cv::GMat out2 = out1 - 0.5*tmp1; // ran via OCV backend + + auto c = cv::GComputation(cv::GIn(in), cv::GOut(out1, out2)); + auto p = cv::gapi::kernels(); + + EXPECT_NO_THROW(c.compileStreaming(cv::compile_args(p))); +} + } // namespace opencv_test