[v4,1/4] lib/rcu: add resource reclamation APIs

Message ID 20200403184142.7729-2-honnappa.nagarahalli@arm.com
State New
Headers show
Series
  • Add RCU reclamation APIs
Related show

Commit Message

Honnappa Nagarahalli April 3, 2020, 6:41 p.m.
Add resource reclamation APIs to make it simple for applications
and libraries to integrate rte_rcu library.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

Reviewed-by: Ola Liljedhal <ola.liljedhal@arm.com>

Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>

---
 lib/librte_rcu/Makefile            |   2 +-
 lib/librte_rcu/meson.build         |   2 +
 lib/librte_rcu/rcu_qsbr_pvt.h      |  57 +++++++
 lib/librte_rcu/rte_rcu_qsbr.c      | 243 ++++++++++++++++++++++++++++-
 lib/librte_rcu/rte_rcu_qsbr.h      | 188 ++++++++++++++++++++++
 lib/librte_rcu/rte_rcu_version.map |   4 +
 lib/meson.build                    |   6 +-
 7 files changed, 498 insertions(+), 4 deletions(-)
 create mode 100644 lib/librte_rcu/rcu_qsbr_pvt.h

-- 
2.17.1

Comments

Ananyev, Konstantin April 7, 2020, 5:39 p.m. | #1
> Add resource reclamation APIs to make it simple for applications

> and libraries to integrate rte_rcu library.


Few nits, thoughts, please see below.
Apart from that - LGTM.
Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>


> 

> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> Reviewed-by: Ola Liljedhal <ola.liljedhal@arm.com>

> Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>

> ---

>  lib/librte_rcu/Makefile            |   2 +-

>  lib/librte_rcu/meson.build         |   2 +

>  lib/librte_rcu/rcu_qsbr_pvt.h      |  57 +++++++

>  lib/librte_rcu/rte_rcu_qsbr.c      | 243 ++++++++++++++++++++++++++++-

>  lib/librte_rcu/rte_rcu_qsbr.h      | 188 ++++++++++++++++++++++

>  lib/librte_rcu/rte_rcu_version.map |   4 +

>  lib/meson.build                    |   6 +-

>  7 files changed, 498 insertions(+), 4 deletions(-)

>  create mode 100644 lib/librte_rcu/rcu_qsbr_pvt.h

> 

> diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile

> index c4bb28d77..95f8a57e2 100644

> --- a/lib/librte_rcu/Makefile

> +++ b/lib/librte_rcu/Makefile

> @@ -8,7 +8,7 @@ LIB = librte_rcu.a

> 

>  CFLAGS += -DALLOW_EXPERIMENTAL_API

>  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3

> -LDLIBS += -lrte_eal

> +LDLIBS += -lrte_eal -lrte_ring

> 

>  EXPORT_MAP := rte_rcu_version.map

> 

> diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build

> index 62920ba02..e280b29c1 100644

> --- a/lib/librte_rcu/meson.build

> +++ b/lib/librte_rcu/meson.build

> @@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h')

>  if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false

>  	ext_deps += cc.find_library('atomic')

>  endif

> +

> +deps += ['ring']

> diff --git a/lib/librte_rcu/rcu_qsbr_pvt.h b/lib/librte_rcu/rcu_qsbr_pvt.h

> new file mode 100644

> index 000000000..413f28587

> --- /dev/null

> +++ b/lib/librte_rcu/rcu_qsbr_pvt.h

> @@ -0,0 +1,57 @@

> +/* SPDX-License-Identifier: BSD-3-Clause

> + * Copyright (c) 2019 Arm Limited

> + */

> +

> +#ifndef _RTE_RCU_QSBR_PVT_H_

> +#define _RTE_RCU_QSBR_PVT_H_

> +

> +/**

> + * This file is private to the RCU library. It should not be included

> + * by the user of this library.

> + */

> +

> +#ifdef __cplusplus

> +extern "C" {

> +#endif

> +

> +#include <rte_ring.h>

> +#include <rte_ring_elem.h>

> +

> +#include "rte_rcu_qsbr.h"

> +

> +/* RTE defer queue structure.

> + * This structure holds the defer queue. The defer queue is used to

> + * hold the deleted entries from the data structure that are not

> + * yet freed.

> + */

> +struct rte_rcu_qsbr_dq {

> +	struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/

> +	struct rte_ring *r;     /**< RCU QSBR defer queue. */

> +	uint32_t size;

> +	/**< Number of elements in the defer queue */

> +	uint32_t esize;

> +	/**< Size (in bytes) of data, including the token, stored on the

> +	 *   defer queue.

> +	 */

> +	uint32_t trigger_reclaim_limit;

> +	/**< Trigger automatic reclamation after the defer queue

> +	 *   has atleast these many resources waiting.

> +	 */

> +	uint32_t max_reclaim_size;

> +	/**< Reclaim at the max these many resources during auto

> +	 *   reclamation.

> +	 */

> +	rte_rcu_qsbr_free_resource_t free_fn;

> +	/**< Function to call to free the resource. */

> +	void *p;

> +	/**< Pointer passed to the free function. Typically, this is the

> +	 *   pointer to the data structure to which the resource to free

> +	 *   belongs.

> +	 */

> +};

> +

> +#ifdef __cplusplus

> +}

> +#endif

> +

> +#endif /* _RTE_RCU_QSBR_PVT_H_ */

> diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c

> index 2f3fad776..e8c1e386f 100644

> --- a/lib/librte_rcu/rte_rcu_qsbr.c

> +++ b/lib/librte_rcu/rte_rcu_qsbr.c

> @@ -1,6 +1,6 @@

>  /* SPDX-License-Identifier: BSD-3-Clause

>   *

> - * Copyright (c) 2018 Arm Limited

> + * Copyright (c) 2018-2019 Arm Limited

>   */

> 

>  #include <stdio.h>

> @@ -18,8 +18,10 @@

>  #include <rte_per_lcore.h>

>  #include <rte_lcore.h>

>  #include <rte_errno.h>

> +#include <rte_ring_elem.h>

> 

>  #include "rte_rcu_qsbr.h"

> +#include "rcu_qsbr_pvt.h"

> 

>  /* Get the memory size of QSBR variable */

>  size_t

> @@ -270,6 +272,245 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)

>  	return 0;

>  }

> 

> +/* Create a queue used to store the data structure elements that can

> + * be freed later. This queue is referred to as 'defer queue'.

> + */

> +struct rte_rcu_qsbr_dq *

> +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)

> +{

> +	struct rte_rcu_qsbr_dq *dq;

> +	uint32_t qs_fifo_size;

> +	unsigned int flags;

> +

> +	if (params == NULL || params->free_fn == NULL ||

> +		params->v == NULL || params->name == NULL ||

> +		params->size == 0 || params->esize == 0 ||

> +		(params->esize % 4 != 0)) {

> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return NULL;

> +	}

> +	/* If auto reclamation is configured, reclaim limit

> +	 * should be a valid value.

> +	 */

> +	if ((params->trigger_reclaim_limit <= params->size) &&

> +	    (params->max_reclaim_size == 0)) {

> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> +			"%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",

> +			__func__, params->size, params->trigger_reclaim_limit,

> +			params->max_reclaim_size);

> +		rte_errno = EINVAL;

> +

> +		return NULL;

> +	}

> +

> +	dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),

> +			 RTE_CACHE_LINE_SIZE);

> +	if (dq == NULL) {

> +		rte_errno = ENOMEM;

> +

> +		return NULL;

> +	}

> +

> +	/* Decide the flags for the ring.

> +	 * If MT safety is requested, use RTS for ring enqueue as most

> +	 * use cases involve dq-enqueue happening on the control plane.

> +	 * Ring dequeue is always HTS due to the possibility of revert.

> +	 */

> +	flags = RING_F_MP_RTS_ENQ;

> +	if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)

> +		flags = RING_F_SP_ENQ;

> +	flags |= RING_F_MC_HTS_DEQ;

> +	/* round up qs_fifo_size to next power of two that is not less than

> +	 * max_size.

> +	 */

> +	qs_fifo_size = rte_align32pow2(params->size + 1);

> +	/* Add token size to ring element size */

> +	dq->r = rte_ring_create_elem(params->name,

> +			__RTE_QSBR_TOKEN_SIZE + params->esize,

> +			qs_fifo_size, SOCKET_ID_ANY, flags);

> +	if (dq->r == NULL) {

> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> +			"%s(): defer queue create failed\n", __func__);

> +		rte_free(dq);

> +		return NULL;

> +	}

> +

> +	dq->v = params->v;

> +	dq->size = params->size;

> +	dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;

> +	dq->trigger_reclaim_limit = params->trigger_reclaim_limit;

> +	dq->max_reclaim_size = params->max_reclaim_size;

> +	dq->free_fn = params->free_fn;

> +	dq->p = params->p;

> +

> +	return dq;

> +}

> +

> +/* Enqueue one resource to the defer queue to free after the grace

> + * period is over.

> + */

> +int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)

