diff mbox series

[v7,1/3] rcu: add RCU library supporting QSBR mechanism

Message ID 20190423043130.18153-2-honnappa.nagarahalli@arm.com
State Superseded
Headers show
Series lib/rcu: add RCU library supporting QSBR mechanism | expand

Commit Message

Honnappa Nagarahalli April 23, 2019, 4:31 a.m. UTC
Add RCU library supporting quiescent state based memory reclamation method.
This library helps identify the quiescent state of the reader threads so
that the writers can free the memory associated with the lock less data
structures.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

Reviewed-by: Steve Capper <steve.capper@arm.com>

Reviewed-by: Gavin Hu <gavin.hu@arm.com>

Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>

Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

---
 MAINTAINERS                        |   5 +
 config/common_base                 |   6 +
 lib/Makefile                       |   2 +
 lib/librte_rcu/Makefile            |  23 ++
 lib/librte_rcu/meson.build         |   7 +
 lib/librte_rcu/rte_rcu_qsbr.c      | 268 ++++++++++++
 lib/librte_rcu/rte_rcu_qsbr.h      | 639 +++++++++++++++++++++++++++++
 lib/librte_rcu/rte_rcu_version.map |  12 +
 lib/meson.build                    |   2 +-
 mk/rte.app.mk                      |   1 +
 10 files changed, 964 insertions(+), 1 deletion(-)
 create mode 100644 lib/librte_rcu/Makefile
 create mode 100644 lib/librte_rcu/meson.build
 create mode 100644 lib/librte_rcu/rte_rcu_qsbr.c
 create mode 100644 lib/librte_rcu/rte_rcu_qsbr.h
 create mode 100644 lib/librte_rcu/rte_rcu_version.map

-- 
2.17.1

Comments

Paul E. McKenney April 23, 2019, 8:10 a.m. UTC | #1
On Mon, Apr 22, 2019 at 11:31:28PM -0500, Honnappa Nagarahalli wrote:
> Add RCU library supporting quiescent state based memory reclamation method.

> This library helps identify the quiescent state of the reader threads so

> that the writers can free the memory associated with the lock less data

> structures.

> 

> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> Reviewed-by: Steve Capper <steve.capper@arm.com>

> Reviewed-by: Gavin Hu <gavin.hu@arm.com>

> Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>

> Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>


Much better!

Acked-by: Paul E. McKenney <paulmck@linux.ibm.com>


> ---

>  MAINTAINERS                        |   5 +

>  config/common_base                 |   6 +

>  lib/Makefile                       |   2 +

>  lib/librte_rcu/Makefile            |  23 ++

>  lib/librte_rcu/meson.build         |   7 +

>  lib/librte_rcu/rte_rcu_qsbr.c      | 268 ++++++++++++

>  lib/librte_rcu/rte_rcu_qsbr.h      | 639 +++++++++++++++++++++++++++++

>  lib/librte_rcu/rte_rcu_version.map |  12 +

>  lib/meson.build                    |   2 +-

>  mk/rte.app.mk                      |   1 +

>  10 files changed, 964 insertions(+), 1 deletion(-)

>  create mode 100644 lib/librte_rcu/Makefile

>  create mode 100644 lib/librte_rcu/meson.build

>  create mode 100644 lib/librte_rcu/rte_rcu_qsbr.c

>  create mode 100644 lib/librte_rcu/rte_rcu_qsbr.h

>  create mode 100644 lib/librte_rcu/rte_rcu_version.map

> 

> diff --git a/MAINTAINERS b/MAINTAINERS

> index a08583471..ae54f37db 100644

> --- a/MAINTAINERS

> +++ b/MAINTAINERS

> @@ -1274,6 +1274,11 @@ F: examples/bpf/

>  F: app/test/test_bpf.c

>  F: doc/guides/prog_guide/bpf_lib.rst

>  

> +RCU - EXPERIMENTAL

> +M: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> +F: lib/librte_rcu/

> +F: doc/guides/prog_guide/rcu_lib.rst

> +

>  

>  Test Applications

>  -----------------

> diff --git a/config/common_base b/config/common_base

> index 7fb0dedb6..f50d26c30 100644

> --- a/config/common_base

> +++ b/config/common_base

> @@ -834,6 +834,12 @@ CONFIG_RTE_LIBRTE_LATENCY_STATS=y

>  #

>  CONFIG_RTE_LIBRTE_TELEMETRY=n

>  

> +#

> +# Compile librte_rcu

> +#

> +CONFIG_RTE_LIBRTE_RCU=y

> +CONFIG_RTE_LIBRTE_RCU_DEBUG=n

> +

>  #

>  # Compile librte_lpm

>  #

> diff --git a/lib/Makefile b/lib/Makefile

> index 26021d0c0..791e0d991 100644

> --- a/lib/Makefile

> +++ b/lib/Makefile

> @@ -111,6 +111,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_IPSEC) += librte_ipsec

>  DEPDIRS-librte_ipsec := librte_eal librte_mbuf librte_cryptodev librte_security

>  DIRS-$(CONFIG_RTE_LIBRTE_TELEMETRY) += librte_telemetry

>  DEPDIRS-librte_telemetry := librte_eal librte_metrics librte_ethdev

> +DIRS-$(CONFIG_RTE_LIBRTE_RCU) += librte_rcu

> +DEPDIRS-librte_rcu := librte_eal

>  

>  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)

>  DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni

> diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile

> new file mode 100644

> index 000000000..6aa677bd1

> --- /dev/null

> +++ b/lib/librte_rcu/Makefile

> @@ -0,0 +1,23 @@

> +# SPDX-License-Identifier: BSD-3-Clause

> +# Copyright(c) 2018 Arm Limited

> +

> +include $(RTE_SDK)/mk/rte.vars.mk

> +

> +# library name

> +LIB = librte_rcu.a

> +

> +CFLAGS += -DALLOW_EXPERIMENTAL_API

> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3

> +LDLIBS += -lrte_eal

> +

> +EXPORT_MAP := rte_rcu_version.map

> +

> +LIBABIVER := 1

> +

> +# all source are stored in SRCS-y

> +SRCS-$(CONFIG_RTE_LIBRTE_RCU) := rte_rcu_qsbr.c

> +

> +# install includes

> +SYMLINK-$(CONFIG_RTE_LIBRTE_RCU)-include := rte_rcu_qsbr.h

> +

> +include $(RTE_SDK)/mk/rte.lib.mk

> diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build

> new file mode 100644

> index 000000000..0c2d5a2e0

> --- /dev/null

> +++ b/lib/librte_rcu/meson.build

> @@ -0,0 +1,7 @@

> +# SPDX-License-Identifier: BSD-3-Clause

> +# Copyright(c) 2018 Arm Limited

> +

> +allow_experimental_apis = true

> +

> +sources = files('rte_rcu_qsbr.c')

> +headers = files('rte_rcu_qsbr.h')

> diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c

> new file mode 100644

> index 000000000..50ef0ba9a

> --- /dev/null

> +++ b/lib/librte_rcu/rte_rcu_qsbr.c

> @@ -0,0 +1,268 @@

> +/* SPDX-License-Identifier: BSD-3-Clause

> + *

> + * Copyright (c) 2018 Arm Limited

> + */

> +

> +#include <stdio.h>

> +#include <string.h>

> +#include <stdint.h>

> +#include <errno.h>

> +

> +#include <rte_common.h>

> +#include <rte_log.h>

> +#include <rte_memory.h>

> +#include <rte_malloc.h>

> +#include <rte_eal.h>

> +#include <rte_eal_memconfig.h>

> +#include <rte_atomic.h>

> +#include <rte_per_lcore.h>

> +#include <rte_lcore.h>

> +#include <rte_errno.h>

> +

> +#include "rte_rcu_qsbr.h"

> +

> +/* Get the memory size of QSBR variable */

> +size_t __rte_experimental

> +rte_rcu_qsbr_get_memsize(uint32_t max_threads)

> +{

> +	size_t sz;

> +

> +	if (max_threads == 0) {

> +		rte_log(RTE_LOG_ERR, rcu_log_type,

> +			"%s(): Invalid max_threads %u\n",

> +			__func__, max_threads);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	sz = sizeof(struct rte_rcu_qsbr);

> +

> +	/* Add the size of quiescent state counter array */

> +	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;

> +

> +	/* Add the size of the registered thread ID bitmap array */

> +	sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads);

> +

> +	return sz;

> +}

> +

> +/* Initialize a quiescent state variable */

> +int __rte_experimental

> +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)

