Skip to content

Commit

Permalink
mptcp: refine MPTCP-level ack scheduling
Browse files Browse the repository at this point in the history
Send timely MPTCP-level ack is somewhat difficult when
the insertion into the msk receive level is performed
by the worker.

It needs TCP-level dup-ack to notify the MPTCP-level
ack_seq increase, as both the TCP-level ack seq and the
rcv window are unchanged.

We can actually avoid processing incoming data with the
worker, and let the subflow or recevmsg() send ack as needed.

When recvmsg() moves the skbs inside the msk receive queue,
the msk space is still unchanged, so tcp_cleanup_rbuf() could
end-up skipping TCP-level ack generation. Anyway, when
__mptcp_move_skbs() is invoked, a known amount of bytes is
going to be consumed soon: we update rcv wnd computation taking
them in account.

Additionally we need to explicitly trigger tcp_cleanup_rbuf()
when recvmsg() consumes a significant amount of the receive buffer.

Reviewed-by: Mat Martineau <mathew.j.martineau@linux.intel.com>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
  • Loading branch information
Paolo Abeni authored and matttbe committed Nov 19, 2020
1 parent ff15ab9 commit c77dbc7
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 57 deletions.
1 change: 1 addition & 0 deletions net/mptcp/options.c
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ static bool mptcp_established_options_dss(struct sock *sk, struct sk_buff *skb,
opts->ext_copy.ack64 = 0;
}
opts->ext_copy.use_ack = 1;
WRITE_ONCE(msk->old_wspace, __mptcp_space((struct sock *)msk));

/* Add kind/length/subtype/flag overhead if mapping is not populated */
if (dss_size == 0)
Expand Down
105 changes: 51 additions & 54 deletions net/mptcp/protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,16 +407,42 @@ static void mptcp_set_timeout(const struct sock *sk, const struct sock *ssk)
mptcp_sk(sk)->timer_ival = tout > 0 ? tout : TCP_RTO_MIN;
}

static void mptcp_send_ack(struct mptcp_sock *msk)
static bool mptcp_subflow_active(struct mptcp_subflow_context *subflow)
{
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);

/* can't send if JOIN hasn't completed yet (i.e. is usable for mptcp) */
if (subflow->request_join && !subflow->fully_established)
return false;

/* only send if our side has not closed yet */
return ((1 << ssk->sk_state) & (TCPF_ESTABLISHED | TCPF_CLOSE_WAIT));
}

static void mptcp_send_ack(struct mptcp_sock *msk, bool force)
{
struct mptcp_subflow_context *subflow;
struct sock *pick = NULL;

mptcp_for_each_subflow(msk, subflow) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);

lock_sock(ssk);
tcp_send_ack(ssk);
release_sock(ssk);
if (force) {
lock_sock(ssk);
tcp_send_ack(ssk);
release_sock(ssk);
continue;
}

/* if the hintes ssk is still active, use it */
pick = ssk;
if (ssk == msk->ack_hint)
break;
}
if (!force && pick) {
lock_sock(pick);
tcp_cleanup_rbuf(pick, 1);
release_sock(pick);
}
}

Expand Down Expand Up @@ -468,7 +494,7 @@ static bool mptcp_check_data_fin(struct sock *sk)

ret = true;
mptcp_set_timeout(sk, NULL);
mptcp_send_ack(msk);
mptcp_send_ack(msk, true);
mptcp_close_wake_up(sk);
}
return ret;
Expand All @@ -483,7 +509,6 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
unsigned int moved = 0;
bool more_data_avail;
struct tcp_sock *tp;
u32 old_copied_seq;
bool done = false;
int sk_rbuf;

Expand All @@ -500,7 +525,6 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,

pr_debug("msk=%p ssk=%p", msk, ssk);
tp = tcp_sk(ssk);
old_copied_seq = tp->copied_seq;
do {
u32 map_remaining, offset;
u32 seq = tp->copied_seq;
Expand Down Expand Up @@ -564,11 +588,9 @@ static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
break;
}
} while (more_data_avail);
msk->ack_hint = ssk;

