Message ID | 20210323003808.16074-5-xiyou.wangcong@gmail.com |
---|---|
State | New |
Headers | show |
Series | sockmap: introduce BPF_SK_SKB_VERDICT and support UDP | expand |
Cong Wang wrote: > From: Cong Wang <cong.wang@bytedance.com> > > We do not have to lock the sock to avoid losing sk_socket, > instead we can purge all the ingress queues when we close > the socket. Sending or receiving packets after orphaning > socket makes no sense. > > We do purge these queues when psock refcnt reaches zero but > here we want to purge them explicitly in sock_map_close(). > There are also some nasty race conditions on testing bit > SK_PSOCK_TX_ENABLED and queuing/canceling the psock work, > we can expand psock->ingress_lock a bit to protect them too. > > Cc: John Fastabend <john.fastabend@gmail.com> > Cc: Daniel Borkmann <daniel@iogearbox.net> > Cc: Jakub Sitnicki <jakub@cloudflare.com> > Cc: Lorenz Bauer <lmb@cloudflare.com> > Signed-off-by: Cong Wang <cong.wang@bytedance.com> > --- > include/linux/skmsg.h | 1 + > net/core/skmsg.c | 51 +++++++++++++++++++++++++++---------------- > net/core/sock_map.c | 1 + > 3 files changed, 34 insertions(+), 19 deletions(-) > > diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h > index f2d45a73b2b2..cf23e6e2cf54 100644 > --- a/include/linux/skmsg.h > +++ b/include/linux/skmsg.h > @@ -347,6 +347,7 @@ static inline void sk_psock_report_error(struct sk_psock *psock, int err) > } > > struct sk_psock *sk_psock_init(struct sock *sk, int node); > +void sk_psock_stop(struct sk_psock *psock, bool wait); > > #if IS_ENABLED(CONFIG_BPF_STREAM_PARSER) > int sk_psock_init_strp(struct sock *sk, struct sk_psock *psock); > diff --git a/net/core/skmsg.c b/net/core/skmsg.c > index 305dddc51857..9176add87643 100644 > --- a/net/core/skmsg.c > +++ b/net/core/skmsg.c > @@ -497,7 +497,7 @@ static int sk_psock_handle_skb(struct sk_psock *psock, struct sk_buff *skb, > if (!ingress) { > if (!sock_writeable(psock->sk)) > return -EAGAIN; > - return skb_send_sock_locked(psock->sk, skb, off, len); > + return skb_send_sock(psock->sk, skb, off, len); > } > return sk_psock_skb_ingress(psock, skb); > } > @@ -511,8 +511,6 @@ static void sk_psock_backlog(struct work_struct *work) > u32 len, off; > int ret; Hi Cong, I'm trying to understand if the workqueue logic will somehow prevent the following, CPU0 CPU1 work dequeue sk_psock_backlog() ... do backlog ... also maybe sleep schedule_work() work_dequeue sk_psock_backlog() <----- multiple runners --------> work_complete It seems we could get multiple instances of sk_psock_backlog(), unless the max_active is set to 1 in __queue_work() which would push us through the WORK_STRUCT_DELAYED state. At least thats my initial read. Before it didn't matter because we had the sock_lock to ensure we have only a single runner here. I need to study the workqueue code here to be sure, but I'm thinking this might a problem unless we set up the workqueue correctly. Do you have any extra details on why above can't happen thanks. > > - /* Lock sock to avoid losing sk_socket during loop. */ > - lock_sock(psock->sk); > if (state->skb) { > skb = state->skb; > len = state->len; > @@ -529,7 +527,7 @@ static void sk_psock_backlog(struct work_struct *work) > skb_bpf_redirect_clear(skb); > do { > ret = -EIO; > - if (likely(psock->sk->sk_socket)) > + if (!sock_flag(psock->sk, SOCK_DEAD)) > ret = sk_psock_handle_skb(psock, skb, off, > len, ingress); > if (ret <= 0) { Thanks, John
On Thu, Mar 25, 2021 at 7:10 PM John Fastabend <john.fastabend@gmail.com> wrote: > Hi Cong, > > I'm trying to understand if the workqueue logic will somehow prevent the > following, > > CPU0 CPU1 > > work dequeue > sk_psock_backlog() > ... do backlog > ... also maybe sleep > > schedule_work() > work_dequeue > sk_psock_backlog() > > <----- multiple runners --------> > > work_complete > > It seems we could get multiple instances of sk_psock_backlog(), unless > the max_active is set to 1 in __queue_work() which would push us through > the WORK_STRUCT_DELAYED state. At least thats my initial read. Before > it didn't matter because we had the sock_lock to ensure we have only a > single runner here. > > I need to study the workqueue code here to be sure, but I'm thinking > this might a problem unless we set up the workqueue correctly. > > Do you have any extra details on why above can't happen thanks. Very good question! I thought a same work callback is never executed concurrently, but after reading the workqueue code, actually I agree with you on this, that is, a same work callback can be executed concurrently on different CPU's. Limiting max_active to 1 is not a solution here, as we still want to keep different items running concurrently. Therefore, we still need a mutex here, just to protect this scenario. I will add a psock->work_mutex inside sk_psock_backlog(). Thanks!
diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h index f2d45a73b2b2..cf23e6e2cf54 100644 --- a/include/linux/skmsg.h +++ b/include/linux/skmsg.h @@ -347,6 +347,7 @@ static inline void sk_psock_report_error(struct sk_psock *psock, int err) } struct sk_psock *sk_psock_init(struct sock *sk, int node); +void sk_psock_stop(struct sk_psock *psock, bool wait); #if IS_ENABLED(CONFIG_BPF_STREAM_PARSER) int sk_psock_init_strp(struct sock *sk, struct sk_psock *psock); diff --git a/net/core/skmsg.c b/net/core/skmsg.c index 305dddc51857..9176add87643 100644 --- a/net/core/skmsg.c +++ b/net/core/skmsg.c @@ -497,7 +497,7 @@ static int sk_psock_handle_skb(struct sk_psock *psock, struct sk_buff *skb, if (!ingress) { if (!sock_writeable(psock->sk)) return -EAGAIN; - return skb_send_sock_locked(psock->sk, skb, off, len); + return skb_send_sock(psock->sk, skb, off, len); } return sk_psock_skb_ingress(psock, skb); } @@ -511,8 +511,6 @@ static void sk_psock_backlog(struct work_struct *work) u32 len, off; int ret; - /* Lock sock to avoid losing sk_socket during loop. */ - lock_sock(psock->sk); if (state->skb) { skb = state->skb; len = state->len; @@ -529,7 +527,7 @@ static void sk_psock_backlog(struct work_struct *work) skb_bpf_redirect_clear(skb); do { ret = -EIO; - if (likely(psock->sk->sk_socket)) + if (!sock_flag(psock->sk, SOCK_DEAD)) ret = sk_psock_handle_skb(psock, skb, off, len, ingress); if (ret <= 0) { @@ -537,13 +535,13 @@ static void sk_psock_backlog(struct work_struct *work) state->skb = skb; state->len = len; state->off = off; - goto end; + return; } /* Hard errors break pipe and stop xmit. */ sk_psock_report_error(psock, ret ? -ret : EPIPE); sk_psock_clear_state(psock, SK_PSOCK_TX_ENABLED); kfree_skb(skb); - goto end; + return; } off += ret; len -= ret; @@ -552,8 +550,6 @@ static void sk_psock_backlog(struct work_struct *work) if (!ingress) kfree_skb(skb); } -end: - release_sock(psock->sk); } struct sk_psock *sk_psock_init(struct sock *sk, int node) @@ -631,7 +627,7 @@ static void __sk_psock_purge_ingress_msg(struct sk_psock *psock) } } -static void sk_psock_zap_ingress(struct sk_psock *psock) +static void __sk_psock_zap_ingress(struct sk_psock *psock) { struct sk_buff *skb; @@ -639,9 +635,7 @@ static void sk_psock_zap_ingress(struct sk_psock *psock) skb_bpf_redirect_clear(skb); kfree_skb(skb); } - spin_lock_bh(&psock->ingress_lock); __sk_psock_purge_ingress_msg(psock); - spin_unlock_bh(&psock->ingress_lock); } static void sk_psock_link_destroy(struct sk_psock *psock) @@ -654,6 +648,18 @@ static void sk_psock_link_destroy(struct sk_psock *psock) } } +void sk_psock_stop(struct sk_psock *psock, bool wait) +{ + spin_lock_bh(&psock->ingress_lock); + sk_psock_clear_state(psock, SK_PSOCK_TX_ENABLED); + sk_psock_cork_free(psock); + __sk_psock_zap_ingress(psock); + spin_unlock_bh(&psock->ingress_lock); + + if (wait) + cancel_work_sync(&psock->work); +} + static void sk_psock_done_strp(struct sk_psock *psock); static void sk_psock_destroy_deferred(struct work_struct *gc) @@ -670,7 +676,6 @@ static void sk_psock_destroy_deferred(struct work_struct *gc) sk_psock_link_destroy(psock); sk_psock_cork_free(psock); - sk_psock_zap_ingress(psock); if (psock->sk_redir) sock_put(psock->sk_redir); @@ -688,8 +693,7 @@ static void sk_psock_destroy(struct rcu_head *rcu) void sk_psock_drop(struct sock *sk, struct sk_psock *psock) { - sk_psock_cork_free(psock); - sk_psock_zap_ingress(psock); + sk_psock_stop(psock, false); write_lock_bh(&sk->sk_callback_lock); sk_psock_restore_proto(sk, psock); @@ -699,7 +703,6 @@ void sk_psock_drop(struct sock *sk, struct sk_psock *psock) else if (psock->progs.stream_verdict) sk_psock_stop_verdict(sk, psock); write_unlock_bh(&sk->sk_callback_lock); - sk_psock_clear_state(psock, SK_PSOCK_TX_ENABLED); call_rcu(&psock->rcu, sk_psock_destroy); } @@ -770,14 +773,20 @@ static void sk_psock_skb_redirect(struct sk_buff *skb) * error that caused the pipe to break. We can't send a packet on * a socket that is in this state so we drop the skb. */ - if (!psock_other || sock_flag(sk_other, SOCK_DEAD) || - !sk_psock_test_state(psock_other, SK_PSOCK_TX_ENABLED)) { + if (!psock_other || sock_flag(sk_other, SOCK_DEAD)) { + kfree_skb(skb); + return; + } + spin_lock_bh(&psock_other->ingress_lock); + if (!sk_psock_test_state(psock_other, SK_PSOCK_TX_ENABLED)) { + spin_unlock_bh(&psock_other->ingress_lock); kfree_skb(skb); return; } skb_queue_tail(&psock_other->ingress_skb, skb); schedule_work(&psock_other->work); + spin_unlock_bh(&psock_other->ingress_lock); } static void sk_psock_tls_verdict_apply(struct sk_buff *skb, struct sock *sk, int verdict) @@ -845,8 +854,12 @@ static void sk_psock_verdict_apply(struct sk_psock *psock, err = sk_psock_skb_ingress_self(psock, skb); } if (err < 0) { - skb_queue_tail(&psock->ingress_skb, skb); - schedule_work(&psock->work); + spin_lock_bh(&psock->ingress_lock); + if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) { + skb_queue_tail(&psock->ingress_skb, skb); + schedule_work(&psock->work); + } + spin_unlock_bh(&psock->ingress_lock); } break; case __SK_REDIRECT: diff --git a/net/core/sock_map.c b/net/core/sock_map.c index dd53a7771d7e..e564fdeaada1 100644 --- a/net/core/sock_map.c +++ b/net/core/sock_map.c @@ -1540,6 +1540,7 @@ void sock_map_close(struct sock *sk, long timeout) saved_close = psock->saved_close; sock_map_remove_links(sk, psock); rcu_read_unlock(); + sk_psock_stop(psock, true); release_sock(sk); saved_close(sk, timeout); }