> +{

> +	size_t sz;

> +

> +	if (v == NULL) {

> +		rte_log(RTE_LOG_ERR, rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	sz = rte_rcu_qsbr_get_memsize(max_threads);

> +	if (sz == 1)

> +		return 1;

> +

> +	/* Set all the threads to offline */

> +	memset(v, 0, sz);

> +	v->max_threads = max_threads;

> +	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,

> +			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /

> +			RTE_QSBR_THRID_ARRAY_ELM_SIZE;

> +	v->token = RTE_QSBR_CNT_INIT;

> +

> +	return 0;

> +}

> +

> +/* Register a reader thread to report its quiescent state

> + * on a QS variable.

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)

> +{

> +	unsigned int i, id, success;

> +	uint64_t old_bmap, new_bmap;

> +

> +	if (v == NULL || thread_id >= v->max_threads) {

> +		rte_log(RTE_LOG_ERR, rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",

> +				v->qsbr_cnt[thread_id].lock_cnt);

> +

> +	id = thread_id & RTE_QSBR_THRID_MASK;

> +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> +

> +	/* Make sure that the counter for registered threads does not

> +	 * go out of sync. Hence, additional checks are required.

> +	 */

> +	/* Check if the thread is already registered */

> +	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					__ATOMIC_RELAXED);

> +	if (old_bmap & 1UL << id)

> +		return 0;

> +

> +	do {

> +		new_bmap = old_bmap | (1UL << id);

> +		success = __atomic_compare_exchange(

> +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					&old_bmap, &new_bmap, 0,

> +					__ATOMIC_RELEASE, __ATOMIC_RELAXED);

> +

> +		if (success)

> +			__atomic_fetch_add(&v->num_threads,

> +						1, __ATOMIC_RELAXED);

> +		else if (old_bmap & (1UL << id))

> +			/* Someone else registered this thread.

> +			 * Counter should not be incremented.

> +			 */

> +			return 0;

> +	} while (success == 0);

> +

> +	return 0;

> +}

> +

> +/* Remove a reader thread, from the list of threads reporting their

> + * quiescent state on a QS variable.

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)

> +{

> +	unsigned int i, id, success;

> +	uint64_t old_bmap, new_bmap;

> +

> +	if (v == NULL || thread_id >= v->max_threads) {

> +		rte_log(RTE_LOG_ERR, rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",

> +				v->qsbr_cnt[thread_id].lock_cnt);

> +

> +	id = thread_id & RTE_QSBR_THRID_MASK;

> +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> +

> +	/* Make sure that the counter for registered threads does not

> +	 * go out of sync. Hence, additional checks are required.

> +	 */

> +	/* Check if the thread is already unregistered */

> +	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					__ATOMIC_RELAXED);

> +	if (old_bmap & ~(1UL << id))

> +		return 0;

> +

> +	do {

> +		new_bmap = old_bmap & ~(1UL << id);

> +		/* Make sure any loads of the shared data structure are

> +		 * completed before removal of the thread from the list of

> +		 * reporting threads.

> +		 */

> +		success = __atomic_compare_exchange(

> +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					&old_bmap, &new_bmap, 0,

> +					__ATOMIC_RELEASE, __ATOMIC_RELAXED);

> +

> +		if (success)

> +			__atomic_fetch_sub(&v->num_threads,

> +						1, __ATOMIC_RELAXED);

> +		else if (old_bmap & ~(1UL << id))

> +			/* Someone else unregistered this thread.

> +			 * Counter should not be incremented.

> +			 */

> +			return 0;

> +	} while (success == 0);

> +

> +	return 0;

> +}

> +

> +/* Wait till the reader threads have entered quiescent state. */

> +void __rte_experimental

> +rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)

> +{

> +	uint64_t t;

> +

> +	RTE_ASSERT(v != NULL);

> +

> +	t = rte_rcu_qsbr_start(v);

> +

> +	/* If the current thread has readside critical section,

> +	 * update its quiescent state status.

> +	 */

> +	if (thread_id != RTE_QSBR_THRID_INVALID)

> +		rte_rcu_qsbr_quiescent(v, thread_id);

> +

> +	/* Wait for other readers to enter quiescent state */

> +	rte_rcu_qsbr_check(v, t, true);

> +}

> +

> +/* Dump the details of a single quiescent state variable to a file. */

> +int __rte_experimental

> +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)

> +{

> +	uint64_t bmap;

> +	uint32_t i, t, id;

> +

> +	if (v == NULL || f == NULL) {

> +		rte_log(RTE_LOG_ERR, rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	fprintf(f, "\nQuiescent State Variable @%p\n", v);

> +

> +	fprintf(f, "  QS variable memory size = %lu\n",

> +				rte_rcu_qsbr_get_memsize(v->max_threads));

> +	fprintf(f, "  Given # max threads = %u\n", v->max_threads);

> +	fprintf(f, "  Current # threads = %u\n", v->num_threads);

> +

> +	fprintf(f, "  Registered thread ID mask = 0x");

> +	for (i = 0; i < v->num_elems; i++)

> +		fprintf(f, "%lx", __atomic_load_n(

> +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					__ATOMIC_ACQUIRE));

> +	fprintf(f, "\n");

> +

> +	fprintf(f, "  Token = %lu\n",

> +			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));

> +

> +	fprintf(f, "Quiescent State Counts for readers:\n");

> +	for (i = 0; i < v->num_elems; i++) {

> +		bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					__ATOMIC_ACQUIRE);

> +		id = i << RTE_QSBR_THRID_INDEX_SHIFT;

> +		while (bmap) {

> +			t = __builtin_ctzl(bmap);

> +			fprintf(f, "thread ID = %d, count = %lu, lock count = %u\n",

> +				id + t,

> +				__atomic_load_n(

> +					&v->qsbr_cnt[id + t].cnt,

> +					__ATOMIC_RELAXED),

> +				__atomic_load_n(

> +					&v->qsbr_cnt[id + t].lock_cnt,

> +					__ATOMIC_RELAXED));

> +			bmap &= ~(1UL << t);

> +		}

> +	}

> +

> +	return 0;

> +}

> +

> +int rcu_log_type;

> +

> +RTE_INIT(rte_rcu_register)

> +{

> +	rcu_log_type = rte_log_register("lib.rcu");

> +	if (rcu_log_type >= 0)

> +		rte_log_set_level(rcu_log_type, RTE_LOG_ERR);

> +}

> diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h

> new file mode 100644

> index 000000000..58fb95ed0

> --- /dev/null

> +++ b/lib/librte_rcu/rte_rcu_qsbr.h

> @@ -0,0 +1,639 @@

> +/* SPDX-License-Identifier: BSD-3-Clause

> + * Copyright (c) 2018 Arm Limited

> + */

> +

> +#ifndef _RTE_RCU_QSBR_H_

> +#define _RTE_RCU_QSBR_H_

> +

> +/**

> + * @file

> + * RTE Quiescent State Based Reclamation (QSBR)

> + *

> + * Quiescent State (QS) is any point in the thread execution

> + * where the thread does not hold a reference to a data structure

> + * in shared memory. While using lock-less data structures, the writer

> + * can safely free memory once all the reader threads have entered

> + * quiescent state.

> + *

> + * This library provides the ability for the readers to report quiescent

> + * state and for the writers to identify when all the readers have

> + * entered quiescent state.

> + */

> +

> +#ifdef __cplusplus

> +extern "C" {

> +#endif

> +

> +#include <stdio.h>

> +#include <stdint.h>

> +#include <errno.h>

> +#include <rte_common.h>

> +#include <rte_memory.h>

> +#include <rte_lcore.h>

> +#include <rte_debug.h>

> +#include <rte_atomic.h>

> +

> +extern int rcu_log_type;

> +

> +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG

> +#define RCU_DP_LOG(level, fmt, args...) \

> +	rte_log(RTE_LOG_ ## level, rcu_log_type, \

> +		"%s(): " fmt "\n", __func__, ## args)

> +#else

> +#define RCU_DP_LOG(level, fmt, args...)

> +#endif

> +

> +#if defined(RTE_LIBRTE_RCU_DEBUG)

> +#define RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\

> +	if (v->qsbr_cnt[thread_id].lock_cnt) \

> +		rte_log(RTE_LOG_ ## level, rcu_log_type, \

> +			"%s(): " fmt "\n", __func__, ## args); \

> +} while (0)

> +#else

> +#define RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)

> +#endif

> +

> +/* Registered thread IDs are stored as a bitmap of 64b element array.

> + * Given thread id needs to be converted to index into the array and

> + * the id within the array element.

> + */

> +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)

> +#define RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \

> +	RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \

> +		RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)

> +#define RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \

> +	((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)

> +#define RTE_QSBR_THRID_INDEX_SHIFT 6

> +#define RTE_QSBR_THRID_MASK 0x3f

> +#define RTE_QSBR_THRID_INVALID 0xffffffff

> +

> +/* Worker thread counter */

> +struct rte_rcu_qsbr_cnt {

> +	uint64_t cnt;

> +	/**< Quiescent state counter. Value 0 indicates the thread is offline

> +	 *   64b counter is used to avoid adding more code to address

> +	 *   counter overflow. Changing this to 32b would require additional

> +	 *   changes to various APIs.

> +	 */

> +	uint32_t lock_cnt;

> +	/**< Lock counter. Used when CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled */

> +} __rte_cache_aligned;

> +

> +#define RTE_QSBR_CNT_THR_OFFLINE 0

> +#define RTE_QSBR_CNT_INIT 1

> +

> +/* RTE Quiescent State variable structure.

> + * This structure has two elements that vary in size based on the

> + * 'max_threads' parameter.

> + * 1) Quiescent state counter array

> + * 2) Register thread ID array

> + */

> +struct rte_rcu_qsbr {

> +	uint64_t token __rte_cache_aligned;

> +	/**< Counter to allow for multiple concurrent quiescent state queries */

> +

> +	uint32_t num_elems __rte_cache_aligned;

> +	/**< Number of elements in the thread ID array */

> +	uint32_t num_threads;

> +	/**< Number of threads currently using this QS variable */

> +	uint32_t max_threads;

> +	/**< Maximum number of threads using this QS variable */

> +

> +	struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;

> +	/**< Quiescent state counter array of 'max_threads' elements */

> +

> +	/**< Registered thread IDs are stored in a bitmap array,

> +	 *   after the quiescent state counter array.

> +	 */

> +} __rte_cache_aligned;

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Return the size of the memory occupied by a Quiescent State variable.

> + *

> + * @param max_threads

> + *   Maximum number of threads reporting quiescent state on this variable.

> + * @return

> + *   On success - size of memory in bytes required for this QS variable.

> + *   On error - 1 with error code set in rte_errno.

> + *   Possible rte_errno codes are:

> + *   - EINVAL - max_threads is 0

> + */

> +size_t __rte_experimental

> +rte_rcu_qsbr_get_memsize(uint32_t max_threads);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Initialize a Quiescent State (QS) variable.

> + *

> + * @param v

> + *   QS variable

> + * @param max_threads

> + *   Maximum number of threads reporting quiescent state on this variable.

> + *   This should be the same value as passed to rte_rcu_qsbr_get_memsize.

> + * @return

> + *   On success - 0

> + *   On error - 1 with error code set in rte_errno.

> + *   Possible rte_errno codes are:

> + *   - EINVAL - max_threads is 0 or 'v' is NULL.

> + *

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Register a reader thread to report its quiescent state

> + * on a QS variable.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe.

> + * Any reader thread that wants to report its quiescent state must

> + * call this API. This can be called during initialization or as part

> + * of the packet processing loop.

> + *

> + * Note that rte_rcu_qsbr_thread_online must be called before the

> + * thread updates its quiescent state using rte_rcu_qsbr_quiescent.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Reader thread with this thread ID will report its quiescent state on

> + *   the QS variable. thread_id is a value between 0 and (max_threads - 1).

> + *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Remove a reader thread, from the list of threads reporting their

> + * quiescent state on a QS variable.

> + *

> + * This is implemented as a lock-free function. It is multi-thread safe.

> + * This API can be called from the reader threads during shutdown.

> + * Ongoing quiescent state queries will stop waiting for the status from this

> + * unregistered reader thread.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Reader thread with this thread ID will stop reporting its quiescent

> + *   state on the QS variable.

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Add a registered reader thread, to the list of threads reporting their

> + * quiescent state on a QS variable.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe.

> + *

> + * Any registered reader thread that wants to report its quiescent state must

> + * call this API before calling rte_rcu_qsbr_quiescent. This can be called

> + * during initialization or as part of the packet processing loop.

> + *

> + * The reader thread must call rte_rcu_thread_offline API, before

> + * calling any functions that block, to ensure that rte_rcu_qsbr_check

> + * API does not wait indefinitely for the reader thread to update its QS.

> + *

> + * The reader thread must call rte_rcu_thread_online API, after the blocking

> + * function call returns, to ensure that rte_rcu_qsbr_check API

> + * waits for the reader thread to update its quiescent state.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Reader thread with this thread ID will report its quiescent state on

> + *   the QS variable.

> + */

> +static __rte_always_inline void __rte_experimental

> +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)

> +{

> +	uint64_t t;

> +

> +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> +

> +	RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",

> +				v->qsbr_cnt[thread_id].lock_cnt);

> +

> +	/* Copy the current value of token.

> +	 * The fence at the end of the function will ensure that

> +	 * the following will not move down after the load of any shared

> +	 * data structure.

> +	 */

> +	t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);

> +

> +	/* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure

> +	 * 'cnt' (64b) is accessed atomically.

> +	 */

> +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> +		t, __ATOMIC_RELAXED);

> +

> +	/* The subsequent load of the data structure should not

> +	 * move above the store. Hence a store-load barrier

> +	 * is required.

> +	 * If the load of the data structure moves above the store,

> +	 * writer might not see that the reader is online, even though

> +	 * the reader is referencing the shared data structure.

> +	 */

> +#ifdef RTE_ARCH_X86_64

> +	/* rte_smp_mb() for x86 is lighter */

> +	rte_smp_mb();

> +#else

> +	__atomic_thread_fence(__ATOMIC_SEQ_CST);

> +#endif

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Remove a registered reader thread from the list of threads reporting their

> + * quiescent state on a QS variable.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe.

> + *

> + * This can be called during initialization or as part of the packet

> + * processing loop.

> + *

> + * The reader thread must call rte_rcu_thread_offline API, before

> + * calling any functions that block, to ensure that rte_rcu_qsbr_check

> + * API does not wait indefinitely for the reader thread to update its QS.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   rte_rcu_qsbr_check API will not wait for the reader thread with

> + *   this thread ID to report its quiescent state on the QS variable.

> + */

> +static __rte_always_inline void __rte_experimental

> +rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)

> +{

> +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> +

> +	RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",

> +				v->qsbr_cnt[thread_id].lock_cnt);

> +

> +	/* The reader can go offline only after the load of the

> +	 * data structure is completed. i.e. any load of the

> +	 * data strcture can not move after this store.

> +	 */

> +

> +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> +		RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Acquire a lock for accessing a shared data structure.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe.

> + *

> + * This API is provided to aid debugging. This should be called before

> + * accessing a shared data structure.

> + *

> + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented.

> + * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the

> + * rte_rcu_qsbr_check API will verify that this counter is 0.

> + *

> + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Reader thread id

> + */

> +static __rte_always_inline void __rte_experimental

> +rte_rcu_qsbr_lock(struct rte_rcu_qsbr *v __rte_unused,

> +			unsigned int thread_id __rte_unused)

> +{

> +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> +

> +#if defined(RTE_LIBRTE_RCU_DEBUG)

> +	/* Increment the lock counter */

> +	__atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,

> +				1, __ATOMIC_ACQUIRE);

> +#endif

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Release a lock after accessing a shared data structure.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe.

> + *

> + * This API is provided to aid debugging. This should be called after

> + * accessing a shared data structure.

> + *

> + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will

> + * decrement a lock counter. rte_rcu_qsbr_check API will verify that this

> + * counter is 0.

> + *

> + * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Reader thread id

> + */

> +static __rte_always_inline void __rte_experimental

> +rte_rcu_qsbr_unlock(struct rte_rcu_qsbr *v __rte_unused,

> +			unsigned int thread_id __rte_unused)

> +{

> +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> +

> +#if defined(RTE_LIBRTE_RCU_DEBUG)

> +	/* Decrement the lock counter */

> +	__atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,

> +				1, __ATOMIC_RELEASE);

> +

> +	RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,

> +				"Lock counter %u. Nested locks?\n",

> +				v->qsbr_cnt[thread_id].lock_cnt);

> +#endif

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Ask the reader threads to report the quiescent state

> + * status.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe and can be called from worker threads.

> + *

> + * @param v

> + *   QS variable

> + * @return

> + *   - This is the token for this call of the API. This should be

> + *     passed to rte_rcu_qsbr_check API.

> + */

> +static __rte_always_inline uint64_t __rte_experimental

> +rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)

> +{

> +	uint64_t t;

> +

> +	RTE_ASSERT(v != NULL);

> +

> +	/* Release the changes to the shared data structure.

> +	 * This store release will ensure that changes to any data

> +	 * structure are visible to the workers before the token

> +	 * update is visible.

> +	 */

> +	t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);

> +

> +	return t;

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Update quiescent state for a reader thread.

> + *

> + * This is implemented as a lock-free function. It is multi-thread safe.

> + * All the reader threads registered to report their quiescent state

> + * on the QS variable must call this API.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Update the quiescent state for the reader with this thread ID.

> + */

> +static __rte_always_inline void __rte_experimental

> +rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)

> +{

> +	uint64_t t;

> +

> +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> +

> +	RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",

> +				v->qsbr_cnt[thread_id].lock_cnt);

> +

> +	/* Acquire the changes to the shared data structure released

> +	 * by rte_rcu_qsbr_start.

> +	 * Later loads of the shared data structure should not move

> +	 * above this load. Hence, use load-acquire.

> +	 */

> +	t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);

> +

> +	/* Inform the writer that updates are visible to this reader.

> +	 * Prior loads of the shared data structure should not move

> +	 * beyond this store. Hence use store-release.

> +	 */

> +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> +			 t, __ATOMIC_RELEASE);

> +

> +	RCU_DP_LOG(DEBUG, "%s: update: token = %lu, Thread ID = %d",

> +		__func__, t, thread_id);

> +}

> +

> +/* Check the quiescent state counter for registered threads only, assuming

> + * that not all threads have registered.

> + */

> +static __rte_always_inline int

> +__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)

> +{

> +	uint32_t i, j, id;

> +	uint64_t bmap;

> +	uint64_t c;

> +	uint64_t *reg_thread_id;

> +

> +	for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v, 0);

> +		i < v->num_elems;

> +		i++, reg_thread_id++) {

> +		/* Load the current registered thread bit map before

> +		 * loading the reader thread quiescent state counters.

> +		 */

> +		bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);

> +		id = i << RTE_QSBR_THRID_INDEX_SHIFT;

> +

