diff mbox series

[bpf-next] xsk: build skb by page

Message ID 9830fcef7159a47bae361fc213c589449f6a77d3.1608713585.git.xuanzhuo@linux.alibaba.com
State Superseded
Headers show
Series [bpf-next] xsk: build skb by page | expand

Commit Message

Xuan Zhuo Dec. 23, 2020, 8:56 a.m. UTC
This patch is used to construct skb based on page to save memory copy
overhead.

Taking into account the problem of addr unaligned, and the
possibility of frame size greater than page in the future.

Signed-off-by: Xuan Zhuo <xuanzhuo@linux.alibaba.com>
---
 net/xdp/xsk.c | 68 ++++++++++++++++++++++++++++++++++++++++++++---------------
 1 file changed, 51 insertions(+), 17 deletions(-)

Comments

John Fastabend Dec. 31, 2020, 4:29 p.m. UTC | #1
Xuan Zhuo wrote:
> This patch is used to construct skb based on page to save memory copy

> overhead.

> 

> Taking into account the problem of addr unaligned, and the

> possibility of frame size greater than page in the future.

> 

> Signed-off-by: Xuan Zhuo <xuanzhuo@linux.alibaba.com>

> ---

>  net/xdp/xsk.c | 68 ++++++++++++++++++++++++++++++++++++++++++++---------------

>  1 file changed, 51 insertions(+), 17 deletions(-)

> 

> diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c

> index ac4a317..7cab40f 100644

> --- a/net/xdp/xsk.c

> +++ b/net/xdp/xsk.c

> @@ -430,6 +430,55 @@ static void xsk_destruct_skb(struct sk_buff *skb)

>  	sock_wfree(skb);

>  }

>  

> +static struct sk_buff *xsk_build_skb_bypage(struct xdp_sock *xs, struct xdp_desc *desc)

> +{

> +	char *buffer;

> +	u64 addr;

> +	u32 len, offset, copy, copied;

> +	int err, i;

> +	struct page *page;

> +	struct sk_buff *skb;

> +

> +	skb = sock_alloc_send_skb(&xs->sk, 0, 1, &err);


Because this is just grabbing an skb did you consider build_skb?

> +	if (unlikely(!skb))

> +		return NULL;


I think it would be best to push err back to caller here with ERR_PTR().

> +

> +	addr = desc->addr;

> +	len = desc->len;

> +

> +	buffer = xsk_buff_raw_get_data(xs->pool, addr);

> +	offset = offset_in_page(buffer);

> +	addr = buffer - (char *)xs->pool->addrs;

> +

> +	for (copied = 0, i = 0; copied < len; ++i) {

> +		page = xs->pool->umem->pgs[addr >> PAGE_SHIFT];

> +

> +		get_page(page);


Is it obvious why this get_page() is needed? Maybe a small comment would
be nice. Something like, "we need to inc refcnt on page to ensure skb
does not release page from pool".

> +

> +		copy = min((u32)(PAGE_SIZE - offset), len - copied);

> +


nit: take it or leave it, seems like a lot of new lines imo. I would
just put all these together. Not really important though.

> +		skb_fill_page_desc(skb, i, page, offset, copy);

> +

> +		copied += copy;

> +		addr += copy;

> +		offset = 0;

> +	}

> +

> +	skb->len += len;

> +	skb->data_len += len;

> +	skb->truesize += len;

> +

> +	refcount_add(len, &xs->sk.sk_wmem_alloc);

> +

> +	skb->dev = xs->dev;

> +	skb->priority = xs->sk.sk_priority;

> +	skb->mark = xs->sk.sk_mark;

> +	skb_shinfo(skb)->destructor_arg = (void *)(long)addr;

> +	skb->destructor = xsk_destruct_skb;

> +

> +	return skb;

> +}

> +

>  static int xsk_generic_xmit(struct sock *sk)

