Skip to content

Commit

Permalink
Export methods to set/get the ambient pplx scheduler (#148)
Browse files Browse the repository at this point in the history
* Export methods to set/get the ambient pplx scheduler

* also add set_wastorage_ambient_delayed_scheduler

* keep the default implementation of delay_task

* put the static variable under ifdef

* add unit test verify_retry_after_delay

* set delayed scheduler in the test
  • Loading branch information
chakrab-msft authored and vinjiang committed Feb 7, 2018
1 parent 9d39de1 commit eae40c9
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 2 deletions.
30 changes: 30 additions & 0 deletions Microsoft.WindowsAzure.Storage/includes/was/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,36 @@ namespace azure { namespace storage {
std::shared_ptr<basic_retry_policy> m_policy;
};

#ifdef _WIN32
/// <summary>
/// Interface for scheduling tasks that start after a provided delay in milliseconds
/// </summary>
struct __declspec(novtable) delayed_scheduler_interface
{
virtual void schedule_after(pplx::TaskProc_t function, void* context, long long delayInMs) = 0;
};

/// <summary>
/// Sets the ambient scheduler to be used by the PPL constructs. Note this is not thread safe.
/// </summary>
WASTORAGE_API void __cdecl set_wastorage_ambient_scheduler(const std::shared_ptr<pplx::scheduler_interface>& scheduler);

/// <summary>
/// Gets the ambient scheduler to be used by the PPL constructs. Note this is not thread safe.
/// </summary>
WASTORAGE_API const std::shared_ptr<pplx::scheduler_interface>& __cdecl get_wastorage_ambient_scheduler();

/// <summary>
/// Sets the ambient scheduler to be used for scheduling delayed tasks. Note this is not thread safe.
/// </summary>
WASTORAGE_API void __cdecl set_wastorage_ambient_delayed_scheduler(const std::shared_ptr<delayed_scheduler_interface>& scheduler);

/// <summary>
/// Gets the ambient scheduler to be used for scheduling delayed tasks. Note this is not thread safe.
/// </summary>
WASTORAGE_API const std::shared_ptr<delayed_scheduler_interface>& __cdecl get_wastorage_ambient_delayed_scheduler();
#endif

}} // namespace azure::storage

#ifndef _WIN32
Expand Down
26 changes: 26 additions & 0 deletions Microsoft.WindowsAzure.Storage/src/cloud_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

namespace azure { namespace storage {

#ifdef _WIN32
static std::shared_ptr<delayed_scheduler_interface> s_delayedScheduler;
#endif

storage_uri::storage_uri(web::http::uri primary_uri)
: m_primary_uri(std::move(primary_uri))
{
Expand Down Expand Up @@ -58,4 +62,26 @@ namespace azure { namespace storage {
}
}

#ifdef _WIN32
void __cdecl set_wastorage_ambient_scheduler(const std::shared_ptr<pplx::scheduler_interface>& scheduler)
{
pplx::set_ambient_scheduler(scheduler);
}

const std::shared_ptr<pplx::scheduler_interface>& __cdecl get_wastorage_ambient_scheduler()
{
return pplx::get_ambient_scheduler();
}

void __cdecl set_wastorage_ambient_delayed_scheduler(const std::shared_ptr<delayed_scheduler_interface>& scheduler)
{
s_delayedScheduler = scheduler;
}

const std::shared_ptr<delayed_scheduler_interface>& __cdecl get_wastorage_ambient_delayed_scheduler()
{
return s_delayedScheduler;
}
#endif

}} // namespace azure::storage
17 changes: 15 additions & 2 deletions Microsoft.WindowsAzure.Storage/src/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ namespace azure { namespace storage { namespace core {
public:
#ifdef _WIN32
delay_event(std::chrono::milliseconds timeout)
: m_callback(new concurrency::call<int>(std::function<void(int)>(std::bind(&delay_event::timer_fired, this, std::placeholders::_1)))), m_timer(static_cast<unsigned int>(timeout.count()), 0, m_callback, false)
: m_callback(new concurrency::call<int>(std::function<void(int)>(std::bind(&delay_event::timer_fired, this, std::placeholders::_1)))), m_timer(static_cast<unsigned int>(timeout.count()), 0, m_callback, false),
m_timeout(timeout)
{
}

Expand All @@ -388,7 +389,18 @@ namespace azure { namespace storage { namespace core {

void start()
{
m_timer.start();
const auto& ambient_delayed_scheduler = get_wastorage_ambient_delayed_scheduler();
if (ambient_delayed_scheduler)
{
ambient_delayed_scheduler->schedule_after(
[](void* event) { reinterpret_cast<delay_event*>(event)->timer_fired(0); },
this,
m_timeout.count());
}
else
{
m_timer.start();
}
}
#else
delay_event(std::chrono::milliseconds timeout)
Expand All @@ -411,6 +423,7 @@ namespace azure { namespace storage { namespace core {
#ifdef _WIN32
concurrency::call<int>* m_callback;
concurrency::timer<int> m_timer;
std::chrono::milliseconds m_timeout;
#else
boost::asio::deadline_timer m_timer;
#endif
Expand Down
48 changes: 48 additions & 0 deletions Microsoft.WindowsAzure.Storage/tests/executor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,52 @@ SUITE(Core)
CHECK_EQUAL(true, caught_storage_exception);
CHECK_EQUAL(true, caught_http_exception);
}

#ifdef _WIN32
class delayed_scheduler : public azure::storage::delayed_scheduler_interface
{
public:
virtual void schedule_after(pplx::TaskProc_t function, void* context, long long delayInMs) override
{
std::this_thread::sleep_for(std::chrono::milliseconds(delayInMs));
function(context);
}
};

TEST_FIXTURE(block_blob_test_base, verify_retry_after_delay)
{
azure::storage::set_wastorage_ambient_delayed_scheduler(std::make_shared<delayed_scheduler>());

const size_t buffer_size = 1024;
std::vector<uint8_t> buffer;
buffer.resize(buffer_size);
auto md5 = fill_buffer_and_get_md5(buffer);
auto stream = concurrency::streams::bytestream::open_istream(buffer);

azure::storage::operation_context context;
static bool throwException = true;
context.set_response_received([](web::http::http_request&, const web::http::http_response&, azure::storage::operation_context context)
{
if (throwException)
{
throwException = false;
throw azure::storage::storage_exception("retry");
}
});

bool failed = false;
try
{
m_blob.upload_block(get_block_id(0), stream, md5, azure::storage::access_condition(), azure::storage::blob_request_options(), context);
}
catch (azure::storage::storage_exception&)
{
failed = true;
}

azure::storage::set_wastorage_ambient_delayed_scheduler(nullptr);
CHECK_EQUAL(false, failed);
CHECK_EQUAL(false, throwException);
}
#endif
}

0 comments on commit eae40c9

Please sign in to comment.