> +{

> +	uint64_t token;

> +	uint32_t cur_size, free_size;

> +

> +	if (dq == NULL || e == NULL) {

> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	/* Start the grace period */

> +	token = rte_rcu_qsbr_start(dq->v);

> +

> +	/* Reclaim resources if the queue is 1/8th full. This helps


Comment about 1/8 is probably left from older version?
As I understand now it is configurable parameter.

> +	 * the queue from growing too large and allows time for reader

> +	 * threads to report their quiescent state.

> +	 */

> +	cur_size = rte_ring_count(dq->r);

> +	if (cur_size > dq->trigger_reclaim_limit) {

> +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> +			"%s(): Triggering reclamation\n", __func__);

> +		rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size, NULL, NULL);

> +	}

> +

> +	/* Check if there is space for atleast 1 resource */

> +	free_size = rte_ring_free_count(dq->r);

> +	if (!free_size) {


Is there any point to do this check at all?
You are doing enqueue below and handle situation with
not enough space in the ring anyway.

> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> +			"%s(): Defer queue is full\n", __func__);

> +		/* Note that the token generated above is not used.

> +		 * Other than wasting tokens, it should not cause any

> +		 * other issues.

> +		 */

> +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> +			"%s(): Skipped enqueuing token = %"PRIu64"\n",

> +			__func__, token);

> +

> +		rte_errno = ENOSPC;

> +		return 1;

> +	}

> +

> +	/* Enqueue the token and resource. Generating the token

> +	 * and enqueuing (token + resource) on the queue is not an

> +	 * atomic operation. This might result in tokens enqueued

> +	 * out of order on the queue. So, some tokens might wait

> +	 * longer than they are required to be reclaimed.

> +	 */

> +	char data[dq->esize];

> +	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);

> +	memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,

> +		dq->esize - __RTE_QSBR_TOKEN_SIZE);

> +	/* Check the status as enqueue might fail since the other thread

> +	 * might have used up the freed space.

> +	 * Enqueue uses the configured flags when the DQ was created.

> +	 */

> +	if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {

> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> +			"%s(): Enqueue failed\n", __func__);

> +		/* Note that the token generated above is not used.

> +		 * Other than wasting tokens, it should not cause any

> +		 * other issues.

> +		 */

> +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> +			"%s(): Skipped enqueuing token = %"PRIu64"\n",

> +			__func__, token);

> +

> +		rte_errno = ENOSPC;

> +		return 1;

> +	}



Just as a thought: in theory if we'll use MP_HTS(/SP) ring we can avoid
wasting RCU tokens:

if (rte_ring_enqueue_elem_bulk_start(dq->r, 1, NULL) != 0) {
	token = rte_rcu_qsbr_start(dq->v);
	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
	rte_ring_enqueue_elem_finish(dq->r, data, dq->esize, 1);
}

Though it might slowdown things if we'll have a lot of
parallel dq_enqueue. 
So not sure is it worth it or not.

> +

> +	rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> +		"%s(): Enqueued token = %"PRIu64"\n", __func__, token);

> +

> +	return 0;

> +}

> +

> +/* Reclaim resources from the defer queue. */

> +int

> +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,

> +				unsigned int *freed, unsigned int *pending)

> +{

> +	uint32_t cnt;

> +	uint64_t token;

> +

> +	if (dq == NULL || n == 0) {

> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	cnt = 0;

> +

> +	char e[dq->esize];

> +	/* Check reader threads quiescent state and reclaim resources */

> +	while ((cnt < n) &&

> +		(rte_ring_dequeue_bulk_elem_start(dq->r, e,

> +					dq->esize, 1, NULL) != 0)) {


Another thought - any point to use burst_elem_start() here to retrieve more
then 1 elem in one go? Something like:
char e[32][dq->size]; 
while ((cnt < n) {
	k = RTE_MAX(32, cnt - n);
	k = rte_ring_dequeue_burst_elem_start(dq->r, e, dq->esize, k, NULL);
	if (k = 0)
		break;
	for (i = 0; i != k; i++) {
		memcpy(&token, e[i], sizeof(uint64_t));
		if (rte_rcu_qsbr_check(dq->v, token, false) != 1)
			break;
	}
	k = i;
	rte_ring_dequeue_elem_finish(dq->r, k);
	for (i = 0; i != k; i++)
		dq->free_fn(dq->p, e[i] + __RTE_QSBR_TOKEN_SIZE);
	n += k;
	if (k == 0)
		break;

?
Also if at enqueue we guarantee strict ordrer (via enqueue_start/enqueue_finish),
then here we probably can do _check_ from the last retrieved token here?
In theory that might help to minimize number of checks.
I.E. do:
for (i = k; i-- !=0; )  {
	memcpy(&token, e[i], sizeof(uint64_t));
	if (rte_rcu_qsbr_check(dq->v, token, false) != 1)   
		break;
}
k = i + 1;
...

> +		memcpy(&token, e, sizeof(uint64_t));

> +

> +		/* Reclaim the resource */

> +		if (rte_rcu_qsbr_check(dq->v, token, false) != 1) {

> +			rte_ring_dequeue_finish(dq->r, 0);

> +			break;

> +		}

> +		rte_ring_dequeue_finish(dq->r, 1);

> +

> +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> +			"%s(): Reclaimed token = %"PRIu64"\n",

> +			__func__, *(uint64_t *)e);

> +

> +		dq->free_fn(dq->p, e + __RTE_QSBR_TOKEN_SIZE);

> +

> +		cnt++;

> +	}

> +

> +	rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> +		"%s(): Reclaimed %u resources\n", __func__, cnt);

> +

> +	if (freed != NULL)

> +		*freed = cnt;

> +	if (pending != NULL)

> +		*pending = rte_ring_count(dq->r);

> +

> +	return 0;

> +}

> +

> +/* Delete a defer queue. */

> +int

> +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)

> +{

> +	unsigned int pending;

> +

> +	if (dq == NULL) {

> +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	/* Reclaim all the resources */

> +	rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending);

> +	if (pending != 0) {

> +		rte_errno = EAGAIN;

> +

> +		return 1;

> +	}

> +

> +	rte_ring_free(dq->r);

> +	rte_free(dq);

> +

> +	return 0;

> +}

> +

>  int rte_rcu_log_type;

> 

>  RTE_INIT(rte_rcu_register)

> diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h

> index 0b5585925..213f9b029 100644

> --- a/lib/librte_rcu/rte_rcu_qsbr.h

> +++ b/lib/librte_rcu/rte_rcu_qsbr.h

> @@ -34,6 +34,7 @@ extern "C" {

>  #include <rte_lcore.h>

>  #include <rte_debug.h>

>  #include <rte_atomic.h>

> +#include <rte_ring.h>

> 

>  extern int rte_rcu_log_type;

> 

> @@ -84,6 +85,7 @@ struct rte_rcu_qsbr_cnt {

>  #define __RTE_QSBR_CNT_THR_OFFLINE 0

>  #define __RTE_QSBR_CNT_INIT 1

>  #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)

> +#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)

> 

>  /* RTE Quiescent State variable structure.

>   * This structure has two elements that vary in size based on the

> @@ -114,6 +116,84 @@ struct rte_rcu_qsbr {

>  	 */

>  } __rte_cache_aligned;

> 

> +/**

> + * Call back function called to free the resources.

> + *

> + * @param p

> + *   Pointer provided while creating the defer queue

> + * @param e

> + *   Pointer to the resource data stored on the defer queue

> + *

> + * @return

> + *   None

> + */

> +typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e);

> +

> +#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE

> +

> +/**

> + * Various flags supported.

> + */

> +/**< Enqueue and reclaim operations are multi-thread safe by default.

> + *   The call back functions registered to free the resources are

> + *   assumed to be multi-thread safe.

> + *   Set this flag is multi-thread safety is not required.

> + */

> +#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1

> +

> +/**

> + * Parameters used when creating the defer queue.

> + */

> +struct rte_rcu_qsbr_dq_parameters {

> +	const char *name;

> +	/**< Name of the queue. */

> +	uint32_t flags;

> +	/**< Flags to control API behaviors */

> +	uint32_t size;

> +	/**< Number of entries in queue. Typically, this will be

> +	 *   the same as the maximum number of entries supported in the

> +	 *   lock free data structure.

> +	 *   Data structures with unbounded number of entries is not

> +	 *   supported currently.

> +	 */

> +	uint32_t esize;

> +	/**< Size (in bytes) of each element in the defer queue.

> +	 *   This has to be multiple of 4B.

> +	 */

> +	uint32_t trigger_reclaim_limit;

> +	/**< Trigger automatic reclamation after the defer queue

> +	 *   has atleast these many resources waiting. This auto

> +	 *   reclamation is triggered in rte_rcu_qsbr_dq_enqueue API

> +	 *   call.

> +	 *   If this is greater than 'size', auto reclamation is

> +	 *   not triggered.

> +	 *   If this is set to 0, auto reclamation is triggered

> +	 *   in every call to rte_rcu_qsbr_dq_enqueue API.

> +	 */

> +	uint32_t max_reclaim_size;

> +	/**< When automatic reclamation is enabled, reclaim at the max

> +	 *   these many resources. This should contain a valid value, if

> +	 *   auto reclamation is on. Setting this to 'size' or greater will

> +	 *   reclaim all possible resources currently on the defer queue.

> +	 */

> +	rte_rcu_qsbr_free_resource_t free_fn;

> +	/**< Function to call to free the resource. */

> +	void *p;

> +	/**< Pointer passed to the free function. Typically, this is the

> +	 *   pointer to the data structure to which the resource to free

> +	 *   belongs. This can be NULL.

> +	 */

> +	struct rte_rcu_qsbr *v;

> +	/**< RCU QSBR variable to use for this defer queue */

> +};

