Skip to content

Commit f3df061

Browse files
committed
Keep underutilized buffer for all recv operations
1 parent 00c6e41 commit f3df061

File tree

2 files changed

+120
-71
lines changed

2 files changed

+120
-71
lines changed

erts/emulator/nifs/unix/unix_socket_syncio.c

Lines changed: 89 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,12 @@ static ERL_NIF_TERM essio_sendfile_ok(ErlNifEnv* env,
325325
size_t count);
326326
#endif
327327

328+
static BOOLEAN_T recv_alloc_buf(size_t size,
329+
ErlNifBinary *bufP);
330+
static BOOLEAN_T recv_create_bin(ErlNifBinary *bufP,
331+
size_t size,
332+
ErlNifBinary *binP);
333+
328334
static BOOLEAN_T recv_check_entry(ErlNifEnv *env,
329335
ESockDescriptor *descP,
330336
ERL_NIF_TERM recvRef,
@@ -2725,7 +2731,7 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
27252731
int flags)
27262732
{
27272733
int saveErrno;
2728-
ErlNifBinary buf;
2734+
ErlNifBinary bin, *bufP;
27292735
ssize_t readResult;
27302736
size_t bufSz = (len != 0 ? len : descP->rBufSz); // 0 means default
27312737
ERL_NIF_TERM ret;
@@ -2746,16 +2752,8 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
27462752
return ret;
27472753
}
27482754

2749-
/* Allocate the receive buffer */
2750-
if (descP->buf.data == NULL) {
2751-
ESOCK_ASSERT( ALLOC_BIN(bufSz, &buf) );
2752-
}
2753-
else {
2754-
buf = descP->buf;
2755-
if (buf.size != bufSz) {
2756-
REALLOC_BIN(&buf, bufSz);
2757-
}
2758-
}
2755+
bufP = &descP->buf;
2756+
ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) );
27592757

27602758
SSDBG( descP, ("UNIX-ESSIO", "essio_recv {%d} -> try read (%lu)\r\n",
27612759
descP->sock, (unsigned long) len) );
@@ -2764,7 +2762,7 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
27642762
esock_atom_read_tries, &descP->readTries, 1);
27652763

27662764
/* recv() */
2767-
readResult = sock_recv(descP->sock, buf.data, buf.size, flags);
2765+
readResult = sock_recv(descP->sock, bufP->data, bufP->size, flags);
27682766
saveErrno = ESOCK_IS_ERROR(readResult) ? sock_errno() : 0;
27692767

27702768
SSDBG( descP, ("UNIX-ESSIO",
@@ -2775,40 +2773,25 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
27752773
if (! recv_check_result(env, descP, sockRef, recvRef,
27762774
readResult, saveErrno, &ret) ) {
27772775
/* Keep the buffer */
2778-
descP->buf = buf;
27792776
return ret;
27802777
}
27812778
/* readResult >= 0 */
2782-
ESOCK_ASSERT( readResult <= buf.size );
27832779

2784-
if (readResult < buf.size) {
2780+
ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) );
2781+
2782+
if (bin.size < bufP->size) {
27852783

27862784
/* +++ We did not fill the buffer +++ */
27872785

27882786
SSDBG( descP,
27892787
("UNIX-ESSIO",
27902788
"essio_recv {%d} -> [%lu] "
27912789
"did not fill the buffer (%ld)\r\n",
2792-
descP->sock, (unsigned long) buf.size,
2793-
(long) readResult) );
2794-
2795-
if (// Less than 4K (1 page) wasted
2796-
readResult >= (buf.size & ~4095) ||
2797-
// Less than 25% wasted
2798-
readResult >= (buf.size >> 1) + (buf.size >> 2)) {
2799-
//
2800-
/* Reallocate and drop buffer */
2801-
descP->buf.data = NULL;
2802-
ESOCK_ASSERT( REALLOC_BIN(&buf, readResult) );
2803-
}
2804-
else {
2805-
/* Keep buffer, copy content to new binary*/
2806-
descP->buf = buf;
2807-
ESOCK_ASSERT( ALLOC_BIN(readResult, &buf) );
2808-
sys_memcpy(buf.data, descP->buf.data, buf.size);
2809-
}
2790+
descP->sock, (unsigned long) bufP->size,
2791+
(unsigned long) bin.size) );
2792+
28102793
/* Return {ok|timeout|select|select_read, Bin} */
2811-
return recv_check_partial(env, descP, sockRef, recvRef, len, &buf);
2794+
return recv_check_partial(env, descP, sockRef, recvRef, len, &bin);
28122795

