Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ydb/core/kafka_proxy/kafka_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,10 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
responseHeader.Write(writable, headerVersion);
reply->Write(writable, version);

Buffer.flush();
ssize_t res = Buffer.flush();
if (res < 0) {
ythrow yexception() << "Error during flush of the written to socket data. Error code: " << strerror(-res) << " (" << res << ")";
}

KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size);
} catch(const yexception& e) {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kafka_proxy/kafka_messages_int.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ TKafkaWritable& TKafkaWritable::operator<<(const TKafkaUuid& val) {
}

void TKafkaWritable::write(const char* val, size_t length) {
Buffer.write(val, length);
ssize_t res = Buffer.write(val, length);
if (res < 0) {
ythrow yexception() << "Error during flush of the written to socket data. Error code: " << strerror(-res) << " (" << res << ")";
}
}

TKafkaReadable& TKafkaReadable::operator>>(TKafkaUuid& val) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kafka_proxy/ut/ut_serialization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,8 @@ Y_UNIT_TEST(RequestHeader_reference) {
0x65, 0x72, 0x2D, 0x31};

TWritableBuf sb(nullptr, BUFFER_SIZE);
sb.write((char*)reference, sizeof(reference));
ssize_t res = sb.write((char*)reference, sizeof(reference));
UNIT_ASSERT_GE(res, 0);

TKafkaReadable readable(sb.GetBuffer());
TRequestHeaderData result;
Expand Down
143 changes: 77 additions & 66 deletions ydb/core/raw_socket/sock_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,62 +121,72 @@ class TBufferedWriter {
public:
TBufferedWriter(TSocketDescriptor* socket, size_t size)
: Socket(socket)
, Buffer(size)
, BufferSize(size) {
}

void write(const char* src, size_t length) {
size_t possible = std::min(length, Buffer.Avail());
if (possible > 0) {
Buffer.Append(src, possible);
}
if (0 == Buffer.Avail()) {
flush();
}
size_t left = length - possible;
if (left >= BufferSize) {
if (Chunks.empty()) {
// optimization for reduce memory copy
ssize_t res = Socket->Send(src + possible, left);
if (res > 0) {
left -= res;
possible += res;
, Buffer(size) {
}

/**
* Writes data to the socket buffer.
*
* This method writes the specified number of bytes from the source buffer to the internal buffer.
* If the internal buffer becomes full, it flushes the buffer to the socket. The process repeats until all data is written.
*
* @param src A pointer to the source buffer containing the data to be written.
* @param length The number of bytes to write from the source buffer.
* @return The total number of bytes written to the socket. If an error occurs during writing, a negative value is returned.
*/
[[nodiscard]] ssize_t write(const char* src, size_t length) {
size_t left = length;
size_t offset = 0;
ssize_t totalWritten = 0;
do {
if (Buffer.Avail() < left) { // time to flush
// flush the remains from buffer, than write straight to socket if we have a lot data
if (!Empty()) {
ssize_t flushRes = flush();
if (flushRes < 0) {
// less than zero means error
return flushRes;
} else {
totalWritten += flushRes;
}
}
}
if (left > 0) {
Buffer.Reserve(left);
Buffer.Append(src + possible, left);
flush();
}
} else if (left > 0) {
Buffer.Append(src + possible, left);
}
}

ssize_t flush() {
if (!Buffer.Empty()) {
Chunks.emplace_back(std::move(Buffer));
Buffer.Reserve(BufferSize);
}
while(!Chunks.empty()) {
auto& chunk = Chunks.front();
ssize_t res = Socket->Send(chunk.Data(), chunk.Size());
if (res > 0) {
if (static_cast<size_t>(res) == chunk.Size()) {
Chunks.pop_front();
// if we have a lot data, skip copying it to buffer, just send ot straight to socket
if (left > Buffer.Capacity()) {
// we send only small batch to socket, cause we know for sure that it will be written to socket without error
// there was a bug when we wrote to socket one big batch and OS closed the connection in case message was bigger than 6mb and SSL was enabled
size_t bytesToSend = std::min(left, MAX_SOCKET_BATCH_SIZE);
ssize_t sendRes = Send(src + offset, bytesToSend);
if (sendRes <= 0) {
// less than zero means error
// exactly zero is also interpreted as error
return sendRes;
} else {
left -= sendRes;
offset += sendRes;
totalWritten += sendRes;
}
} else {
chunk.Shift(res);
Buffer.Append(src + offset, left);
left = 0;
}
} else if (-res == EINTR) {
continue;
} else if (-res == EAGAIN || -res == EWOULDBLOCK) {
return 0;
} else {
return res;
Buffer.Append(src + offset, left);
left = 0;
}
}
} while (left > 0);

return totalWritten;
}

return 0;
[[nodiscard]] ssize_t flush() {
if (Empty()) {
return 0;
}
ssize_t res = Send(Data(), Size());
if (res > 0) {
Buffer.Clear();
}
return res;
}

const char* Data() {
Expand All @@ -192,29 +202,30 @@ class TBufferedWriter {
}

bool Empty() {
return Buffer.Empty() && Chunks.empty();
return Buffer.Empty();
}

private:
static constexpr ui32 MAX_RETRY_ATTEMPTS = 3;
static constexpr size_t MAX_SOCKET_BATCH_SIZE = 1_MB;
TSocketDescriptor* Socket;
TBuffer Buffer;
size_t BufferSize;

struct Chunk {
Chunk(TBuffer&& buffer)
: Buffer(std::move(buffer))
, Position(0) {
ssize_t Send(const char* data, size_t length) {
ui32 retryAttemtpts = MAX_RETRY_ATTEMPTS;
while (true) {
ssize_t res = Socket->Send(data, length);
// retry
if ((-res == EAGAIN || -res == EWOULDBLOCK || -res == EINTR) && retryAttemtpts--) {
continue;
}

return res;
}

TBuffer Buffer;
size_t Position;

const char* Data() { return Buffer.Data() + Position; }
size_t Size() { return Buffer.Size() - Position; }
void Shift(size_t size) { Position += size; }
};
std::deque<Chunk> Chunks;


Y_UNREACHABLE();
}
};

} // namespace NKikimr::NRawSocket

Loading