> +

> +/* RTE defer queue structure.

> + * This structure holds the defer queue. The defer queue is used to

> + * hold the deleted entries from the data structure that are not

> + * yet freed.

> + */

> +struct rte_rcu_qsbr_dq;

> +

>  /**

>   * @warning

>   * @b EXPERIMENTAL: this API may change without prior notice

> @@ -692,6 +772,114 @@ __rte_experimental

>  int

>  rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);

> 

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Create a queue used to store the data structure elements that can

> + * be freed later. This queue is referred to as 'defer queue'.

> + *

> + * @param params

> + *   Parameters to create a defer queue.

> + * @return

> + *   On success - Valid pointer to defer queue

> + *   On error - NULL

> + *   Possible rte_errno codes are:

> + *   - EINVAL - NULL parameters are passed

> + *   - ENOMEM - Not enough memory

> + */

> +__rte_experimental

> +struct rte_rcu_qsbr_dq *

> +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Enqueue one resource to the defer queue and start the grace period.

> + * The resource will be freed later after at least one grace period

> + * is over.

> + *

> + * If the defer queue is full, it will attempt to reclaim resources.

> + * It will also reclaim resources at regular intervals to avoid

> + * the defer queue from growing too big.

> + *

> + * Multi-thread safety is provided as the defer queue configuration.

> + * When multi-thread safety is requested, it is possible that the

> + * resources are not stored in their order of deletion. This results

> + * in resources being held in the defer queue longer than they should.

> + *

> + * @param dq

> + *   Defer queue to allocate an entry from.

> + * @param e

> + *   Pointer to resource data to copy to the defer queue. The size of

> + *   the data to copy is equal to the element size provided when the

> + *   defer queue was created.

> + * @return

> + *   On success - 0

> + *   On error - 1 with rte_errno set to

> + *   - EINVAL - NULL parameters are passed

> + *   - ENOSPC - Defer queue is full. This condition can not happen

> + *		if the defer queue size is equal (or larger) than the

> + *		number of elements in the data structure.

> + */

> +__rte_experimental

> +int

> +rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Free quesed resources from the defer queue.

> + *

> + * This API is multi-thread safe.

> + *

> + * @param dq

> + *   Defer queue to free an entry from.

> + * @param n

> + *   Maximum number of resources to free.

> + * @param freed

> + *   Number of resources that were freed.

> + * @param pending

> + *   Number of resources pending on the defer queue. This number might not

> + *   be acurate if multi-thread safety is configured.

> + * @return

> + *   On successful reclamation of at least 1 resource - 0

> + *   On error - 1 with rte_errno set to

> + *   - EINVAL - NULL parameters are passed

> + */

> +__rte_experimental

> +int

> +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,

> +				unsigned int *freed, unsigned int *pending);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Delete a defer queue.

> + *

> + * It tries to reclaim all the resources on the defer queue.

> + * If any of the resources have not completed the grace period

> + * the reclamation stops and returns immediately. The rest of

> + * the resources are not reclaimed and the defer queue is not

> + * freed.

> + *

> + * @param dq

> + *   Defer queue to delete.

> + * @return

> + *   On success - 0

> + *   On error - 1

> + *   Possible rte_errno codes are:

> + *   - EINVAL - NULL parameters are passed

> + *   - EAGAIN - Some of the resources have not completed at least 1 grace

> + *		period, try again.

> + */

> +__rte_experimental

> +int

> +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);

> +

>  #ifdef __cplusplus

>  }

>  #endif

> diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map

> index f8b9ef2ab..dfac88a37 100644

> --- a/lib/librte_rcu/rte_rcu_version.map

> +++ b/lib/librte_rcu/rte_rcu_version.map

> @@ -8,6 +8,10 @@ EXPERIMENTAL {

>  	rte_rcu_qsbr_synchronize;

>  	rte_rcu_qsbr_thread_register;

>  	rte_rcu_qsbr_thread_unregister;

> +	rte_rcu_qsbr_dq_create;

> +	rte_rcu_qsbr_dq_enqueue;

> +	rte_rcu_qsbr_dq_reclaim;

> +	rte_rcu_qsbr_dq_delete;

> 

>  	local: *;

>  };

> diff --git a/lib/meson.build b/lib/meson.build

> index 9c3cc55d5..15e91a303 100644

> --- a/lib/meson.build

> +++ b/lib/meson.build

> @@ -11,7 +11,9 @@

>  libraries = [

>  	'kvargs', # eal depends on kvargs

>  	'eal', # everything depends on eal

> -	'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core

> +	'ring',

> +	'rcu', # rcu depends on ring

> +	'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core

>  	'cmdline',

>  	'metrics', # bitrate/latency stats depends on this

>  	'hash',    # efd depends on this

> @@ -22,7 +24,7 @@ libraries = [

>  	'gro', 'gso', 'ip_frag', 'jobstats',

>  	'kni', 'latencystats', 'lpm', 'member',

>  	'power', 'pdump', 'rawdev',

> -	'rcu', 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',

> +	'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',

>  	# ipsec lib depends on net, crypto and security

>  	'ipsec',

>  	#fib lib depends on rib

> --

> 2.17.1
Honnappa Nagarahalli April 19, 2020, 11:22 p.m. | #2
<snip>

> 

> > Add resource reclamation APIs to make it simple for applications and

> > libraries to integrate rte_rcu library.

> 

> Few nits, thoughts, please see below.

> Apart from that - LGTM.

> Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

> 

> >

> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> > Reviewed-by: Ola Liljedhal <ola.liljedhal@arm.com>

> > Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>

> > ---

> >  lib/librte_rcu/Makefile            |   2 +-

> >  lib/librte_rcu/meson.build         |   2 +

> >  lib/librte_rcu/rcu_qsbr_pvt.h      |  57 +++++++

> >  lib/librte_rcu/rte_rcu_qsbr.c      | 243 ++++++++++++++++++++++++++++-

> >  lib/librte_rcu/rte_rcu_qsbr.h      | 188 ++++++++++++++++++++++

> >  lib/librte_rcu/rte_rcu_version.map |   4 +

> >  lib/meson.build                    |   6 +-

> >  7 files changed, 498 insertions(+), 4 deletions(-)  create mode

> > 100644 lib/librte_rcu/rcu_qsbr_pvt.h

> >

> > diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile index

> > c4bb28d77..95f8a57e2 100644

> > --- a/lib/librte_rcu/Makefile

> > +++ b/lib/librte_rcu/Makefile

> > @@ -8,7 +8,7 @@ LIB = librte_rcu.a

> >

> >  CFLAGS += -DALLOW_EXPERIMENTAL_API

> >  CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -LDLIBS += -lrte_eal

> > +LDLIBS += -lrte_eal -lrte_ring

> >

> >  EXPORT_MAP := rte_rcu_version.map

> >

> > diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build

> > index 62920ba02..e280b29c1 100644

> > --- a/lib/librte_rcu/meson.build

> > +++ b/lib/librte_rcu/meson.build

> > @@ -10,3 +10,5 @@ headers = files('rte_rcu_qsbr.h')  if cc.get_id() ==

> > 'clang' and dpdk_conf.get('RTE_ARCH_64') == false

> >  	ext_deps += cc.find_library('atomic')  endif

> > +

> > +deps += ['ring']

> > diff --git a/lib/librte_rcu/rcu_qsbr_pvt.h

> > b/lib/librte_rcu/rcu_qsbr_pvt.h new file mode 100644 index

> > 000000000..413f28587

> > --- /dev/null

> > +++ b/lib/librte_rcu/rcu_qsbr_pvt.h

> > @@ -0,0 +1,57 @@

> > +/* SPDX-License-Identifier: BSD-3-Clause

> > + * Copyright (c) 2019 Arm Limited

> > + */

> > +

> > +#ifndef _RTE_RCU_QSBR_PVT_H_

> > +#define _RTE_RCU_QSBR_PVT_H_

> > +

> > +/**

> > + * This file is private to the RCU library. It should not be included

> > + * by the user of this library.

> > + */

> > +

> > +#ifdef __cplusplus

> > +extern "C" {

> > +#endif

> > +

> > +#include <rte_ring.h>

> > +#include <rte_ring_elem.h>

> > +

> > +#include "rte_rcu_qsbr.h"

> > +

> > +/* RTE defer queue structure.

> > + * This structure holds the defer queue. The defer queue is used to

> > + * hold the deleted entries from the data structure that are not

> > + * yet freed.

> > + */

> > +struct rte_rcu_qsbr_dq {

> > +	struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/

> > +	struct rte_ring *r;     /**< RCU QSBR defer queue. */

> > +	uint32_t size;

> > +	/**< Number of elements in the defer queue */

> > +	uint32_t esize;

> > +	/**< Size (in bytes) of data, including the token, stored on the

> > +	 *   defer queue.

> > +	 */

> > +	uint32_t trigger_reclaim_limit;

> > +	/**< Trigger automatic reclamation after the defer queue

> > +	 *   has atleast these many resources waiting.

> > +	 */

> > +	uint32_t max_reclaim_size;

> > +	/**< Reclaim at the max these many resources during auto

> > +	 *   reclamation.

> > +	 */

> > +	rte_rcu_qsbr_free_resource_t free_fn;

> > +	/**< Function to call to free the resource. */

> > +	void *p;

> > +	/**< Pointer passed to the free function. Typically, this is the

> > +	 *   pointer to the data structure to which the resource to free

> > +	 *   belongs.

> > +	 */

> > +};

> > +

> > +#ifdef __cplusplus

> > +}

> > +#endif

> > +

> > +#endif /* _RTE_RCU_QSBR_PVT_H_ */

> > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c

> > b/lib/librte_rcu/rte_rcu_qsbr.c index 2f3fad776..e8c1e386f 100644

> > --- a/lib/librte_rcu/rte_rcu_qsbr.c

> > +++ b/lib/librte_rcu/rte_rcu_qsbr.c

> > @@ -1,6 +1,6 @@

> >  /* SPDX-License-Identifier: BSD-3-Clause

> >   *

> > - * Copyright (c) 2018 Arm Limited

> > + * Copyright (c) 2018-2019 Arm Limited

> >   */

> >

> >  #include <stdio.h>

> > @@ -18,8 +18,10 @@

> >  #include <rte_per_lcore.h>

> >  #include <rte_lcore.h>

> >  #include <rte_errno.h>

> > +#include <rte_ring_elem.h>

> >

> >  #include "rte_rcu_qsbr.h"

> > +#include "rcu_qsbr_pvt.h"

> >

> >  /* Get the memory size of QSBR variable */  size_t @@ -270,6 +272,245

> > @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)

> >  	return 0;

> >  }