28132796
} else {
28142797

@@ -2817,11 +2800,10 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
28172800
SSDBG( descP,
28182801
("UNIX-ESSIO",
28192802
"essio_recv {%d} -> [%lu] filled the buffer\r\n",
2820-
descP->sock, (unsigned long) buf.size) );
2803+
descP->sock, (unsigned long) bin.size) );
28212804

2822-
descP->buf.data = NULL; // Drop buffer
28232805
/* Return {more|ok|select_read, Bin} */
2824-
return recv_check_full(env, descP, sockRef, recvRef, len, &buf);
2806+
return recv_check_full(env, descP, sockRef, recvRef, len, &bin);
28252807
}
28262808
}
28272809

@@ -2844,7 +2826,7 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
28442826
SOCKLEN_T fromAddrLen;
28452827
ssize_t readResult;
28462828
int saveErrno;
2847-
ErlNifBinary buf;
2829+
ErlNifBinary bin, *bufP;
28482830
size_t bufSz = (len != 0 ? len : descP->rBufSz); // 0 means default
28492831
ERL_NIF_TERM ret;
28502832

@@ -2861,8 +2843,8 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
28612843
return ret;
28622844
}
28632845

2864-
/* Allocate the receive buffer */
2865-
ESOCK_ASSERT( ALLOC_BIN(bufSz, &buf) );
2846+
bufP = &descP->buf;
2847+
ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) );
28662848

28672849
ESOCK_CNT_INC(env, descP, sockRef,
28682850
esock_atom_read_tries, &descP->readTries, 1);
@@ -2871,18 +2853,19 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
28712853
sys_memzero((char*) &fromAddr, fromAddrLen);
28722854

28732855
/* recvfrom() */
2874-
readResult = sock_recvfrom(descP->sock, buf.data, buf.size, flags,
2856+
readResult = sock_recvfrom(descP->sock, bufP->data, bufP->size, flags,
28752857
&fromAddr.sa, &fromAddrLen);
28762858
saveErrno = ESOCK_IS_ERROR(readResult) ? sock_errno() : 0;
28772859

28782860
/* Check for errors and end of stream */
28792861
if (! recv_check_result(env, descP, sockRef, recvRef,
28802862
readResult, saveErrno, &ret) ) {
2881-
FREE_BIN(&buf);
2863+
/* Keep the buffer */
28822864
return ret;
28832865
}
28842866
/* readResult >= 0 */
2885-
ESOCK_ASSERT( readResult <= buf.size );
2867+
2868+
ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) );
28862869

28872870
/* The recvfrom function delivers one (1) message. If our buffer
28882871
* is too small, the message will be truncated. So, regardless
@@ -2892,18 +2875,14 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
28922875
* Encode the message and source address
28932876
*/
28942877

2895-
if (readResult < buf.size) {
2896-
ESOCK_ASSERT( REALLOC_BIN(&buf, readResult) );
2897-
}
2898-
28992878
descP->rNumCnt = 0;
29002879

29012880
ESOCK_CNT_INC(env, descP, sockRef, esock_atom_read_pkg,
29022881
&descP->readPkgCnt, 1);
29032882
ESOCK_CNT_INC(env, descP, sockRef, esock_atom_read_byte,
2904-
&descP->readByteCnt, buf.size);
2905-
if (buf.size > descP->readPkgMax)
2906-
descP->readPkgMax = buf.size;
2883+
&descP->readByteCnt, bin.size);
2884+
if (bin.size > descP->readPkgMax)
2885+
descP->readPkgMax = bin.size;
29072886

