Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert partially constructed segments on-error in segment init function #434

Open
wants to merge 31 commits into
base: branch-24.10
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
75db6d7
WIP: Shutdown fixes from PR 407, without the name changes
dagardner-nv Jan 19, 2024
7c58f7e
WIP: Add in missing methods
dagardner-nv Jan 19, 2024
1bd0eaf
Add test for connecting and releasing connections
dagardner-nv Jan 19, 2024
4021c0b
Tests for bug 360
dagardner-nv Jan 19, 2024
6cbe155
Parametarize on engine type
dagardner-nv Jan 19, 2024
2b80e67
Add test replicating a segment init error
dagardner-nv Jan 19, 2024
df2dc2e
Update CR year
dagardner-nv Jan 19, 2024
c9eccac
formatting
dagardner-nv Jan 19, 2024
66d3b07
Add new test for a variation on #360 when a module has been loaded an…
dagardner-nv Jan 22, 2024
63de5c0
Add C++ unittest to reproduce Morpheus issue #953 which is a variatio…
dagardner-nv Jan 22, 2024
44dd6c9
Slim back test to minimum [no ci]
dagardner-nv Jan 22, 2024
defa442
These appear to be duplicates of cpp/mrc/include/mrc/modules/sample_m…
dagardner-nv Jan 22, 2024
9d83330
Cleanup test
dagardner-nv Jan 22, 2024
2de362f
Add new test replicating issue observed using modules without modules…
dagardner-nv Jan 22, 2024
13f1371
Only log a fatal error in debug mode, aborting on this error is reall…
dagardner-nv Jan 23, 2024
f6b2395
Use EXPECT_DEBUG_DEATH rather than EXPECT_DEATH since we only log fat…
dagardner-nv Jan 23, 2024
e93e7a4
Add EXPECT_DEATH_OR_THROW macro which checks for an abort in debug mo…
dagardner-nv Jan 23, 2024
615e98c
Remove test triggering an abort in debug mode
dagardner-nv Jan 23, 2024
34a99ff
IWYU fixes
dagardner-nv Jan 23, 2024
c7d6587
Pin libxml2 to avoid serialization bug in hwloc, add rapids-dependenc…
dagardner-nv Jan 23, 2024
c2a485a
regen
dagardner-nv Jan 24, 2024
697df42
Remove trailing blank line
dagardner-nv Jan 24, 2024
4b72060
Remove eronious EXPECT_THROW
dagardner-nv Jan 24, 2024
c896eeb
Merge branch 'branch-24.03' into david-inconsistent-segments-and-edge…
dagardner-nv Jan 26, 2024
f57e922
Merge branch 'branch-24.03' of github.com:nv-morpheus/MRC into david-…
dagardner-nv Feb 9, 2024
fd5ab48
Merge branch 'branch-24.03' of github.com:nv-morpheus/MRC into david-…
dagardner-nv Feb 9, 2024
b92f1f9
Rename shutdown to kill per PR feedback
dagardner-nv Feb 13, 2024
20ea08c
Add separate kill_segment method
dagardner-nv Feb 13, 2024
8986784
Merge branch 'branch-24.06' into david-inconsistent-segments-and-edge…
dagardner-nv Apr 11, 2024
3143daf
Merge branch 'branch-24.06' of github.com:nv-morpheus/MRC into david-…
dagardner-nv Apr 16, 2024
6040afb
Merge branch 'branch-24.10' of github.com:nv-morpheus/MRC into david-…
dagardner-nv Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions cpp/mrc/include/mrc/edge/edge_holder.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -23,7 +23,6 @@
#include "mrc/channel/egress.hpp"
#include "mrc/channel/ingress.hpp"
#include "mrc/edge/forward.hpp"
#include "mrc/exceptions/runtime_error.hpp"
#include "mrc/type_traits.hpp"
#include "mrc/utils/string_utils.hpp"