> +		while (bmap) {

> +			j = __builtin_ctzl(bmap);

> +			RCU_DP_LOG(DEBUG,

> +				"%s: check: token = %lu, wait = %d, Bit Map = 0x%lx, Thread ID = %d",

> +				__func__, t, wait, bmap, id + j);

> +			c = __atomic_load_n(

> +					&v->qsbr_cnt[id + j].cnt,

> +					__ATOMIC_ACQUIRE);

> +			RCU_DP_LOG(DEBUG,

> +				"%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d",

> +				__func__, t, wait, c, id+j);

> +			/* Counter is not checked for wrap-around condition

> +			 * as it is a 64b counter.

> +			 */

> +			if (unlikely(c != RTE_QSBR_CNT_THR_OFFLINE && c < t)) {

> +				/* This thread is not in quiescent state */

> +				if (!wait)

> +					return 0;

> +

> +				rte_pause();

> +				/* This thread might have unregistered.

> +				 * Re-read the bitmap.

> +				 */

> +				bmap = __atomic_load_n(reg_thread_id,

> +						__ATOMIC_ACQUIRE);

> +

> +				continue;

> +			}

> +

> +			bmap &= ~(1UL << j);

> +		}

> +	}

> +

> +	return 1;

> +}

> +

> +/* Check the quiescent state counter for all threads, assuming that

> + * all the threads have registered.

> + */

> +static __rte_always_inline int

> +__rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)

> +{

> +	uint32_t i;

> +	struct rte_rcu_qsbr_cnt *cnt;

> +	uint64_t c;

> +

> +	for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {

> +		RCU_DP_LOG(DEBUG,

> +			"%s: check: token = %lu, wait = %d, Thread ID = %d",

> +			__func__, t, wait, i);

> +		while (1) {

> +			c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);

> +			RCU_DP_LOG(DEBUG,

> +				"%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d",

> +				__func__, t, wait, c, i);

> +			/* Counter is not checked for wrap-around condition

> +			 * as it is a 64b counter.

> +			 */

> +			if (likely(c == RTE_QSBR_CNT_THR_OFFLINE || c >= t))

> +				break;

> +

> +			/* This thread is not in quiescent state */

> +			if (!wait)

> +				return 0;

> +

> +			rte_pause();

> +		}

> +	}

> +

> +	return 1;

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Checks if all the reader threads have entered the quiescent state

> + * referenced by token.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe and can be called from the worker threads as well.

> + *

> + * If this API is called with 'wait' set to true, the following

> + * factors must be considered:

> + *

> + * 1) If the calling thread is also reporting the status on the

> + * same QS variable, it must update the quiescent state status, before

> + * calling this API.

> + *

> + * 2) In addition, while calling from multiple threads, only

> + * one of those threads can be reporting the quiescent state status

> + * on a given QS variable.

> + *

> + * @param v

> + *   QS variable

> + * @param t

> + *   Token returned by rte_rcu_qsbr_start API

> + * @param wait

> + *   If true, block till all the reader threads have completed entering

> + *   the quiescent state referenced by token 't'.

> + * @return

> + *   - 0 if all reader threads have NOT passed through specified number

> + *     of quiescent states.

> + *   - 1 if all reader threads have passed through specified number

> + *     of quiescent states.

> + */

> +static __rte_always_inline int __rte_experimental

> +rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)

> +{

> +	RTE_ASSERT(v != NULL);

> +

> +	if (likely(v->num_threads == v->max_threads))

> +		return __rcu_qsbr_check_all(v, t, wait);

> +	else

> +		return __rcu_qsbr_check_selective(v, t, wait);

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Wait till the reader threads have entered quiescent state.

> + *

> + * This is implemented as a lock-free function. It is multi-thread safe.

> + * This API can be thought of as a wrapper around rte_rcu_qsbr_start and

> + * rte_rcu_qsbr_check APIs.

> + *

> + * If this API is called from multiple threads, only one of

> + * those threads can be reporting the quiescent state status on a

> + * given QS variable.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Thread ID of the caller if it is registered to report quiescent state

> + *   on this QS variable (i.e. the calling thread is also part of the

> + *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.

> + */

> +void __rte_experimental

> +rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Dump the details of a single QS variables to a file.

> + *

> + * It is NOT multi-thread safe.

> + *

> + * @param f

> + *   A pointer to a file for output

> + * @param v

> + *   QS variable

> + * @return

> + *   On success - 0

> + *   On error - 1 with error code set in rte_errno.

> + *   Possible rte_errno codes are:

> + *   - EINVAL - NULL parameters are passed

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);

> +

> +#ifdef __cplusplus

> +}

> +#endif

> +

> +#endif /* _RTE_RCU_QSBR_H_ */

> diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map

> new file mode 100644

> index 000000000..5ea8524db

> --- /dev/null

> +++ b/lib/librte_rcu/rte_rcu_version.map

> @@ -0,0 +1,12 @@

> +EXPERIMENTAL {

> +	global:

> +

> +	rte_rcu_qsbr_get_memsize;

> +	rte_rcu_qsbr_init;

> +	rte_rcu_qsbr_thread_register;

> +	rte_rcu_qsbr_thread_unregister;

> +	rte_rcu_qsbr_synchronize;

> +	rte_rcu_qsbr_dump;

> +

> +	local: *;

> +};

> diff --git a/lib/meson.build b/lib/meson.build

> index 595314d7d..67be10659 100644

> --- a/lib/meson.build

> +++ b/lib/meson.build

> @@ -22,7 +22,7 @@ libraries = [

>  	'gro', 'gso', 'ip_frag', 'jobstats',

>  	'kni', 'latencystats', 'lpm', 'member',

>  	'power', 'pdump', 'rawdev',

> -	'reorder', 'sched', 'security', 'stack', 'vhost',

> +	'reorder', 'sched', 'security', 'stack', 'vhost', 'rcu',

>  	#ipsec lib depends on crypto and security

>  	'ipsec',

>  	# add pkt framework libs which use other libs from above

> diff --git a/mk/rte.app.mk b/mk/rte.app.mk

> index abea16d48..ebe6d48a7 100644

> --- a/mk/rte.app.mk

> +++ b/mk/rte.app.mk

> @@ -97,6 +97,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_EAL)            += -lrte_eal

>  _LDLIBS-$(CONFIG_RTE_LIBRTE_CMDLINE)        += -lrte_cmdline

>  _LDLIBS-$(CONFIG_RTE_LIBRTE_REORDER)        += -lrte_reorder

>  _LDLIBS-$(CONFIG_RTE_LIBRTE_SCHED)          += -lrte_sched

> +_LDLIBS-$(CONFIG_RTE_LIBRTE_RCU)            += -lrte_rcu

>  

>  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)

>  _LDLIBS-$(CONFIG_RTE_LIBRTE_KNI)            += -lrte_kni

> -- 

> 2.17.1

>
Honnappa Nagarahalli April 23, 2019, 9:23 p.m. UTC | #2
> 

> On Mon, Apr 22, 2019 at 11:31:28PM -0500, Honnappa Nagarahalli wrote:

> > Add RCU library supporting quiescent state based memory reclamation

> method.

> > This library helps identify the quiescent state of the reader threads

> > so that the writers can free the memory associated with the lock less

> > data structures.

> >

> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> > Reviewed-by: Steve Capper <steve.capper@arm.com>

> > Reviewed-by: Gavin Hu <gavin.hu@arm.com>

> > Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>

> > Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

> 

> Much better!

> 

> Acked-by: Paul E. McKenney <paulmck@linux.ibm.com>

> 

Thanks a lot, appreciate your feedback.

Any views from maintainers on including this library into RC3? IMO, this library is independent and should not affect existing code.
Ruifeng Wang April 24, 2019, 10:03 a.m. UTC | #3
Hi Honnappa,

> -----Original Message-----

> From: dev <dev-bounces@dpdk.org> On Behalf Of Honnappa Nagarahalli

> Sent: Tuesday, April 23, 2019 12:31

> To: konstantin.ananyev@intel.com; stephen@networkplumber.org;

> paulmck@linux.ibm.com; marko.kovacevic@intel.com; dev@dpdk.org

> Cc: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>; Gavin Hu (Arm

> Technology China) <Gavin.Hu@arm.com>; Dharmik Thakkar

> <Dharmik.Thakkar@arm.com>; Malvika Gupta <Malvika.Gupta@arm.com>

> Subject: [dpdk-dev] [PATCH v7 1/3] rcu: add RCU library supporting QSBR

> mechanism

> 


*snip*

> +int __rte_experimental

> +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int

> +thread_id) {

> +	unsigned int i, id, success;

> +	uint64_t old_bmap, new_bmap;

> +

> +	if (v == NULL || thread_id >= v->max_threads) {

> +		rte_log(RTE_LOG_ERR, rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",

> +				v->qsbr_cnt[thread_id].lock_cnt);

> +

> +	id = thread_id & RTE_QSBR_THRID_MASK;

> +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> +

> +	/* Make sure that the counter for registered threads does not

> +	 * go out of sync. Hence, additional checks are required.

> +	 */

> +	/* Check if the thread is already unregistered */

> +	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					__ATOMIC_RELAXED);

> +	if (old_bmap & ~(1UL << id))


If I understand correctly, here should be (!(old_bmap & 1UL << id))
Can you please check?

> +		return 0;

> +

> +	do {

> +		new_bmap = old_bmap & ~(1UL << id);

> +		/* Make sure any loads of the shared data structure are

> +		 * completed before removal of the thread from the list of

> +		 * reporting threads.

> +		 */

> +		success = __atomic_compare_exchange(

> +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					&old_bmap, &new_bmap, 0,

> +					__ATOMIC_RELEASE,

> __ATOMIC_RELAXED);

> +

> +		if (success)

> +			__atomic_fetch_sub(&v->num_threads,

> +						1, __ATOMIC_RELAXED);

> +		else if (old_bmap & ~(1UL << id))


Same comment as previous.

> +			/* Someone else unregistered this thread.

> +			 * Counter should not be incremented.

> +			 */

> +			return 0;

> +	} while (success == 0);

> +

> +	return 0;

> +}