> >

> > +/* Create a queue used to store the data structure elements that can

> > + * be freed later. This queue is referred to as 'defer queue'.

> > + */

> > +struct rte_rcu_qsbr_dq *

> > +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters

> > +*params) {

> > +	struct rte_rcu_qsbr_dq *dq;

> > +	uint32_t qs_fifo_size;

> > +	unsigned int flags;

> > +

> > +	if (params == NULL || params->free_fn == NULL ||

> > +		params->v == NULL || params->name == NULL ||

> > +		params->size == 0 || params->esize == 0 ||

> > +		(params->esize % 4 != 0)) {

> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> > +			"%s(): Invalid input parameter\n", __func__);

> > +		rte_errno = EINVAL;

> > +

> > +		return NULL;

> > +	}

> > +	/* If auto reclamation is configured, reclaim limit

> > +	 * should be a valid value.

> > +	 */

> > +	if ((params->trigger_reclaim_limit <= params->size) &&

> > +	    (params->max_reclaim_size == 0)) {

> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> > +			"%s(): Invalid input parameter, size = %u,

> trigger_reclaim_limit = %u, max_reclaim_size = %u\n",

> > +			__func__, params->size, params-

> >trigger_reclaim_limit,

> > +			params->max_reclaim_size);

> > +		rte_errno = EINVAL;

> > +

> > +		return NULL;

> > +	}

> > +

> > +	dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),

> > +			 RTE_CACHE_LINE_SIZE);

> > +	if (dq == NULL) {

> > +		rte_errno = ENOMEM;

> > +

> > +		return NULL;

> > +	}

> > +

> > +	/* Decide the flags for the ring.

> > +	 * If MT safety is requested, use RTS for ring enqueue as most

> > +	 * use cases involve dq-enqueue happening on the control plane.

> > +	 * Ring dequeue is always HTS due to the possibility of revert.

> > +	 */

> > +	flags = RING_F_MP_RTS_ENQ;

> > +	if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)

> > +		flags = RING_F_SP_ENQ;

> > +	flags |= RING_F_MC_HTS_DEQ;

> > +	/* round up qs_fifo_size to next power of two that is not less than

> > +	 * max_size.

> > +	 */

> > +	qs_fifo_size = rte_align32pow2(params->size + 1);

> > +	/* Add token size to ring element size */

> > +	dq->r = rte_ring_create_elem(params->name,

> > +			__RTE_QSBR_TOKEN_SIZE + params->esize,

> > +			qs_fifo_size, SOCKET_ID_ANY, flags);

> > +	if (dq->r == NULL) {

> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> > +			"%s(): defer queue create failed\n", __func__);

> > +		rte_free(dq);

> > +		return NULL;

> > +	}

> > +

> > +	dq->v = params->v;

> > +	dq->size = params->size;

> > +	dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;

> > +	dq->trigger_reclaim_limit = params->trigger_reclaim_limit;

> > +	dq->max_reclaim_size = params->max_reclaim_size;

> > +	dq->free_fn = params->free_fn;

> > +	dq->p = params->p;

> > +

> > +	return dq;

> > +}

> > +

> > +/* Enqueue one resource to the defer queue to free after the grace

> > + * period is over.

> > + */

> > +int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e) {

> > +	uint64_t token;

> > +	uint32_t cur_size, free_size;

> > +

> > +	if (dq == NULL || e == NULL) {

> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> > +			"%s(): Invalid input parameter\n", __func__);

> > +		rte_errno = EINVAL;

> > +

> > +		return 1;

> > +	}

> > +

> > +	/* Start the grace period */

> > +	token = rte_rcu_qsbr_start(dq->v);

> > +

> > +	/* Reclaim resources if the queue is 1/8th full. This helps

> 

> Comment about 1/8 is probably left from older version?

> As I understand now it is configurable parameter.

Ack, will correct this.

> 

> > +	 * the queue from growing too large and allows time for reader

> > +	 * threads to report their quiescent state.

> > +	 */

> > +	cur_size = rte_ring_count(dq->r);

> > +	if (cur_size > dq->trigger_reclaim_limit) {

> > +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> > +			"%s(): Triggering reclamation\n", __func__);

> > +		rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size, NULL,

> NULL);

> > +	}

> > +

> > +	/* Check if there is space for atleast 1 resource */

> > +	free_size = rte_ring_free_count(dq->r);

> > +	if (!free_size) {

> 

> Is there any point to do this check at all?

> You are doing enqueue below and handle situation with not enough space in

> the ring anyway.

Ack

> 

> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> > +			"%s(): Defer queue is full\n", __func__);

> > +		/* Note that the token generated above is not used.

> > +		 * Other than wasting tokens, it should not cause any

> > +		 * other issues.

> > +		 */

> > +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> > +			"%s(): Skipped enqueuing token = %"PRIu64"\n",

> > +			__func__, token);

> > +

> > +		rte_errno = ENOSPC;

> > +		return 1;

> > +	}

> > +

> > +	/* Enqueue the token and resource. Generating the token

> > +	 * and enqueuing (token + resource) on the queue is not an

> > +	 * atomic operation. This might result in tokens enqueued

> > +	 * out of order on the queue. So, some tokens might wait

> > +	 * longer than they are required to be reclaimed.

> > +	 */

> > +	char data[dq->esize];

> > +	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);

> > +	memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,

> > +		dq->esize - __RTE_QSBR_TOKEN_SIZE);

> > +	/* Check the status as enqueue might fail since the other thread

> > +	 * might have used up the freed space.

> > +	 * Enqueue uses the configured flags when the DQ was created.

> > +	 */

> > +	if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {

> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> > +			"%s(): Enqueue failed\n", __func__);

> > +		/* Note that the token generated above is not used.

> > +		 * Other than wasting tokens, it should not cause any

> > +		 * other issues.

> > +		 */

> > +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> > +			"%s(): Skipped enqueuing token = %"PRIu64"\n",

> > +			__func__, token);

> > +

> > +		rte_errno = ENOSPC;

> > +		return 1;

> > +	}

> 

> 

> Just as a thought: in theory if we'll use MP_HTS(/SP) ring we can avoid

> wasting RCU tokens:

> 

> if (rte_ring_enqueue_elem_bulk_start(dq->r, 1, NULL) != 0) {

> 	token = rte_rcu_qsbr_start(dq->v);

> 	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);

> 	rte_ring_enqueue_elem_finish(dq->r, data, dq->esize, 1); }

> 

> Though it might slowdown things if we'll have a lot of parallel dq_enqueue.

> So not sure is it worth it or not.

Adding peek APIs for RTS would be better. That should take care of the parallel dw_enqueue. Not sure if I gave you the comment. My ring patch supported these APIs.

> 

> > +

> > +	rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> > +		"%s(): Enqueued token = %"PRIu64"\n", __func__, token);