29082887
esock_encode_sockaddr(env,
29092888
&fromAddr, fromAddrLen,
@@ -2913,7 +2892,7 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
29132892
* erlang term in env (no need to free; it will be GC:ed).
29142893
*/
29152894
/* {FromAddr, Bin} */
2916-
ret = MKT2(env, ret, MKBIN(env, &buf));
2895+
ret = MKT2(env, ret, MKBIN(env, &bin));
29172896

29182897
if (descP->selectRead && (COMPARE(recvRef, esock_atom_zero) != 0)) {
29192898
/* Return {select_read, {FromAddr, Bin}} */
@@ -2950,8 +2929,7 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
29502929
size_t ctrlSz = (ctrlLen != 0 ? ctrlLen : descP->rCtrlSz);
29512930
struct msghdr msgHdr;
29522931
SysIOVec iov[1]; // Shall we always use 1?
2953-
ErlNifBinary data[1]; // Shall we always use 1?
2954-
ErlNifBinary ctrl;
2932+
ErlNifBinary ctrl, bin, *bufP;
29552933
ERL_NIF_TERM ret;
29562934
ESockAddress addr;
29572935

@@ -2970,9 +2948,10 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
29702948
return ret;
29712949
}
29722950

2973-
/* Allocate the (msg) data buffer:
2951+
/* Allocate the data buffer
29742952
*/
2975-
ESOCK_ASSERT( ALLOC_BIN(bufSz, &data[0]) );
2953+
bufP = &descP->buf;
2954+
ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) );
29762955

29772956
/* Allocate the ctrl (buffer):
29782957
*/
@@ -2985,8 +2964,8 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
29852964
sys_memzero((char*) &addr, addrLen);
29862965
sys_memzero((char*) &msgHdr, sizeof(msgHdr));
29872966

2988-
iov[0].iov_base = data[0].data;
2989-
iov[0].iov_len = data[0].size;
2967+
iov[0].iov_base = bufP->data;
2968+
iov[0].iov_len = bufP->size;
29902969

29912970
msgHdr.msg_name = &addr;
29922971
msgHdr.msg_namelen = addrLen;
@@ -3002,12 +2981,14 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
30022981
/* Check for errors and end of stream */
30032982
if (! recv_check_result(env, descP, sockRef, recvRef,
30042983
readResult, saveErrno, &ret) ) {
3005-
FREE_BIN(&data[0]);
2984+
/* Keep the data buffer */
30062985
FREE_BIN(&ctrl);
30072986
return ret;
30082987
}
30092988
/* readResult >= 0 */
30102989

2990+
ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) );
2991+
30112992
/* The recvmsg function delivers one (1) message. If our buffer
30122993
* is to small, the message will be truncated. So, regardless
30132994
* if we filled the buffer or not, we have got what we are going
@@ -3038,7 +3019,7 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
30383019
descP->readPkgMax = readResult;
30393020

30403021
encode_msg(env, descP,
3041-
readResult, &msgHdr, &data[0], &ctrl,
3022+
readResult, &msgHdr, &bin, &ctrl,
30423023
&ret);
30433024

30443025
if (descP->selectRead && (COMPARE(recvRef, esock_atom_zero) != 0)) {
@@ -6861,6 +6842,55 @@ void essio_down(ErlNifEnv* env,
68616842

68626843
/* *** Recv/recvfrom/recvmsg utility functions *** */
68636844

