Skip to content

Commit

Permalink
Add possibility to wait for current job to stop.
Browse files Browse the repository at this point in the history
  • Loading branch information
tamasmeszaros committed Jan 11, 2022
1 parent 4d0088e commit 43f5e61
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 37 deletions.
75 changes: 58 additions & 17 deletions src/slic3r/GUI/Jobs/BoostThreadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,25 @@ void BoostThreadWorker::WorkerMessage::deliver(BoostThreadWorker &runner)
void BoostThreadWorker::run()
{
bool stop = false;
while(!stop) {
m_input_queue.consume_one_blk([this, &stop](JobEntry &e) {
if (!e.job)
stop = true;
else {
m_canceled.store(false);

try {
e.job->process(*this);
} catch (...) {
e.eptr = std::current_exception();
while (!stop) {
m_input_queue
.consume_one(BlockingWait{0, &m_running}, [this, &stop](JobEntry &e) {
if (!e.job)
stop = true;
else {
m_canceled.store(false);

try {
e.job->process(*this);
} catch (...) {
e.eptr = std::current_exception();
}

e.canceled = m_canceled.load();
m_output_queue.push(std::move(e)); // finalization message
}

e.canceled = m_canceled.load();
m_output_queue.push(std::move(e)); // finalization message
}
m_running.store(false);
}, &m_running);
m_running.store(false);
});
};
}

Expand Down Expand Up @@ -96,6 +97,7 @@ BoostThreadWorker::~BoostThreadWorker()
bool joined = false;
try {
cancel_all();
wait_for_idle(ABORT_WAIT_MAX_MS);
m_input_queue.push(JobEntry{nullptr});
joined = join(ABORT_WAIT_MAX_MS);
} catch(...) {}
Expand Down Expand Up @@ -129,6 +131,45 @@ void BoostThreadWorker::process_events()
}));
}

bool BoostThreadWorker::wait_for_current_job(unsigned timeout_ms)
{
bool ret = true;

if (!is_idle()) {
bool was_finish = false;
bool timeout_reached = false;
while (!timeout_reached && !was_finish) {
timeout_reached =
!m_output_queue.consume_one(BlockingWait{timeout_ms},
[this, &was_finish](
WorkerMessage &msg) {
msg.deliver(*this);
if (msg.get_type() ==
WorkerMessage::Finalize)
was_finish = true;
});
}

ret = !timeout_reached;
}

return ret;
}

bool BoostThreadWorker::wait_for_idle(unsigned timeout_ms)
{
bool timeout_reached = false;
while (!timeout_reached && !is_idle()) {
timeout_reached = !m_output_queue
.consume_one(BlockingWait{timeout_ms},
[this](WorkerMessage &msg) {
msg.deliver(*this);
});
}

return !timeout_reached;
}

bool BoostThreadWorker::push(std::unique_ptr<Job> job)
{
if (job)
Expand Down
6 changes: 6 additions & 0 deletions src/slic3r/GUI/Jobs/BoostThreadWorker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ class BoostThreadWorker : public Worker, private Job::Ctl

class WorkerMessage
{
public:
enum MsgType { Empty, Status, Finalize, MainThreadCall };

private:
boost::variant<EmptyMessage, StatusInfo, JobEntry, MainThreadCallData> m_data;

public:
Expand Down Expand Up @@ -127,6 +130,9 @@ class BoostThreadWorker : public Worker, private Job::Ctl
const ProgressIndicator * get_pri() const { return m_progress.get(); }

void process_events() override;
bool wait_for_current_job(unsigned timeout_ms = 0) override;
bool wait_for_idle(unsigned timeout_ms = 0) override;

};

}} // namespace Slic3r::GUI
Expand Down
8 changes: 8 additions & 0 deletions src/slic3r/GUI/Jobs/PlaterWorker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ class PlaterWorker: public Worker {
void cancel() override { m_w.cancel(); }
void cancel_all() override { m_w.cancel_all(); }
void process_events() override { m_w.process_events(); }
bool wait_for_current_job(unsigned timeout_ms = 0) override
{
return m_w.wait_for_current_job(timeout_ms);
}
bool wait_for_idle(unsigned timeout_ms = 0) override
{
return m_w.wait_for_idle(timeout_ms);
}
};

}} // namespace Slic3r::GUI
Expand Down
23 changes: 19 additions & 4 deletions src/slic3r/GUI/Jobs/ThreadSafeQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@