> > +

> > +	return 0;

> > +}

> > +

> > +/* Reclaim resources from the defer queue. */ int

> > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,

> > +				unsigned int *freed, unsigned int *pending) {

> > +	uint32_t cnt;

> > +	uint64_t token;

> > +

> > +	if (dq == NULL || n == 0) {

> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> > +			"%s(): Invalid input parameter\n", __func__);

> > +		rte_errno = EINVAL;

> > +

> > +		return 1;

> > +	}

> > +

> > +	cnt = 0;

> > +

> > +	char e[dq->esize];

> > +	/* Check reader threads quiescent state and reclaim resources */

> > +	while ((cnt < n) &&

> > +		(rte_ring_dequeue_bulk_elem_start(dq->r, e,

> > +					dq->esize, 1, NULL) != 0)) {

> 

> Another thought - any point to use burst_elem_start() here to retrieve more

> then 1 elem in one go? Something like:

I think it makes sense.

> char e[32][dq->size];

> while ((cnt < n) {

> 	k = RTE_MAX(32, cnt - n);

> 	k = rte_ring_dequeue_burst_elem_start(dq->r, e, dq->esize, k, NULL);

> 	if (k = 0)

> 		break;

> 	for (i = 0; i != k; i++) {

> 		memcpy(&token, e[i], sizeof(uint64_t));

> 		if (rte_rcu_qsbr_check(dq->v, token, false) != 1)

> 			break;

> 	}

> 	k = i;

> 	rte_ring_dequeue_elem_finish(dq->r, k);

> 	for (i = 0; i != k; i++)

> 		dq->free_fn(dq->p, e[i] + __RTE_QSBR_TOKEN_SIZE);

I think it also makes sense to change the free_fn to take 'n' number of tokens.

> 	n += k;

> 	if (k == 0)

> 		break;

> 

> ?

> Also if at enqueue we guarantee strict ordrer (via

> enqueue_start/enqueue_finish), then here we probably can do _check_ from

> the last retrieved token here?

> In theory that might help to minimize number of checks.

> I.E. do:

> for (i = k; i-- !=0; )  {

> 	memcpy(&token, e[i], sizeof(uint64_t));

> 	if (rte_rcu_qsbr_check(dq->v, token, false) != 1)

There is a higher chance that later tokens are not acked. This introduces more polling of the counters.
The rte_rcu_qsbr_check has an optimization. While acking the current token, it will also caches the greatest token acked. It uses the cached token for the subsequent calls. I think this provides a better optimization.

> 		break;

> }

> k = i + 1;

> ...

> 

> > +		memcpy(&token, e, sizeof(uint64_t));

> > +

> > +		/* Reclaim the resource */

> > +		if (rte_rcu_qsbr_check(dq->v, token, false) != 1) {

> > +			rte_ring_dequeue_finish(dq->r, 0);

> > +			break;

> > +		}

> > +		rte_ring_dequeue_finish(dq->r, 1);

> > +

> > +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> > +			"%s(): Reclaimed token = %"PRIu64"\n",

> > +			__func__, *(uint64_t *)e);

> > +

> > +		dq->free_fn(dq->p, e + __RTE_QSBR_TOKEN_SIZE);

> > +

> > +		cnt++;

> > +	}

> > +

> > +	rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> > +		"%s(): Reclaimed %u resources\n", __func__, cnt);

> > +

> > +	if (freed != NULL)

> > +		*freed = cnt;

> > +	if (pending != NULL)

> > +		*pending = rte_ring_count(dq->r);

> > +

> > +	return 0;

> > +}

> > +

> > +/* Delete a defer queue. */

> > +int

> > +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq) {

> > +	unsigned int pending;

> > +

> > +	if (dq == NULL) {

> > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> > +			"%s(): Invalid input parameter\n", __func__);

> > +		rte_errno = EINVAL;

> > +

> > +		return 1;

> > +	}

> > +

> > +	/* Reclaim all the resources */

> > +	rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending);

> > +	if (pending != 0) {

> > +		rte_errno = EAGAIN;

> > +

> > +		return 1;

> > +	}

> > +

> > +	rte_ring_free(dq->r);

> > +	rte_free(dq);

> > +

> > +	return 0;

> > +}

> > +

> >  int rte_rcu_log_type;

> >

> >  RTE_INIT(rte_rcu_register)

> > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h

> > b/lib/librte_rcu/rte_rcu_qsbr.h index 0b5585925..213f9b029 100644

> > --- a/lib/librte_rcu/rte_rcu_qsbr.h

> > +++ b/lib/librte_rcu/rte_rcu_qsbr.h

> > @@ -34,6 +34,7 @@ extern "C" {

> >  #include <rte_lcore.h>

> >  #include <rte_debug.h>

> >  #include <rte_atomic.h>

> > +#include <rte_ring.h>

> >

> >  extern int rte_rcu_log_type;

> >

> > @@ -84,6 +85,7 @@ struct rte_rcu_qsbr_cnt {  #define

> > __RTE_QSBR_CNT_THR_OFFLINE 0  #define __RTE_QSBR_CNT_INIT 1

> #define

> > __RTE_QSBR_CNT_MAX ((uint64_t)~0)

> > +#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)

> >

> >  /* RTE Quiescent State variable structure.

> >   * This structure has two elements that vary in size based on the @@

> > -114,6 +116,84 @@ struct rte_rcu_qsbr {

> >  	 */

> >  } __rte_cache_aligned;

> >

> > +/**

> > + * Call back function called to free the resources.

> > + *

> > + * @param p

> > + *   Pointer provided while creating the defer queue

> > + * @param e

> > + *   Pointer to the resource data stored on the defer queue

> > + *

> > + * @return

> > + *   None

> > + */

> > +typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e);

> > +

> > +#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE

> > +

> > +/**

> > + * Various flags supported.

> > + */

> > +/**< Enqueue and reclaim operations are multi-thread safe by default.

> > + *   The call back functions registered to free the resources are

> > + *   assumed to be multi-thread safe.

> > + *   Set this flag is multi-thread safety is not required.

> > + */

> > +#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1

> > +

> > +/**

> > + * Parameters used when creating the defer queue.

> > + */

> > +struct rte_rcu_qsbr_dq_parameters {

> > +	const char *name;

> > +	/**< Name of the queue. */

> > +	uint32_t flags;

> > +	/**< Flags to control API behaviors */

> > +	uint32_t size;

> > +	/**< Number of entries in queue. Typically, this will be

> > +	 *   the same as the maximum number of entries supported in the

> > +	 *   lock free data structure.

> > +	 *   Data structures with unbounded number of entries is not

> > +	 *   supported currently.

> > +	 */

> > +	uint32_t esize;

> > +	/**< Size (in bytes) of each element in the defer queue.

> > +	 *   This has to be multiple of 4B.

> > +	 */

> > +	uint32_t trigger_reclaim_limit;

> > +	/**< Trigger automatic reclamation after the defer queue

> > +	 *   has atleast these many resources waiting. This auto

> > +	 *   reclamation is triggered in rte_rcu_qsbr_dq_enqueue API

> > +	 *   call.

> > +	 *   If this is greater than 'size', auto reclamation is

> > +	 *   not triggered.

> > +	 *   If this is set to 0, auto reclamation is triggered

> > +	 *   in every call to rte_rcu_qsbr_dq_enqueue API.

> > +	 */

> > +	uint32_t max_reclaim_size;

> > +	/**< When automatic reclamation is enabled, reclaim at the max

> > +	 *   these many resources. This should contain a valid value, if

> > +	 *   auto reclamation is on. Setting this to 'size' or greater will

> > +	 *   reclaim all possible resources currently on the defer queue.

> > +	 */

> > +	rte_rcu_qsbr_free_resource_t free_fn;

> > +	/**< Function to call to free the resource. */

> > +	void *p;

> > +	/**< Pointer passed to the free function. Typically, this is the

> > +	 *   pointer to the data structure to which the resource to free

> > +	 *   belongs. This can be NULL.

> > +	 */

> > +	struct rte_rcu_qsbr *v;

> > +	/**< RCU QSBR variable to use for this defer queue */ };

> > +

> > +/* RTE defer queue structure.

> > + * This structure holds the defer queue. The defer queue is used to

> > + * hold the deleted entries from the data structure that are not

> > + * yet freed.

> > + */

> > +struct rte_rcu_qsbr_dq;

> > +

> >  /**

> >   * @warning

> >   * @b EXPERIMENTAL: this API may change without prior notice @@

> > -692,6 +772,114 @@ __rte_experimental  int  rte_rcu_qsbr_dump(FILE *f,

> > struct rte_rcu_qsbr *v);

> >

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Create a queue used to store the data structure elements that can

> > + * be freed later. This queue is referred to as 'defer queue'.

> > + *

> > + * @param params

> > + *   Parameters to create a defer queue.

> > + * @return

> > + *   On success - Valid pointer to defer queue

> > + *   On error - NULL

> > + *   Possible rte_errno codes are:

> > + *   - EINVAL - NULL parameters are passed

> > + *   - ENOMEM - Not enough memory

> > + */

