Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ namespace NYdb::inline Dev {
namespace NYdb::inline Dev::NQuery {

struct TCreateSessionSettings : public TSimpleRequestSettings<TCreateSessionSettings> {
TCreateSessionSettings();
TCreateSessionSettings() {
ClientTimeout(TDuration::Seconds(5));
}
};

using TAsyncCreateSessionResult = NThreading::TFuture<TCreateSessionResult>;
Expand Down
4 changes: 2 additions & 2 deletions ydb/public/sdk/cpp/src/client/common_client/impl/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ class TClientImplCommon
return DbDriverState_->DiscoveryCompleted();
}

void ScheduleTask(const std::function<void()>& fn, TDeadline::Duration timeout) override {
void ScheduleTask(const std::function<void()>& fn, TDeadline::Duration delay) override {
std::weak_ptr<IClientImplCommon> weak = this->shared_from_this();
auto cbGuard = [weak, fn]() {
auto strongClient = weak.lock();
if (strongClient) {
fn();
}
};
Connections_->ScheduleOneTimeTask(std::move(cbGuard), timeout);
Connections_->ScheduleDelayedTask(std::move(cbGuard), delay);
}

protected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ namespace NYdb::inline Dev {

constexpr TDeadline::Duration MAX_DEFERRED_CALL_DELAY = 10s; // The max delay between GetOperation calls for one operation

TSimpleCbResult::TSimpleCbResult(
TSimpleCb&& cb,
TGRpcConnectionsImpl* connections,
std::shared_ptr<IQueueClientContext> context)
: TGenericCbHolder<TSimpleCb>(std::move(cb), connections, std::move(context))
TSimpleCbResult::TSimpleCbResult(TSimpleCb&& cb)
: UserResponseCb_(std::move(cb))
{ }

void TSimpleCbResult::Process(void*) {
Expand Down Expand Up @@ -111,4 +108,22 @@ void TPeriodicAction::OnError() {
UserResponseCb_(std::move(issues), EStatus::CLIENT_INTERNAL_ERROR);
}

TDelayedAction::TDelayedAction(
TDelayedCb&& userCb,
TGRpcConnectionsImpl* connection,
std::shared_ptr<IQueueClientContext> context,
TDeadline deadline)
: TAlarmActionBase(std::move(userCb), connection, std::move(context))
{
Deadline_ = deadline;
}

void TDelayedAction::OnAlarm() {
UserResponseCb_(true);
}

void TDelayedAction::OnError() {
UserResponseCb_(false);
}

} // namespace NYdb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct TPlainStatus;
template<typename TResponse>
using TResponseCb = std::function<void(TResponse*, TPlainStatus status)>;
using TDeferredOperationCb = std::function<void(Ydb::Operations::Operation*, TPlainStatus status)>;
using TDelayedCb = std::function<void(bool ok)>;

template<typename TCb>
class TGenericCbHolder {
Expand Down Expand Up @@ -175,16 +176,14 @@ class TResult
std::multimap<std::string, std::string> Metadata_;
};

class TSimpleCbResult
: public TGenericCbHolder<TSimpleCb>
, public IObjectInQueue
class TSimpleCbResult : public IObjectInQueue
{
public:
TSimpleCbResult(
TSimpleCb&& cb,
TGRpcConnectionsImpl* connections,
std::shared_ptr<IQueueClientContext> context);
TSimpleCbResult(TSimpleCb&& cb);
void Process(void*) override;

private:
TSimpleCb UserResponseCb_;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -233,4 +232,18 @@ class TPeriodicAction
TDeadline::Duration Period_;
};

class TDelayedAction
: public TAlarmActionBase<TDelayedCb>
{
public:
TDelayedAction(
TDelayedCb&& userCb,
TGRpcConnectionsImpl* connection,
std::shared_ptr<IQueueClientContext> context,
TDeadline deadline);

void OnAlarm() override;
void OnError() override;
};

} // namespace NYdb
Original file line number Diff line number Diff line change
Expand Up @@ -219,34 +219,38 @@ void TGRpcConnectionsImpl::AddPeriodicTask(TPeriodicCb&& cb, TDeadline::Duration
}
}

void TGRpcConnectionsImpl::ScheduleOneTimeTask(TSimpleCb&& fn, TDeadline::Duration timeout) {
auto cbLow = [this, fn = std::move(fn)](NYdb::NIssue::TIssues&&, EStatus status) mutable {
if (status != EStatus::SUCCESS) {
return false;
}

std::shared_ptr<IQueueClientContext> context;

if (!TryCreateContext(context)) {
// Shutting down, fn must handle it
fn();
} else {
// Enqueue to user pool
auto resp = new TSimpleCbResult(
std::move(fn),
this,
std::move(context));
EnqueueResponse(resp);
void TGRpcConnectionsImpl::ScheduleDelayedTask(TSimpleCb&& fn, TDeadline deadline) {
auto cbLow = [this, fn = std::move(fn)](bool ok) mutable {
if (!ok) {
return;
}

return false;
// Enqueue to user pool
auto resp = new TSimpleCbResult(std::move(fn));
EnqueueResponse(resp);
};

if (timeout > TDeadline::Duration::zero()) {
AddPeriodicTask(std::move(cbLow), timeout);
} else {
cbLow(NYdb::NIssue::TIssues(), EStatus::SUCCESS);
std::shared_ptr<IQueueClientContext> context;
if (!TryCreateContext(context)) {
cbLow(false);
return;
}

if (deadline <= TDeadline::Now()) {
cbLow(true);
return;
}

auto action = MakeIntrusive<TDelayedAction>(
std::move(cbLow),
this,
std::move(context),
deadline);
action->Start();
}

void TGRpcConnectionsImpl::ScheduleDelayedTask(TSimpleCb&& fn, TDeadline::Duration delay) {
ScheduleDelayedTask(std::move(fn), TDeadline::AfterDuration(delay));
}

NThreading::TFuture<bool> TGRpcConnectionsImpl::ScheduleFuture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ class TGRpcConnectionsImpl
~TGRpcConnectionsImpl();

void AddPeriodicTask(TPeriodicCb&& cb, TDeadline::Duration period) override;
void ScheduleOneTimeTask(TSimpleCb&& fn, TDeadline::Duration timeout);

void ScheduleDelayedTask(TSimpleCb&& fn, TDeadline deadline);
void ScheduleDelayedTask(TSimpleCb&& fn, TDeadline::Duration delay);

NThreading::TFuture<bool> ScheduleFuture(
TDuration timeout,
IQueueClientContextPtr token = nullptr
Expand Down
61 changes: 40 additions & 21 deletions ydb/public/sdk/cpp/src/client/impl/session/session_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace NSessionPool {

using namespace NThreading;

constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4;
constexpr std::uint64_t KEEP_ALIVE_RANDOM_FRACTION = 4;
static const TStatus CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT = TStatus(
TPlainStatus(
EStatus::CLIENT_RESOURCE_EXHAUSTED,
Expand All @@ -33,10 +33,10 @@ TStatus GetStatus(const TStatus& status) {
TDuration RandomizeThreshold(TDuration duration) {
TDuration::TValue value = duration.GetValue();
if (KEEP_ALIVE_RANDOM_FRACTION) {
const i64 randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION;
const std::int64_t randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION;
if (randomLimit < 2)
return duration;
value += static_cast<i64>(RandomNumber<ui64>(randomLimit));
value += static_cast<std::int64_t>(RandomNumber<std::uint64_t>(randomLimit));
}
return TDuration::FromValue(value);
}
Expand All @@ -53,15 +53,14 @@ bool IsSessionCloseRequested(const TStatus& status) {
return false;
}

TSessionPool::TWaitersQueue::TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout)
TSessionPool::TWaitersQueue::TWaitersQueue(std::uint32_t maxQueueSize)
: MaxQueueSize_(maxQueueSize)
, MaxWaitSessionTimeout_(maxWaitSessionTimeout)
{
}

bool TSessionPool::TWaitersQueue::TryPush(std::unique_ptr<IGetSessionCtx>& p) {
if (Waiters_.size() < MaxQueueSize_) {
Waiters_.insert(std::make_pair(TInstant::Now(), std::move(p)));
Waiters_.insert(std::make_pair(p->GetDeadline(), std::move(p)));
return true;
}
return false;
Expand All @@ -77,24 +76,25 @@ std::unique_ptr<IGetSessionCtx> TSessionPool::TWaitersQueue::TryGet() {
return result;
}

void TSessionPool::TWaitersQueue::GetOld(TInstant now, std::vector<std::unique_ptr<IGetSessionCtx>>& oldWaiters) {
void TSessionPool::TWaitersQueue::GetOld(TDeadline deadline, std::vector<std::unique_ptr<IGetSessionCtx>>& oldWaiters) {
auto it = Waiters_.begin();
while (it != Waiters_.end()) {
if (now < it->first + MaxWaitSessionTimeout_)
if (deadline < it->first) {
break;
}

oldWaiters.emplace_back(std::move(it->second));

Waiters_.erase(it++);
}
}

ui32 TSessionPool::TWaitersQueue::Size() const {
std::uint32_t TSessionPool::TWaitersQueue::Size() const {
return Waiters_.size();
}


TSessionPool::TSessionPool(ui32 maxActiveSessions)
TSessionPool::TSessionPool(std::uint32_t maxActiveSessions)
: Closed_(false)
, WaitersQueue_(maxActiveSessions * 10)
, ActiveSessions_(0)
Expand Down Expand Up @@ -150,7 +150,7 @@ void TSessionPool::GetSession(std::unique_ptr<IGetSessionCtx> ctx)
}

if (sessionSource == TSessionSource::Waiter) {
// Nothing to do here
ctx->ScheduleOnDeadlineWaiterCleanup();
} else if (sessionSource == TSessionSource::Error) {
FakeSessionsCounter_.Inc();
ctx->ReplyError(CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT);
Expand Down Expand Up @@ -194,6 +194,22 @@ bool TSessionPool::CheckAndFeedWaiterNewSession(bool active) {
return true;
}

void TSessionPool::ClearOldWaiters() {
std::lock_guard guard(Mtx_);

std::vector<std::unique_ptr<IGetSessionCtx>> oldWaiters;
WaitersQueue_.GetOld(TDeadline::Now(), oldWaiters);

for (auto& waiter : oldWaiters) {
FakeSessionsCounter_.Inc();
waiter->ReplyError(CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT);
}

if (!oldWaiters.empty()) {
UpdateStats();
}
}

bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active) {
// Do not call ReplySessionToUser under the session pool lock
std::unique_ptr<IGetSessionCtx> getSessionCtx;
Expand Down Expand Up @@ -267,23 +283,26 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<ISessionClient> weakC
// moreover it is unsafe to touch this ptr!
return false;
} else {
auto keepAliveBatchSize = PERIODIC_ACTION_BATCH_SIZE;
auto sessionCountToProcess = PERIODIC_ACTION_BATCH_SIZE;
std::vector<std::unique_ptr<TKqpSessionCommon>> sessionsToTouch;
sessionsToTouch.reserve(keepAliveBatchSize);
sessionsToTouch.reserve(sessionCountToProcess);
std::vector<std::unique_ptr<TKqpSessionCommon>> sessionsToDelete;
sessionsToDelete.reserve(keepAliveBatchSize);
sessionsToDelete.reserve(sessionCountToProcess);
std::vector<std::unique_ptr<IGetSessionCtx>> waitersToReplyError;
waitersToReplyError.reserve(keepAliveBatchSize);
const auto now = TInstant::Now();
waitersToReplyError.reserve(sessionCountToProcess);
const auto now = TDeadline::Now();
const std::uint64_t nowUs = std::chrono::duration_cast<std::chrono::microseconds>(
now.GetTimePoint().time_since_epoch()).count();
{
std::lock_guard guard(Mtx_);
{
auto& sessions = Sessions_;

auto it = sessions.begin();
while (it != sessions.end() && keepAliveBatchSize--) {
if (now < it->second->GetTimeToTouchFast())
while (it != sessions.end() && sessionCountToProcess--) {
if (nowUs < it->second->GetTimeToTouchFast().MicroSeconds()) {
break;
}

if (deletePredicate(it->second.get(), sessions.size())) {
it->second->UpdateServerCloseHandler(nullptr);
Expand Down Expand Up @@ -329,16 +348,16 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<ISessionClient> weakC
return periodicCb;
}

i64 TSessionPool::GetActiveSessions() const {
std::int64_t TSessionPool::GetActiveSessions() const {
std::lock_guard guard(Mtx_);
return ActiveSessions_;
}

i64 TSessionPool::GetActiveSessionsLimit() const {
std::int64_t TSessionPool::GetActiveSessionsLimit() const {
return MaxActiveSessions_;
}

i64 TSessionPool::GetCurrentPoolSize() const {
std::int64_t TSessionPool::GetCurrentPoolSize() const {
std::lock_guard guard(Mtx_);
return Sessions_.size();
}
Expand Down
Loading
Loading