@@ -121,62 +121,72 @@ class TBufferedWriter {
121121public:
122122 TBufferedWriter (TSocketDescriptor* socket, size_t size)
123123 : Socket(socket)
124- , Buffer(size)
125- , BufferSize(size) {
126- }
127-
128- void write (const char * src, size_t length) {
129- size_t possible = std::min (length, Buffer.Avail ());
130- if (possible > 0 ) {
131- Buffer.Append (src, possible);
132- }
133- if (0 == Buffer.Avail ()) {
134- flush ();
135- }
136- size_t left = length - possible;
137- if (left >= BufferSize) {
138- if (Chunks.empty ()) {
139- // optimization for reduce memory copy
140- ssize_t res = Socket->Send (src + possible, left);
141- if (res > 0 ) {
142- left -= res;
143- possible += res;
124+ , Buffer(size) {
125+ }
126+
127+ /* *
128+ * Writes data to the socket buffer.
129+ *
130+ * This method writes the specified number of bytes from the source buffer to the internal buffer.
131+ * If the internal buffer becomes full, it flushes the buffer to the socket. The process repeats until all data is written.
132+ *
133+ * @param src A pointer to the source buffer containing the data to be written.
134+ * @param length The number of bytes to write from the source buffer.
135+ * @return The total number of bytes written to the socket. If an error occurs during writing, a negative value is returned.
136+ */
137+ [[nodiscard]] ssize_t write (const char * src, size_t length) {
138+ size_t left = length;
139+ size_t offset = 0 ;
140+ ssize_t totalWritten = 0 ;
141+ do {
142+ if (Buffer.Avail () < left) { // time to flush
143+ // flush the remains from buffer, than write straight to socket if we have a lot data
144+ if (!Empty ()) {
145+ ssize_t flushRes = flush ();
146+ if (flushRes < 0 ) {
147+ // less than zero means error
148+ return flushRes;
149+ } else {
150+ totalWritten += flushRes;
151+ }
144152 }
145- }
146- if (left > 0 ) {
147- Buffer.Reserve (left);
148- Buffer.Append (src + possible, left);
149- flush ();
150- }
151- } else if (left > 0 ) {
152- Buffer.Append (src + possible, left);
153- }
154- }
155-
156- ssize_t flush () {
157- if (!Buffer.Empty ()) {
158- Chunks.emplace_back (std::move (Buffer));
159- Buffer.Reserve (BufferSize);
160- }
161- while (!Chunks.empty ()) {
162- auto & chunk = Chunks.front ();
163- ssize_t res = Socket->Send (chunk.Data (), chunk.Size ());
164- if (res > 0 ) {
165- if (static_cast <size_t >(res) == chunk.Size ()) {
166- Chunks.pop_front ();
153+ // if we have a lot data, skip copying it to buffer, just send ot straight to socket
154+ if (left > Buffer.Capacity ()) {
155+ // we send only small batch to socket, cause we know for sure that it will be written to socket without error
156+ // there was a bug when we wrote to socket one big batch and OS closed the connection in case message was bigger than 6mb and SSL was enabled
157+ size_t bytesToSend = std::min (left, MAX_SOCKET_BATCH_SIZE);
158+ ssize_t sendRes = Send (src + offset, bytesToSend);
159+ if (sendRes <= 0 ) {
160+ // less than zero means error
161+ // exactly zero is also interpreted as error
162+ return sendRes;
163+ } else {
164+ left -= sendRes;
165+ offset += sendRes;
166+ totalWritten += sendRes;
167+ }
167168 } else {
168- chunk.Shift (res);
169+ Buffer.Append (src + offset, left);
170+ left = 0 ;
169171 }
170- } else if (-res == EINTR) {
171- continue ;
172- } else if (-res == EAGAIN || -res == EWOULDBLOCK) {
173- return 0 ;
174172 } else {
175- return res;
173+ Buffer.Append (src + offset, left);
174+ left = 0 ;
176175 }
177- }
176+ } while (left > 0 );
177+
178+ return totalWritten;
179+ }
178180
179- return 0 ;
181+ [[nodiscard]] ssize_t flush () {
182+ if (Empty ()) {
183+ return 0 ;
184+ }
185+ ssize_t res = Send (Data (), Size ());
186+ if (res > 0 ) {
187+ Buffer.Clear ();
188+ }
189+ return res;
180190 }
181191
182192 const char * Data () {
@@ -192,29 +202,30 @@ class TBufferedWriter {
192202 }
193203
194204 bool Empty () {
195- return Buffer.Empty () && Chunks. empty () ;
205+ return Buffer.Empty ();
196206 }
197207
198208private:
209+ static constexpr ui32 MAX_RETRY_ATTEMPTS = 3 ;
210+ static constexpr size_t MAX_SOCKET_BATCH_SIZE = 1_MB;
199211 TSocketDescriptor* Socket;
200212 TBuffer Buffer;
201- size_t BufferSize;
202213
203- struct Chunk {
204- Chunk (TBuffer&& buffer)
205- : Buffer(std::move(buffer))
206- , Position(0 ) {
214+ ssize_t Send (const char * data, size_t length) {
215+ ui32 retryAttemtpts = MAX_RETRY_ATTEMPTS;
216+ while (true ) {
217+ ssize_t res = Socket->Send (data, length);
218+ // retry
219+ if ((-res == EAGAIN || -res == EWOULDBLOCK || -res == EINTR) && retryAttemtpts--) {
220+ continue ;
221+ }
222+
223+ return res;
207224 }
208-
209- TBuffer Buffer;
210- size_t Position;
211-
212- const char * Data () { return Buffer.Data () + Position; }
213- size_t Size () { return Buffer.Size () - Position; }
214- void Shift (size_t size) { Position += size; }
215- };
216- std::deque<Chunk> Chunks;
217-
225+
226+ Y_UNREACHABLE ();
227+ }
218228};
219229
220230} // namespace NKikimr::NRawSocket
231+
0 commit comments