Skip to content

Commit 5021622

Browse files
authored
Add import start and end times (#5389)
1 parent dd7e775 commit 5021622

File tree

11 files changed

+219
-5
lines changed

11 files changed

+219
-5
lines changed

ydb/core/protos/import.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import "ydb/public/api/protos/ydb_issue_message.proto";
33
import "ydb/public/api/protos/ydb_operation.proto";
44
import "ydb/public/api/protos/ydb_status_codes.proto";
55

6+
import "google/protobuf/timestamp.proto";
7+
68
package NKikimrImport;
79
option java_package = "ru.yandex.kikimr.proto";
810

@@ -11,6 +13,8 @@ message TImport {
1113
optional Ydb.StatusIds.StatusCode Status = 2;
1214
repeated Ydb.Issue.IssueMessage Issues = 3;
1315
optional Ydb.Import.ImportProgress.Progress Progress = 4;
16+
optional google.protobuf.Timestamp StartTime = 7;
17+
optional google.protobuf.Timestamp EndTime = 8;
1418
repeated Ydb.Import.ImportItemProgress ItemsProgress = 5;
1519
oneof Settings {
1620
Ydb.Import.ImportFromS3Settings ImportFromS3Settings = 6;

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4285,6 +4285,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
42854285
importInfo->State = static_cast<TImportInfo::EState>(rowset.GetValue<Schema::Imports::State>());
42864286
importInfo->Issue = rowset.GetValueOrDefault<Schema::Imports::Issue>(TString());
42874287

4288+
importInfo->StartTime = TInstant::Seconds(rowset.GetValueOrDefault<Schema::Imports::StartTime>());
4289+
importInfo->EndTime = TInstant::Seconds(rowset.GetValueOrDefault<Schema::Imports::EndTime>());
4290+
42884291
Self->Imports[id] = importInfo;
42894292
if (uid) {
42904293
Self->ImportsByUid[uid] = importInfo;

ydb/core/tx/schemeshard/schemeshard_import.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ namespace {
2121
}
2222
}
2323

24+
NProtoBuf::Timestamp SecondsToProtoTimeStamp(ui64 sec) {
25+
NProtoBuf::Timestamp timestamp;
26+
timestamp.set_seconds((i64)(sec));
27+
timestamp.set_nanos(0);
28+
return timestamp;
29+
}
30+
2431
TImportInfo::EState GetMinState(TImportInfo::TPtr importInfo) {
2532
TImportInfo::EState state = TImportInfo::EState::Invalid;
2633

@@ -41,6 +48,11 @@ void TSchemeShard::FromXxportInfo(NKikimrImport::TImport& import, const TImportI
4148
import.SetId(importInfo->Id);
4249
import.SetStatus(Ydb::StatusIds::SUCCESS);
4350

51+
*import.MutableStartTime() = SecondsToProtoTimeStamp(importInfo->StartTime.Seconds());
52+
if (importInfo->EndTime != TInstant::Zero()) {
53+
*import.MutableEndTime() = SecondsToProtoTimeStamp(importInfo->EndTime.Seconds());
54+
}
55+
4456
switch (importInfo->State) {
4557
case TImportInfo::EState::Waiting:
4658
switch (GetMinState(importInfo)) {
@@ -131,7 +143,9 @@ void TSchemeShard::PersistRemoveImport(NIceDb::TNiceDb& db, const TImportInfo::T
131143
void TSchemeShard::PersistImportState(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo) {
132144
db.Table<Schema::Imports>().Key(importInfo->Id).Update(
133145
NIceDb::TUpdate<Schema::Imports::State>(static_cast<ui8>(importInfo->State)),
134-
NIceDb::TUpdate<Schema::Imports::Issue>(importInfo->Issue)
146+
NIceDb::TUpdate<Schema::Imports::Issue>(importInfo->Issue),
147+
NIceDb::TUpdate<Schema::Imports::StartTime>(importInfo->StartTime.Seconds()),
148+
NIceDb::TUpdate<Schema::Imports::EndTime>(importInfo->EndTime.Seconds())
135149
);
136150
}
137151

ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ struct TSchemeShard::TImport::TTxCancel: public TSchemeShard::TXxport::TTxBase {
8585
}
8686
}
8787

88+
if (importInfo->State == TImportInfo::EState::Cancelled) {
89+
importInfo->EndTime = TAppData::TimeProvider->Now();
90+
}
91+
8892
Self->PersistImportState(db, importInfo);
8993
SendNotificationsIfFinished(importInfo);
9094
return respond(Ydb::StatusIds::SUCCESS);
@@ -183,6 +187,7 @@ struct TSchemeShard::TImport::TTxCancelAck: public TSchemeShard::TXxport::TTxBas
183187
}
184188

185189
importInfo->State = TImportInfo::EState::Cancelled;
190+
importInfo->EndTime = TAppData::TimeProvider->Now();
186191
Self->PersistImportState(db, importInfo);
187192

188193
SendNotificationsIfFinished(importInfo);

ydb/core/tx/schemeshard/schemeshard_import__create.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
124124
Self->PersistCreateImport(db, importInfo);
125125

126126
importInfo->State = TImportInfo::EState::Waiting;
127+
importInfo->StartTime = TAppData::TimeProvider->Now();
127128
Self->PersistImportState(db, importInfo);
128129

129130
Self->Imports[id] = importInfo;
@@ -511,6 +512,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
511512
break;
512513
}
513514
}
515+
516+
if (importInfo->State == EState::Cancelled) {
517+
importInfo->EndTime = TAppData::TimeProvider->Now();
518+
}
514519
}
515520

516521
TMaybe<TString> GetIssues(const TPathId& dstPathId, TTxId restoreTxId) {
@@ -1004,6 +1009,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10041009

10051010
if (AllOf(importInfo->Items, &TImportInfo::TItem::IsDone)) {
10061011
importInfo->State = EState::Done;
1012+
importInfo->EndTime = TAppData::TimeProvider->Now();
10071013
}
10081014

10091015
Self->PersistImportItemState(db, importInfo, itemIdx);

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2743,6 +2743,9 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
27432743

27442744
TSet<TActorId> Subscribers;
27452745

2746+
TInstant StartTime = TInstant::Zero();
2747+
TInstant EndTime = TInstant::Zero();
2748+
27462749
explicit TImportInfo(
27472750
const ui64 id,
27482751
const TString& uid,

ydb/core/tx/schemeshard/schemeshard_schema.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1473,6 +1473,9 @@ struct Schema : NIceDb::Schema {
14731473
struct State : Column<8, NScheme::NTypeIds::Byte> {};
14741474
struct Issue : Column<9, NScheme::NTypeIds::Utf8> {};
14751475

1476+
struct StartTime : Column<11, NScheme::NTypeIds::Uint64> {};
1477+
struct EndTime : Column<12, NScheme::NTypeIds::Uint64> {};
1478+
14761479
using TKey = TableKey<Id>;
14771480
using TColumns = TableColumns<
14781481
Id,
@@ -1484,7 +1487,9 @@ struct Schema : NIceDb::Schema {
14841487
Items,
14851488
State,
14861489
Issue,
1487-
UserSID
1490+
UserSID,
1491+
StartTime,
1492+
EndTime
14881493
>;
14891494
};
14901495

ydb/core/tx/schemeshard/ut_export/ut_export.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,7 +1571,7 @@ partitioning_settings {
15711571
}
15721572
)", port));
15731573

1574-
runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing backup
1574+
runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing export
15751575

15761576
env.TestWaitNotification(runtime, txId);
15771577

@@ -1632,7 +1632,7 @@ partitioning_settings {
16321632
)", port));
16331633
const ui64 exportId = txId;
16341634

1635-
runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing backup
1635+
runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing export
16361636

16371637
if (!delayed) {
16381638
TDispatchOptions opts;

ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
1313
#include <ydb/core/metering/metering.h>
1414
#include <ydb/core/ydb_convert/table_description.h>
15+
#include <ydb/public/api/protos/ydb_import.pb.h>
1516

1617
#include <contrib/libs/zstd/include/zstd.h>
1718
#include <library/cpp/string_utils/quote/quote.h>
@@ -2949,6 +2950,162 @@ Y_UNIT_TEST_SUITE(TImportTests) {
29492950
env.TestWaitNotification(runtime, importId);
29502951
}
29512952

2953+
Y_UNIT_TEST(ImportStartTime) {
2954+
TTestBasicRuntime runtime;
2955+
TTestEnv env(runtime, TTestEnvOptions());
2956+
ui64 txId = 100;
2957+
2958+
const auto data = GenerateTestData(R"(
2959+
columns {
2960+
name: "key"
2961+
type { optional_type { item { type_id: UTF8 } } }
2962+
}
2963+
columns {
2964+
name: "value"
2965+
type { optional_type { item { type_id: UTF8 } } }
2966+
}
2967+
primary_key: "key"
2968+
)", {{"a", 1}});
2969+
2970+
TPortManager portManager;
2971+
const ui16 port = portManager.GetPort();
2972+
2973+
TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port));
2974+
UNIT_ASSERT(s3Mock.Start());
2975+
2976+
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
2977+
ImportFromS3Settings {
2978+
endpoint: "localhost:%d"
2979+
scheme: HTTP
2980+
items {
2981+
source_prefix: ""
2982+
destination_path: "/MyRoot/Table"
2983+
}
2984+
}
2985+
)", port));
2986+
2987+
const auto desc = TestGetImport(runtime, txId, "/MyRoot");
2988+
const auto& entry = desc.GetResponse().GetEntry();
2989+
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_PREPARING);
2990+
UNIT_ASSERT(entry.HasStartTime());
2991+
UNIT_ASSERT(!entry.HasEndTime());
2992+
}
2993+
2994+
Y_UNIT_TEST(CompletedImportEndTime) {
2995+
TTestBasicRuntime runtime;
2996+
TTestEnv env(runtime, TTestEnvOptions());
2997+
ui64 txId = 100;
2998+
2999+
const auto data = GenerateTestData(R"(
3000+
columns {
3001+
name: "key"
3002+
type { optional_type { item { type_id: UTF8 } } }
3003+
}
3004+
columns {
3005+
name: "value"
3006+
type { optional_type { item { type_id: UTF8 } } }
3007+
}
3008+
primary_key: "key"
3009+
)", {{"a", 1}});
3010+
3011+
TPortManager portManager;
3012+
const ui16 port = portManager.GetPort();
3013+
3014+
TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port));
3015+
UNIT_ASSERT(s3Mock.Start());
3016+
3017+
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
3018+
ImportFromS3Settings {
3019+
endpoint: "localhost:%d"
3020+
scheme: HTTP
3021+
items {
3022+
source_prefix: ""
3023+
destination_path: "/MyRoot/Table"
3024+
}
3025+
}
3026+
)", port));
3027+
3028+
runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing import
3029+
3030+
env.TestWaitNotification(runtime, txId);
3031+
3032+
const auto desc = TestGetImport(runtime, txId, "/MyRoot");
3033+
const auto& entry = desc.GetResponse().GetEntry();
3034+
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_DONE);
3035+
UNIT_ASSERT(entry.HasStartTime());
3036+
UNIT_ASSERT(entry.HasEndTime());
3037+
UNIT_ASSERT_LT(entry.GetStartTime().seconds(), entry.GetEndTime().seconds());
3038+
}
3039+
3040+
Y_UNIT_TEST(CancelledImportEndTime) {
3041+
TTestBasicRuntime runtime;
3042+
TTestEnv env(runtime, TTestEnvOptions());
3043+
ui64 txId = 100;
3044+
3045+
const auto data = GenerateTestData(R"(
3046+
columns {
3047+
name: "key"
3048+
type { optional_type { item { type_id: UTF8 } } }
3049+
}
3050+
columns {
3051+
name: "value"
3052+
type { optional_type { item { type_id: UTF8 } } }
3053+
}
3054+
primary_key: "key"
3055+
)", {{"a", 1}});
3056+
3057+
TPortManager portManager;
3058+
const ui16 port = portManager.GetPort();
3059+
3060+
TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port));
3061+
UNIT_ASSERT(s3Mock.Start());
3062+
3063+
auto delayFunc = [](TAutoPtr<IEventHandle>& ev) {
3064+
if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) {
3065+
return false;
3066+
}
3067+
3068+
return ev->Get<TEvSchemeShard::TEvModifySchemeTransaction>()->Record
3069+
.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpRestore;
3070+
};
3071+
3072+
THolder<IEventHandle> delayed;
3073+
auto prevObserver = SetDelayObserver(runtime, delayed, delayFunc);
3074+
3075+
TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
3076+
ImportFromS3Settings {
3077+
endpoint: "localhost:%d"
3078+
scheme: HTTP
3079+
items {
3080+
source_prefix: ""
3081+
destination_path: "/MyRoot/Table"
3082+
}
3083+
}
3084+
)", port));
3085+
const ui64 importId = txId;
3086+
3087+
runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing import
3088+
3089+
WaitForDelayed(runtime, delayed, prevObserver);
3090+
3091+
TestCancelImport(runtime, ++txId, "/MyRoot", importId);
3092+
3093+
auto desc = TestGetImport(runtime, importId, "/MyRoot");
3094+
auto entry = desc.GetResponse().GetEntry();
3095+
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_CANCELLATION);
3096+
UNIT_ASSERT(entry.HasStartTime());
3097+
UNIT_ASSERT(!entry.HasEndTime());
3098+
3099+
runtime.Send(delayed.Release(), 0, true);
3100+
env.TestWaitNotification(runtime, importId);
3101+
3102+
desc = TestGetImport(runtime, importId, "/MyRoot", Ydb::StatusIds::CANCELLED);
3103+
entry = desc.GetResponse().GetEntry();
3104+
UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_CANCELLED);
3105+
UNIT_ASSERT(entry.HasStartTime());
3106+
UNIT_ASSERT(entry.HasEndTime());
3107+
UNIT_ASSERT_LT(entry.GetStartTime().seconds(), entry.GetEndTime().seconds());
3108+
}
29523109
}
29533110

29543111
Y_UNIT_TEST_SUITE(TImportWithRebootsTests) {

ydb/public/api/protos/out/out.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <ydb/public/api/protos/ydb_monitoring.pb.h>
33
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
44
#include <ydb/public/api/protos/ydb_export.pb.h>
5+
#include <ydb/public/api/protos/ydb_import.pb.h>
56

67
#include <util/stream/output.h>
78

@@ -24,3 +25,7 @@ Y_DECLARE_OUT_SPEC(, Ydb::Monitoring::StatusFlag::Status, stream, value) {
2425
Y_DECLARE_OUT_SPEC(, Ydb::Export::ExportProgress::Progress, stream, value) {
2526
stream << Ydb::Export::ExportProgress_Progress_Name(value);
2627
}
28+
29+
Y_DECLARE_OUT_SPEC(, Ydb::Import::ImportProgress::Progress, stream, value) {
30+
stream << Ydb::Import::ImportProgress_Progress_Name(value);
31+
}

0 commit comments

Comments
 (0)