Expand All @@ -37,6 +36,7 @@
#include <memory>
#include <mutex>
#include <optional>
#include <sstream> // for std::stringstream
#include <stdexcept>
#include <type_traits>
#include <typeindex>
Expand All @@ -51,19 +51,45 @@
{
public:
EdgeHolder() = default;

virtual ~EdgeHolder()
{
// Drop any edge connections before this object goes out of scope. This should execute any disconnectors
m_connected_edge.reset();

if (this->check_active_connection(false))
{
LOG(FATAL) << "A node was destructed which still had dependent connections. Nodes must be kept alive while "
"dependent connections are still active";
std::stringstream msg;
msg << "EdgeHolder(" << this << ") "

Check warning on line 63 in cpp/mrc/include/mrc/edge/edge_holder.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/edge/edge_holder.hpp#L62-L63

Added lines #L62 - L63 were not covered by tests
<< "A node was destructed which still had dependent connections. Nodes must be kept alive while "
"dependent connections are still active\n"
<< this->connection_info();

Check warning on line 66 in cpp/mrc/include/mrc/edge/edge_holder.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/edge/edge_holder.hpp#L66

Added line #L66 was not covered by tests

#if defined(NDEBUG)
LOG(ERROR) << msg.str();
#else
LOG(FATAL) << msg.str();

Check warning on line 71 in cpp/mrc/include/mrc/edge/edge_holder.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/edge/edge_holder.hpp#L71

Added line #L71 was not covered by tests
#endif
}
}

protected:
std::string connection_info() const

Check warning on line 77 in cpp/mrc/include/mrc/edge/edge_holder.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/edge/edge_holder.hpp#L77

Added line #L77 was not covered by tests
{
std::stringstream ss;

Check warning on line 79 in cpp/mrc/include/mrc/edge/edge_holder.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/edge/edge_holder.hpp#L79

Added line #L79 was not covered by tests
ss << "m_owned_edge=" << m_owned_edge.lock() << "\tm_owned_edge_lifetime=" << m_owned_edge_lifetime
<< "\tm_connected_edge=" << m_connected_edge;

Check warning on line 81 in cpp/mrc/include/mrc/edge/edge_holder.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/edge/edge_holder.hpp#L81

Added line #L81 was not covered by tests

bool is_connected = false;
if (m_connected_edge)

Check warning on line 84 in cpp/mrc/include/mrc/edge/edge_holder.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/edge/edge_holder.hpp#L83-L84

Added lines #L83 - L84 were not covered by tests
{
is_connected = m_connected_edge->is_connected();

Check warning on line 86 in cpp/mrc/include/mrc/edge/edge_holder.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/edge/edge_holder.hpp#L86

Added line #L86 was not covered by tests
}

ss << "\tis_connected=" << is_connected << "\tcheck_active_connection=" << this->check_active_connection(false);
return ss.str();

Check warning on line 90 in cpp/mrc/include/mrc/edge/edge_holder.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/edge/edge_holder.hpp#L89-L90

Added lines #L89 - L90 were not covered by tests
}

bool check_active_connection(bool do_throw = true) const
{
// Alive connection exists when the lock is true, lifetime is false or a connction object has been set
Expand Down Expand Up @@ -155,6 +181,13 @@
m_connected_edge.reset();
}

void disconnect()
{
m_connected_edge.reset();
m_owned_edge_lifetime.reset();
m_owned_edge.reset();
}

const std::shared_ptr<Edge<T>>& get_connected_edge() const
{
return m_connected_edge;
Expand Down Expand Up @@ -204,6 +237,17 @@
virtual ~MultiEdgeHolder() = default;

protected:
std::string connection_info() const
{
std::stringstream ss;
ss << "m_edges.size()=" << m_edges.size();
for (const auto& [key, edge_pair] : m_edges)
{
ss << "\n\tkey=" << key << "\t" << edge_pair.connection_info();
}
return ss.str();
}

void init_owned_edge(KeyT key, std::shared_ptr<Edge<T>> edge)
{
auto& edge_pair = this->get_edge_pair(key, true);
Expand Down
13 changes: 12 additions & 1 deletion cpp/mrc/include/mrc/manifold/composite_manifold.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -59,6 +59,17 @@ class CompositeManifold : public Manifold
mrc::make_edge(*m_ingress, *m_egress);
}

~CompositeManifold() override
{
shutdown();
};

void shutdown() final
{
m_ingress->shutdown();
m_egress->shutdown();
}

protected:
IngressT& ingress()
{
Expand Down
10 changes: 9 additions & 1 deletion cpp/mrc/include/mrc/manifold/egress.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -35,6 +35,7 @@
{
virtual ~EgressDelegate() = default;
virtual void add_output(const SegmentAddress& address, edge::IWritableProviderBase* output_sink) = 0;
virtual void shutdown(){};

Check warning on line 38 in cpp/mrc/include/mrc/manifold/egress.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/manifold/egress.hpp#L38

Added line #L38 was not covered by tests
};

template <typename T>
Expand All @@ -55,6 +56,13 @@
template <typename T>
class RoundRobinEgress : public node::Router<SegmentAddress, T>, public TypedEgress<T>
{
public:
void shutdown() final
{
DVLOG(10) << "Releasing edges from manifold egress";

Check warning on line 62 in cpp/mrc/include/mrc/manifold/egress.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/manifold/egress.hpp#L62

Added line #L62 was not covered by tests
node::Router<SegmentAddress, T>::release_edge_connections();
}

protected:
SegmentAddress determine_key_for_value(const T& t) override
{
Expand Down
12 changes: 11 additions & 1 deletion cpp/mrc/include/mrc/manifold/ingress.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -23,6 +23,8 @@
#include "mrc/node/sink_properties.hpp"
#include "mrc/node/source_properties.hpp"

#include <glog/logging.h>

#include <memory>

namespace mrc::manifold {
Expand All @@ -31,6 +33,7 @@
{
virtual ~IngressDelegate() = default;
virtual void add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) = 0;
virtual void shutdown(){};

Check warning on line 36 in cpp/mrc/include/mrc/manifold/ingress.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/manifold/ingress.hpp#L36

Added line #L36 was not covered by tests
};

template <typename T>
Expand All @@ -51,6 +54,13 @@
template <typename T>
class MuxedIngress : public node::Muxer<T>, public TypedIngress<T>
{
public:
void shutdown() final
{
DVLOG(10) << "Releasing edges from manifold ingress";

Check warning on line 60 in cpp/mrc/include/mrc/manifold/ingress.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/manifold/ingress.hpp#L60

Added line #L60 was not covered by tests
node::SourceProperties<T>::release_edge_connection();
}

protected:
void do_add_input(const SegmentAddress& address, edge::IWritableAcceptor<T>* source) final
{
Expand Down
7 changes: 4 additions & 3 deletions cpp/mrc/include/mrc/manifold/interface.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -28,8 +28,9 @@ struct Interface

virtual const PortName& port_name() const = 0;

virtual void start() = 0;
virtual void join() = 0;
virtual void start() = 0;
virtual void join() = 0;
virtual void shutdown() = 0;

virtual void add_input(const SegmentAddress& address, edge::IWritableAcceptorBase* input_source) = 0;
virtual void add_output(const SegmentAddress& address, edge::IWritableProviderBase* output_sink) = 0;
Expand Down
3 changes: 2 additions & 1 deletion cpp/mrc/include/mrc/manifold/manifold.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -39,6 +39,7 @@ class Manifold : public Interface
~Manifold() override;

const PortName& port_name() const final;
void shutdown() override;

protected:
runnable::IRunnableResources& resources();
Expand Down
8 changes: 6 additions & 2 deletions cpp/mrc/include/mrc/node/queue.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -33,7 +33,11 @@ class Queue : public WritableProvider<T>, public ReadableProvider<T>
{
this->set_channel(std::make_unique<mrc::channel::BufferedChannel<T>>());
}
~Queue() override = default;
~Queue() override
{
SinkProperties<T>::release_edge_connection();
SourceProperties<T>::release_edge_connection();
};

void set_channel(std::unique_ptr<mrc::channel::Channel<T>> channel)
{
Expand Down
5 changes: 3 additions & 2 deletions cpp/mrc/include/mrc/node/rx_node.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -68,13 +68,14 @@ class RxNode : public RxSinkBase<InputT>,

void make_stream(stream_fn_t fn);

void on_shutdown_critical_section() final;

private:
// the following method(s) are moved to private from their original scopes to prevent access from deriving classes
using RxSinkBase<InputT>::observable;
using RxSourceBase<OutputT>::observer;

void do_subscribe(rxcpp::composite_subscription& subscription) final;
void on_shutdown_critical_section() final;

void on_stop(const rxcpp::subscription& subscription) override;
void on_kill(const rxcpp::subscription& subscription) final;
Expand Down
48 changes: 40 additions & 8 deletions cpp/mrc/include/mrc/node/source_properties.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -23,6 +23,7 @@
#include "mrc/edge/edge_writable.hpp"
#include "mrc/node/forward.hpp"
#include "mrc/type_traits.hpp"
#include "mrc/types.hpp" // for Mutex
#include "mrc/utils/type_utils.hpp"

#include <memory>
Expand Down Expand Up @@ -208,37 +209,68 @@
class ForwardingEgressProvider : public ReadableProvider<T>
{
protected:
struct State
{
Mutex m_mutex;
bool m_is_destroyed{false};
};

class ForwardingEdge : public edge::IEdgeReadable<T>
{
public:
ForwardingEdge(ForwardingEgressProvider<T>& parent) : m_parent(parent) {}
ForwardingEdge(ForwardingEgressProvider<T>& parent, std::shared_ptr<State> state) :
m_parent(parent),
m_state(std::move(state))
{}

~ForwardingEdge() = default;

channel::Status await_read(T& t) override
{
return m_parent.get_next(t);
std::lock_guard<decltype(m_state->m_mutex)> lock(m_state->m_mutex);
if (!(m_state->m_is_destroyed))
{
return m_parent.get_next(t);
}

return channel::Status::closed;

Check warning on line 236 in cpp/mrc/include/mrc/node/source_properties.hpp

View check run for this annotation

Codecov / codecov/patch

cpp/mrc/include/mrc/node/source_properties.hpp#L236

Added line #L236 was not covered by tests
}

private:
ForwardingEgressProvider<T>& m_parent;
std::shared_ptr<State> m_state;
};

ForwardingEgressProvider()
ForwardingEgressProvider() : m_state(std::make_shared<State>())
{
auto inner_edge = std::make_shared<ForwardingEdge>(*this);
auto inner_edge = std::make_shared<ForwardingEdge>(*this, m_state);

inner_edge->add_disconnector([this]() {
// Only call the on_complete if we have been connected
this->on_complete();
inner_edge->add_disconnector([this, state = m_state]() {
std::lock_guard<decltype(state->m_mutex)> lock(state->m_mutex);
if (!(state->m_is_destroyed))
{
// Only call the on_complete if we have been connected and `this` is still alive
this->on_complete();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to see if there are any implementations of on_complete. If there arent, then we should just scrap the disconnector and remove the state and mutex changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked, the classes which implement on_complete are:

  • mrc::node::NodeComponent Calls SourceProperties<OutputT>::release_edge_connection(); and do_on_complete which is unimpl in the base
  • mrc::node::RouterBase calls MultiSourceProperties<KeyT, output_data_t>::release_edge_connections();

do_on_complete in turn is used by LambdaNodeComponent to call m_on_complete_fn if defined.

}
});

ReadableProvider<T>::init_owned_edge(inner_edge);
}

~ForwardingEgressProvider()
{
SourceProperties<T>::disconnect();
{
std::lock_guard<decltype(m_state->m_mutex)> lock(m_state->m_mutex);
m_state->m_is_destroyed = true;
}
}

virtual channel::Status get_next(T& t) = 0;

virtual void on_complete() {}

std::shared_ptr<State> m_state;
};

} // namespace mrc::node
7 changes: 6 additions & 1 deletion cpp/mrc/include/mrc/segment/component.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -34,6 +34,11 @@ class Component final : public Object<ResourceT>
Component(std::unique_ptr<ResourceT> resource) : m_resource(std::move(resource)) {}
~Component() final = default;

void destroy() final
{
m_resource.reset();
}

private:
ResourceT* get_object() const final
{
Expand Down
Loading
Loading