From patchwork Mon Feb 24 20:39:31 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Honnappa Nagarahalli X-Patchwork-Id: 183779 Delivered-To: patch@linaro.org Received: by 2002:a92:1f12:0:0:0:0:0 with SMTP id i18csp833376ile; Mon, 24 Feb 2020 12:39:58 -0800 (PST) X-Google-Smtp-Source: APXvYqziAw5ze6WkmWWAfAfs6twzLfA38a0H8zU0fHk9XpQDUApI1x8R9/wtwqE6e2V1h9FdyGQ8 X-Received: by 2002:aa7:d34f:: with SMTP id m15mr47585473edr.378.1582576797904; Mon, 24 Feb 2020 12:39:57 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1582576797; cv=none; d=google.com; s=arc-20160816; b=FhjrgB3uuevTEIJ72VCuxwFjF8UFk1DlmqZbdFcjXfKrjbGzf/siVqLEA78uKGTB83 Ysf2qBxy6py6UFRs9zlqm9ZSNiNAkGfv6fDIGByhpIAM7flVXpxo59aqPf2f0dUIP2ye xlS9GgX2lvx00Ti9wBSHRgxa7coHrMeSCgMpm75IT9TeTPfFrNg4Ds7Lvvf6lJoStjTv Vp6Xqu866KtBrDhYWzwYkfx9LTqt7D1GUu1Kzj+hu3bBzJydk2T8lngrFWYh0WE30J26 +AD9CetZGFsE+7X7gKyysO5AdhhMT5NP7IF+C0TXXYyoAEsQZjD/Y3uVMWOQWW+lL/F2 xvhA== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:list-subscribe:list-help:list-post:list-archive :list-unsubscribe:list-id:precedence:subject:references:in-reply-to :message-id:date:cc:to:from; bh=F01qZ//eFoALRkZ/NLCKn7yO6R8E55Ll4r0LSAjIrAE=; b=KjFwqypVTcINnMTa43/HrnCE5XL74crcxlf63s0IN625rTIWPG/HxuOrHlAm3Zy33c 4AFCCDIpgkA0up3IjUP84bh0faWh53WOvKzxCMlmI6caQvucUqJv10uwwk+22a5lRqAn QqlVuz0qT+sBPWTJS6X01+nY9Bg6tkVHba0EVG1beEGo6jyFOfcz8nj9hAqdifMXwU/Q vNi3E2ygP/LR6AjcVWV9JT5qILKplhAwgpNXeHpHXwN9SYxOj5vJ7UvIStGJl3LQ05DR 31/EkSz2gvO7zxdLpxr9IyY3S41LmlJDVeNaiWyUKOVSk+e0F0QqTxVYfsgeB99RtE2B eh/w== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of dev-bounces@dpdk.org designates 92.243.14.124 as permitted sender) smtp.mailfrom=dev-bounces@dpdk.org Return-Path: Received: from dpdk.org (dpdk.org. [92.243.14.124]) by mx.google.com with ESMTP id pw3si7531775ejb.244.2020.02.24.12.39.57; Mon, 24 Feb 2020 12:39:57 -0800 (PST) Received-SPF: pass (google.com: domain of dev-bounces@dpdk.org designates 92.243.14.124 as permitted sender) client-ip=92.243.14.124; Authentication-Results: mx.google.com; spf=pass (google.com: domain of dev-bounces@dpdk.org designates 92.243.14.124 as permitted sender) smtp.mailfrom=dev-bounces@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id D558E1BFAC; Mon, 24 Feb 2020 21:39:50 +0100 (CET) Received: from foss.arm.com (foss.arm.com [217.140.110.172]) by dpdk.org (Postfix) with ESMTP id 5ED441BFAA for ; Mon, 24 Feb 2020 21:39:49 +0100 (CET) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.121.207.14]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id C70DD31B; Mon, 24 Feb 2020 12:39:48 -0800 (PST) Received: from qc2400f-1.austin.arm.com (qc2400f-1.austin.arm.com [10.118.14.48]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPSA id AFBDB3F534; Mon, 24 Feb 2020 12:39:48 -0800 (PST) From: Honnappa Nagarahalli To: olivier.matz@6wind.com, konstantin.ananyev@intel.com Cc: honnappa.nagarahalli@arm.com, gavin.hu@arm.com, dev@dpdk.org, nd@arm.com Date: Mon, 24 Feb 2020 14:39:31 -0600 Message-Id: <20200224203931.21256-2-honnappa.nagarahalli@arm.com> X-Mailer: git-send-email 2.17.1 In-Reply-To: <20200224203931.21256-1-honnappa.nagarahalli@arm.com> References: <20200224203931.21256-1-honnappa.nagarahalli@arm.com> Subject: [dpdk-dev] [RFC 1/1] lib/ring: add scatter gather and serial dequeue APIs X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Add scatter gather APIs to avoid intermediate memcpy. Serial dequeue APIs are added to support access to ring elements before actual dequeue. Signed-off-by: Honnappa Nagarahalli Reviewed-by: Gavin Hu Reviewed-by: Ola Liljedahl --- lib/librte_ring/Makefile | 1 + lib/librte_ring/meson.build | 1 + lib/librte_ring/rte_ring_c11_mem.h | 98 +++++++ lib/librte_ring/rte_ring_elem_sg.h | 417 +++++++++++++++++++++++++++++ lib/librte_ring/rte_ring_generic.h | 93 +++++++ 5 files changed, 610 insertions(+) create mode 100644 lib/librte_ring/rte_ring_elem_sg.h -- 2.17.1 diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 917c560ad..824e4a9bb 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -17,6 +17,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c # install includes SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ rte_ring_elem.h \ + rte_ring_elem_sg.h \ rte_ring_generic.h \ rte_ring_c11_mem.h diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index f2f3ccc88..30115ad7c 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -4,6 +4,7 @@ sources = files('rte_ring.c') headers = files('rte_ring.h', 'rte_ring_elem.h', + 'rte_ring_elem_sg.h', 'rte_ring_c11_mem.h', 'rte_ring_generic.h') diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h index 0fb73a337..dcae8bcc0 100644 --- a/lib/librte_ring/rte_ring_c11_mem.h +++ b/lib/librte_ring/rte_ring_c11_mem.h @@ -178,4 +178,102 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, return n; } +/** + * @internal This function updates the consumer head if there are no + * prior reserved elements on the ring. + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements to dequeue, i.e. how far should the head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head_serial(struct rte_ring *r, int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + uint32_t prod_tail; + uint32_t cons_tail; + int success; + + /* move cons.head atomically */ + *old_head = __atomic_load_n(&r->cons.head, __ATOMIC_RELAXED); + do { + /* Restore n as it may change every loop */ + n = max; + + /* Load the cons.tail and ensure that it is the + * same as cons.head. load-acquire synchronizes + * with the store-release in update_tail. + */ + cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE); + if (*old_head != cons_tail) { + rte_pause(); + *old_head = __atomic_load_n(&r->cons.head, + __ATOMIC_RELAXED); + success = 0; + continue; + } + + /* this load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + prod_tail = __atomic_load_n(&r->prod.tail, + __ATOMIC_ACQUIRE); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = (prod_tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) + r->cons.head = *new_head, success = 1; + else + /* on failure, *old_head will be updated */ + success = __atomic_compare_exchange_n(&r->cons.head, + old_head, *new_head, + 0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED); + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal Discard reserved ring elements + * + * @param ht + * A pointer to the ring's head-tail structure + */ +static __rte_always_inline void +__rte_ring_revert_head(struct rte_ring_headtail *ht) +{ + __atomic_store_n(&ht->head, ht->tail, __ATOMIC_RELAXED); +} + #endif /* _RTE_RING_C11_MEM_H_ */ diff --git a/lib/librte_ring/rte_ring_elem_sg.h b/lib/librte_ring/rte_ring_elem_sg.h new file mode 100644 index 000000000..a73f4fbfe --- /dev/null +++ b/lib/librte_ring/rte_ring_elem_sg.h @@ -0,0 +1,417 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2020 Arm Limited + * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_ELEM_SG_H_ +#define _RTE_RING_ELEM_SG_H_ + +/** + * @file + * RTE Ring with + * 1) user defined element size + * 2) scatter gather feature to copy objects to/from the ring + * 3) ability to reserve, consume/discard elements in the ring + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rte_ring.h" +#include "rte_ring_elem.h" + +/* Between load and load. there might be cpu reorder in weak model + * (powerpc/arm). + * There are 2 choices for the users + * 1.use rmb() memory barrier + * 2.use one-direction load_acquire/store_release barrier,defined by + * CONFIG_RTE_USE_C11_MEM_MODEL=y + * It depends on performance test results. + * By default, move common functions to rte_ring_generic.h + */ +#ifdef RTE_USE_C11_MEM_MODEL +#include "rte_ring_c11_mem.h" +#else +#include "rte_ring_generic.h" +#endif + +static __rte_always_inline void +__rte_ring_get_elem_addr_64(struct rte_ring *r, uint32_t head, + uint32_t num, void **dst1, uint32_t *n1, void **dst2) +{ + uint32_t idx = head & r->mask; + uint64_t *ring = (uint64_t *)&r[1]; + + *dst1 = ring + idx; + *n1 = num; + + if (idx + num > r->size) { + *n1 = num - (r->size - idx - 1); + *dst2 = ring; + } +} + +static __rte_always_inline void +__rte_ring_get_elem_addr_128(struct rte_ring *r, uint32_t head, + uint32_t num, void **dst1, uint32_t *n1, void **dst2) +{ + uint32_t idx = head & r->mask; + rte_int128_t *ring = (rte_int128_t *)&r[1]; + + *dst1 = ring + idx; + *n1 = num; + + if (idx + num > r->size) { + *n1 = num - (r->size - idx - 1); + *dst2 = ring; + } +} + +static __rte_always_inline void +__rte_ring_get_elem_addr(struct rte_ring *r, uint32_t head, + uint32_t esize, uint32_t num, void **dst1, uint32_t *n1, void **dst2) +{ + if (esize == 8) + return __rte_ring_get_elem_addr_64(r, head, + num, dst1, n1, dst2); + else if (esize == 16) + return __rte_ring_get_elem_addr_128(r, head, + num, dst1, n1, dst2); + else { + uint32_t idx, scale, nr_idx; + uint32_t *ring = (uint32_t *)&r[1]; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + idx = head & r->mask; + nr_idx = idx * scale; + + *dst1 = ring + nr_idx; + *n1 = num; + + if (idx + num > r->size) { + *n1 = num - (r->size - idx - 1); + *dst2 = ring; + } + } +} + +/** + * @internal Reserve ring elements to enqueue several objects on the ring + * + * @param r + * A pointer to the ring structure. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of elements to reserve in the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Reserve a fixed number of elements from a ring + * RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible from ring + * @param is_sp + * Indicates whether to use single producer or multi-producer reserve + * @param old_head + * Producer's head index before reservation. + * @param new_head + * Producer's head index after reservation. + * @param free_space + * returns the amount of space after the reserve operation has finished. + * It is not updated if the number of reserved elements is zero. + * @param dst1 + * Pointer to location in the ring to copy the data. + * @param n1 + * Number of elements to copy at dst1 + * @param dst2 + * In case of ring wrap around, this pointer provides the location to + * copy the remaining elements. The number of elements to copy at this + * location is equal to (number of elements reserved - n1) + * @return + * Actual number of elements reserved. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize, + unsigned int n, enum rte_ring_queue_behavior behavior, + unsigned int is_sp, unsigned int *old_head, + unsigned int *new_head, unsigned int *free_space, + void **dst1, unsigned int *n1, void **dst2) +{ + uint32_t free_entries; + + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, + old_head, new_head, &free_entries); + + if (n == 0) + goto end; + + __rte_ring_get_elem_addr(r, *old_head, esize, n, dst1, n1, dst2); + + if (free_space != NULL) + *free_space = free_entries - n; + +end: + return n; +} + +/** + * @internal Consume previously reserved ring elements (for enqueue) + * + * @param r + * A pointer to the ring structure. + * @param old_head + * Producer's head index before reservation. + * @param new_head + * Producer's head index after reservation. + * @param is_sp + * Indicates whether to use single producer or multi-producer head update + */ +static __rte_always_inline void +__rte_ring_do_enqueue_elem_commit(struct rte_ring *r, + unsigned int old_head, unsigned int new_head, + unsigned int is_sp) +{ + update_tail(&r->prod, old_head, new_head, is_sp, 1); +} + +/** + * Reserve one element for enqueuing one object on a ring + * (multi-producers safe). Application must call + * 'rte_ring_mp_enqueue_elem_commit' to complete the enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param old_head + * Producer's head index before reservation. The same should be passed to + * 'rte_ring_mp_enqueue_elem_commit' function. + * @param new_head + * Producer's head index after reservation. The same should be passed to + * 'rte_ring_mp_enqueue_elem_commit' function. + * @param free_space + * Returns the amount of space after the reservation operation has finished. + * It is not updated if the number of reserved elements is zero. + * @param dst + * Pointer to location in the ring to copy the data. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to reserve; no element is reserved. + */ +static __rte_always_inline int +rte_ring_mp_enqueue_elem_reserve(struct rte_ring *r, unsigned int esize, + unsigned int *old_head, unsigned int *new_head, + unsigned int *free_space, void **dst) +{ + unsigned int n; + + return __rte_ring_do_enqueue_elem_reserve(r, esize, 1, + RTE_RING_QUEUE_FIXED, 0, old_head, new_head, + free_space, dst, &n, NULL) ? 0 : -ENOBUFS; +} + +/** + * Consume previously reserved elements (for enqueue) in a ring + * (multi-producers safe). This API completes the enqueue operation. + * + * @param r + * A pointer to the ring structure. + * @param old_head + * Producer's head index before reservation. This value was returned + * when the API 'rte_ring_mp_enqueue_elem_reserve' was called. + * @param new_head + * Producer's head index after reservation. This value was returned + * when the API 'rte_ring_mp_enqueue_elem_reserve' was called. + */ +static __rte_always_inline void +rte_ring_mp_enqueue_elem_commit(struct rte_ring *r, unsigned int old_head, + unsigned int new_head) +{ + __rte_ring_do_enqueue_elem_commit(r, old_head, new_head, 0); +} + +/** + * @internal Reserve elements to dequeue several objects on the ring. + * This function blocks if there are elements reserved already. + * + * @param r + * A pointer to the ring structure. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to reserve in the ring + * @param behavior + * RTE_RING_QUEUE_FIXED: Reserve fixed number of elements in a ring + * RTE_RING_QUEUE_VARIABLE: Reserve as many elements as possible in a ring + * @param is_sc + * Indicates whether to use single consumer or multi-consumer head update + * @param old_head + * Consumer's head index before reservation. + * @param new_head + * Consumer's head index after reservation. + * @param available + * returns the number of remaining ring elements after the reservation + * It is not updated if the number of reserved elements is zero. + * @param src1 + * Pointer to location in the ring to copy the data from. + * @param n1 + * Number of elements to copy from src1 + * @param src2 + * In case of wrap around in the ring, this pointer provides the location + * to copy the remaining elements from. The number of elements to copy from + * this pointer is equal to (number of elements reserved - n1) + * @return + * Actual number of elements reserved. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue_elem_reserve_serial(struct rte_ring *r, + unsigned int esize, unsigned int n, + enum rte_ring_queue_behavior behavior, unsigned int is_sc, + unsigned int *old_head, unsigned int *new_head, + unsigned int *available, void **src1, unsigned int *n1, + void **src2) +{ + uint32_t entries; + + n = __rte_ring_move_cons_head_serial(r, is_sc, n, behavior, + old_head, new_head, &entries); + + if (n == 0) + goto end; + + __rte_ring_get_elem_addr(r, *old_head, esize, n, src1, n1, src2); + + if (available != NULL) + *available = entries - n; + +end: + return n; +} + +/** + * @internal Consume previously reserved ring elements (for dequeue) + * + * @param r + * A pointer to the ring structure. + * @param old_head + * Consumer's head index before reservation. + * @param new_head + * Consumer's head index after reservation. + * @param is_sc + * Indicates whether to use single consumer or multi-consumer head update + */ +static __rte_always_inline void +__rte_ring_do_dequeue_elem_commit(struct rte_ring *r, + unsigned int old_head, unsigned int new_head, + unsigned int is_sc) +{ + update_tail(&r->cons, old_head, new_head, is_sc, 1); +} + +/** + * Reserve one element on a ring for dequeue. This function blocks if there + * are elements reserved already. Application must call + * 'rte_ring_do_dequeue_elem_commit' or + * `rte_ring_do_dequeue_elem_revert_serial' to complete the dequeue operation. + * + * @param r + * A pointer to the ring structure. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param old_head + * Consumer's head index before reservation. The same should be passed to + * 'rte_ring_dequeue_elem_commit' function. + * @param new_head + * Consumer's head index after reservation. The same should be passed to + * 'rte_ring_dequeue_elem_commit' function. + * @param available + * returns the number of remaining ring elements after the reservation + * It is not updated if the number of reserved elements is zero. + * @param src + * Pointer to location in the ring to copy the data from. + * @return + * - 0: Success; elements reserved + * - -ENOBUFS: Not enough room in the ring; no element is reserved. + */ +static __rte_always_inline int +rte_ring_dequeue_elem_reserve_serial(struct rte_ring *r, unsigned int esize, + unsigned int *old_head, unsigned int *new_head, + unsigned int *available, void **src) +{ + unsigned int n; + + return __rte_ring_do_dequeue_elem_reserve_serial(r, esize, 1, + RTE_RING_QUEUE_FIXED, r->cons.single, old_head, + new_head, available, src, &n, NULL) ? 0 : -ENOBUFS; +} + +/** + * Consume previously reserved elements (for dequeue) in a ring + * (multi-consumer safe). + * + * @param r + * A pointer to the ring structure. + * @param old_head + * Consumer's head index before reservation. This value was returned + * when the API 'rte_ring_dequeue_elem_reserve_xxx' was called. + * @param new_head + * Consumer's head index after reservation. This value was returned + * when the API 'rte_ring_dequeue_elem_reserve_xxx' was called. + */ +static __rte_always_inline void +rte_ring_dequeue_elem_commit(struct rte_ring *r, unsigned int old_head, + unsigned int new_head) +{ + __rte_ring_do_dequeue_elem_commit(r, old_head, new_head, + r->cons.single); +} + +/** + * Discard previously reserved elements (for dequeue) in a ring. + * + * @warning + * This API can be called only if the ring elements were reserved + * using 'rte_ring_dequeue_xxx_elem_reserve_serial' APIs. + * + * @param r + * A pointer to the ring structure. + */ +static __rte_always_inline void +rte_ring_dequeue_elem_revert_serial(struct rte_ring *r) +{ + __rte_ring_revert_head(&r->cons); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_ELEM_SG_H_ */ diff --git a/lib/librte_ring/rte_ring_generic.h b/lib/librte_ring/rte_ring_generic.h index 953cdbbd5..8d7a7ffcc 100644 --- a/lib/librte_ring/rte_ring_generic.h +++ b/lib/librte_ring/rte_ring_generic.h @@ -170,4 +170,97 @@ __rte_ring_move_cons_head(struct rte_ring *r, unsigned int is_sc, return n; } +/** + * @internal This function updates the consumer head if there are no + * prior reserved elements on the ring. + * + * @param r + * A pointer to the ring structure + * @param is_sc + * Indicates whether multi-consumer path is needed or not + * @param n + * The number of elements we will want to dequeue, i.e. how far should the + * head be moved + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param old_head + * Returns head value as it was before the move, i.e. where dequeue starts + * @param new_head + * Returns the current/new head value i.e. where dequeue finishes + * @param entries + * Returns the number of entries in the ring BEFORE head was moved + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_move_cons_head_serial(struct rte_ring *r, unsigned int is_sc, + unsigned int n, enum rte_ring_queue_behavior behavior, + uint32_t *old_head, uint32_t *new_head, + uint32_t *entries) +{ + unsigned int max = n; + int success; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = max; + + *old_head = r->cons.head; + + /* add rmb barrier to avoid load/load reorder in weak + * memory model. It is noop on x86 + */ + rte_smp_rmb(); + + /* Ensure that cons.tail and cons.head are the same */ + if (*old_head != r->cons.tail) { + rte_pause(); + + success = 0; + continue; + } + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = (r->prod.tail - *old_head); + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + return 0; + + *new_head = *old_head + n; + if (is_sc) { + r->cons.head = *new_head; + rte_smp_rmb(); + success = 1; + } else { + success = rte_atomic32_cmpset(&r->cons.head, *old_head, + *new_head); + } + } while (unlikely(success == 0)); + return n; +} + +/** + * @internal This function updates the head to match the tail + * + * @param ht + * A pointer to the ring's head-tail structure + */ +static __rte_always_inline void +__rte_ring_revert_head(struct rte_ring_headtail *ht) +{ + /* Discard the reserved ring elements. */ + ht->head = ht->tail; +} + #endif /* _RTE_RING_GENERIC_H_ */