Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ydb/core/http_proxy/http_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ namespace NKikimr::NHttpProxy {
"' iam token size: " << HttpContext.IamToken.size());
TMap<TString, TString> peerMetadata {
{NYmq::V1::FOLDER_ID, FolderId},
{NYmq::V1::CLOUD_ID, HttpContext.UserName ? HttpContext.UserName : CloudId},
{NYmq::V1::CLOUD_ID, CloudId ? CloudId : HttpContext.UserName },
{NYmq::V1::USER_SID, UserSid},
{NYmq::V1::REQUEST_ID, HttpContext.RequestId},
{NYmq::V1::SECURITY_TOKEN, HttpContext.SecurityToken},
Expand Down Expand Up @@ -408,7 +408,7 @@ namespace NKikimr::NHttpProxy {
ctx,
NKikimrServices::HTTP_PROXY,
"Not retrying GRPC response."
<< " Code: " << get<1>(errorAndCode)
<< " Code: " << get<1>(errorAndCode)
<< ", Error: " << get<0>(errorAndCode);
);

Expand All @@ -434,7 +434,7 @@ namespace NKikimr::NHttpProxy {
ctx,
NKikimrServices::HTTP_PROXY,
TStringBuilder() << "Got cloud auth response."
<< " FolderId: " << ev->Get()->FolderId
<< " FolderId: " << ev->Get()->FolderId
<< " CloudId: " << ev->Get()->CloudId
<< " UserSid: " << ev->Get()->Sid;
);
Expand Down Expand Up @@ -553,7 +553,7 @@ namespace NKikimr::NHttpProxy {
.Counters = nullptr,
.AWSSignature = std::move(HttpContext.GetSignature()),
.IAMToken = HttpContext.IamToken,
.FolderID = ""
.FolderID = ""
};

auto authRequestProxy = MakeHolder<NSQS::THttpProxyAuthRequestProxy>(
Expand Down
28 changes: 17 additions & 11 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ TString TPartition::LogPrefix() const {
} else {
state = "Unknown";
}
return TStringBuilder() << "[PQ: " << TabletID << ", Partition:" << Partition << ", State:" << state << "] ";
return TStringBuilder() << "[PQ: " << TabletID << ", Partition: " << Partition << ", State: " << state << "] ";
}

bool TPartition::IsActive() const {
Expand Down Expand Up @@ -2134,6 +2134,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)

void TPartition::CommitWriteOperations(TTransaction& t)
{
PQ_LOG_D("TPartition::CommitWriteOperations TxId: " << t.GetTxId());

Y_ABORT_UNLESS(PersistRequest);
Y_ABORT_UNLESS(!PartitionedBlob.IsInited());

Expand All @@ -2151,6 +2153,10 @@ void TPartition::CommitWriteOperations(TTransaction& t)
HaveWriteMsg = true;
}

PQ_LOG_D("t.WriteInfo->BodyKeys.size=" << t.WriteInfo->BodyKeys.size() <<
", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size());
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);

if (!t.WriteInfo->BodyKeys.empty()) {
PartitionedBlob = TPartitionedBlob(Partition,
NewHead.Offset,
Expand All @@ -2165,6 +2171,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
MaxBlobSize);

for (auto& k : t.WriteInfo->BodyKeys) {
PQ_LOG_D("add key " << k.Key.ToString());
auto write = PartitionedBlob.Add(k.Key, k.Size);
if (write && !write->Value.empty()) {
AddCmdWrite(write, PersistRequest.Get(), ctx);
Expand All @@ -2173,18 +2180,17 @@ void TPartition::CommitWriteOperations(TTransaction& t)
}
}

}

if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
RenameFormedBlobs(formedBlobs,
*Parameters,
curWrites,
PersistRequest.Get(),
ctx);
}
PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size());
if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
RenameFormedBlobs(formedBlobs,
*Parameters,
curWrites,
PersistRequest.Get(),
ctx);
}

if (!t.WriteInfo->BodyKeys.empty()) {
const auto& last = t.WriteInfo->BodyKeys.back();

NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount());
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/persqueue/partition_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <util/system/types.h>
#include <util/digest/multi.h>
#include <util/str_stl.h>
#include <util/string/builder.h>

#include <functional>

Expand Down Expand Up @@ -51,6 +52,13 @@ class TPartitionId {
}
}

TString ToString() const
{
TStringBuilder s;
s << *this;
return s;
}

bool IsSupportivePartition() const
{
return WriteId.Defined();
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,16 +1064,16 @@ void TPartition::RenameFormedBlobs(const std::deque<TPartitionedBlob::TRenameFor
}
if (!DataKeysBody.empty() && CompactedKeys.empty()) {
Y_ABORT_UNLESS(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= x.NewKey.GetOffset(),
"PQ: %" PRIu64 ", Partition: %s, "
"LAST KEY %s, HeadOffset %lu, NEWKEY %s",
TabletID, Partition.ToString().c_str(),
DataKeysBody.back().Key.ToString().c_str(),
Head.Offset,
x.NewKey.ToString().c_str());
}
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"writing blob: topic '" << TopicName() << "' partition " << Partition
<< " " << x.OldKey.ToString() << " size " << x.Size << " WTime " << ctx.Now().MilliSeconds()
);
PQ_LOG_D("writing blob: topic '" << TopicName() << "' partition " << Partition <<
" old key " << x.OldKey.ToString() << " new key " << x.NewKey.ToString() <<
" size " << x.Size << " WTime " << ctx.Now().MilliSeconds());

