Skip to content

Commit a01e245

Browse files
Merge 316e8bf into 97ed61a
2 parents 97ed61a + 316e8bf commit a01e245

File tree

4 files changed

+82
-65
lines changed

4 files changed

+82
-65
lines changed

ydb/core/kafka_proxy/kafka_connection.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,10 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
570570
responseHeader.Write(writable, headerVersion);
571571
reply->Write(writable, version);
572572

573-
Buffer.flush();
573+
ssize_t res = Buffer.flush();
574+
if (res < 0) {
575+
ythrow yexception() << "Error during flush of the written to socket data. Error code: " << strerror(-res) << " (" << res << ")" << ".";
576+
}
574577

575578
KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size);
576579
} catch(const yexception& e) {

ydb/core/kafka_proxy/kafka_messages_int.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ TKafkaWritable& TKafkaWritable::operator<<(const TKafkaUuid& val) {
2020
}
2121

2222
void TKafkaWritable::write(const char* val, size_t length) {
23-
Buffer.write(val, length);
23+
ssize_t res = Buffer.write(val, length);
24+
if (res < 0) {
25+
ythrow yexception() << "Error during flush of the written to socket data. Error code: " << strerror(-res) << " (" << res << ")" << ".";
26+
}
2427
}
2528

2629
TKafkaReadable& TKafkaReadable::operator>>(TKafkaUuid& val) {

ydb/core/kafka_proxy/ut/ut_serialization.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,8 @@ Y_UNIT_TEST(RequestHeader_reference) {
648648
0x65, 0x72, 0x2D, 0x31};
649649

650650
TWritableBuf sb(nullptr, BUFFER_SIZE);
651-
sb.write((char*)reference, sizeof(reference));
651+
ssize_t res = sb.write((char*)reference, sizeof(reference));
652+
UNIT_ASSERT_GE(res, 0);
652653

653654
TKafkaReadable readable(sb.GetBuffer());
654655
TRequestHeaderData result;

ydb/core/raw_socket/sock_impl.h

Lines changed: 72 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -121,57 +121,82 @@ class TBufferedWriter {
121121
public:
122122
TBufferedWriter(TSocketDescriptor* socket, size_t size)
123123
: Socket(socket)
124-
, Buffer(size)
125-
, BufferSize(size) {
126-
}
127-
128-
void write(const char* src, size_t length) {
129-
size_t possible = std::min(length, Buffer.Avail());
130-
if (possible > 0) {
131-
Buffer.Append(src, possible);
132-
}
133-
if (0 == Buffer.Avail() && Socket) {
134-
flush();
135-
}
136-
size_t left = length - possible;
137-
if (left >= BufferSize) {
138-
if (Chunks.empty()) {
139-
// optimization for reduce memory copy
140-
ssize_t res = Socket->Send(src + possible, left);
141-
if (res > 0) {
142-
left -= res;
143-
possible += res;
124+
, Buffer(size) {
125+
}
126+
127+
/**
128+
* Writes data to the socket buffer.
129+
*
130+
* This method writes the specified number of bytes from the source buffer to the internal buffer.
131+
* If the internal buffer becomes full, it flushes the buffer to the socket. The process repeats until all data is written.
132+
*
133+
* @param src A pointer to the source buffer containing the data to be written.
134+
* @param length The number of bytes to write from the source buffer.
135+
* @return The total number of bytes written to the socket. If an error occurs during writing, a negative value is returned.
136+
*/
137+
[[nodiscard]] ssize_t write(const char* src, size_t length) {
138+
size_t left = length;
139+
size_t offset = 0;
140+
ssize_t totalWritten = 0;
141+
ui32 retryAttemtpts = MAX_RETRY_ATTEMPTS;
142+
do {
143+
if (Buffer.Avail() < left) { // time to flush
144+
// flush the remains from buffer, than write straight to socket if we have a lot data
145+
if (!Empty()) {
146+
ssize_t flushRes = flush();
147+
if (flushRes < 0) {
148+
// less than zero means error
149+
return flushRes;
150+
} else if (flushRes == 0) {
151+
retryAttemtpts--;
152+
continue;
153+
} else {
154+
totalWritten += flushRes;
155+
}
144156
}
157+
// if we have a lot data, skip copying it to buffer, just send ot straight to socket
158+
if (left > Buffer.Capacity()) {
159+
// we send only buffer capacity, cause we know for sure that it will be written to socket without error
160+
// there was a bug when we wrote to socket in one big batch and OS closed the connection if message was bigger than 6mb and SSL was enabled
161+
ssize_t sendRes = Socket->Send(src + offset, Buffer.Capacity());
162+
if (sendRes < 0) {
163+
// less than zero means error
164+
return sendRes;
165+
} else if (sendRes == 0) {
166+
retryAttemtpts--;
167+
continue;
168+
} else {
169+
left -= sendRes;
170+
offset += sendRes;
171+
totalWritten += sendRes;
172+
}
173+
} else {
174+
Buffer.Append(src + offset, left);
175+
left = 0;
176+
}
177+
} else {
178+
Buffer.Append(src + offset, left);
179+
left = 0;
145180
}
146-
if (left > 0) {
147-
Buffer.Reserve(left);
148-
Buffer.Append(src + possible, left);
149-
flush();
150-
}
151-
} else if (left > 0) {
152-
Buffer.Append(src + possible, left);
153-
}
181+
} while (left > 0 && retryAttemtpts > 0);
182+
183+
return totalWritten;
154184
}
155185

156-
ssize_t flush() {
157-
if (!Buffer.Empty()) {
158-
Chunks.emplace_back(std::move(Buffer));
159-
Buffer.Reserve(BufferSize);
186+
[[nodiscard]] ssize_t flush() {
187+
if (Empty()) {
188+
return 0;
160189
}
161-
while(!Chunks.empty()) {
162-
auto& chunk = Chunks.front();
163-
ssize_t res = Socket->Send(chunk.Data(), chunk.Size());
164-
if (res > 0) {
165-
if (static_cast<size_t>(res) == chunk.Size()) {
166-
Chunks.pop_front();
167-
} else {
168-
chunk.Shift(res);
169-
}
170-
} else if (-res == EINTR) {
190+
ui32 retryAttemtpts = MAX_RETRY_ATTEMPTS;
191+
while (retryAttemtpts > 0) {
192+
ssize_t res = Socket->Send(std::move(Data()), Size());
193+
if (res < 0) {
194+
return res;
195+
} else if (res == 0) {
196+
retryAttemtpts--;
171197
continue;
172-
} else if (-res == EAGAIN || -res == EWOULDBLOCK) {
173-
return 0;
174198
} else {
199+
Buffer.Clear();
175200
return res;
176201
}
177202
}
@@ -192,29 +217,14 @@ class TBufferedWriter {
192217
}
193218

194219
bool Empty() {
195-
return Buffer.Empty() && Chunks.empty();
220+
return Buffer.Empty();
196221
}
197222

198223
private:
224+
const ui32 MAX_RETRY_ATTEMPTS = 3;
199225
TSocketDescriptor* Socket;
200226
TBuffer Buffer;
201-
size_t BufferSize;
202-
203-
struct Chunk {
204-
Chunk(TBuffer&& buffer)
205-
: Buffer(std::move(buffer))
206-
, Position(0) {
207-
}
208-
209-
TBuffer Buffer;
210-
size_t Position;
211-
212-
const char* Data() { return Buffer.Data() + Position; }
213-
size_t Size() { return Buffer.Size() - Position; }
214-
void Shift(size_t size) { Position += size; }
215-
};
216-
std::deque<Chunk> Chunks;
217-
218227
};
219228

220229
} // namespace NKikimr::NRawSocket
230+

0 commit comments

Comments
 (0)