*snip*
Jerin Jacob Kollanukkaran April 24, 2019, 8:02 p.m. UTC | #4
> -----Original Message-----

> From: dev <dev-bounces@dpdk.org> On Behalf Of Honnappa Nagarahalli

> Sent: Wednesday, April 24, 2019 2:53 AM

> To: paulmck@linux.ibm.com

> Cc: konstantin.ananyev@intel.com; stephen@networkplumber.org;

> marko.kovacevic@intel.com; dev@dpdk.org; Gavin Hu (Arm Technology China)

> <Gavin.Hu@arm.com>; Dharmik Thakkar <Dharmik.Thakkar@arm.com>;

> Malvika Gupta <Malvika.Gupta@arm.com>; Honnappa Nagarahalli

> <Honnappa.Nagarahalli@arm.com>; bruce.richardson@intel.com; nd

> <nd@arm.com>; thomas@monjalon.net; nd <nd@arm.com>

> Subject: Re: [dpdk-dev] [PATCH v7 1/3] rcu: add RCU library supporting QSBR

> mechanism

> 

> >

> > On Mon, Apr 22, 2019 at 11:31:28PM -0500, Honnappa Nagarahalli wrote:

> > > Add RCU library supporting quiescent state based memory reclamation

> > method.

> > > This library helps identify the quiescent state of the reader

> > > threads so that the writers can free the memory associated with the

> > > lock less data structures.

> > >

> > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> > > Reviewed-by: Steve Capper <steve.capper@arm.com>

> > > Reviewed-by: Gavin Hu <gavin.hu@arm.com>

> > > Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>

> > > Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

> >

> > Much better!

> >

> > Acked-by: Paul E. McKenney <paulmck@linux.ibm.com>

> >

> Thanks a lot, appreciate your feedback.

> 

> Any views from maintainers on including this library into RC3? IMO, this library is

> independent and should not affect existing code.


Tested rcu_qsbr_autotest and rcu_qsbr_perf_autotest UT on a armv8.2 machine(octeontx2).
Found rcu_qsbr_perf_autotest() runs successfully on 24 cores.
There is come issue with rcu_qsbr_autotest on 24 cores. It works fine upto 20 cores.

Please find below the success log, failure log and core dump.

[master][dpdk.org] $ echo "rcu_qsbr_autotest" | sudo ./build/app/test -c 0xfffff0
EAL: Detected 24 lcore(s)
EAL: Detected 1 NUMA nodes
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: No available hugepages reported in hugepages-2048kB
EAL: Probing VFIO support...
EAL: VFIO support initialized
APP: HPET is not enabled, using TSC as default timer
RTE>>rcu_qsbr_autotest

Test rte_rcu_qsbr_thread_register()
rte_rcu_qsbr_get_memsize(): Invalid max_threads 0

Test rte_rcu_qsbr_init()
rte_rcu_qsbr_init(): Invalid input parameter

Test rte_rcu_qsbr_thread_register()
rte_rcu_qsbr_thread_register(): Invalid input parameter
rte_rcu_qsbr_thread_register(): Invalid input parameter
rte_rcu_qsbr_thread_register(): Invalid input parameter

Test rte_rcu_qsbr_thread_unregister()
rte_rcu_qsbr_thread_unregister(): Invalid input parameter
rte_rcu_qsbr_thread_unregister(): Invalid input parameter
rte_rcu_qsbr_thread_unregister(): Invalid input parameter

Test rte_rcu_qsbr_start()

Test rte_rcu_qsbr_check()
Test rte_rcu_qsbr_synchronize()

Test rte_rcu_qsbr_dump()
rte_rcu_qsbr_dump(): Invalid input parameter
rte_rcu_qsbr_dump(): Invalid input parameter
rte_rcu_qsbr_dump(): Invalid input parameter

Quiescent State Variable @0x13ff94100
  QS variable memory size = 16768
  Given # max threads = 128
  Current # threads = 0
  Registered thread ID mask = 0x00
  Token = 1
Quiescent State Counts for readers:

Quiescent State Variable @0x13ff94100
  QS variable memory size = 16768
  Given # max threads = 128
  Current # threads = 1
  Registered thread ID mask = 0x200
  Token = 1
Quiescent State Counts for readers:
thread ID = 5, count = 0, lock count = 0

Quiescent State Variable @0x13ff8ff00
  QS variable memory size = 16768
  Given # max threads = 128
  Current # threads = 2
  Registered thread ID mask = 0xc00
  Token = 1
Quiescent State Counts for readers:
thread ID = 6, count = 0, lock count = 0
thread ID = 7, count = 0, lock count = 0

Test rte_rcu_qsbr_thread_online()

Test rte_rcu_qsbr_thread_offline()

Functional tests
Test: 1 writer, 1 QSBR variable, simultaneous QSBR queries
Test: 8 writers, 4 QSBR variable, simultaneous QSBR queries

Test OK

[master] [dpdk.org] $ echo "rcu_qsbr_autotest" | sudo ./build/app/test -c 0xffffff
EAL: Detected 24 lcore(s)
EAL: Detected 1 NUMA nodes
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: No available hugepages reported in hugepages-2048kB
EAL: Probing VFIO support...
EAL: VFIO support initialized
APP: HPET is not enabled, using TSC as default timer
RTE>>rcu_qsbr_autotest

Test rte_rcu_qsbr_thread_register()
rte_rcu_qsbr_get_memsize(): Invalid max_threads 0

Test rte_rcu_qsbr_init()
rte_rcu_qsbr_init(): Invalid input parameter

Test rte_rcu_qsbr_thread_register()
rte_rcu_qsbr_thread_register(): Invalid input parameter
rte_rcu_qsbr_thread_register(): Invalid input parameter
rte_rcu_qsbr_thread_register(): Invalid input parameter

Test rte_rcu_qsbr_thread_unregister()
rte_rcu_qsbr_thread_unregister(): Invalid input parameter
rte_rcu_qsbr_thread_unregister(): Invalid input parameter
rte_rcu_qsbr_thread_unregister(): Invalid input parameter

Test rte_rcu_qsbr_start()

Test rte_rcu_qsbr_check()

Test rte_rcu_qsbr_synchronize()

Test rte_rcu_qsbr_dump()
rte_rcu_qsbr_dump(): Invalid input parameter
rte_rcu_qsbr_dump(): Invalid input parameter
rte_rcu_qsbr_dump(): Invalid input parameter

Quiescent State Variable @0x13ff94100
  QS variable memory size = 16768
  Given # max threads = 128
  Current # threads = 0
  Registered thread ID mask = 0x00
  Token = 1
Quiescent State Counts for readers:

Quiescent State Variable @0x13ff94100
  QS variable memory size = 16768
  Given # max threads = 128
  Current # threads = 1
  Registered thread ID mask = 0x20
  Token = 1
Quiescent State Counts for readers:
thread ID = 1, count = 0, lock count = 0

Quiescent State Variable @0x13ff8ff00
  QS variable memory size = 16768
  Given # max threads = 128
  Current # threads = 2
  Registered thread ID mask = 0xc0
  Token = 1
Quiescent State Counts for readers:
thread ID = 2, count = 0, lock count = 0
thread ID = 3, count = 0, lock count = 0

Test rte_rcu_qsbr_thread_online()

Test rte_rcu_qsbr_thread_offline()

Functional tests
Test: 1 writer, 1 QSBR variable, simultaneous QSBR queries
Test: 10 writers, 5 QSBR variable, simultaneous QSBR queries
rte_rcu_qsbr_init(): Invalid input parameter
rte_rcu_qsbr_thread_register(): Invalid input parameter
rte_rcu_qsbr_thread_register(): Invalid input parameter
Segmentation fault