> > +__rte_experimental

> > +struct rte_rcu_qsbr_dq *

> > +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters

> > +*params);

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Enqueue one resource to the defer queue and start the grace period.

> > + * The resource will be freed later after at least one grace period

> > + * is over.

> > + *

> > + * If the defer queue is full, it will attempt to reclaim resources.

> > + * It will also reclaim resources at regular intervals to avoid

> > + * the defer queue from growing too big.

> > + *

> > + * Multi-thread safety is provided as the defer queue configuration.

> > + * When multi-thread safety is requested, it is possible that the

> > + * resources are not stored in their order of deletion. This results

> > + * in resources being held in the defer queue longer than they should.

> > + *

> > + * @param dq

> > + *   Defer queue to allocate an entry from.

> > + * @param e

> > + *   Pointer to resource data to copy to the defer queue. The size of

> > + *   the data to copy is equal to the element size provided when the

> > + *   defer queue was created.

> > + * @return

> > + *   On success - 0

> > + *   On error - 1 with rte_errno set to

> > + *   - EINVAL - NULL parameters are passed

> > + *   - ENOSPC - Defer queue is full. This condition can not happen

> > + *		if the defer queue size is equal (or larger) than the

> > + *		number of elements in the data structure.

> > + */

> > +__rte_experimental

> > +int

> > +rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Free quesed resources from the defer queue.

> > + *

> > + * This API is multi-thread safe.

> > + *

> > + * @param dq

> > + *   Defer queue to free an entry from.

> > + * @param n

> > + *   Maximum number of resources to free.

> > + * @param freed

> > + *   Number of resources that were freed.

> > + * @param pending

> > + *   Number of resources pending on the defer queue. This number might

> not

> > + *   be acurate if multi-thread safety is configured.

> > + * @return

> > + *   On successful reclamation of at least 1 resource - 0

> > + *   On error - 1 with rte_errno set to

> > + *   - EINVAL - NULL parameters are passed

> > + */

> > +__rte_experimental

> > +int

> > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,

> > +				unsigned int *freed, unsigned int *pending);

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Delete a defer queue.

> > + *

> > + * It tries to reclaim all the resources on the defer queue.

> > + * If any of the resources have not completed the grace period

> > + * the reclamation stops and returns immediately. The rest of

> > + * the resources are not reclaimed and the defer queue is not

> > + * freed.

> > + *

> > + * @param dq

> > + *   Defer queue to delete.

> > + * @return

> > + *   On success - 0

> > + *   On error - 1

> > + *   Possible rte_errno codes are:

> > + *   - EINVAL - NULL parameters are passed

> > + *   - EAGAIN - Some of the resources have not completed at least 1 grace

> > + *		period, try again.

> > + */

> > +__rte_experimental

> > +int

> > +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);

> > +

> >  #ifdef __cplusplus

> >  }

> >  #endif

> > diff --git a/lib/librte_rcu/rte_rcu_version.map

> > b/lib/librte_rcu/rte_rcu_version.map

> > index f8b9ef2ab..dfac88a37 100644

> > --- a/lib/librte_rcu/rte_rcu_version.map

> > +++ b/lib/librte_rcu/rte_rcu_version.map

> > @@ -8,6 +8,10 @@ EXPERIMENTAL {

> >  	rte_rcu_qsbr_synchronize;

> >  	rte_rcu_qsbr_thread_register;

> >  	rte_rcu_qsbr_thread_unregister;

> > +	rte_rcu_qsbr_dq_create;

> > +	rte_rcu_qsbr_dq_enqueue;

> > +	rte_rcu_qsbr_dq_reclaim;

> > +	rte_rcu_qsbr_dq_delete;

> >

> >  	local: *;

> >  };

> > diff --git a/lib/meson.build b/lib/meson.build index

> > 9c3cc55d5..15e91a303 100644

> > --- a/lib/meson.build

> > +++ b/lib/meson.build

> > @@ -11,7 +11,9 @@

> >  libraries = [

> >  	'kvargs', # eal depends on kvargs

> >  	'eal', # everything depends on eal

> > -	'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core

> > +	'ring',

> > +	'rcu', # rcu depends on ring

> > +	'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core

> >  	'cmdline',

> >  	'metrics', # bitrate/latency stats depends on this

> >  	'hash',    # efd depends on this

> > @@ -22,7 +24,7 @@ libraries = [

> >  	'gro', 'gso', 'ip_frag', 'jobstats',

> >  	'kni', 'latencystats', 'lpm', 'member',

> >  	'power', 'pdump', 'rawdev',

> > -	'rcu', 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',

> > +	'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',

> >  	# ipsec lib depends on net, crypto and security

> >  	'ipsec',

> >  	#fib lib depends on rib

> > --

> > 2.17.1
Ananyev, Konstantin April 20, 2020, 8:19 a.m. | #3
> > > +

> > > +	/* Enqueue the token and resource. Generating the token

> > > +	 * and enqueuing (token + resource) on the queue is not an

> > > +	 * atomic operation. This might result in tokens enqueued

> > > +	 * out of order on the queue. So, some tokens might wait

> > > +	 * longer than they are required to be reclaimed.

> > > +	 */

> > > +	char data[dq->esize];

> > > +	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);

> > > +	memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,

> > > +		dq->esize - __RTE_QSBR_TOKEN_SIZE);

> > > +	/* Check the status as enqueue might fail since the other thread

> > > +	 * might have used up the freed space.

> > > +	 * Enqueue uses the configured flags when the DQ was created.

> > > +	 */

> > > +	if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {

> > > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> > > +			"%s(): Enqueue failed\n", __func__);

> > > +		/* Note that the token generated above is not used.

> > > +		 * Other than wasting tokens, it should not cause any

> > > +		 * other issues.

> > > +		 */

> > > +		rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> > > +			"%s(): Skipped enqueuing token = %"PRIu64"\n",

> > > +			__func__, token);

> > > +

> > > +		rte_errno = ENOSPC;

> > > +		return 1;

> > > +	}

> >

> >

> > Just as a thought: in theory if we'll use MP_HTS(/SP) ring we can avoid

> > wasting RCU tokens:

> >

> > if (rte_ring_enqueue_elem_bulk_start(dq->r, 1, NULL) != 0) {

> > 	token = rte_rcu_qsbr_start(dq->v);

> > 	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);

> > 	rte_ring_enqueue_elem_finish(dq->r, data, dq->esize, 1); }

> >

> > Though it might slowdown things if we'll have a lot of parallel dq_enqueue.

> > So not sure is it worth it or not.

> Adding peek APIs for RTS would be better. That should take care of the parallel dw_enqueue. Not sure if I gave you the comment. My ring

> patch supported these APIs.


AFAIK, peek API is not possible for RTS mode.
Probably you are talking about Scatter-Gather API introduced in your RFC
(_reserve_; update ring entries manually; _commit_)?
Anyway, if there is no much value in my idea above, then feel free to drop it.

> 

> >

> > > +

> > > +	rte_log(RTE_LOG_INFO, rte_rcu_log_type,

> > > +		"%s(): Enqueued token = %"PRIu64"\n", __func__, token);

> > > +

> > > +	return 0;

> > > +}

> > > +

> > > +/* Reclaim resources from the defer queue. */ int

> > > +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,