6845+
static
6846+
BOOLEAN_T recv_alloc_buf(size_t size,
6847+
ErlNifBinary *bufP)
6848+
{
6849+
if (bufP->data == NULL) {
6850+
return ALLOC_BIN(size, bufP);
6851+
}
6852+
else {
6853+
if (size != bufP->size)
6854+
return REALLOC_BIN(bufP, size);
6855+
else
6856+
return TRUE;
6857+
}
6858+
}
6859+
6860+
static
6861+
BOOLEAN_T recv_create_bin(ErlNifBinary *bufP, size_t size, ErlNifBinary *binP)
6862+
{
6863+
/* Don't touch bufP->size
6864+
*/
6865+
if (size >= bufP->size) {
6866+
/* Buffer full
6867+
* - use it as return binary and drop buffer
6868+
*/
6869+
ESOCK_ASSERT( bufP->size >= size );
6870+
*binP = *bufP;
6871+
bufP->data = NULL;
6872+
return TRUE;
6873+
}
6874+
else if (size >= (bufP->size & ~4095) ||
6875+
size >= (bufP->size >> 1) + (bufP->size >> 2)) {
6876+
/* Less than a 4 K page shrink or less than 25% shrink
6877+
* - reallocate and drop buffer
6878+
*/
6879+
*binP = *bufP;
6880+
bufP->data = NULL;
6881+
return REALLOC_BIN(binP, size);
6882+
}
6883+
else {
6884+
BOOLEAN_T ret;
6885+
/* Keep buffer, copy content to new allocated binary
6886+
*/
6887+
ret = ALLOC_BIN(size, binP);
6888+
if (ret)
6889+
sys_memcpy(binP->data, bufP->data, size);
6890+
return ret;
6891+
}
6892+
}
6893+
68646894
static
68656895
BOOLEAN_T recv_check_entry(ErlNifEnv *env,
68666896
ESockDescriptor *descP,

lib/kernel/src/socket.erl

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5427,20 +5427,38 @@ recv_deadline(SockRef, Length, Flags, Deadline, Buf) ->
54275427

54285428
%%
54295429
{select_read, Bin} -> %% All data, new recv operation in progress
5430+
%% The combination of select_read and recv with time-out
5431+
%% is contradictive since the return values has no place
5432+
%% for a continuation because neither a ref nor 'nowait'
5433+
%% was given, so we handle this as if there was no select_read
5434+
%% by cancelling the new recv operation
5435+
%%
54305436
_ = cancel(SockRef, recv, Handle),
54315437
{ok, condense_buffer(Bin, Buf)};
54325438
%%
5433-
Select %% select | {select, Bin} %% No data or incomplete
5434-
when Select =:= select;
5435-
tuple_size(Select) =:= 2, element(1, Select) =:= select ->
5436-
{Length_1, Buf_1} =
5437-
if
5438-
Select =:= select ->
5439-
{Length, Buf};
5440-
true ->
5441-
Bin = element(2, Select),
5442-
{Length - byte_size(Bin), [Bin | Buf]}
5443-
end,
5439+
select ->
5440+
%%
5441+
%% There is nothing just now, but we will be notified
5442+
%% with a select message when there is something to recv
5443+
Timeout = timeout(Deadline),
5444+
receive
5445+
?socket_msg(?socket(SockRef), select, Handle) ->
5446+
if
5447+
0 < Timeout ->
5448+
%% Retry
5449+
recv_deadline(
5450+
SockRef, Length, Flags, Deadline, Buf);
5451+
true ->
5452+
recv_error(timeout, Buf)
5453+
end;
5454+
?socket_msg(_Socket, abort, {Handle, Reason}) ->
5455+
recv_error(Reason, Buf)
5456+
after Timeout ->
5457+
_ = cancel(SockRef, recv, Handle),
5458+
recv_error(timeout, Buf)
5459+
end;
5460+
{select, Bin} ->
5461+
Buf_1 = [Bin | Buf],
54445462
%%
54455463
%% There is nothing just now, but we will be notified
54465464
%% with a select message when there is something to recv
@@ -5451,7 +5469,8 @@ recv_deadline(SockRef, Length, Flags, Deadline, Buf) ->
54515469
0 < Timeout ->
54525470
%% Retry
54535471
recv_deadline(
5454-
SockRef, Length_1, Flags, Deadline, Buf_1);
5472+
SockRef, Length - byte_size(Bin),
5473+
Flags, Deadline, Buf_1);
54555474
true ->
54565475
recv_error(timeout, Buf_1)
54575476
end;

0 commit comments

Comments
 (0)