*bytes += moved;
if (tp->copied_seq != old_copied_seq)
tcp_cleanup_rbuf(ssk, 1);

return done;
}

Expand Down Expand Up @@ -672,19 +694,8 @@ void mptcp_data_ready(struct sock *sk, struct sock *ssk)
if (atomic_read(&sk->sk_rmem_alloc) > sk_rbuf)
goto wake;

if (move_skbs_to_msk(msk, ssk))
goto wake;
move_skbs_to_msk(msk, ssk);

/* mptcp socket is owned, release_cb should retry */
if (!test_and_set_bit(TCP_DELACK_TIMER_DEFERRED,
&sk->sk_tsq_flags)) {
sock_hold(sk);

/* need to try again, its possible release_cb() has already
* been called after the test_and_set_bit() above.
*/
move_skbs_to_msk(msk, ssk);
}
wake:
if (wake)
sk->sk_data_ready(sk);
Expand Down Expand Up @@ -1095,18 +1106,6 @@ static void mptcp_nospace(struct mptcp_sock *msk)
mptcp_clean_una((struct sock *)msk);
}

static bool mptcp_subflow_active(struct mptcp_subflow_context *subflow)
{
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);

/* can't send if JOIN hasn't completed yet (i.e. is usable for mptcp) */
if (subflow->request_join && !subflow->fully_established)
return false;

/* only send if our side has not closed yet */
return ((1 << ssk->sk_state) & (TCPF_ESTABLISHED | TCPF_CLOSE_WAIT));
}

#define MPTCP_SEND_BURST_SIZE ((1 << 16) - \
sizeof(struct tcphdr) - \
MAX_TCP_OPTION_SPACE - \
Expand Down Expand Up @@ -1534,7 +1533,7 @@ static void mptcp_rcv_space_adjust(struct mptcp_sock *msk, int copied)
msk->rcvq_space.time = mstamp;
}

static bool __mptcp_move_skbs(struct mptcp_sock *msk)
static bool __mptcp_move_skbs(struct mptcp_sock *msk, unsigned int rcv)
{
unsigned int moved = 0;
bool done;
Expand All @@ -1553,12 +1552,16 @@ static bool __mptcp_move_skbs(struct mptcp_sock *msk)

slowpath = lock_sock_fast(ssk);
done = __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
if (moved && rcv) {
WRITE_ONCE(msk->rmem_pending, min(rcv, moved));
tcp_cleanup_rbuf(ssk, 1);
WRITE_ONCE(msk->rmem_pending, 0);
}
unlock_sock_fast(ssk, slowpath);
} while (!done);

if (mptcp_ofo_queue(msk) || moved > 0) {
if (!mptcp_check_data_fin((struct sock *)msk))
mptcp_send_ack(msk);
mptcp_check_data_fin((struct sock *)msk);
return true;
}
return false;
Expand All @@ -1582,8 +1585,8 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
__mptcp_flush_join_list(msk);

while (len > (size_t)copied) {
int bytes_read;
for (;;) {
int bytes_read, old_space;

bytes_read = __mptcp_recvmsg_mskq(msk, msg, len - copied);
if (unlikely(bytes_read < 0)) {
Expand All @@ -1595,9 +1598,14 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
copied += bytes_read;

if (skb_queue_empty(&sk->sk_receive_queue) &&
__mptcp_move_skbs(msk))
__mptcp_move_skbs(msk, len - copied))
continue;

/* be sure to advertise window change */
old_space = READ_ONCE(msk->old_wspace);
if ((tcp_space(sk) - old_space) >= old_space)
mptcp_send_ack(msk, false);

/* only the master socket status is relevant here. The exit
* conditions mirror closely tcp_recvmsg()
*/
Expand Down Expand Up @@ -1650,7 +1658,7 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
/* .. race-breaker: ssk might have gotten new data
* after last __mptcp_move_skbs() returned false.
*/
if (unlikely(__mptcp_move_skbs(msk)))
if (unlikely(__mptcp_move_skbs(msk, 0)))
set_bit(MPTCP_DATA_READY, &msk->flags);
} else if (unlikely(!test_bit(MPTCP_DATA_READY, &msk->flags))) {
/* data to read but mptcp_wait_data() cleared DATA_READY */
Expand Down Expand Up @@ -1881,7 +1889,6 @@ static void mptcp_worker(struct work_struct *work)
if (test_and_clear_bit(MPTCP_WORK_CLOSE_SUBFLOW, &msk->flags))
__mptcp_close_subflow(msk);

__mptcp_move_skbs(msk);
if (mptcp_send_head(sk))
mptcp_push_pending(sk, 0);

Expand Down Expand Up @@ -1965,6 +1972,7 @@ static int __mptcp_init_sock(struct sock *sk)
msk->out_of_order_queue = RB_ROOT;
msk->first_pending = NULL;

msk->ack_hint = NULL;
msk->first = NULL;
inet_csk(sk)->icsk_sync_mss = mptcp_sync_mss;

Expand Down Expand Up @@ -2500,8 +2508,7 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname,
return -EOPNOTSUPP;
}

#define MPTCP_DEFERRED_ALL (TCPF_DELACK_TIMER_DEFERRED | \
TCPF_WRITE_TIMER_DEFERRED)
#define MPTCP_DEFERRED_ALL (TCPF_WRITE_TIMER_DEFERRED)

