diff mbox series

[bpf-next,1/2] xsk: update rings for load-acquire/store-release semantics

Message ID 20210301104318.263262-2-bjorn.topel@gmail.com
State New
Headers show
Series load-acquire/store-release semantics for AF_XDP rings | expand

Commit Message

Björn Töpel March 1, 2021, 10:43 a.m. UTC
From: Björn Töpel <bjorn.topel@intel.com>

Currently, the AF_XDP rings uses smp_{r,w,}mb() fences on the
kernel-side. By updating the rings for load-acquire/store-release
semantics, the full barrier on the consumer side can be replaced with
improved performance as a nice side-effect.

Note that this change does *not* require similar changes on the
libbpf/userland side, however it is recommended [1].

On x86-64 systems, by removing the smp_mb() on the Rx and Tx side, the
l2fwd AF_XDP xdpsock sample performance increases by
1%. Weakly-ordered platforms, such as ARM64 might benefit even more.

[1] https://lore.kernel.org/bpf/20200316184423.GA14143@willie-the-truck/

Signed-off-by: Björn Töpel <bjorn.topel@intel.com>
---
 net/xdp/xsk_queue.h | 27 +++++++++++----------------
 1 file changed, 11 insertions(+), 16 deletions(-)

Comments

Björn Töpel March 2, 2021, 8:04 a.m. UTC | #1
On 2021-03-01 17:08, Toke Høiland-Jørgensen wrote:
> Björn Töpel <bjorn.topel@gmail.com> writes:

> 

>> From: Björn Töpel <bjorn.topel@intel.com>

>>

>> Currently, the AF_XDP rings uses smp_{r,w,}mb() fences on the

>> kernel-side. By updating the rings for load-acquire/store-release

>> semantics, the full barrier on the consumer side can be replaced with

>> improved performance as a nice side-effect.

>>

>> Note that this change does *not* require similar changes on the

>> libbpf/userland side, however it is recommended [1].

>>

>> On x86-64 systems, by removing the smp_mb() on the Rx and Tx side, the

>> l2fwd AF_XDP xdpsock sample performance increases by

>> 1%. Weakly-ordered platforms, such as ARM64 might benefit even more.

>>

>> [1] https://lore.kernel.org/bpf/20200316184423.GA14143@willie-the-truck/

>>

>> Signed-off-by: Björn Töpel <bjorn.topel@intel.com>

>> ---

>>   net/xdp/xsk_queue.h | 27 +++++++++++----------------

>>   1 file changed, 11 insertions(+), 16 deletions(-)

>>

>> diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h

>> index 2823b7c3302d..e24279d8d845 100644

>> --- a/net/xdp/xsk_queue.h

>> +++ b/net/xdp/xsk_queue.h

>> @@ -47,19 +47,18 @@ struct xsk_queue {

>>   	u64 queue_empty_descs;

>>   };

>>   

>> -/* The structure of the shared state of the rings are the same as the

>> - * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion

>> - * ring, the kernel is the producer and user space is the consumer. For

>> - * the Tx and fill rings, the kernel is the consumer and user space is

>> - * the producer.

>> +/* The structure of the shared state of the rings are a simple

>> + * circular buffer, as outlined in

>> + * Documentation/core-api/circular-buffers.rst. For the Rx and

>> + * completion ring, the kernel is the producer and user space is the

>> + * consumer. For the Tx and fill rings, the kernel is the consumer and

>> + * user space is the producer.

>>    *

>>    * producer                         consumer

>>    *

>> - * if (LOAD ->consumer) {           LOAD ->producer

>> - *                    (A)           smp_rmb()       (C)

>> + * if (LOAD ->consumer) {  (A)      LOAD.acq ->producer  (C)

> 

> Why is LOAD.acq not needed on the consumer side?

>


You mean why LOAD.acq is not needed on the *producer* side, i.e. the
->consumer? The ->consumer is a control dependency for the store, so
there is no ordering constraint for ->consumer at producer side. If
there's no space, no data is written. So, no barrier is needed there --
at least that has been my perspective.

This is very similar to the buffer in
Documentation/core-api/circular-buffers.rst. Roping in Paul for some
guidance.


Björn

> -Toke

>
diff mbox series

Patch

diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 2823b7c3302d..e24279d8d845 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -47,19 +47,18 @@  struct xsk_queue {
 	u64 queue_empty_descs;
 };
 
-/* The structure of the shared state of the rings are the same as the
- * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
- * ring, the kernel is the producer and user space is the consumer. For
- * the Tx and fill rings, the kernel is the consumer and user space is
- * the producer.
+/* The structure of the shared state of the rings are a simple
+ * circular buffer, as outlined in
+ * Documentation/core-api/circular-buffers.rst. For the Rx and
+ * completion ring, the kernel is the producer and user space is the
+ * consumer. For the Tx and fill rings, the kernel is the consumer and
+ * user space is the producer.
  *
  * producer                         consumer
  *
- * if (LOAD ->consumer) {           LOAD ->producer
- *                    (A)           smp_rmb()       (C)
+ * if (LOAD ->consumer) {  (A)      LOAD.acq ->producer  (C)
  *    STORE $data                   LOAD $data
- *    smp_wmb()       (B)           smp_mb()        (D)
- *    STORE ->producer              STORE ->consumer
+ *    STORE.rel ->producer (B)      STORE.rel ->consumer (D)
  * }
  *
  * (A) pairs with (D), and (B) pairs with (C).
@@ -227,15 +226,13 @@  static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
 
 static inline void __xskq_cons_release(struct xsk_queue *q)
 {
-	smp_mb(); /* D, matches A */
-	WRITE_ONCE(q->ring->consumer, q->cached_cons);
+	smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */
 }
 
 static inline void __xskq_cons_peek(struct xsk_queue *q)
 {
 	/* Refresh the local pointer */
-	q->cached_prod = READ_ONCE(q->ring->producer);
-	smp_rmb(); /* C, matches B */
+	q->cached_prod = smp_load_acquire(&q->ring->producer);  /* C, matches B */
 }
 
 static inline void xskq_cons_get_entries(struct xsk_queue *q)
@@ -397,9 +394,7 @@  static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
 
 static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
 {
-	smp_wmb(); /* B, matches C */
-
-	WRITE_ONCE(q->ring->producer, idx);
+	smp_store_release(&q->ring->producer, idx); /* B, matches C */
 }
 
 static inline void xskq_prod_submit(struct xsk_queue *q)