diff mbox series

[bpf-next,1/3] net: veth: introduce bulking for XDP_PASS

Message ID adca75284e30320e9d692d618a6349319d9340f3.1611685778.git.lorenzo@kernel.org
State New
Headers show
Series veth: add skb bulking allocation for XDP_PASS | expand

Commit Message

Lorenzo Bianconi Jan. 26, 2021, 6:41 p.m. UTC
Introduce bulking support for XDP_PASS verdict forwarding skbs to
the networking stack

Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>
---
 drivers/net/veth.c | 43 ++++++++++++++++++++++++++-----------------
 1 file changed, 26 insertions(+), 17 deletions(-)

Comments

Jesper Dangaard Brouer Jan. 28, 2021, 2:06 p.m. UTC | #1
On Tue, 26 Jan 2021 19:41:59 +0100
Lorenzo Bianconi <lorenzo@kernel.org> wrote:

> Introduce bulking support for XDP_PASS verdict forwarding skbs to

> the networking stack

> 

> Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>

> ---

>  drivers/net/veth.c | 43 ++++++++++++++++++++++++++-----------------

>  1 file changed, 26 insertions(+), 17 deletions(-)

> 

> diff --git a/drivers/net/veth.c b/drivers/net/veth.c

> index 6e03b619c93c..23137d9966da 100644

> --- a/drivers/net/veth.c

> +++ b/drivers/net/veth.c

> @@ -35,6 +35,7 @@

>  #define VETH_XDP_HEADROOM	(XDP_PACKET_HEADROOM + NET_IP_ALIGN)

>  

>  #define VETH_XDP_TX_BULK_SIZE	16

> +#define VETH_XDP_BATCH		8

>


I suspect that VETH_XDP_BATCH = 8 is not the optimal value.

You have taken this value from CPUMAP code, which cannot be generalized
to this case.  The optimal value for CPUMAP is actually to bulk dequeue
16 frames from ptr_ring, but there is a prefetch in one of the loops,
which should not be larger than 10, due to the Intel Line-Fill-Buffer
cannot have more than 10 out-standing prefetch instructions in flight.
(Yes, I measured this[1] with perf stat, when coding that)

Could you please test with 16, to see if results are better?

In this veth case, we will likely be started on the same CPU that
received the xdp_frames.  Thus, things are likely hot in cache, and we
don't have to care so much about moving cachelines across CPUs.  So, I
don't expect it will make much difference.


[1] https://github.com/xdp-project/xdp-project/blob/master/areas/cpumap/cpumap02-optimizations.org
-- 
Best regards,
  Jesper Dangaard Brouer
  MSc.CS, Principal Kernel Engineer at Red Hat
  LinkedIn: http://www.linkedin.com/in/brouer
Toshiaki Makita Jan. 28, 2021, 3:17 p.m. UTC | #2
On 2021/01/27 3:41, Lorenzo Bianconi wrote:
> Introduce bulking support for XDP_PASS verdict forwarding skbs to

> the networking stack

> 

> Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>

> ---

>   drivers/net/veth.c | 43 ++++++++++++++++++++++++++-----------------

>   1 file changed, 26 insertions(+), 17 deletions(-)

> 

> diff --git a/drivers/net/veth.c b/drivers/net/veth.c

> index 6e03b619c93c..23137d9966da 100644

> --- a/drivers/net/veth.c

> +++ b/drivers/net/veth.c

> @@ -35,6 +35,7 @@

>   #define VETH_XDP_HEADROOM	(XDP_PACKET_HEADROOM + NET_IP_ALIGN)

>   

>   #define VETH_XDP_TX_BULK_SIZE	16

> +#define VETH_XDP_BATCH		8

>   