>  {

>  	struct xdp_sock *xs = xdp_sk(sk);

> @@ -445,40 +494,25 @@ static int xsk_generic_xmit(struct sock *sk)

>  		goto out;

>  

>  	while (xskq_cons_peek_desc(xs->tx, &desc, xs->pool)) {

> -		char *buffer;

> -		u64 addr;

> -		u32 len;

> -

>  		if (max_batch-- == 0) {

>  			err = -EAGAIN;

>  			goto out;

>  		}

>  

> -		len = desc.len;

> -		skb = sock_alloc_send_skb(sk, len, 1, &err);

> +		skb = xsk_build_skb_bypage(xs, &desc);

>  		if (unlikely(!skb))


Is err set here? Either way if skb is an ERR_PTR we can use that
here for better error handling.

>  			goto out;

>  

> -		skb_put(skb, len);

> -		addr = desc.addr;

> -		buffer = xsk_buff_raw_get_data(xs->pool, addr);

> -		err = skb_store_bits(skb, 0, buffer, len);

>  		/* This is the backpressure mechanism for the Tx path.

>  		 * Reserve space in the completion queue and only proceed

>  		 * if there is space in it. This avoids having to implement

>  		 * any buffering in the Tx path.

>  		 */

> -		if (unlikely(err) || xskq_prod_reserve(xs->pool->cq)) {

> +		if (xskq_prod_reserve(xs->pool->cq)) {

>  			kfree_skb(skb);


Same here, do we need to set err now that its not explicit above in
err = skb_store_bits...

>  			goto out;

>  		}

>  

> -		skb->dev = xs->dev;

> -		skb->priority = sk->sk_priority;

> -		skb->mark = sk->sk_mark;

> -		skb_shinfo(skb)->destructor_arg = (void *)(long)desc.addr;

> -		skb->destructor = xsk_destruct_skb;

> -

>  		err = __dev_direct_xmit(skb, xs->queue_id);

>  		if  (err == NETDEV_TX_BUSY) {

>  			/* Tell user-space to retry the send */

> -- 

> 1.8.3.1

>
diff mbox series

Patch

diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index ac4a317..7cab40f 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -430,6 +430,55 @@  static void xsk_destruct_skb(struct sk_buff *skb)
 	sock_wfree(skb);
 }
 
+static struct sk_buff *xsk_build_skb_bypage(struct xdp_sock *xs, struct xdp_desc *desc)
+{
+	char *buffer;
+	u64 addr;
+	u32 len, offset, copy, copied;
+	int err, i;
+	struct page *page;
+	struct sk_buff *skb;
+
+	skb = sock_alloc_send_skb(&xs->sk, 0, 1, &err);
+	if (unlikely(!skb))
+		return NULL;
+
+	addr = desc->addr;
+	len = desc->len;
+
+	buffer = xsk_buff_raw_get_data(xs->pool, addr);
+	offset = offset_in_page(buffer);
+	addr = buffer - (char *)xs->pool->addrs;
+
+	for (copied = 0, i = 0; copied < len; ++i) {
+		page = xs->pool->umem->pgs[addr >> PAGE_SHIFT];
+
+		get_page(page);
+
+		copy = min((u32)(PAGE_SIZE - offset), len - copied);
+
+		skb_fill_page_desc(skb, i, page, offset, copy);
+
+		copied += copy;
+		addr += copy;
+		offset = 0;
+	}
+
+	skb->len += len;
+	skb->data_len += len;
+	skb->truesize += len;
+
+	refcount_add(len, &xs->sk.sk_wmem_alloc);
+
+	skb->dev = xs->dev;
+	skb->priority = xs->sk.sk_priority;
+	skb->mark = xs->sk.sk_mark;
+	skb_shinfo(skb)->destructor_arg = (void *)(long)addr;
+	skb->destructor = xsk_destruct_skb;
+
+	return skb;
+}
+
 static int xsk_generic_xmit(struct sock *sk)
 {
 	struct xdp_sock *xs = xdp_sk(sk);
@@ -445,40 +494,25 @@  static int xsk_generic_xmit(struct sock *sk)
 		goto out;
 
 	while (xskq_cons_peek_desc(xs->tx, &desc, xs->pool)) {
-		char *buffer;
-		u64 addr;
-		u32 len;
-
 		if (max_batch-- == 0) {
 			err = -EAGAIN;
 			goto out;
 		}
 
-		len = desc.len;
-		skb = sock_alloc_send_skb(sk, len, 1, &err);
+		skb = xsk_build_skb_bypage(xs, &desc);
 		if (unlikely(!skb))
 			goto out;
 
-		skb_put(skb, len);
-		addr = desc.addr;
-		buffer = xsk_buff_raw_get_data(xs->pool, addr);
-		err = skb_store_bits(skb, 0, buffer, len);
 		/* This is the backpressure mechanism for the Tx path.
 		 * Reserve space in the completion queue and only proceed
 		 * if there is space in it. This avoids having to implement
 		 * any buffering in the Tx path.
 		 */
-		if (unlikely(err) || xskq_prod_reserve(xs->pool->cq)) {
+		if (xskq_prod_reserve(xs->pool->cq)) {
 			kfree_skb(skb);
 			goto out;
 		}
 
-		skb->dev = xs->dev;
-		skb->priority = sk->sk_priority;
-		skb->mark = sk->sk_mark;
-		skb_shinfo(skb)->destructor_arg = (void *)(long)desc.addr;
-		skb->destructor = xsk_destruct_skb;
-
 		err = __dev_direct_xmit(skb, xs->queue_id);
 		if  (err == NETDEV_TX_BUSY) {
 			/* Tell user-space to retry the send */