> > > +				unsigned int *freed, unsigned int *pending) {

> > > +	uint32_t cnt;

> > > +	uint64_t token;

> > > +

> > > +	if (dq == NULL || n == 0) {

> > > +		rte_log(RTE_LOG_ERR, rte_rcu_log_type,

> > > +			"%s(): Invalid input parameter\n", __func__);

> > > +		rte_errno = EINVAL;

> > > +

> > > +		return 1;

> > > +	}

> > > +

> > > +	cnt = 0;

> > > +

> > > +	char e[dq->esize];

> > > +	/* Check reader threads quiescent state and reclaim resources */

> > > +	while ((cnt < n) &&

> > > +		(rte_ring_dequeue_bulk_elem_start(dq->r, e,

> > > +					dq->esize, 1, NULL) != 0)) {

> >

> > Another thought - any point to use burst_elem_start() here to retrieve more

> > then 1 elem in one go? Something like:

> I think it makes sense.

> 

> > char e[32][dq->size];

> > while ((cnt < n) {

> > 	k = RTE_MAX(32, cnt - n);

> > 	k = rte_ring_dequeue_burst_elem_start(dq->r, e, dq->esize, k, NULL);

> > 	if (k = 0)

> > 		break;

> > 	for (i = 0; i != k; i++) {

> > 		memcpy(&token, e[i], sizeof(uint64_t));

> > 		if (rte_rcu_qsbr_check(dq->v, token, false) != 1)

> > 			break;

> > 	}

> > 	k = i;

> > 	rte_ring_dequeue_elem_finish(dq->r, k);

> > 	for (i = 0; i != k; i++)

> > 		dq->free_fn(dq->p, e[i] + __RTE_QSBR_TOKEN_SIZE);

> I think it also makes sense to change the free_fn to take 'n' number of tokens.

> 

> > 	n += k;

> > 	if (k == 0)

> > 		break;

> >

> > ?

> > Also if at enqueue we guarantee strict ordrer (via

> > enqueue_start/enqueue_finish), then here we probably can do _check_ from

> > the last retrieved token here?

> > In theory that might help to minimize number of checks.

> > I.E. do:

> > for (i = k; i-- !=0; )  {

> > 	memcpy(&token, e[i], sizeof(uint64_t));

> > 	if (rte_rcu_qsbr_check(dq->v, token, false) != 1)

> There is a higher chance that later tokens are not acked. This introduces more polling of the counters.

> The rte_rcu_qsbr_check has an optimization. While acking the current token, it will also caches the greatest token acked. It uses the cached

> token for the subsequent calls. I think this provides a better optimization.


Ok.

Patch

diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile
index c4bb28d77..95f8a57e2 100644
--- a/lib/librte_rcu/Makefile
+++ b/lib/librte_rcu/Makefile
@@ -8,7 +8,7 @@  LIB = librte_rcu.a
 
 CFLAGS += -DALLOW_EXPERIMENTAL_API
 CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
-LDLIBS += -lrte_eal
+LDLIBS += -lrte_eal -lrte_ring
 
 EXPORT_MAP := rte_rcu_version.map
 
diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
index 62920ba02..e280b29c1 100644
--- a/lib/librte_rcu/meson.build
+++ b/lib/librte_rcu/meson.build
@@ -10,3 +10,5 @@  headers = files('rte_rcu_qsbr.h')
 if cc.get_id() == 'clang' and dpdk_conf.get('RTE_ARCH_64') == false
 	ext_deps += cc.find_library('atomic')
 endif
+
+deps += ['ring']
diff --git a/lib/librte_rcu/rcu_qsbr_pvt.h b/lib/librte_rcu/rcu_qsbr_pvt.h
new file mode 100644
index 000000000..413f28587
--- /dev/null
+++ b/lib/librte_rcu/rcu_qsbr_pvt.h
@@ -0,0 +1,57 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2019 Arm Limited
+ */
+
+#ifndef _RTE_RCU_QSBR_PVT_H_
+#define _RTE_RCU_QSBR_PVT_H_
+
+/**
+ * This file is private to the RCU library. It should not be included
+ * by the user of this library.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <rte_ring.h>
+#include <rte_ring_elem.h>
+
+#include "rte_rcu_qsbr.h"
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq {
+	struct rte_rcu_qsbr *v; /**< RCU QSBR variable used by this queue.*/
+	struct rte_ring *r;     /**< RCU QSBR defer queue. */
+	uint32_t size;
+	/**< Number of elements in the defer queue */
+	uint32_t esize;
+	/**< Size (in bytes) of data, including the token, stored on the
+	 *   defer queue.
+	 */
+	uint32_t trigger_reclaim_limit;
+	/**< Trigger automatic reclamation after the defer queue
+	 *   has atleast these many resources waiting.
+	 */
+	uint32_t max_reclaim_size;
+	/**< Reclaim at the max these many resources during auto
+	 *   reclamation.
+	 */
+	rte_rcu_qsbr_free_resource_t free_fn;
+	/**< Function to call to free the resource. */
+	void *p;
+	/**< Pointer passed to the free function. Typically, this is the
+	 *   pointer to the data structure to which the resource to free
+	 *   belongs.
+	 */
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RCU_QSBR_PVT_H_ */
diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
index 2f3fad776..e8c1e386f 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.c
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -1,6 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  *
- * Copyright (c) 2018 Arm Limited
+ * Copyright (c) 2018-2019 Arm Limited
  */
 
 #include <stdio.h>
@@ -18,8 +18,10 @@ 
 #include <rte_per_lcore.h>
 #include <rte_lcore.h>
 #include <rte_errno.h>
+#include <rte_ring_elem.h>
 
 #include "rte_rcu_qsbr.h"
+#include "rcu_qsbr_pvt.h"
 
 /* Get the memory size of QSBR variable */
 size_t
@@ -270,6 +272,245 @@  rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
 	return 0;
 }
 
+/* Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ */
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
+{
+	struct rte_rcu_qsbr_dq *dq;
+	uint32_t qs_fifo_size;
+	unsigned int flags;
+
+	if (params == NULL || params->free_fn == NULL ||
+		params->v == NULL || params->name == NULL ||
+		params->size == 0 || params->esize == 0 ||
+		(params->esize % 4 != 0)) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return NULL;
+	}
+	/* If auto reclamation is configured, reclaim limit
+	 * should be a valid value.
+	 */
+	if ((params->trigger_reclaim_limit <= params->size) &&
+	    (params->max_reclaim_size == 0)) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
+			__func__, params->size, params->trigger_reclaim_limit,
+			params->max_reclaim_size);
+		rte_errno = EINVAL;
+
+		return NULL;
+	}
+
+	dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
+			 RTE_CACHE_LINE_SIZE);
+	if (dq == NULL) {
+		rte_errno = ENOMEM;
+
+		return NULL;
+	}
+
+	/* Decide the flags for the ring.
+	 * If MT safety is requested, use RTS for ring enqueue as most
+	 * use cases involve dq-enqueue happening on the control plane.
+	 * Ring dequeue is always HTS due to the possibility of revert.
+	 */
+	flags = RING_F_MP_RTS_ENQ;
+	if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
+		flags = RING_F_SP_ENQ;
+	flags |= RING_F_MC_HTS_DEQ;
+	/* round up qs_fifo_size to next power of two that is not less than
+	 * max_size.
+	 */
+	qs_fifo_size = rte_align32pow2(params->size + 1);
+	/* Add token size to ring element size */
+	dq->r = rte_ring_create_elem(params->name,
+			__RTE_QSBR_TOKEN_SIZE + params->esize,
+			qs_fifo_size, SOCKET_ID_ANY, flags);
+	if (dq->r == NULL) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): defer queue create failed\n", __func__);
+		rte_free(dq);
+		return NULL;
+	}
+
+	dq->v = params->v;
+	dq->size = params->size;
+	dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
+	dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
+	dq->max_reclaim_size = params->max_reclaim_size;
+	dq->free_fn = params->free_fn;
+	dq->p = params->p;
+
+	return dq;
+}
+
+/* Enqueue one resource to the defer queue to free after the grace
+ * period is over.
+ */
+int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
+{
+	uint64_t token;
+	uint32_t cur_size, free_size;
+
+	if (dq == NULL || e == NULL) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	/* Start the grace period */
+	token = rte_rcu_qsbr_start(dq->v);
+
+	/* Reclaim resources if the queue is 1/8th full. This helps
+	 * the queue from growing too large and allows time for reader
+	 * threads to report their quiescent state.
+	 */
+	cur_size = rte_ring_count(dq->r);
+	if (cur_size > dq->trigger_reclaim_limit) {
+		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+			"%s(): Triggering reclamation\n", __func__);
+		rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size, NULL, NULL);
+	}
+
+	/* Check if there is space for atleast 1 resource */
+	free_size = rte_ring_free_count(dq->r);
+	if (!free_size) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Defer queue is full\n", __func__);
+		/* Note that the token generated above is not used.
+		 * Other than wasting tokens, it should not cause any
+		 * other issues.
+		 */
+		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+			"%s(): Skipped enqueuing token = %"PRIu64"\n",
+			__func__, token);
+
+		rte_errno = ENOSPC;
+		return 1;
+	}
+
+	/* Enqueue the token and resource. Generating the token
+	 * and enqueuing (token + resource) on the queue is not an
+	 * atomic operation. This might result in tokens enqueued
+	 * out of order on the queue. So, some tokens might wait
+	 * longer than they are required to be reclaimed.
+	 */
+	char data[dq->esize];
+	memcpy(data, &token, __RTE_QSBR_TOKEN_SIZE);
+	memcpy(data + __RTE_QSBR_TOKEN_SIZE, e,
+		dq->esize - __RTE_QSBR_TOKEN_SIZE);
+	/* Check the status as enqueue might fail since the other thread
+	 * might have used up the freed space.
+	 * Enqueue uses the configured flags when the DQ was created.
+	 */
+	if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Enqueue failed\n", __func__);
+		/* Note that the token generated above is not used.
+		 * Other than wasting tokens, it should not cause any
+		 * other issues.
+		 */
+		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+			"%s(): Skipped enqueuing token = %"PRIu64"\n",
+			__func__, token);
+
+		rte_errno = ENOSPC;
+		return 1;
+	}
+
+	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+		"%s(): Enqueued token = %"PRIu64"\n", __func__, token);
+
+	return 0;
+}
+
+/* Reclaim resources from the defer queue. */
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
+				unsigned int *freed, unsigned int *pending)
+{
+	uint32_t cnt;
+	uint64_t token;
+
+	if (dq == NULL || n == 0) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	cnt = 0;
+
+	char e[dq->esize];
+	/* Check reader threads quiescent state and reclaim resources */
+	while ((cnt < n) &&
+		(rte_ring_dequeue_bulk_elem_start(dq->r, e,
+					dq->esize, 1, NULL) != 0)) {
+		memcpy(&token, e, sizeof(uint64_t));
+
+		/* Reclaim the resource */
+		if (rte_rcu_qsbr_check(dq->v, token, false) != 1) {
+			rte_ring_dequeue_finish(dq->r, 0);
+			break;
+		}
+		rte_ring_dequeue_finish(dq->r, 1);
+
+		rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+			"%s(): Reclaimed token = %"PRIu64"\n",
+			__func__, *(uint64_t *)e);
+
+		dq->free_fn(dq->p, e + __RTE_QSBR_TOKEN_SIZE);
+
+		cnt++;
+	}
+
+	rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+		"%s(): Reclaimed %u resources\n", __func__, cnt);
+
+	if (freed != NULL)
+		*freed = cnt;
+	if (pending != NULL)
+		*pending = rte_ring_count(dq->r);
+
+	return 0;
+}
+
+/* Delete a defer queue. */
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
+{
+	unsigned int pending;
+
+	if (dq == NULL) {
+		rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	/* Reclaim all the resources */
+	rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending);
+	if (pending != 0) {
+		rte_errno = EAGAIN;
+
+		return 1;
+	}
+
+	rte_ring_free(dq->r);
+	rte_free(dq);
+
+	return 0;
+}
+
 int rte_rcu_log_type;
 
 RTE_INIT(rte_rcu_register)
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
index 0b5585925..213f9b029 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.h
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -34,6 +34,7 @@  extern "C" {
 #include <rte_lcore.h>
 #include <rte_debug.h>
 #include <rte_atomic.h>
+#include <rte_ring.h>
 
 extern int rte_rcu_log_type;
 
@@ -84,6 +85,7 @@  struct rte_rcu_qsbr_cnt {
 #define __RTE_QSBR_CNT_THR_OFFLINE 0
 #define __RTE_QSBR_CNT_INIT 1
 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
+#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
 
 /* RTE Quiescent State variable structure.
  * This structure has two elements that vary in size based on the
@@ -114,6 +116,84 @@  struct rte_rcu_qsbr {
 	 */
 } __rte_cache_aligned;
 
+/**
+ * Call back function called to free the resources.
+ *
+ * @param p
+ *   Pointer provided while creating the defer queue
+ * @param e
+ *   Pointer to the resource data stored on the defer queue
+ *
+ * @return
+ *   None
+ */
+typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e);
+
+#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
+
+/**
+ * Various flags supported.
+ */
+/**< Enqueue and reclaim operations are multi-thread safe by default.
+ *   The call back functions registered to free the resources are
+ *   assumed to be multi-thread safe.
+ *   Set this flag is multi-thread safety is not required.
+ */
+#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
+
+/**
+ * Parameters used when creating the defer queue.
+ */
+struct rte_rcu_qsbr_dq_parameters {
+	const char *name;
+	/**< Name of the queue. */
+	uint32_t flags;
+	/**< Flags to control API behaviors */
+	uint32_t size;
+	/**< Number of entries in queue. Typically, this will be
+	 *   the same as the maximum number of entries supported in the
+	 *   lock free data structure.
+	 *   Data structures with unbounded number of entries is not
+	 *   supported currently.
+	 */
+	uint32_t esize;
+	/**< Size (in bytes) of each element in the defer queue.
+	 *   This has to be multiple of 4B.
+	 */
+	uint32_t trigger_reclaim_limit;
+	/**< Trigger automatic reclamation after the defer queue
+	 *   has atleast these many resources waiting. This auto
+	 *   reclamation is triggered in rte_rcu_qsbr_dq_enqueue API
+	 *   call.
+	 *   If this is greater than 'size', auto reclamation is
+	 *   not triggered.
+	 *   If this is set to 0, auto reclamation is triggered
+	 *   in every call to rte_rcu_qsbr_dq_enqueue API.
+	 */
+	uint32_t max_reclaim_size;
+	/**< When automatic reclamation is enabled, reclaim at the max
+	 *   these many resources. This should contain a valid value, if
+	 *   auto reclamation is on. Setting this to 'size' or greater will
+	 *   reclaim all possible resources currently on the defer queue.
+	 */
+	rte_rcu_qsbr_free_resource_t free_fn;
+	/**< Function to call to free the resource. */
+	void *p;
+	/**< Pointer passed to the free function. Typically, this is the
+	 *   pointer to the data structure to which the resource to free
+	 *   belongs. This can be NULL.
+	 */
+	struct rte_rcu_qsbr *v;
+	/**< RCU QSBR variable to use for this defer queue */
+};
+
+/* RTE defer queue structure.
+ * This structure holds the defer queue. The defer queue is used to
+ * hold the deleted entries from the data structure that are not
+ * yet freed.
+ */
+struct rte_rcu_qsbr_dq;
+
 /**
  * @warning
  * @b EXPERIMENTAL: this API may change without prior notice
@@ -692,6 +772,114 @@  __rte_experimental
 int
 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
 
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ *
+ * @param params
+ *   Parameters to create a defer queue.
+ * @return
+ *   On success - Valid pointer to defer queue
+ *   On error - NULL
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ *   - ENOMEM - Not enough memory
+ */
+__rte_experimental
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Enqueue one resource to the defer queue and start the grace period.
+ * The resource will be freed later after at least one grace period
+ * is over.
+ *
+ * If the defer queue is full, it will attempt to reclaim resources.
+ * It will also reclaim resources at regular intervals to avoid
+ * the defer queue from growing too big.
+ *
+ * Multi-thread safety is provided as the defer queue configuration.
+ * When multi-thread safety is requested, it is possible that the
+ * resources are not stored in their order of deletion. This results
+ * in resources being held in the defer queue longer than they should.
+ *
+ * @param dq
+ *   Defer queue to allocate an entry from.
+ * @param e
+ *   Pointer to resource data to copy to the defer queue. The size of
+ *   the data to copy is equal to the element size provided when the
+ *   defer queue was created.
+ * @return
+ *   On success - 0
+ *   On error - 1 with rte_errno set to
+ *   - EINVAL - NULL parameters are passed
+ *   - ENOSPC - Defer queue is full. This condition can not happen
+ *		if the defer queue size is equal (or larger) than the
+ *		number of elements in the data structure.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Free quesed resources from the defer queue.
+ *
+ * This API is multi-thread safe.
+ *
+ * @param dq
+ *   Defer queue to free an entry from.
+ * @param n
+ *   Maximum number of resources to free.
+ * @param freed
+ *   Number of resources that were freed.
+ * @param pending
+ *   Number of resources pending on the defer queue. This number might not
+ *   be acurate if multi-thread safety is configured.
+ * @return
+ *   On successful reclamation of at least 1 resource - 0
+ *   On error - 1 with rte_errno set to
+ *   - EINVAL - NULL parameters are passed
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
+				unsigned int *freed, unsigned int *pending);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Delete a defer queue.
+ *
+ * It tries to reclaim all the resources on the defer queue.
+ * If any of the resources have not completed the grace period
+ * the reclamation stops and returns immediately. The rest of
+ * the resources are not reclaimed and the defer queue is not
+ * freed.
+ *
+ * @param dq
+ *   Defer queue to delete.
+ * @return
+ *   On success - 0
+ *   On error - 1
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ *   - EAGAIN - Some of the resources have not completed at least 1 grace
+ *		period, try again.
+ */
+__rte_experimental
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map
index f8b9ef2ab..dfac88a37 100644
--- a/lib/librte_rcu/rte_rcu_version.map
+++ b/lib/librte_rcu/rte_rcu_version.map
@@ -8,6 +8,10 @@  EXPERIMENTAL {
 	rte_rcu_qsbr_synchronize;
 	rte_rcu_qsbr_thread_register;
 	rte_rcu_qsbr_thread_unregister;
+	rte_rcu_qsbr_dq_create;
+	rte_rcu_qsbr_dq_enqueue;
+	rte_rcu_qsbr_dq_reclaim;
+	rte_rcu_qsbr_dq_delete;
 
 	local: *;
 };
diff --git a/lib/meson.build b/lib/meson.build
index 9c3cc55d5..15e91a303 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -11,7 +11,9 @@ 
 libraries = [
 	'kvargs', # eal depends on kvargs
 	'eal', # everything depends on eal
-	'ring', 'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
+	'ring',
+	'rcu', # rcu depends on ring
+	'mempool', 'mbuf', 'net', 'meter', 'ethdev', 'pci', # core
 	'cmdline',
 	'metrics', # bitrate/latency stats depends on this
 	'hash',    # efd depends on this
@@ -22,7 +24,7 @@  libraries = [
 	'gro', 'gso', 'ip_frag', 'jobstats',
 	'kni', 'latencystats', 'lpm', 'member',
 	'power', 'pdump', 'rawdev',
-	'rcu', 'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
+	'rib', 'reorder', 'sched', 'security', 'stack', 'vhost',
 	# ipsec lib depends on net, crypto and security
 	'ipsec',
 	#fib lib depends on rib