>   struct veth_stats {

>   	u64	rx_drops;

> @@ -787,27 +788,35 @@ static int veth_xdp_rcv(struct veth_rq *rq, int budget,

>   	int i, done = 0;

>   

>   	for (i = 0; i < budget; i++) {

> -		void *ptr = __ptr_ring_consume(&rq->xdp_ring);

> -		struct sk_buff *skb;

> +		void *frames[VETH_XDP_BATCH];

> +		void *skbs[VETH_XDP_BATCH];

> +		int i, n_frame, n_skb = 0;


'i' is a shadowed variable. I think this may be confusing.

>   

> -		if (!ptr)

> +		n_frame = __ptr_ring_consume_batched(&rq->xdp_ring, frames,

> +						     VETH_XDP_BATCH);


This apparently exceeds the budget.
This will process budget*VETH_XDP_BATCH packets at most.
(You are probably aware of this because you return 'i' instead of 'done'?)

Also I'm not sure if we need to introduce __ptr_ring_consume_batched() here.
The function just does __ptr_ring_consume() n times.

IIUC Your final code looks like this:

for (budget) {
	n_frame = __ptr_ring_consume_batched(VETH_XDP_BATCH);
	for (n_frame) {
		if (frame is XDP)
			xdpf[n_xdpf++] = to_xdp(frame);
		else
			skbs[n_skb++] = frame;
	}

	if (n_xdpf)
		veth_xdp_rcv_batch(xdpf);

	for (n_skb) {
		skb = veth_xdp_rcv_skb(skbs[i]);
		napi_gro_receive(skb);
	}
}

Your code processes VETH_XDP_BATCH packets at a time no matter whether each of them 
is xdp_frame or skb, but I think you actually want to process VETH_XDP_BATCH 
xdp_frames at a time?
Then, why not doing like this?

for (budget) {
	ptr = __ptr_ring_consume();
	if (ptr is XDP) {
		if (n_xdpf >= VETH_XDP_BATCH) {
			veth_xdp_rcv_batch(xdpf);
			n_xdpf = 0;
		}
		xdpf[n_xdpf++] = to_xdp(ptr);
	} else {
		skb = veth_xdp_rcv_skb(ptr);
		napi_gro_receive(skb);
	}
}
if (n_xdpf)
	veth_xdp_rcv_batch(xdpf);

Toshiaki Makita
Lorenzo Bianconi Jan. 28, 2021, 5:41 p.m. UTC | #3
> On 2021/01/27 3:41, Lorenzo Bianconi wrote:

> > Introduce bulking support for XDP_PASS verdict forwarding skbs to

> > the networking stack

> > 

> > Signed-off-by: Lorenzo Bianconi <lorenzo@kernel.org>

> > ---

> >   drivers/net/veth.c | 43 ++++++++++++++++++++++++++-----------------

> >   1 file changed, 26 insertions(+), 17 deletions(-)

> > 

> > diff --git a/drivers/net/veth.c b/drivers/net/veth.c

> > index 6e03b619c93c..23137d9966da 100644

> > --- a/drivers/net/veth.c

> > +++ b/drivers/net/veth.c

> > @@ -35,6 +35,7 @@

> >   #define VETH_XDP_HEADROOM	(XDP_PACKET_HEADROOM + NET_IP_ALIGN)

> >   #define VETH_XDP_TX_BULK_SIZE	16

> > +#define VETH_XDP_BATCH		8

> >   struct veth_stats {

> >   	u64	rx_drops;

> > @@ -787,27 +788,35 @@ static int veth_xdp_rcv(struct veth_rq *rq, int budget,

> >   	int i, done = 0;

> >   	for (i = 0; i < budget; i++) {

> > -		void *ptr = __ptr_ring_consume(&rq->xdp_ring);

> > -		struct sk_buff *skb;

> > +		void *frames[VETH_XDP_BATCH];

> > +		void *skbs[VETH_XDP_BATCH];

> > +		int i, n_frame, n_skb = 0;

> 

> 'i' is a shadowed variable. I think this may be confusing.


ack, I will fix it in v2

> 

> > -		if (!ptr)

> > +		n_frame = __ptr_ring_consume_batched(&rq->xdp_ring, frames,

> > +						     VETH_XDP_BATCH);

> 

> This apparently exceeds the budget.

> This will process budget*VETH_XDP_BATCH packets at most.

> (You are probably aware of this because you return 'i' instead of 'done'?)


right, I will fix it in v2

> 

> Also I'm not sure if we need to introduce __ptr_ring_consume_batched() here.

> The function just does __ptr_ring_consume() n times.

> 

> IIUC Your final code looks like this:

> 

> for (budget) {

> 	n_frame = __ptr_ring_consume_batched(VETH_XDP_BATCH);

> 	for (n_frame) {

> 		if (frame is XDP)

> 			xdpf[n_xdpf++] = to_xdp(frame);

> 		else

> 			skbs[n_skb++] = frame;

> 	}

> 

> 	if (n_xdpf)

> 		veth_xdp_rcv_batch(xdpf);

> 

> 	for (n_skb) {

> 		skb = veth_xdp_rcv_skb(skbs[i]);

> 		napi_gro_receive(skb);

> 	}

> }

> 

> Your code processes VETH_XDP_BATCH packets at a time no matter whether each

> of them is xdp_frame or skb, but I think you actually want to process

> VETH_XDP_BATCH xdp_frames at a time?

> Then, why not doing like this?

> 

> for (budget) {

> 	ptr = __ptr_ring_consume();

> 	if (ptr is XDP) {

> 		if (n_xdpf >= VETH_XDP_BATCH) {

> 			veth_xdp_rcv_batch(xdpf);

> 			n_xdpf = 0;

> 		}

> 		xdpf[n_xdpf++] = to_xdp(ptr);

> 	} else {

> 		skb = veth_xdp_rcv_skb(ptr);

> 		napi_gro_receive(skb);

> 	}

> }

> if (n_xdpf)

> 	veth_xdp_rcv_batch(xdpf);


I agree, the code is more readable. I will fix it in v2.
I guess we can drop patch 2/3 and squash patch 1/3 and 3/3.

Regards,
Lorenzo

> 

> Toshiaki Makita
diff mbox series

Patch

diff --git a/drivers/net/veth.c b/drivers/net/veth.c
index 6e03b619c93c..23137d9966da 100644
--- a/drivers/net/veth.c
+++ b/drivers/net/veth.c
@@ -35,6 +35,7 @@ 
 #define VETH_XDP_HEADROOM	(XDP_PACKET_HEADROOM + NET_IP_ALIGN)
 
 #define VETH_XDP_TX_BULK_SIZE	16
+#define VETH_XDP_BATCH		8
 
 struct veth_stats {
 	u64	rx_drops;
@@ -787,27 +788,35 @@  static int veth_xdp_rcv(struct veth_rq *rq, int budget,
 	int i, done = 0;
 
 	for (i = 0; i < budget; i++) {
-		void *ptr = __ptr_ring_consume(&rq->xdp_ring);
-		struct sk_buff *skb;
+		void *frames[VETH_XDP_BATCH];
+		void *skbs[VETH_XDP_BATCH];
+		int i, n_frame, n_skb = 0;
 
-		if (!ptr)
+		n_frame = __ptr_ring_consume_batched(&rq->xdp_ring, frames,
+						     VETH_XDP_BATCH);
+		if (!n_frame)
 			break;
 
-		if (veth_is_xdp_frame(ptr)) {
-			struct xdp_frame *frame = veth_ptr_to_xdp(ptr);
+		for (i = 0; i < n_frame; i++) {
+			void *f = frames[i];
+			struct sk_buff *skb;
 
-			stats->xdp_bytes += frame->len;
-			skb = veth_xdp_rcv_one(rq, frame, bq, stats);
-		} else {
-			skb = ptr;
-			stats->xdp_bytes += skb->len;
-			skb = veth_xdp_rcv_skb(rq, skb, bq, stats);
-		}
-
-		if (skb)
-			napi_gro_receive(&rq->xdp_napi, skb);
+			if (veth_is_xdp_frame(f)) {
+				struct xdp_frame *frame = veth_ptr_to_xdp(f);
 
-		done++;
+				stats->xdp_bytes += frame->len;
+				skb = veth_xdp_rcv_one(rq, frame, bq, stats);
+			} else {
+				skb = f;
+				stats->xdp_bytes += skb->len;
+				skb = veth_xdp_rcv_skb(rq, skb, bq, stats);
+			}
+			if (skb)
+				skbs[n_skb++] = skb;
+		}
+		for (i = 0; i < n_skb; i++)
+			napi_gro_receive(&rq->xdp_napi, skbs[i]);
+		done += n_frame;
 	}
 
 	u64_stats_update_begin(&rq->stats.syncp);
@@ -818,7 +827,7 @@  static int veth_xdp_rcv(struct veth_rq *rq, int budget,
 	rq->stats.vs.xdp_packets += done;
 	u64_stats_update_end(&rq->stats.syncp);
 
-	return done;
+	return i;
 }
 
 static int veth_poll(struct napi_struct *napi, int budget)