Skip to content

Commit c46d951

Browse files
authored
Merge pull request #382 from microsoft/user/jasbray/update_task_cancel
Fix most remaining task cancellation race conditions
2 parents 664cbfb + f507942 commit c46d951

File tree

6 files changed

+61
-32
lines changed

6 files changed

+61
-32
lines changed

lib/offline/OfflineStorageHandler.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ namespace ARIASDK_NS_BEGIN {
6161
if (!m_flushPending)
6262
return;
6363
}
64-
LOG_INFO("Waiting for pending Flush (%p) to complete...", m_flushHandle.m_task.load());
64+
LOG_INFO("Waiting for pending Flush (%p) to complete...", m_flushHandle.m_task);
6565
m_flushComplete.wait();
6666
}
6767

@@ -259,7 +259,7 @@ namespace ARIASDK_NS_BEGIN {
259259
m_flushPending = true;
260260
m_flushComplete.Reset();
261261
m_flushHandle = PAL::scheduleTask(&m_taskDispatcher, 0, this, &OfflineStorageHandler::Flush);
262-
LOG_INFO("Requested Flush (%p)", m_flushHandle.m_task.load());
262+
LOG_INFO("Requested Flush (%p)", m_flushHandle.m_task);
263263
}
264264
m_flushLock.unlock();
265265
}

lib/pal/TaskDispatcher.hpp

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <climits>
1313
#include <algorithm>
1414
#include <atomic>
15+
#include <utility>
1516