namespace Slic3r { namespace GUI {

struct BlockingWait
{
unsigned timeout_ms = 0;
std::atomic<bool> *pop_flag = nullptr;
};

// A thread safe queue for one producer and one consumer. Use consume_one_blk
// to block on an empty queue.
template<class T,
Expand All @@ -22,7 +28,7 @@ class ThreadSafeQueueSPSC
public:

// Consume one element, block if the queue is empty.
template<class Fn> void consume_one_blk(Fn &&fn, std::atomic<bool> *pop_flag = nullptr)
template<class Fn> bool consume_one(const BlockingWait &blkw, Fn &&fn)
{
static_assert(!std::is_reference_v<T>, "");
static_assert(std::is_default_constructible_v<T>, "");
Expand All @@ -31,7 +37,15 @@ class ThreadSafeQueueSPSC
T el;
{
std::unique_lock lk{m_mutex};
m_cond_var.wait(lk, [this]{ return !m_queue.empty(); });

auto pred = [this]{ return !m_queue.empty(); };
if (blkw.timeout_ms > 0) {
auto timeout = std::chrono::milliseconds(blkw.timeout_ms);
if (!m_cond_var.wait_for(lk, timeout, pred))
return false;
}
else
m_cond_var.wait(lk, pred);

if constexpr (std::is_move_assignable_v<T>)
el = std::move(m_queue.front());
Expand All @@ -40,11 +54,12 @@ class ThreadSafeQueueSPSC

m_queue.pop();

if (pop_flag) // The optional atomic is set before the lock us unlocked
pop_flag->store(true);
if (blkw.pop_flag) // The optional atomic is set before the lock us unlocked
blkw.pop_flag->store(true);
}

fn(el);
return true;
}

// Consume one element, return true if consumed, false if queue was empty.
Expand Down
36 changes: 31 additions & 5 deletions src/slic3r/GUI/Jobs/Worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ class Worker {
// Returns false if the job gets discarded.
virtual bool push(std::unique_ptr<Job> job) = 0;

// Returns true if no job is running and no job message is left to be processed.
// This means that nothing is left to finalize or take care of in the main thread.
// Returns true if no job is running, the job queue is empty and no job
// message is left to be processed. This means that nothing is left to
// finalize or take care of in the main thread.
virtual bool is_idle() const = 0;

// Ask the current job gracefully to cancel. This call is not blocking and
Expand All @@ -29,11 +30,21 @@ class Worker {
// This method will delete the queued jobs and cancel the current one.
virtual void cancel_all() = 0;

// Needs to be called continuously to process events (like status update or
// finalizing of jobs) in the UI thread. This can be done e.g. in a wxIdle
// handler.
// Needs to be called continuously to process events (like status update
// or finalizing of jobs) in the main thread. This can be done e.g. in a
// wxIdle handler.
virtual void process_events() = 0;

// Wait until the current job finishes. Timeout will only be considered
// if not zero. Returns false if timeout is reached but the job has not
// finished.
virtual bool wait_for_current_job(unsigned timeout_ms = 0) = 0;

// Wait until the whole job queue finishes. Timeout will only be considered
// if not zero. Returns false only if timeout is reached but the worker has
// not reached the idle state.
virtual bool wait_for_idle(unsigned timeout_ms = 0) = 0;

// The destructor shall properly close the worker thread.
virtual ~Worker() = default;
};
Expand Down Expand Up @@ -88,6 +99,21 @@ template<class...Args> bool replace_job(Worker &w, Args&& ...args)
return queue_job(w, std::forward<Args>(args)...);
}

// Cancel the current job and wait for it to actually be stopped.
inline void stop_current_job(Worker &w, unsigned timeout_ms = 0)
{
w.cancel();
w.wait_for_current_job(timeout_ms);
}

// Cancel all pending jobs including current one and wait until the worker
// becomes idle.
inline void stop_queue(Worker &w, unsigned timeout_ms = 0)
{
w.cancel_all();
w.wait_for_idle(timeout_ms);
}

}} // namespace Slic3r::GUI

#endif // WORKER_HPP
11 changes: 8 additions & 3 deletions src/slic3r/GUI/Plater.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5057,6 +5057,7 @@ void Plater::import_sl1_archive()
{
auto &w = get_ui_job_worker();
if (w.is_idle() && p->m_sla_import_dlg->ShowModal() == wxID_OK) {
p->take_snapshot(_L("Import SLA archive"));
replace_job(w, std::make_unique<SLAImportJob>(p->m_sla_import_dlg));
}
}
Expand Down Expand Up @@ -5493,8 +5494,10 @@ void Plater::set_number_of_copies(/*size_t num*/)
void Plater::fill_bed_with_instances()
{
auto &w = get_ui_job_worker();
if (w.is_idle())
if (w.is_idle()) {
p->take_snapshot(_L("Fill bed"));
replace_job(w, std::make_unique<FillBedJob>());
}
}

bool Plater::is_selection_empty() const
Expand Down Expand Up @@ -5896,7 +5899,7 @@ void Plater::reslice()
return;

// Stop arrange and (or) optimize rotation tasks.
this->get_ui_job_worker().cancel_all();
stop_queue(this->get_ui_job_worker());

if (printer_technology() == ptSLA) {
for (auto& object : model().objects)
Expand Down Expand Up @@ -6361,8 +6364,10 @@ GLCanvas3D* Plater::get_current_canvas3D()
void Plater::arrange()
{
auto &w = get_ui_job_worker();
if (w.is_idle())
if (w.is_idle()) {
p->take_snapshot(_L("Arrange"));
replace_job(w, std::make_unique<ArrangeJob>());
}
}

void Plater::set_current_canvas_as_dirty()
Expand Down
13 changes: 5 additions & 8 deletions tests/slic3rutils/slic3r_jobs_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ TEST_CASE("State should not be idle while running a job", "[Jobs]") {
}).wait();
});

while (!worker.is_idle())
worker.process_events();
worker.wait_for_idle();

REQUIRE(worker.is_idle());
}
Expand All @@ -55,8 +54,7 @@ TEST_CASE("Status messages should be received by the main thread during job exec
}
});

while (!worker.is_idle())
worker.process_events();
worker.wait_for_idle();

REQUIRE(pri->pr == 100);
REQUIRE(pri->statustxt == "Running");
Expand Down Expand Up @@ -85,8 +83,7 @@ TEST_CASE("Cancellation should be recognized be the worker", "[Jobs]") {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
worker.cancel();

while (!worker.is_idle())
worker.process_events();
worker.wait_for_current_job();

REQUIRE(pri->pr != 100);
}
Expand Down Expand Up @@ -146,6 +143,6 @@ TEST_CASE("Exception should be properly forwarded to finalize()", "[Jobs]") {
eptr = nullptr;
});

while (!worker.is_idle())
worker.process_events();
worker.wait_for_idle();
REQUIRE(worker.is_idle());
}

0 comments on commit 43f5e61

Please sign in to comment.