Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the HTTP server shutdown logic to respond correctly to Ctrl+C and stop commands #1517

Open
wants to merge 5 commits into
base: branch-24.06
Choose a base branch
from

Conversation

mdemoret-nv
Copy link
Contributor

Description

This PR refactors the C++ HTTP server to allow it to correctly stop processing when a user presses Ctrl+C or pipeline.stop() is called.

By Submitting this PR I confirm:

  • I am familiar with the Contributing Guidelines.
  • When the PR is ready for review, new or existing tests cover these changes.
  • When the PR is ready for review, the documentation is up to date with these changes.

@mdemoret-nv mdemoret-nv added improvement Improvement to existing functionality sherlock Issues/PRs related to Sherlock workflows and components labels Feb 13, 2024
@mdemoret-nv mdemoret-nv requested a review from a team as a code owner February 13, 2024 21:03
Comment on lines -64 to +78
class HttpServerSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>
class HttpServerSourceStage : public PythonRunnableSource<std::shared_ptr<ControlMessage>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we aren't completely on ControlMessage yet, can we make the output message a template choice?

@@ -68,7 +70,9 @@ using parse_status_t = std::tuple<unsigned /*http status code*/,
* Refer to https://www.boost.org/doc/libs/1_74_0/libs/system/doc/html/system.html#ref_class_error_code for more
* information regarding `boost::system::error_code`.
*/
using payload_parse_fn_t = std::function<parse_status_t(const std::string& /* post body */)>;
using payload_parse_fn_t =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the docstring to reflect the change here

@@ -121,18 +123,23 @@ class Session : public std::enable_shared_from_this<Session>

// Release ownership of the parsed message and move it into the
// handle_request method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move the comment down just above the handle_request call, otherwise it looks like its related to building up that address_str

Comment on lines +153 to +172
// HttpServerSourceStage::subscriber_fn_t HttpServerSourceStage::build()
// {
// return [this](rxcpp::subscriber<source_type_t> subscriber) -> void {
// try
// {
// m_server->start();
// this->source_generator(subscriber);
// } catch (const SourceStageStopAfter& e)
// {
// DLOG(INFO) << "Completed after emitting " << m_records_emitted << " records";
// } catch (const std::exception& e)
// {
// LOG(ERROR) << "Encountered error while listening for incoming HTTP requests: " << e.what() << std::endl;
// subscriber.on_error(std::make_exception_ptr(e));
// return;
// }
// subscriber.on_completed();
// this->close();
// };
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Comment on lines +274 to +334
// void HttpServerSourceStage::source_generator(rxcpp::subscriber<HttpServerSourceStage::source_type_t> subscriber)
// {
// // only check if the server is running when the queue is empty, allowing all queued messages to be processed
// prior
// // to shutting down
// bool server_running = true;
// bool queue_closed = false;
// while (subscriber.is_subscribed() && server_running && !queue_closed)
// {
// table_t table_ptr{nullptr};

// auto queue_status = m_queue.pop_wait_for(table_ptr, std::chrono::milliseconds(100));

// if (queue_status == boost::fibers::channel_op_status::success)
// {
// // NOLINTNEXTLINE(clang-diagnostic-unused-value)
// DCHECK_NOTNULL(table_ptr);
// try
// {
// auto message = MessageMeta::create_from_cpp(std::move(*table_ptr), 0);
// auto num_records = message->count();
// subscriber.on_next(std::move(message));
// m_records_emitted += num_records;
// } catch (const std::exception& e)
// {
// LOG(ERROR) << "Error occurred converting HTTP payload to Dataframe: " << e.what();
// }

// if (m_stop_after > 0 && m_records_emitted >= m_stop_after)
// {
// throw SourceStageStopAfter();
// }
// }
// else if (queue_status == boost::fibers::channel_op_status::timeout)
// {
// // Yield when we have no messages so we can check if the server is still running
// boost::this_fiber::yield();
// }
// else if (queue_status == boost::fibers::channel_op_status::empty)
// {
// // if the queue is empty, maybe it's because our server is not running
// server_running = m_server->is_running();

// if (server_running)
// {
// // Sleep when there are no messages
// std::this_thread::sleep_for(m_sleep_time);
// }
// }
// else if (queue_status == boost::fibers::channel_op_status::closed)
// {
// queue_closed = true;
// }
// else
// {
// std::string error_msg{"Unknown queue status: " + std::to_string(static_cast<int>(queue_status))};
// LOG(ERROR) << error_msg;
// throw std::runtime_error(error_msg);
// }
// }
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this out in the details as a part of the PR

Comment on lines +212 to +254
# from morpheus.common import FiberQueue
# from morpheus.common import HttpServer

# with (FiberQueue(self._max_queue_size) as self._queue,
# HttpServer(parse_fn=self._parse_payload,
# bind_address=self._bind_address,
# port=self._port,
# endpoint=self._endpoint,
# method=self._method.value,
# num_threads=self._num_server_threads,
# max_payload_size=self._max_payload_size_bytes,
# request_timeout=self._request_timeout_secs) as http_server):

# import asyncio
# q = asyncio.Queue(maxsize=self._max_queue_size)

# q.

# self._processing = True
# while self._processing:
# # Read as many messages as we can from the queue if it's empty check to see if we should be shutting
# # down. It is important that any messages we received that are in the queue are processed before we
# # shutdown since we already returned an OK response to the client.
# df = None
# try:
# df = self._queue.get(block=False)
# except queue.Empty:
# if (not http_server.is_running()):
# self._processing = False
# else:
# logger.debug("Queue empty, sleeping ...")
# time.sleep(self._sleep_time)
# except Closed:
# logger.error("Queue closed unexpectedly, shutting down")
# self._processing = False

# if df is not None:
# num_records = len(df)
# yield MessageMeta(df)
# self._records_emitted += num_records

# if self._stop_after > 0 and self._records_emitted >= self._stop_after:
# self._processing = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

num_records = len(df)
yield MessageMeta(df)

while True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I don't like break statements I would instead do:

done = False
while not done:
....
    except Closed:
        done = True

Comment on lines -172 to -173
with (FiberQueue(self._max_queue_size) as self._queue,
HttpServer(parse_fn=self._parse_payload,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wish I had left a comment here, but I have a vague recollection having difficulty ensuring that the HttpServer was properly shutdown if and only if the queue was emptied. Which is why I went with lazily constructing these inside a context manager.

With that said, at the time there was a memory leak in Morpheus which prevented stages from being destroyed, which has since been fixed. So I'm not 100% this was needed or I was unwittingly just working-around the memory leak.



def retry_async(retry_exceptions=None):
import tenacity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to already be in our env as a transitive dep, but we should add it to dependencies.yaml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look into moving similar code from sherlock.

@mdemoret-nv mdemoret-nv changed the base branch from branch-24.03 to branch-24.06 April 6, 2024 00:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement Improvement to existing functionality sherlock Issues/PRs related to Sherlock workflows and components
Projects
Status: Review - Changes Requested
Development

Successfully merging this pull request may close these issues.

2 participants