|
7 | 7 |
|
8 | 8 | namespace NKikimr::NKqp { |
9 | 9 |
|
10 | | -bool TBulkUpsertCommand::DeserializeFromString(const TString& info) { |
11 | | - auto lines = StringSplitter(info).SplitBySet("\n").SkipEmpty().ToList<TString>(); |
12 | | - if (lines.size() < 2 || lines.size() > 3) { |
13 | | - return false; |
| 10 | +TConclusionStatus TBulkUpsertCommand::DoExecute(TKikimrRunner& kikimr) { |
| 11 | + if (ArrowBatch->num_rows() < PartsCount) { |
| 12 | + return TConclusionStatus::Fail( |
| 13 | + "not enough records(" + ::ToString(ArrowBatch->num_rows()) + ") for split in " + ::ToString(PartsCount) + " chunks"); |
14 | 14 | } |
15 | | - TableName = Strip(lines[0]); |
16 | | - ArrowBatch = Base64Decode(Strip(lines[1])); |
17 | | - AFL_VERIFY(!!ArrowBatch); |
18 | | - if (lines.size() == 3) { |
19 | | - if (!Ydb::StatusIds_StatusCode_Parse(Strip(lines[2]), &ExpectedCode)) { |
20 | | - return false; |
21 | | - } |
22 | | - // if (lines[2] == "SUCCESS") { |
23 | | - // } else if (lines[2] = "INTERNAL_ERROR") { |
24 | | - // ExpectedCode = Ydb::StatusIds::INTERNAL_ERROR; |
25 | | - // } else if (lines[2] == "BAD_REQUEST") { |
26 | | - // ExpectedCode = Ydb::StatusIds::BAD_REQUEST; |
27 | | - // } else { |
28 | | - // return false; |
29 | | - // } |
| 15 | + ui32 cursor = 0; |
| 16 | + for (ui32 i = 0; i < PartsCount; ++i) { |
| 17 | + const ui32 size = (i + 1 != PartsCount) ? (ArrowBatch->num_rows() / PartsCount) : (ArrowBatch->num_rows() - cursor); |
| 18 | + TLocalHelper lHelper(kikimr); |
| 19 | + lHelper.SendDataViaActorSystem(TableName, ArrowBatch->Slice(cursor, size), ExpectedCode); |
| 20 | + cursor += size; |
30 | 21 | } |
31 | | - return true; |
| 22 | + AFL_VERIFY(cursor == ArrowBatch->num_rows()); |
| 23 | + return TConclusionStatus::Success(); |
32 | 24 | } |
33 | 25 |
|
34 | | -TConclusionStatus TBulkUpsertCommand::DoExecute(TKikimrRunner& kikimr) { |
35 | | - TLocalHelper lHelper(kikimr); |
36 | | - lHelper.SendDataViaActorSystem( |
37 | | - TableName, NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TNativeSerializer().Deserialize(ArrowBatch)), ExpectedCode); |
| 26 | +TConclusionStatus TBulkUpsertCommand::DoDeserializeProperties(const TPropertiesCollection& props) { |
| 27 | + if (props.GetFreeArgumentsCount() != 2) { |
| 28 | + return TConclusionStatus::Fail("incorrect free arguments count for BULK_UPSERTcommand"); |
| 29 | + } |
| 30 | + TableName = props.GetFreeArgumentVerified(0); |
| 31 | + ArrowBatch = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TNativeSerializer().Deserialize(Base64Decode(props.GetFreeArgumentVerified(1)))); |
| 32 | + if (auto value = props.GetOptional("EXPECT_STATUS")) { |
| 33 | + if (!Ydb::StatusIds_StatusCode_Parse(*value, &ExpectedCode)) { |
| 34 | + return TConclusionStatus::Fail("cannot parse EXPECT_STATUS from " + *value); |
| 35 | + } |
| 36 | + } |
| 37 | + if (auto value = props.GetOptional("PARTS_COUNT")) { |
| 38 | + if (!TryFromString<ui32>(*value, PartsCount)) { |
| 39 | + return TConclusionStatus::Fail("cannot parse PARTS_COUNT from " + *value); |
| 40 | + } |
| 41 | + } |
38 | 42 | return TConclusionStatus::Success(); |
39 | 43 | } |
40 | 44 |
|
|
0 commit comments