CompactedKeys.emplace_back(x.NewKey, x.Size);
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4667,6 +4667,8 @@ void TPersQueue::TryStartTransaction(const TActorContext& ctx)
Y_ABORT_UNLESS(next);

CheckTxState(ctx, *next);

TryWriteTxs(ctx);
}

void TPersQueue::OnInitComplete(const TActorContext& ctx)
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,8 @@ Y_UNIT_TEST_SUITE(Cdc) {

// get records
{
WaitForDataRecords(client, shardIt);

auto res = client.GetRecords(shardIt).ExtractValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(res.GetResult().records().size(), records.size());
Expand All @@ -1268,6 +1270,19 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
}

static void WaitForDataRecords(TDataStreamsClient& client, const TString& shardIt) {
int n = 0;
for (; n < 100; ++n) {
auto res = client.GetRecords(shardIt).ExtractValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
if (res.GetResult().records().size()) {
break;
}
Sleep(TDuration::MilliSeconds(100));
}
UNIT_ASSERT_VALUES_UNEQUAL(n, 100);
}

static void Write(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) {
TTestYdsEnv env(tableDesc, streamDesc);

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/schemeshard/ut_base/ut_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6343,6 +6343,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
"PartitionPerTablet: 10 "
"PQTabletConfig: {PartitionConfig { LifetimeSeconds : 10}}"
);
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePath(runtime, "/MyRoot/DirA/PQGroup_1", true),
{NLs::CheckPartCount("PQGroup_1", 100, 10, 10, 100),
Expand Down Expand Up @@ -6865,7 +6866,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
AsyncForceDropUnsafe(runtime, ++txId, pVer.PathId.LocalPathId);

TestModificationResult(runtime, txId-2, NKikimrScheme::StatusAccepted);
TestModificationResult(runtime, txId-1, NKikimrScheme::StatusAccepted);
TestModificationResults(runtime, txId-1, {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications});
TestModificationResult(runtime, txId, NKikimrScheme::StatusAccepted);

TActorId sender = runtime.AllocateEdgeActor();
Expand Down
95 changes: 91 additions & 4 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class TFixture : public NUnitTest::TBaseFixture {

void TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params);

void WriteMessagesInTx(size_t big, size_t small);

const TDriver& GetDriver() const;

void CheckTabletKeys(const TString& topicName);
Expand Down Expand Up @@ -1595,21 +1597,22 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)

for (size_t i = 0; i < params.OldHeadCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'));
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++oldHeadMsgCount;
}

for (size_t i = 0; i < params.BigBlobsCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'900'000, 'x'), &tx);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++bigBlobMsgCount;
}

for (size_t i = 0; i < params.NewHeadCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++newHeadMsgCount;
}

WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);

if (params.RestartMode == ERestartBeforeCommit) {
RestartPQTablet("topic_A", 0);
}
Expand Down Expand Up @@ -1638,7 +1641,7 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)
start += oldHeadMsgCount;

for (size_t i = 0; i < bigBlobMsgCount; ++i) {
UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'900'000);
UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'000'000);
}
start += bigBlobMsgCount;

Expand Down Expand Up @@ -1903,6 +1906,90 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture)
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
}

void TFixture::WriteMessagesInTx(size_t big, size_t small)
{
CreateTopic("topic_A", TEST_CONSUMER);

NTable::TSession tableSession = CreateTableSession();
NTable::TTransaction tx = BeginTx(tableSession);

for (size_t i = 0; i < big; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx, 0);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
}

for (size_t i = 0; i < small; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(16'384, 'x'), &tx, 0);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
}

CommitTx(tx, EStatus::SUCCESS);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_29, TFixture)
{
WriteMessagesInTx(1, 0);
WriteMessagesInTx(1, 0);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_30, TFixture)
{
WriteMessagesInTx(1, 0);
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_31, TFixture)
{
WriteMessagesInTx(1, 0);
WriteMessagesInTx(1, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_32, TFixture)
{
WriteMessagesInTx(0, 1);
WriteMessagesInTx(1, 0);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_33, TFixture)
{
WriteMessagesInTx(0, 1);
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_34, TFixture)
{
WriteMessagesInTx(0, 1);
WriteMessagesInTx(1, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_35, TFixture)
{
WriteMessagesInTx(1, 1);
WriteMessagesInTx(1, 0);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_36, TFixture)
{
WriteMessagesInTx(1, 1);
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_37, TFixture)
{
WriteMessagesInTx(1, 1);
WriteMessagesInTx(1, 1);
}


Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
{
WriteMessagesInTx(2, 202);
WriteMessagesInTx(2, 200);
WriteMessagesInTx(0, 1);
WriteMessagesInTx(4, 0);
WriteMessagesInTx(0, 1);
}

}

}
Loading