/* this is very alike tcp_release_cb() but we must handle differently a
* different set of events
Expand All @@ -2519,16 +2526,6 @@ static void mptcp_release_cb(struct sock *sk)

sock_release_ownership(sk);

if (flags & TCPF_DELACK_TIMER_DEFERRED) {
struct mptcp_sock *msk = mptcp_sk(sk);
struct sock *ssk;

ssk = mptcp_subflow_recv_lookup(msk);
if (!ssk || sk->sk_state == TCP_CLOSE ||
!schedule_work(&msk->work))
__sock_put(sk);
}

if (flags & TCPF_WRITE_TIMER_DEFERRED) {
mptcp_retransmit_handler(sk);
__sock_put(sk);
Expand Down
8 changes: 8 additions & 0 deletions net/mptcp/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,20 @@ struct mptcp_sock {
u64 rcv_data_fin_seq;
struct sock *last_snd;
int snd_burst;
int old_wspace;
atomic64_t snd_una;
atomic64_t wnd_end;
unsigned long timer_ival;
u32 token;
int rmem_pending;
unsigned long flags;
bool can_ack;
bool fully_established;
bool rcv_data_fin;
bool snd_data_fin_enable;
bool use_64bit_ack; /* Set when we received a 64-bit DSN */
spinlock_t join_list_lock;
struct sock *ack_hint;
struct work_struct work;
struct sk_buff *ooo_last_skb;
struct rb_root out_of_order_queue;
Expand Down Expand Up @@ -258,6 +261,11 @@ static inline struct mptcp_sock *mptcp_sk(const struct sock *sk)
return (struct mptcp_sock *)sk;
}

static inline int __mptcp_space(const struct sock *sk)
{
return tcp_space(sk) + READ_ONCE(mptcp_sk(sk)->rmem_pending);
}

static inline struct mptcp_data_frag *mptcp_send_head(const struct sock *sk)
{
const struct mptcp_sock *msk = mptcp_sk(sk);
Expand Down
4 changes: 1 addition & 3 deletions net/mptcp/subflow.c
Original file line number Diff line number Diff line change
Expand Up @@ -850,8 +850,6 @@ static void mptcp_subflow_discard_data(struct sock *ssk, struct sk_buff *skb,
sk_eat_skb(ssk, skb);
if (mptcp_subflow_get_map_offset(subflow) >= subflow->map_data_len)
subflow->map_valid = 0;
if (incr)
tcp_cleanup_rbuf(ssk, incr);
}

static bool subflow_check_data_avail(struct sock *ssk)
Expand Down Expand Up @@ -973,7 +971,7 @@ void mptcp_space(const struct sock *ssk, int *space, int *full_space)
const struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
const struct sock *sk = subflow->conn;

*space = tcp_space(sk);
*space = __mptcp_space(sk);
*full_space = tcp_full_space(sk);
}

Expand Down

0 comments on commit c77dbc7

Please sign in to comment.