1617
#include "ITaskDispatcher.hpp"
1718
#include "Version.hpp"
@@ -59,33 +60,42 @@ namespace PAL_NS_BEGIN {
5960
class DeferredCallbackHandle
6061
{
6162
public:
62-
std::atomic<MAT::Task*> m_task;
63-
MAT::ITaskDispatcher* m_taskDispatcher;
63+
std::mutex m_mutex;
64+
MAT::Task* m_task = nullptr;
65+
MAT::ITaskDispatcher* m_taskDispatcher = nullptr;
6466

6567
DeferredCallbackHandle(MAT::Task* task, MAT::ITaskDispatcher* taskDispatcher) :
6668
m_task(task),
6769
m_taskDispatcher(taskDispatcher) { };
68-
DeferredCallbackHandle() : m_task(nullptr), m_taskDispatcher(nullptr) {};
69-
DeferredCallbackHandle(const DeferredCallbackHandle& h) :
70-
m_task(h.m_task.load()),
71-
m_taskDispatcher(h.m_taskDispatcher) { };
70+
DeferredCallbackHandle() {};
71+
DeferredCallbackHandle(DeferredCallbackHandle&& h)
72+
{
73+
*this = std::move(h);
74+
};
7275

73-
DeferredCallbackHandle& operator=(DeferredCallbackHandle other)
76+
DeferredCallbackHandle& operator=(DeferredCallbackHandle&& other)
7477
{
75-
m_task = other.m_task.load();
78+
std::lock_guard<std::mutex> lock(m_mutex);
79+
std::lock_guard<std::mutex> otherLock(other.m_mutex);
80+
m_task = other.m_task;
81+
other.m_task = nullptr;
7682
m_taskDispatcher = other.m_taskDispatcher;
83+
7784
return *this;
7885
}
7986

8087
bool Cancel(uint64_t waitTime = 0)
8188
{
82-
MAT::Task* m_current_task = m_task.exchange(nullptr);
83-
if (m_current_task)
89+
std::lock_guard<std::mutex> lock(m_mutex);
90+
if (m_task)
8491
{
85-
bool result = (m_taskDispatcher != nullptr) && (m_taskDispatcher->Cancel(m_current_task, waitTime));
92+
bool result = (m_taskDispatcher != nullptr) && (m_taskDispatcher->Cancel(m_task, waitTime));
8693
return result;
8794
}
88-
return false;
95+
else {
96+
// Canceled nothing successfully
97+
return true;
98+
}
8999
}
90100
};
91101

lib/pal/WorkerThread.cpp

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,6 @@
77
/* Maximum scheduler interval for SDK is 1 hour required for clamping in case of monotonic clock drift */
88
#define MAX_FUTURE_DELTA_MS (60 * 60 * 1000)
99

10-
// Polling interval for task cancellation can be customized at compile-time
11-
#ifndef TASK_CANCEL_WAIT_MS
12-
#define TASK_CANCEL_WAIT_MS 50
13-
#endif
14-
1510
namespace PAL_NS_BEGIN {
1611

1712
class WorkerThreadShutdownItem : public Task
@@ -31,6 +26,7 @@ namespace PAL_NS_BEGIN {
3126

3227
// TODO: [MG] - investigate all the cases why we need recursive here
3328
std::recursive_mutex m_lock;
29+
std::timed_mutex m_execution_mutex;
3430

3531
std::list<MAT::Task*> m_queue;
3632
std::list<MAT::Task*> m_timerQueue;
@@ -100,7 +96,7 @@ namespace PAL_NS_BEGIN {
10096
//
10197
// - acquire the m_lock to prevent a new task from getting scheduled.
10298
// This may block the scheduling of a new task in queue for up to
103-
// TASK_CANCEL_WAIT_MS=50 ms in case if the task being canceled
99+
// waitTime in case if the task being canceled
104100
// is the one being executed right now.
105101
//
106102
// - if currently executing task is the one we are trying to cancel,
@@ -134,12 +130,19 @@ namespace PAL_NS_BEGIN {
134130
/* Can't recursively wait on completion of our own thread */
135131
if (m_hThread.get_id() != std::this_thread::get_id())
136132
{
137-
while ((waitTime > TASK_CANCEL_WAIT_MS) && (m_itemInProgress == item))
133+
if (waitTime > 0 && m_execution_mutex.try_lock_for(std::chrono::milliseconds(waitTime)))
138134
{
139-
PAL::sleep(TASK_CANCEL_WAIT_MS);
140-
waitTime -= TASK_CANCEL_WAIT_MS;
135+
m_itemInProgress = nullptr;
136+
m_execution_mutex.unlock();
141137
}
142138
}
139+
else
140+
{
141+
// The SDK may attempt to cancel itself from within its own task.
142+
// Return true and assume that the current task will finish, and therefore be cancelled.
143+
return true;
144+
}
145+
143146
/* Either waited long enough or the task is still executing. Return:
144147
* true - if item in progress is different than item (other task)
145148
* false - if item in progress is still the same (didn't wait long enough)
@@ -153,7 +156,6 @@ namespace PAL_NS_BEGIN {
153156
// Still in the queue
154157
m_timerQueue.erase(it);
155158
delete item;
156-
return true;
157159
}
158160
}
159161
#if 0
@@ -167,7 +169,7 @@ namespace PAL_NS_BEGIN {
167169
Sleep(10);
168170
}
169171
#endif
170-
return false;
172+
return true;
171173
}
172174

173175
protected:
@@ -229,13 +231,20 @@ namespace PAL_NS_BEGIN {
229231
break;
230232
}
231233

232-
LOG_TRACE("%10llu Execute item=%p type=%s\n", wakeupCount, item.get(), item.get()->TypeName.c_str() );
233-
(*item)();
234-
self->m_itemInProgress = nullptr;
234+
{
235+
std::lock_guard<std::timed_mutex> lock(self->m_execution_mutex);
235236

236-
if (item.get()) {
237-
item->Type = MAT::Task::Done;
238-
item.reset();
237+
// Item wasn't cancelled before it could be executed
238+
if (self->m_itemInProgress != nullptr) {
239+
LOG_TRACE("%10llu Execute item=%p type=%s\n", wakeupCount, item.get(), item.get()->TypeName.c_str() );
240+
(*item)();
241+
self->m_itemInProgress = nullptr;
242+
}
243+
244+
if (item) {
245+
item->Type = MAT::Task::Done;
246+
item = nullptr;
247+
}
239248
}
240249
}
241250
}

lib/system/TelemetrySystem.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ namespace ARIASDK_NS_BEGIN {
8181

8282
// cancel all pending and force-finish all uploads
8383
stopTimes[1] = GetUptimeMs();
84+
// TODO: Should this still pause, since the TPM now has abort logic in addition to pause logic?
85+
// hcm.cancelAllRequests is also part of pause, so the logic is definitely redundant. Issue 387
8486
onPause();
8587
hcm.cancelAllRequests();
8688
tpm.finishAllUploads();

lib/tpm/TransmissionPolicyManager.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ namespace ARIASDK_NS_BEGIN {
255255
// Called from finishAllUploads
256256
void TransmissionPolicyManager::handleFinishAllUploads()
257257
{
258+
// TODO: This pause appears to server no practical purpose? Issue 387
258259
pauseAllUploads();
259260
allUploadsFinished(); // calls stats.onStop >> this->flushTaskDispatcher;
260261
}

lib/tpm/TransmissionPolicyManager.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,14 @@ namespace ARIASDK_NS_BEGIN {
130130
{
131131
uint64_t cancelWaitTimeMs = (m_scheduledUploadAborted) ? UPLOAD_TASK_CANCEL_TIME_MS : 0;
132132
bool result = m_scheduledUpload.Cancel(cancelWaitTimeMs);
133-
m_isUploadScheduled.exchange(false);
133+
134+
// TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for.
135+
// We either need a stronger guarantee here (could impact SDK performance), or a mechanism to
136+
// ensure those tasks are canceled when the log manager is destroyed. Issue 388
137+
if (result)
138+
{
139+
m_isUploadScheduled.exchange(false);
140+
}
134141
return result;
135142
}
136143

0 commit comments

Comments
 (0)