|
2 | 2 | #include <ydb/library/ydb_issue/issue_helpers.h> |
3 | 3 | #include <ydb/core/kqp/ut/common/kqp_ut_common.h> |
4 | 4 | #include <ydb/core/kqp/proxy_service/kqp_proxy_service.h> |
| 5 | +#include <ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h> |
5 | 6 | #include <ydb/core/kqp/common/kqp.h> |
| 7 | +#include <ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h> |
6 | 8 | #include <ydb/core/protos/config.pb.h> |
7 | 9 | #include <ydb/core/protos/kqp.pb.h> |
8 | 10 | #include <ydb/core/testlib/test_client.h> |
@@ -65,6 +67,63 @@ TString CreateSession(TTestActorRuntime* runtime, const TActorId& kqpProxy, cons |
65 | 67 | return sessionId; |
66 | 68 | } |
67 | 69 |
|
| 70 | +class TDatabaseCacheTestActor : public TActorBootstrapped<TDatabaseCacheTestActor> { |
| 71 | +public: |
| 72 | + TDatabaseCacheTestActor(const TString& database, const TString& expectedDatabaseId, bool fromCache, TDatabasesCache& cache, NThreading::TPromise<void> promise) |
| 73 | + : Database(database) |
| 74 | + , ExpectedDatabaseId(expectedDatabaseId) |
| 75 | + , Cache(cache) |
| 76 | + , Promise(promise) |
| 77 | + , FromCache(fromCache) |
| 78 | + {} |
| 79 | + |
| 80 | + void Bootstrap() { |
| 81 | + Become(&TDatabaseCacheTestActor::StateFunc); |
| 82 | + |
| 83 | + auto event = MakeHolder<TEvKqp::TEvQueryRequest>(); |
| 84 | + event->Record.MutableRequest()->SetDatabase(Database); |
| 85 | + Send(SelfId(), event.Release()); |
| 86 | + } |
| 87 | + |
| 88 | + void Handle(TEvKqp::TEvUpdateDatabaseInfo::TPtr& ev) { |
| 89 | + Cache.UpdateDatabaseInfo(ev, ActorContext()); |
| 90 | + } |
| 91 | + |
| 92 | + void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { |
| 93 | + auto success = Cache.SetDatabaseIdOrDeffer(ev, [this](Ydb::StatusIds::StatusCode status, NYql::TIssues issues){ |
| 94 | + UNIT_ASSERT_C(false, TStringBuilder() << "Unexpected fail, " << GetErrorString() << ", status: " << status << ", reason: " << issues.ToOneLineString()); |
| 95 | + }, ActorContext()); |
| 96 | + |
| 97 | + if (FromCache) { |
| 98 | + UNIT_ASSERT_C(success, TStringBuilder() << "Expected database id from cache, " << GetErrorString()); |
| 99 | + UNIT_ASSERT_STRING_CONTAINS_C(ev->Get()->GetDatabaseId(), ExpectedDatabaseId, GetErrorString()); |
| 100 | + Promise.SetValue(); |
| 101 | + PassAway(); |
| 102 | + } else { |
| 103 | + UNIT_ASSERT_C(!success, TStringBuilder() << "Unexpected database id from cache, " << GetErrorString()); |
| 104 | + FromCache = true; |
| 105 | + } |
| 106 | + } |
| 107 | + |
| 108 | + STRICT_STFUNC(StateFunc, |
| 109 | + hFunc(TEvKqp::TEvUpdateDatabaseInfo, Handle); |
| 110 | + hFunc(TEvKqp::TEvQueryRequest, Handle); |
| 111 | + ) |
| 112 | + |
| 113 | +private: |
| 114 | + TString GetErrorString() const { |
| 115 | + return TStringBuilder() << "database: " << Database << ", from cache: " << FromCache << "\n"; |
| 116 | + } |
| 117 | + |
| 118 | +private: |
| 119 | + const TString Database; |
| 120 | + const TString ExpectedDatabaseId; |
| 121 | + TDatabasesCache& Cache; |
| 122 | + NThreading::TPromise<void> Promise; |
| 123 | + |
| 124 | + bool FromCache = false; |
| 125 | +}; |
| 126 | + |
68 | 127 | } |
69 | 128 |
|
70 | 129 | Y_UNIT_TEST_SUITE(KqpProxy) { |
@@ -542,5 +601,32 @@ Y_UNIT_TEST_SUITE(KqpProxy) { |
542 | 601 |
|
543 | 602 | UNIT_ASSERT(allDoneOk); |
544 | 603 | } |
| 604 | + |
| 605 | + Y_UNIT_TEST(DatabasesCacheForServerless) { |
| 606 | + auto ydb = NWorkload::TYdbSetupSettings() |
| 607 | + .CreateSampleTenants(true) |
| 608 | + .Create(); |
| 609 | + |
| 610 | + auto& runtime = *ydb->GetRuntime(); |
| 611 | + TDatabasesCache cache; |
| 612 | + |
| 613 | + auto checkCache = [&](const TString& database, const TString& expectedDatabaseId, bool fromCache) { |
| 614 | + auto promise = NThreading::NewPromise(); |
| 615 | + runtime.Register(new TDatabaseCacheTestActor(database, expectedDatabaseId, fromCache, cache, promise)); |
| 616 | + promise.GetFuture().GetValueSync(); |
| 617 | + }; |
| 618 | + |
| 619 | + const auto& dedicatedTennant = ydb->GetSettings().GetDedicatedTenantName(); |
| 620 | + checkCache(dedicatedTennant, dedicatedTennant, false); |
| 621 | + checkCache(dedicatedTennant, dedicatedTennant, true); |
| 622 | + |
| 623 | + const auto& sharedTennant = ydb->GetSettings().GetSharedTenantName(); |
| 624 | + checkCache(sharedTennant, sharedTennant, false); |
| 625 | + checkCache(sharedTennant, sharedTennant, true); |
| 626 | + |
| 627 | + const auto& serverlessTennant = ydb->GetSettings().GetServerlessTenantName(); |
| 628 | + checkCache(serverlessTennant, TStringBuilder() << ":4:" << serverlessTennant, false); |
| 629 | + checkCache(serverlessTennant, TStringBuilder() << ":4:" << serverlessTennant, true); |
| 630 | + } |
545 | 631 | } // namspace NKqp |
546 | 632 | } // namespace NKikimr |
0 commit comments