[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".
Core was generated by `./build/app/test -c 0xffffff'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0  rte_rcu_qsbr_thread_online (thread_id=<optimized out>, v=0x0) at /home/jerin/dpdk.org/build/include/rte_rcu_qsbr.h:238
/home/jerin/dpdk.org/lib/librte_rcu/rte_rcu_qsbr.h:238:7712:beg:0x564df4
[Current thread is 1 (Thread 0xffff9c06d900 (LWP 1938))]
(gdb) bt
#0  rte_rcu_qsbr_thread_online (thread_id=<optimized out>, v=0x0) at /home/jerin/dpdk.org/build/include/rte_rcu_qsbr.h:238
#1  test_rcu_qsbr_reader (arg=<optimized out>) at /home/jerin/dpdk.org/app/test/test_rcu_qsbr.c:641
#2  0x0000000000652430 in eal_thread_loop (arg=<optimized out>) at /home/jerin/dpdk.org/lib/librte_eal/linux/eal/eal_thread.c:153
#3  0x0000ffffa13a756c in start_thread () from /usr/lib/libpthread.so.0
#4  0x0000ffffa11e301c in thread_start () from /usr/lib/libc.so.6
Honnappa Nagarahalli April 25, 2019, 5:15 a.m. UTC | #5
> > >

> > > On Mon, Apr 22, 2019 at 11:31:28PM -0500, Honnappa Nagarahalli

> wrote:

> > > > Add RCU library supporting quiescent state based memory

> > > > reclamation

> > > method.

> > > > This library helps identify the quiescent state of the reader

> > > > threads so that the writers can free the memory associated with

> > > > the lock less data structures.

> > > >

> > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> > > > Reviewed-by: Steve Capper <steve.capper@arm.com>

> > > > Reviewed-by: Gavin Hu <gavin.hu@arm.com>

> > > > Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>

> > > > Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

> > >

> > > Much better!

> > >

> > > Acked-by: Paul E. McKenney <paulmck@linux.ibm.com>

> > >

> > Thanks a lot, appreciate your feedback.

> >

> > Any views from maintainers on including this library into RC3? IMO,

> > this library is independent and should not affect existing code.

> 

> Tested rcu_qsbr_autotest and rcu_qsbr_perf_autotest UT on a armv8.2

> machine(octeontx2).

> Found rcu_qsbr_perf_autotest() runs successfully on 24 cores.

> There is come issue with rcu_qsbr_autotest on 24 cores. It works fine upto 20

> cores.

Thanks Jerin for running the test cases. I could reproduce the issue. It is a test case issue, will fix in next version.
diff mbox series

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index a08583471..ae54f37db 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1274,6 +1274,11 @@  F: examples/bpf/
 F: app/test/test_bpf.c
 F: doc/guides/prog_guide/bpf_lib.rst
 
+RCU - EXPERIMENTAL
+M: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
+F: lib/librte_rcu/
+F: doc/guides/prog_guide/rcu_lib.rst
+
 
 Test Applications
 -----------------
diff --git a/config/common_base b/config/common_base
index 7fb0dedb6..f50d26c30 100644
--- a/config/common_base
+++ b/config/common_base
@@ -834,6 +834,12 @@  CONFIG_RTE_LIBRTE_LATENCY_STATS=y
 #
 CONFIG_RTE_LIBRTE_TELEMETRY=n
 
+#
+# Compile librte_rcu
+#
+CONFIG_RTE_LIBRTE_RCU=y
+CONFIG_RTE_LIBRTE_RCU_DEBUG=n
+
 #
 # Compile librte_lpm
 #
diff --git a/lib/Makefile b/lib/Makefile
index 26021d0c0..791e0d991 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -111,6 +111,8 @@  DIRS-$(CONFIG_RTE_LIBRTE_IPSEC) += librte_ipsec
 DEPDIRS-librte_ipsec := librte_eal librte_mbuf librte_cryptodev librte_security
 DIRS-$(CONFIG_RTE_LIBRTE_TELEMETRY) += librte_telemetry
 DEPDIRS-librte_telemetry := librte_eal librte_metrics librte_ethdev
+DIRS-$(CONFIG_RTE_LIBRTE_RCU) += librte_rcu
+DEPDIRS-librte_rcu := librte_eal
 
 ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)
 DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni
diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile
new file mode 100644
index 000000000..6aa677bd1
--- /dev/null
+++ b/lib/librte_rcu/Makefile
@@ -0,0 +1,23 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Arm Limited
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_rcu.a
+
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
+LDLIBS += -lrte_eal
+
+EXPORT_MAP := rte_rcu_version.map
+
+LIBABIVER := 1
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_RCU) := rte_rcu_qsbr.c
+
+# install includes
+SYMLINK-$(CONFIG_RTE_LIBRTE_RCU)-include := rte_rcu_qsbr.h
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
new file mode 100644
index 000000000..0c2d5a2e0
--- /dev/null
+++ b/lib/librte_rcu/meson.build
@@ -0,0 +1,7 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Arm Limited
+
+allow_experimental_apis = true
+
+sources = files('rte_rcu_qsbr.c')
+headers = files('rte_rcu_qsbr.h')
diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
new file mode 100644
index 000000000..50ef0ba9a
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -0,0 +1,268 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2018 Arm Limited
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdint.h>
+#include <errno.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_malloc.h>
+#include <rte_eal.h>
+#include <rte_eal_memconfig.h>
+#include <rte_atomic.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_errno.h>
+
+#include "rte_rcu_qsbr.h"
+
+/* Get the memory size of QSBR variable */
+size_t __rte_experimental
+rte_rcu_qsbr_get_memsize(uint32_t max_threads)
+{
+	size_t sz;
+
+	if (max_threads == 0) {
+		rte_log(RTE_LOG_ERR, rcu_log_type,
+			"%s(): Invalid max_threads %u\n",
+			__func__, max_threads);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	sz = sizeof(struct rte_rcu_qsbr);
+
+	/* Add the size of quiescent state counter array */
+	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
+
+	/* Add the size of the registered thread ID bitmap array */
+	sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
+
+	return sz;
+}
+
+/* Initialize a quiescent state variable */
+int __rte_experimental
+rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
+{
+	size_t sz;
+
+	if (v == NULL) {
+		rte_log(RTE_LOG_ERR, rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	sz = rte_rcu_qsbr_get_memsize(max_threads);
+	if (sz == 1)
+		return 1;
+
+	/* Set all the threads to offline */
+	memset(v, 0, sz);
+	v->max_threads = max_threads;
+	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
+			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
+			RTE_QSBR_THRID_ARRAY_ELM_SIZE;
+	v->token = RTE_QSBR_CNT_INIT;
+
+	return 0;
+}
+
+/* Register a reader thread to report its quiescent state
+ * on a QS variable.
+ */
+int __rte_experimental
+rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	unsigned int i, id, success;
+	uint64_t old_bmap, new_bmap;
+
+	if (v == NULL || thread_id >= v->max_threads) {
+		rte_log(RTE_LOG_ERR, rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
+				v->qsbr_cnt[thread_id].lock_cnt);
+
+	id = thread_id & RTE_QSBR_THRID_MASK;
+	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
+
+	/* Make sure that the counter for registered threads does not
+	 * go out of sync. Hence, additional checks are required.
+	 */
+	/* Check if the thread is already registered */
+	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					__ATOMIC_RELAXED);
+	if (old_bmap & 1UL << id)
+		return 0;
+
+	do {
+		new_bmap = old_bmap | (1UL << id);
+		success = __atomic_compare_exchange(
+					RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					&old_bmap, &new_bmap, 0,
+					__ATOMIC_RELEASE, __ATOMIC_RELAXED);
+
+		if (success)
+			__atomic_fetch_add(&v->num_threads,
+						1, __ATOMIC_RELAXED);
+		else if (old_bmap & (1UL << id))
+			/* Someone else registered this thread.
+			 * Counter should not be incremented.
+			 */
+			return 0;
+	} while (success == 0);
+
+	return 0;
+}
+
+/* Remove a reader thread, from the list of threads reporting their
+ * quiescent state on a QS variable.
+ */
+int __rte_experimental
+rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	unsigned int i, id, success;
+	uint64_t old_bmap, new_bmap;
+
+	if (v == NULL || thread_id >= v->max_threads) {
+		rte_log(RTE_LOG_ERR, rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
+				v->qsbr_cnt[thread_id].lock_cnt);
+
+	id = thread_id & RTE_QSBR_THRID_MASK;
+	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
+
+	/* Make sure that the counter for registered threads does not
+	 * go out of sync. Hence, additional checks are required.
+	 */
+	/* Check if the thread is already unregistered */
+	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					__ATOMIC_RELAXED);
+	if (old_bmap & ~(1UL << id))
+		return 0;
+
+	do {
+		new_bmap = old_bmap & ~(1UL << id);
+		/* Make sure any loads of the shared data structure are
+		 * completed before removal of the thread from the list of
+		 * reporting threads.
+		 */
+		success = __atomic_compare_exchange(
+					RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					&old_bmap, &new_bmap, 0,
+					__ATOMIC_RELEASE, __ATOMIC_RELAXED);
+
+		if (success)
+			__atomic_fetch_sub(&v->num_threads,
+						1, __ATOMIC_RELAXED);
+		else if (old_bmap & ~(1UL << id))
+			/* Someone else unregistered this thread.
+			 * Counter should not be incremented.
+			 */
+			return 0;
+	} while (success == 0);
+
+	return 0;
+}
+
+/* Wait till the reader threads have entered quiescent state. */
+void __rte_experimental
+rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v != NULL);
+
+	t = rte_rcu_qsbr_start(v);
+
+	/* If the current thread has readside critical section,
+	 * update its quiescent state status.
+	 */
+	if (thread_id != RTE_QSBR_THRID_INVALID)
+		rte_rcu_qsbr_quiescent(v, thread_id);
+
+	/* Wait for other readers to enter quiescent state */
+	rte_rcu_qsbr_check(v, t, true);
+}
+
+/* Dump the details of a single quiescent state variable to a file. */
+int __rte_experimental
+rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
+{
+	uint64_t bmap;
+	uint32_t i, t, id;
+
+	if (v == NULL || f == NULL) {
+		rte_log(RTE_LOG_ERR, rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	fprintf(f, "\nQuiescent State Variable @%p\n", v);
+
+	fprintf(f, "  QS variable memory size = %lu\n",
+				rte_rcu_qsbr_get_memsize(v->max_threads));
+	fprintf(f, "  Given # max threads = %u\n", v->max_threads);
+	fprintf(f, "  Current # threads = %u\n", v->num_threads);
+
+	fprintf(f, "  Registered thread ID mask = 0x");
+	for (i = 0; i < v->num_elems; i++)
+		fprintf(f, "%lx", __atomic_load_n(
+					RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					__ATOMIC_ACQUIRE));
+	fprintf(f, "\n");
+
+	fprintf(f, "  Token = %lu\n",
+			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
+
+	fprintf(f, "Quiescent State Counts for readers:\n");
+	for (i = 0; i < v->num_elems; i++) {
+		bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					__ATOMIC_ACQUIRE);
+		id = i << RTE_QSBR_THRID_INDEX_SHIFT;
+		while (bmap) {
+			t = __builtin_ctzl(bmap);
+			fprintf(f, "thread ID = %d, count = %lu, lock count = %u\n",
+				id + t,
+				__atomic_load_n(
+					&v->qsbr_cnt[id + t].cnt,
+					__ATOMIC_RELAXED),
+				__atomic_load_n(
+					&v->qsbr_cnt[id + t].lock_cnt,
+					__ATOMIC_RELAXED));
+			bmap &= ~(1UL << t);
+		}
+	}
+
+	return 0;
+}
+
+int rcu_log_type;
+
+RTE_INIT(rte_rcu_register)
+{
+	rcu_log_type = rte_log_register("lib.rcu");
+	if (rcu_log_type >= 0)
+		rte_log_set_level(rcu_log_type, RTE_LOG_ERR);
+}
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
new file mode 100644
index 000000000..58fb95ed0
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -0,0 +1,639 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2018 Arm Limited
+ */
+
+#ifndef _RTE_RCU_QSBR_H_
+#define _RTE_RCU_QSBR_H_
+
+/**
+ * @file
+ * RTE Quiescent State Based Reclamation (QSBR)
+ *
+ * Quiescent State (QS) is any point in the thread execution
+ * where the thread does not hold a reference to a data structure
+ * in shared memory. While using lock-less data structures, the writer
+ * can safely free memory once all the reader threads have entered
+ * quiescent state.
+ *
+ * This library provides the ability for the readers to report quiescent
+ * state and for the writers to identify when all the readers have
+ * entered quiescent state.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_debug.h>
+#include <rte_atomic.h>
+
+extern int rcu_log_type;
+
+#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
+#define RCU_DP_LOG(level, fmt, args...) \
+	rte_log(RTE_LOG_ ## level, rcu_log_type, \
+		"%s(): " fmt "\n", __func__, ## args)
+#else
+#define RCU_DP_LOG(level, fmt, args...)
+#endif
+
+#if defined(RTE_LIBRTE_RCU_DEBUG)
+#define RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
+	if (v->qsbr_cnt[thread_id].lock_cnt) \
+		rte_log(RTE_LOG_ ## level, rcu_log_type, \
+			"%s(): " fmt "\n", __func__, ## args); \
+} while (0)
+#else
+#define RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
+#endif
+
+/* Registered thread IDs are stored as a bitmap of 64b element array.
+ * Given thread id needs to be converted to index into the array and
+ * the id within the array element.
+ */
+#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
+#define RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
+	RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
+		RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
+#define RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
+	((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
+#define RTE_QSBR_THRID_INDEX_SHIFT 6
+#define RTE_QSBR_THRID_MASK 0x3f
+#define RTE_QSBR_THRID_INVALID 0xffffffff
+
+/* Worker thread counter */
+struct rte_rcu_qsbr_cnt {
+	uint64_t cnt;
+	/**< Quiescent state counter. Value 0 indicates the thread is offline
+	 *   64b counter is used to avoid adding more code to address
+	 *   counter overflow. Changing this to 32b would require additional
+	 *   changes to various APIs.
+	 */
+	uint32_t lock_cnt;
+	/**< Lock counter. Used when CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled */
+} __rte_cache_aligned;
+
+#define RTE_QSBR_CNT_THR_OFFLINE 0
+#define RTE_QSBR_CNT_INIT 1
+
+/* RTE Quiescent State variable structure.
+ * This structure has two elements that vary in size based on the
+ * 'max_threads' parameter.
+ * 1) Quiescent state counter array
+ * 2) Register thread ID array
+ */
+struct rte_rcu_qsbr {
+	uint64_t token __rte_cache_aligned;
+	/**< Counter to allow for multiple concurrent quiescent state queries */
+
+	uint32_t num_elems __rte_cache_aligned;
+	/**< Number of elements in the thread ID array */
+	uint32_t num_threads;
+	/**< Number of threads currently using this QS variable */
+	uint32_t max_threads;
+	/**< Maximum number of threads using this QS variable */
+
+	struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
+	/**< Quiescent state counter array of 'max_threads' elements */
+
+	/**< Registered thread IDs are stored in a bitmap array,
+	 *   after the quiescent state counter array.
+	 */
+} __rte_cache_aligned;
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Return the size of the memory occupied by a Quiescent State variable.
+ *
+ * @param max_threads
+ *   Maximum number of threads reporting quiescent state on this variable.
+ * @return
+ *   On success - size of memory in bytes required for this QS variable.
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - max_threads is 0
+ */
+size_t __rte_experimental
+rte_rcu_qsbr_get_memsize(uint32_t max_threads);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Initialize a Quiescent State (QS) variable.
+ *
+ * @param v
+ *   QS variable
+ * @param max_threads
+ *   Maximum number of threads reporting quiescent state on this variable.
+ *   This should be the same value as passed to rte_rcu_qsbr_get_memsize.
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - max_threads is 0 or 'v' is NULL.
+ *
+ */
+int __rte_experimental
+rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Register a reader thread to report its quiescent state
+ * on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe.
+ * Any reader thread that wants to report its quiescent state must
+ * call this API. This can be called during initialization or as part
+ * of the packet processing loop.
+ *
+ * Note that rte_rcu_qsbr_thread_online must be called before the
+ * thread updates its quiescent state using rte_rcu_qsbr_quiescent.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Reader thread with this thread ID will report its quiescent state on
+ *   the QS variable. thread_id is a value between 0 and (max_threads - 1).
+ *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.
+ */
+int __rte_experimental
+rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Remove a reader thread, from the list of threads reporting their
+ * quiescent state on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread safe.
+ * This API can be called from the reader threads during shutdown.
+ * Ongoing quiescent state queries will stop waiting for the status from this
+ * unregistered reader thread.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Reader thread with this thread ID will stop reporting its quiescent
+ *   state on the QS variable.
+ */
+int __rte_experimental
+rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Add a registered reader thread, to the list of threads reporting their
+ * quiescent state on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe.
+ *
+ * Any registered reader thread that wants to report its quiescent state must
+ * call this API before calling rte_rcu_qsbr_quiescent. This can be called
+ * during initialization or as part of the packet processing loop.
+ *
+ * The reader thread must call rte_rcu_thread_offline API, before
+ * calling any functions that block, to ensure that rte_rcu_qsbr_check
+ * API does not wait indefinitely for the reader thread to update its QS.
+ *
+ * The reader thread must call rte_rcu_thread_online API, after the blocking
+ * function call returns, to ensure that rte_rcu_qsbr_check API
+ * waits for the reader thread to update its quiescent state.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Reader thread with this thread ID will report its quiescent state on
+ *   the QS variable.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
+
+	RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
+				v->qsbr_cnt[thread_id].lock_cnt);
+
+	/* Copy the current value of token.
+	 * The fence at the end of the function will ensure that
+	 * the following will not move down after the load of any shared
+	 * data structure.
+	 */
+	t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
+
+	/* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
+	 * 'cnt' (64b) is accessed atomically.
+	 */
+	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
+		t, __ATOMIC_RELAXED);
+
+	/* The subsequent load of the data structure should not
+	 * move above the store. Hence a store-load barrier
+	 * is required.
+	 * If the load of the data structure moves above the store,
+	 * writer might not see that the reader is online, even though
+	 * the reader is referencing the shared data structure.
+	 */
+#ifdef RTE_ARCH_X86_64
+	/* rte_smp_mb() for x86 is lighter */
+	rte_smp_mb();
+#else
+	__atomic_thread_fence(__ATOMIC_SEQ_CST);
+#endif
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Remove a registered reader thread from the list of threads reporting their
+ * quiescent state on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe.
+ *
+ * This can be called during initialization or as part of the packet
+ * processing loop.
+ *
+ * The reader thread must call rte_rcu_thread_offline API, before
+ * calling any functions that block, to ensure that rte_rcu_qsbr_check
+ * API does not wait indefinitely for the reader thread to update its QS.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   rte_rcu_qsbr_check API will not wait for the reader thread with
+ *   this thread ID to report its quiescent state on the QS variable.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
+
+	RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
+				v->qsbr_cnt[thread_id].lock_cnt);
+
+	/* The reader can go offline only after the load of the
+	 * data structure is completed. i.e. any load of the
+	 * data strcture can not move after this store.
+	 */
+
+	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
+		RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Acquire a lock for accessing a shared data structure.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe.
+ *
+ * This API is provided to aid debugging. This should be called before
+ * accessing a shared data structure.
+ *
+ * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented.
+ * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the
+ * rte_rcu_qsbr_check API will verify that this counter is 0.
+ *
+ * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Reader thread id
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_lock(struct rte_rcu_qsbr *v __rte_unused,
+			unsigned int thread_id __rte_unused)
+{
+	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
+
+#if defined(RTE_LIBRTE_RCU_DEBUG)
+	/* Increment the lock counter */
+	__atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
+				1, __ATOMIC_ACQUIRE);
+#endif
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Release a lock after accessing a shared data structure.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe.
+ *
+ * This API is provided to aid debugging. This should be called after
+ * accessing a shared data structure.
+ *
+ * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will
+ * decrement a lock counter. rte_rcu_qsbr_check API will verify that this
+ * counter is 0.
+ *
+ * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Reader thread id
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_unlock(struct rte_rcu_qsbr *v __rte_unused,
+			unsigned int thread_id __rte_unused)
+{
+	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
+
+#if defined(RTE_LIBRTE_RCU_DEBUG)
+	/* Decrement the lock counter */
+	__atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
+				1, __ATOMIC_RELEASE);
+
+	RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
+				"Lock counter %u. Nested locks?\n",
+				v->qsbr_cnt[thread_id].lock_cnt);
+#endif
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Ask the reader threads to report the quiescent state
+ * status.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe and can be called from worker threads.
+ *
+ * @param v
+ *   QS variable
+ * @return
+ *   - This is the token for this call of the API. This should be
+ *     passed to rte_rcu_qsbr_check API.
+ */
+static __rte_always_inline uint64_t __rte_experimental
+rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v != NULL);
+
+	/* Release the changes to the shared data structure.
+	 * This store release will ensure that changes to any data
+	 * structure are visible to the workers before the token
+	 * update is visible.
+	 */
+	t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
+
+	return t;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Update quiescent state for a reader thread.
+ *
+ * This is implemented as a lock-free function. It is multi-thread safe.
+ * All the reader threads registered to report their quiescent state
+ * on the QS variable must call this API.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Update the quiescent state for the reader with this thread ID.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
+
+	RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
+				v->qsbr_cnt[thread_id].lock_cnt);
+
+	/* Acquire the changes to the shared data structure released
+	 * by rte_rcu_qsbr_start.
+	 * Later loads of the shared data structure should not move
+	 * above this load. Hence, use load-acquire.
+	 */
+	t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
+
+	/* Inform the writer that updates are visible to this reader.
+	 * Prior loads of the shared data structure should not move
+	 * beyond this store. Hence use store-release.
+	 */
+	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
+			 t, __ATOMIC_RELEASE);
+
+	RCU_DP_LOG(DEBUG, "%s: update: token = %lu, Thread ID = %d",
+		__func__, t, thread_id);
+}
+
+/* Check the quiescent state counter for registered threads only, assuming
+ * that not all threads have registered.
+ */
+static __rte_always_inline int
+__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
+{
+	uint32_t i, j, id;
+	uint64_t bmap;
+	uint64_t c;
+	uint64_t *reg_thread_id;
+
+	for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v, 0);
+		i < v->num_elems;
+		i++, reg_thread_id++) {
+		/* Load the current registered thread bit map before
+		 * loading the reader thread quiescent state counters.
+		 */
+		bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
+		id = i << RTE_QSBR_THRID_INDEX_SHIFT;
+
+		while (bmap) {
+			j = __builtin_ctzl(bmap);
+			RCU_DP_LOG(DEBUG,
+				"%s: check: token = %lu, wait = %d, Bit Map = 0x%lx, Thread ID = %d",
+				__func__, t, wait, bmap, id + j);
+			c = __atomic_load_n(
+					&v->qsbr_cnt[id + j].cnt,
+					__ATOMIC_ACQUIRE);
+			RCU_DP_LOG(DEBUG,
+				"%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d",
+				__func__, t, wait, c, id+j);
+			/* Counter is not checked for wrap-around condition
+			 * as it is a 64b counter.
+			 */
+			if (unlikely(c != RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
+				/* This thread is not in quiescent state */
+				if (!wait)
+					return 0;
+
+				rte_pause();
+				/* This thread might have unregistered.
+				 * Re-read the bitmap.
+				 */
+				bmap = __atomic_load_n(reg_thread_id,
+						__ATOMIC_ACQUIRE);
+
+				continue;
+			}
+
+			bmap &= ~(1UL << j);
+		}
+	}
+
+	return 1;
+}
+
+/* Check the quiescent state counter for all threads, assuming that
+ * all the threads have registered.
+ */
+static __rte_always_inline int
+__rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
+{
+	uint32_t i;
+	struct rte_rcu_qsbr_cnt *cnt;
+	uint64_t c;
+
+	for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
+		RCU_DP_LOG(DEBUG,
+			"%s: check: token = %lu, wait = %d, Thread ID = %d",
+			__func__, t, wait, i);
+		while (1) {
+			c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
+			RCU_DP_LOG(DEBUG,
+				"%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d",
+				__func__, t, wait, c, i);
+			/* Counter is not checked for wrap-around condition
+			 * as it is a 64b counter.
+			 */
+			if (likely(c == RTE_QSBR_CNT_THR_OFFLINE || c >= t))
+				break;
+
+			/* This thread is not in quiescent state */
+			if (!wait)
+				return 0;
+
+			rte_pause();
+		}
+	}
+
+	return 1;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Checks if all the reader threads have entered the quiescent state
+ * referenced by token.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe and can be called from the worker threads as well.
+ *
+ * If this API is called with 'wait' set to true, the following
+ * factors must be considered:
+ *
+ * 1) If the calling thread is also reporting the status on the
+ * same QS variable, it must update the quiescent state status, before
+ * calling this API.
+ *
+ * 2) In addition, while calling from multiple threads, only
+ * one of those threads can be reporting the quiescent state status
+ * on a given QS variable.
+ *
+ * @param v
+ *   QS variable
+ * @param t
+ *   Token returned by rte_rcu_qsbr_start API
+ * @param wait
+ *   If true, block till all the reader threads have completed entering
+ *   the quiescent state referenced by token 't'.
+ * @return
+ *   - 0 if all reader threads have NOT passed through specified number
+ *     of quiescent states.
+ *   - 1 if all reader threads have passed through specified number
+ *     of quiescent states.
+ */
+static __rte_always_inline int __rte_experimental
+rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
+{
+	RTE_ASSERT(v != NULL);
+
+	if (likely(v->num_threads == v->max_threads))
+		return __rcu_qsbr_check_all(v, t, wait);
+	else
+		return __rcu_qsbr_check_selective(v, t, wait);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Wait till the reader threads have entered quiescent state.
+ *
+ * This is implemented as a lock-free function. It is multi-thread safe.
+ * This API can be thought of as a wrapper around rte_rcu_qsbr_start and
+ * rte_rcu_qsbr_check APIs.
+ *
+ * If this API is called from multiple threads, only one of
+ * those threads can be reporting the quiescent state status on a
+ * given QS variable.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Thread ID of the caller if it is registered to report quiescent state
+ *   on this QS variable (i.e. the calling thread is also part of the
+ *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.
+ */
+void __rte_experimental
+rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Dump the details of a single QS variables to a file.
+ *
+ * It is NOT multi-thread safe.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param v
+ *   QS variable
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ */
+int __rte_experimental
+rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RCU_QSBR_H_ */
diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map
new file mode 100644
index 000000000..5ea8524db
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_version.map
@@ -0,0 +1,12 @@ 
+EXPERIMENTAL {
+	global:
+
+	rte_rcu_qsbr_get_memsize;
+	rte_rcu_qsbr_init;
+	rte_rcu_qsbr_thread_register;
+	rte_rcu_qsbr_thread_unregister;
+	rte_rcu_qsbr_synchronize;
+	rte_rcu_qsbr_dump;
+
+	local: *;
+};
diff --git a/lib/meson.build b/lib/meson.build
index 595314d7d..67be10659 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -22,7 +22,7 @@  libraries = [
 	'gro', 'gso', 'ip_frag', 'jobstats',
 	'kni', 'latencystats', 'lpm', 'member',
 	'power', 'pdump', 'rawdev',
-	'reorder', 'sched', 'security', 'stack', 'vhost',
+	'reorder', 'sched', 'security', 'stack', 'vhost', 'rcu',
 	#ipsec lib depends on crypto and security
 	'ipsec',
 	# add pkt framework libs which use other libs from above
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index abea16d48..ebe6d48a7 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -97,6 +97,7 @@  _LDLIBS-$(CONFIG_RTE_LIBRTE_EAL)            += -lrte_eal
 _LDLIBS-$(CONFIG_RTE_LIBRTE_CMDLINE)        += -lrte_cmdline
 _LDLIBS-$(CONFIG_RTE_LIBRTE_REORDER)        += -lrte_reorder
 _LDLIBS-$(CONFIG_RTE_LIBRTE_SCHED)          += -lrte_sched
+_LDLIBS-$(CONFIG_RTE_LIBRTE_RCU)            += -lrte_rcu
 
 ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_KNI)            += -lrte_kni