diff mbox series

[v4,1/3] rcu: add RCU library supporting QSBR mechanism

Message ID 20190410112006.21644-2-honnappa.nagarahalli@arm.com
State Superseded
Headers show
Series lib/rcu: add RCU library supporting QSBR mechanism | expand

Commit Message

Honnappa Nagarahalli April 10, 2019, 11:20 a.m. UTC
Add RCU library supporting quiescent state based memory reclamation method.
This library helps identify the quiescent state of the reader threads so
that the writers can free the memory associated with the lock less data
structures.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

Reviewed-by: Steve Capper <steve.capper@arm.com>

Reviewed-by: Gavin Hu <gavin.hu@arm.com>

Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>

Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

---
 MAINTAINERS                        |   5 +
 config/common_base                 |   6 +
 lib/Makefile                       |   2 +
 lib/librte_rcu/Makefile            |  23 ++
 lib/librte_rcu/meson.build         |   5 +
 lib/librte_rcu/rte_rcu_qsbr.c      | 237 ++++++++++++
 lib/librte_rcu/rte_rcu_qsbr.h      | 554 +++++++++++++++++++++++++++++
 lib/librte_rcu/rte_rcu_version.map |  11 +
 lib/meson.build                    |   2 +-
 mk/rte.app.mk                      |   1 +
 10 files changed, 845 insertions(+), 1 deletion(-)
 create mode 100644 lib/librte_rcu/Makefile
 create mode 100644 lib/librte_rcu/meson.build
 create mode 100644 lib/librte_rcu/rte_rcu_qsbr.c
 create mode 100644 lib/librte_rcu/rte_rcu_qsbr.h
 create mode 100644 lib/librte_rcu/rte_rcu_version.map

-- 
2.17.1

Comments

Paul E. McKenney April 10, 2019, 6:14 p.m. UTC | #1
On Wed, Apr 10, 2019 at 06:20:04AM -0500, Honnappa Nagarahalli wrote:
> Add RCU library supporting quiescent state based memory reclamation method.

> This library helps identify the quiescent state of the reader threads so

> that the writers can free the memory associated with the lock less data

> structures.


I don't see any sign of read-side markers (rcu_read_lock() and
rcu_read_unlock() in the Linux kernel, userspace RCU, etc.).

Yes, strictly speaking, these are not needed for QSBR to operate, but they
make it way easier to maintain and debug code using RCU.  For example,
given the read-side markers, you can check for errors like having a call
to rte_rcu_qsbr_quiescent() in the middle of a reader quite easily.
Without those read-side markers, life can be quite hard and you will
really hate yourself for failing to have provided them.

Some additional questions and comments interspersed.

							Thanx, Paul

> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> Reviewed-by: Steve Capper <steve.capper@arm.com>

> Reviewed-by: Gavin Hu <gavin.hu@arm.com>

> Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>

> Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

> ---

>  MAINTAINERS                        |   5 +

>  config/common_base                 |   6 +

>  lib/Makefile                       |   2 +

>  lib/librte_rcu/Makefile            |  23 ++

>  lib/librte_rcu/meson.build         |   5 +

>  lib/librte_rcu/rte_rcu_qsbr.c      | 237 ++++++++++++

>  lib/librte_rcu/rte_rcu_qsbr.h      | 554 +++++++++++++++++++++++++++++

>  lib/librte_rcu/rte_rcu_version.map |  11 +

>  lib/meson.build                    |   2 +-

>  mk/rte.app.mk                      |   1 +

>  10 files changed, 845 insertions(+), 1 deletion(-)

>  create mode 100644 lib/librte_rcu/Makefile

>  create mode 100644 lib/librte_rcu/meson.build

>  create mode 100644 lib/librte_rcu/rte_rcu_qsbr.c

>  create mode 100644 lib/librte_rcu/rte_rcu_qsbr.h

>  create mode 100644 lib/librte_rcu/rte_rcu_version.map

> 

> diff --git a/MAINTAINERS b/MAINTAINERS

> index 9774344dd..6e9766eed 100644

> --- a/MAINTAINERS

> +++ b/MAINTAINERS

> @@ -1267,6 +1267,11 @@ F: examples/bpf/

>  F: app/test/test_bpf.c

>  F: doc/guides/prog_guide/bpf_lib.rst

>  

> +RCU - EXPERIMENTAL

> +M: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> +F: lib/librte_rcu/

> +F: doc/guides/prog_guide/rcu_lib.rst

> +

>  

>  Test Applications

>  -----------------

> diff --git a/config/common_base b/config/common_base

> index 8da08105b..ad70c79e1 100644

> --- a/config/common_base

> +++ b/config/common_base

> @@ -829,6 +829,12 @@ CONFIG_RTE_LIBRTE_LATENCY_STATS=y

>  #

>  CONFIG_RTE_LIBRTE_TELEMETRY=n

>  

> +#

> +# Compile librte_rcu

> +#

> +CONFIG_RTE_LIBRTE_RCU=y

> +CONFIG_RTE_LIBRTE_RCU_DEBUG=n

> +

>  #

>  # Compile librte_lpm

>  #

> diff --git a/lib/Makefile b/lib/Makefile

> index 26021d0c0..791e0d991 100644

> --- a/lib/Makefile

> +++ b/lib/Makefile

> @@ -111,6 +111,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_IPSEC) += librte_ipsec

>  DEPDIRS-librte_ipsec := librte_eal librte_mbuf librte_cryptodev librte_security

>  DIRS-$(CONFIG_RTE_LIBRTE_TELEMETRY) += librte_telemetry

>  DEPDIRS-librte_telemetry := librte_eal librte_metrics librte_ethdev

> +DIRS-$(CONFIG_RTE_LIBRTE_RCU) += librte_rcu

> +DEPDIRS-librte_rcu := librte_eal

>  

>  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)

>  DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni

> diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile

> new file mode 100644

> index 000000000..6aa677bd1

> --- /dev/null

> +++ b/lib/librte_rcu/Makefile

> @@ -0,0 +1,23 @@

> +# SPDX-License-Identifier: BSD-3-Clause

> +# Copyright(c) 2018 Arm Limited

> +

> +include $(RTE_SDK)/mk/rte.vars.mk

> +

> +# library name

> +LIB = librte_rcu.a

> +

> +CFLAGS += -DALLOW_EXPERIMENTAL_API

> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3

> +LDLIBS += -lrte_eal

> +

> +EXPORT_MAP := rte_rcu_version.map

> +

> +LIBABIVER := 1

> +

> +# all source are stored in SRCS-y

> +SRCS-$(CONFIG_RTE_LIBRTE_RCU) := rte_rcu_qsbr.c

> +

> +# install includes

> +SYMLINK-$(CONFIG_RTE_LIBRTE_RCU)-include := rte_rcu_qsbr.h

> +

> +include $(RTE_SDK)/mk/rte.lib.mk

> diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build

> new file mode 100644

> index 000000000..c009ae4b7

> --- /dev/null

> +++ b/lib/librte_rcu/meson.build

> @@ -0,0 +1,5 @@

> +# SPDX-License-Identifier: BSD-3-Clause

> +# Copyright(c) 2018 Arm Limited

> +

> +sources = files('rte_rcu_qsbr.c')

> +headers = files('rte_rcu_qsbr.h')

> diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c

> new file mode 100644

> index 000000000..53d08446a

> --- /dev/null

> +++ b/lib/librte_rcu/rte_rcu_qsbr.c

> @@ -0,0 +1,237 @@

> +/* SPDX-License-Identifier: BSD-3-Clause

> + *

> + * Copyright (c) 2018 Arm Limited

> + */

> +

> +#include <stdio.h>

> +#include <string.h>

> +#include <stdint.h>

> +#include <errno.h>

> +

> +#include <rte_common.h>

> +#include <rte_log.h>

> +#include <rte_memory.h>

> +#include <rte_malloc.h>

> +#include <rte_eal.h>

> +#include <rte_eal_memconfig.h>

> +#include <rte_atomic.h>

> +#include <rte_per_lcore.h>

> +#include <rte_lcore.h>

> +#include <rte_errno.h>

> +

> +#include "rte_rcu_qsbr.h"

> +

> +/* Get the memory size of QSBR variable */

> +size_t __rte_experimental

> +rte_rcu_qsbr_get_memsize(uint32_t max_threads)

> +{

> +	size_t sz;

> +

> +	if (max_threads == 0) {

> +		rte_log(RTE_LOG_ERR, rcu_log_type,

> +			"%s(): Invalid max_threads %u\n",

> +			__func__, max_threads);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	sz = sizeof(struct rte_rcu_qsbr);

> +

> +	/* Add the size of quiescent state counter array */

> +	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;

> +

> +	/* Add the size of the registered thread ID bitmap array */

> +	sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads);

> +

> +	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);


Given that you align here, should you also align in the earlier steps
in the computation of sz?

> +}

> +

> +/* Initialize a quiescent state variable */

> +int __rte_experimental

> +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)

> +{

> +	size_t sz;

> +

> +	if (v == NULL) {

> +		rte_log(RTE_LOG_ERR, rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	sz = rte_rcu_qsbr_get_memsize(max_threads);

> +	if (sz == 1)

> +		return 1;

> +

> +	/* Set all the threads to offline */

> +	memset(v, 0, sz);


We calculate sz here, but it looks like the caller must also calculate it
in order to correctly allocate the memory referenced by the "v" argument
to this function, with bad things happening if the two calculations get
different results.  Should "v" instead be allocated within this function
to avoid this sort of problem?

> +	v->max_threads = max_threads;

> +	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,

> +			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /

> +			RTE_QSBR_THRID_ARRAY_ELM_SIZE;

> +	v->token = RTE_QSBR_CNT_INIT;

> +

> +	return 0;

> +}

> +

> +/* Register a reader thread to report its quiescent state

> + * on a QS variable.

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)

> +{

> +	unsigned int i, id, success;

> +	uint64_t old_bmap, new_bmap;

> +

> +	if (v == NULL || thread_id >= v->max_threads) {

> +		rte_log(RTE_LOG_ERR, rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	id = thread_id & RTE_QSBR_THRID_MASK;

> +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> +

> +	/* Make sure that the counter for registered threads does not

> +	 * go out of sync. Hence, additional checks are required.

> +	 */

> +	/* Check if the thread is already registered */

> +	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					__ATOMIC_RELAXED);

> +	if (old_bmap & 1UL << id)

> +		return 0;

> +

> +	do {

> +		new_bmap = old_bmap | (1UL << id);

> +		success = __atomic_compare_exchange(

> +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					&old_bmap, &new_bmap, 0,

> +					__ATOMIC_RELEASE, __ATOMIC_RELAXED);

> +

> +		if (success)

> +			__atomic_fetch_add(&v->num_threads,

> +						1, __ATOMIC_RELAXED);

> +		else if (old_bmap & (1UL << id))

> +			/* Someone else registered this thread.

> +			 * Counter should not be incremented.

> +			 */

> +			return 0;

> +	} while (success == 0);


This would be simpler if threads were required to register themselves.
Maybe you have use cases requiring registration of other threads, but
this capability is adding significant complexity, so it might be worth
some thought.

> +	return 0;

> +}

> +

> +/* Remove a reader thread, from the list of threads reporting their

> + * quiescent state on a QS variable.

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)

> +{

> +	unsigned int i, id, success;

> +	uint64_t old_bmap, new_bmap;

> +

> +	if (v == NULL || thread_id >= v->max_threads) {

> +		rte_log(RTE_LOG_ERR, rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	id = thread_id & RTE_QSBR_THRID_MASK;

> +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> +

> +	/* Make sure that the counter for registered threads does not

> +	 * go out of sync. Hence, additional checks are required.

> +	 */

> +	/* Check if the thread is already unregistered */

> +	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					__ATOMIC_RELAXED);

> +	if (old_bmap & ~(1UL << id))

> +		return 0;

> +

> +	do {

> +		new_bmap = old_bmap & ~(1UL << id);

> +		/* Make sure any loads of the shared data structure are

> +		 * completed before removal of the thread from the list of

> +		 * reporting threads.

> +		 */

> +		success = __atomic_compare_exchange(

> +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					&old_bmap, &new_bmap, 0,

> +					__ATOMIC_RELEASE, __ATOMIC_RELAXED);

> +

> +		if (success)

> +			__atomic_fetch_sub(&v->num_threads,

> +						1, __ATOMIC_RELAXED);

> +		else if (old_bmap & ~(1UL << id))

> +			/* Someone else unregistered this thread.

> +			 * Counter should not be incremented.

> +			 */

> +			return 0;

> +	} while (success == 0);


Ditto!

> +	return 0;

> +}

> +

> +/* Dump the details of a single quiescent state variable to a file. */

> +int __rte_experimental

> +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)

> +{

> +	uint64_t bmap;

> +	uint32_t i, t;

> +

> +	if (v == NULL || f == NULL) {

> +		rte_log(RTE_LOG_ERR, rcu_log_type,

> +			"%s(): Invalid input parameter\n", __func__);

> +		rte_errno = EINVAL;

> +

> +		return 1;

> +	}

> +

> +	fprintf(f, "\nQuiescent State Variable @%p\n", v);

> +

> +	fprintf(f, "  QS variable memory size = %lu\n",

> +				rte_rcu_qsbr_get_memsize(v->max_threads));

> +	fprintf(f, "  Given # max threads = %u\n", v->max_threads);

> +	fprintf(f, "  Current # threads = %u\n", v->num_threads);

> +

> +	fprintf(f, "  Registered thread ID mask = 0x");

> +	for (i = 0; i < v->num_elems; i++)

> +		fprintf(f, "%lx", __atomic_load_n(

> +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					__ATOMIC_ACQUIRE));

> +	fprintf(f, "\n");

> +

> +	fprintf(f, "  Token = %lu\n",

> +			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));

> +

> +	fprintf(f, "Quiescent State Counts for readers:\n");

> +	for (i = 0; i < v->num_elems; i++) {

> +		bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> +					__ATOMIC_ACQUIRE);

> +		while (bmap) {

> +			t = __builtin_ctzl(bmap);

> +			fprintf(f, "thread ID = %d, count = %lu\n", t,

> +				__atomic_load_n(

> +					&v->qsbr_cnt[i].cnt,

> +					__ATOMIC_RELAXED));

> +			bmap &= ~(1UL << t);

> +		}

> +	}

> +

> +	return 0;

> +}

> +

> +int rcu_log_type;

> +

> +RTE_INIT(rte_rcu_register)

> +{

> +	rcu_log_type = rte_log_register("lib.rcu");

> +	if (rcu_log_type >= 0)

> +		rte_log_set_level(rcu_log_type, RTE_LOG_ERR);

> +}

> diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h

> new file mode 100644

> index 000000000..ff696aeab

> --- /dev/null

> +++ b/lib/librte_rcu/rte_rcu_qsbr.h

> @@ -0,0 +1,554 @@

> +/* SPDX-License-Identifier: BSD-3-Clause

> + * Copyright (c) 2018 Arm Limited

> + */

> +

> +#ifndef _RTE_RCU_QSBR_H_

> +#define _RTE_RCU_QSBR_H_

> +

> +/**

> + * @file

> + * RTE Quiescent State Based Reclamation (QSBR)

> + *

> + * Quiescent State (QS) is any point in the thread execution

> + * where the thread does not hold a reference to a data structure

> + * in shared memory. While using lock-less data structures, the writer

> + * can safely free memory once all the reader threads have entered

> + * quiescent state.

> + *

> + * This library provides the ability for the readers to report quiescent

> + * state and for the writers to identify when all the readers have

> + * entered quiescent state.

> + */

> +

> +#ifdef __cplusplus

> +extern "C" {

> +#endif

> +

> +#include <stdio.h>

> +#include <stdint.h>

> +#include <errno.h>

> +#include <rte_common.h>

> +#include <rte_memory.h>

> +#include <rte_lcore.h>

> +#include <rte_debug.h>

> +#include <rte_atomic.h>

> +

> +extern int rcu_log_type;

> +

> +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG

> +#define RCU_DP_LOG(level, fmt, args...) \

> +	rte_log(RTE_LOG_ ## level, rcu_log_type, \

> +		"%s(): " fmt "\n", __func__, ## args)

> +#else

> +#define RCU_DP_LOG(level, fmt, args...)

> +#endif

> +

> +/* Registered thread IDs are stored as a bitmap of 64b element array.

> + * Given thread id needs to be converted to index into the array and

> + * the id within the array element.

> + */

> +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)

> +#define RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \

> +	RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \

> +		RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)

> +#define RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \

> +	((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)

> +#define RTE_QSBR_THRID_INDEX_SHIFT 6

> +#define RTE_QSBR_THRID_MASK 0x3f

> +#define RTE_QSBR_THRID_INVALID 0xffffffff

> +

> +/* Worker thread counter */

> +struct rte_rcu_qsbr_cnt {

> +	uint64_t cnt;

> +	/**< Quiescent state counter. Value 0 indicates the thread is offline */

> +} __rte_cache_aligned;

> +

> +#define RTE_QSBR_CNT_THR_OFFLINE 0

> +#define RTE_QSBR_CNT_INIT 1

> +

> +/* RTE Quiescent State variable structure.

> + * This structure has two elements that vary in size based on the

> + * 'max_threads' parameter.

> + * 1) Quiescent state counter array

> + * 2) Register thread ID array

> + */

> +struct rte_rcu_qsbr {

> +	uint64_t token __rte_cache_aligned;

> +	/**< Counter to allow for multiple concurrent quiescent state queries */

> +

> +	uint32_t num_elems __rte_cache_aligned;

> +	/**< Number of elements in the thread ID array */

> +	uint32_t num_threads;

> +	/**< Number of threads currently using this QS variable */

> +	uint32_t max_threads;

> +	/**< Maximum number of threads using this QS variable */

> +

> +	struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;

> +	/**< Quiescent state counter array of 'max_threads' elements */

> +

> +	/**< Registered thread IDs are stored in a bitmap array,

> +	 *   after the quiescent state counter array.

> +	 */

> +} __rte_cache_aligned;

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Return the size of the memory occupied by a Quiescent State variable.

> + *

> + * @param max_threads

> + *   Maximum number of threads reporting quiescent state on this variable.

> + * @return

> + *   On success - size of memory in bytes required for this QS variable.

> + *   On error - 1 with error code set in rte_errno.

> + *   Possible rte_errno codes are:

> + *   - EINVAL - max_threads is 0

> + */

> +size_t __rte_experimental

> +rte_rcu_qsbr_get_memsize(uint32_t max_threads);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Initialize a Quiescent State (QS) variable.

> + *

> + * @param v

> + *   QS variable

> + * @param max_threads

> + *   Maximum number of threads reporting quiescent state on this variable.

> + *   This should be the same value as passed to rte_rcu_qsbr_get_memsize.

> + * @return

> + *   On success - 0

> + *   On error - 1 with error code set in rte_errno.

> + *   Possible rte_errno codes are:

> + *   - EINVAL - max_threads is 0 or 'v' is NULL.

> + *

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Register a reader thread to report its quiescent state

> + * on a QS variable.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe.

> + * Any reader thread that wants to report its quiescent state must

> + * call this API. This can be called during initialization or as part

> + * of the packet processing loop.

> + *

> + * Note that rte_rcu_qsbr_thread_online must be called before the

> + * thread updates its quiescent state using rte_rcu_qsbr_quiescent.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Reader thread with this thread ID will report its quiescent state on

> + *   the QS variable. thread_id is a value between 0 and (max_threads - 1).

> + *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Remove a reader thread, from the list of threads reporting their

> + * quiescent state on a QS variable.

> + *

> + * This is implemented as a lock-free function. It is multi-thread safe.

> + * This API can be called from the reader threads during shutdown.

> + * Ongoing quiescent state queries will stop waiting for the status from this

> + * unregistered reader thread.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Reader thread with this thread ID will stop reporting its quiescent

> + *   state on the QS variable.

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Add a registered reader thread, to the list of threads reporting their

> + * quiescent state on a QS variable.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe.

> + *

> + * Any registered reader thread that wants to report its quiescent state must

> + * call this API before calling rte_rcu_qsbr_quiescent. This can be called

> + * during initialization or as part of the packet processing loop.

> + *

> + * The reader thread must call rte_rcu_thread_offline API, before

> + * calling any functions that block, to ensure that rte_rcu_qsbr_check

> + * API does not wait indefinitely for the reader thread to update its QS.

> + *

> + * The reader thread must call rte_rcu_thread_online API, after the blocking

> + * function call returns, to ensure that rte_rcu_qsbr_check API

> + * waits for the reader thread to update its quiescent state.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Reader thread with this thread ID will report its quiescent state on

> + *   the QS variable.

> + */

> +static __rte_always_inline void __rte_experimental

> +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)


I am not clear on why this function should be inline.  Or do you have use
cases where threads go offline and come back online extremely frequently?

> +{

> +	uint64_t t;

> +

> +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> +

> +	/* Copy the current value of token.

> +	 * The fence at the end of the function will ensure that

> +	 * the following will not move down after the load of any shared

> +	 * data structure.

> +	 */

> +	t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);

> +

> +	/* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure

> +	 * 'cnt' (64b) is accessed atomically.

> +	 */

> +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> +		t, __ATOMIC_RELAXED);

> +

> +	/* The subsequent load of the data structure should not

> +	 * move above the store. Hence a store-load barrier

> +	 * is required.

> +	 * If the load of the data structure moves above the store,

> +	 * writer might not see that the reader is online, even though

> +	 * the reader is referencing the shared data structure.

> +	 */

> +#ifdef RTE_ARCH_X86_64

> +	/* rte_smp_mb() for x86 is lighter */

> +	rte_smp_mb();

> +#else

> +	__atomic_thread_fence(__ATOMIC_SEQ_CST);

> +#endif

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Remove a registered reader thread from the list of threads reporting their

> + * quiescent state on a QS variable.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe.

> + *

> + * This can be called during initialization or as part of the packet

> + * processing loop.

> + *

> + * The reader thread must call rte_rcu_thread_offline API, before

> + * calling any functions that block, to ensure that rte_rcu_qsbr_check

> + * API does not wait indefinitely for the reader thread to update its QS.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   rte_rcu_qsbr_check API will not wait for the reader thread with

> + *   this thread ID to report its quiescent state on the QS variable.

> + */

> +static __rte_always_inline void __rte_experimental

> +rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)


Same here on inlining.

> +{

> +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> +

> +	/* The reader can go offline only after the load of the

> +	 * data structure is completed. i.e. any load of the

> +	 * data strcture can not move after this store.

> +	 */

> +

> +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> +		RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Ask the reader threads to report the quiescent state

> + * status.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe and can be called from worker threads.

> + *

> + * @param v

> + *   QS variable

> + * @return

> + *   - This is the token for this call of the API. This should be

> + *     passed to rte_rcu_qsbr_check API.

> + */

> +static __rte_always_inline uint64_t __rte_experimental

> +rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)

> +{

> +	uint64_t t;

> +

> +	RTE_ASSERT(v != NULL);

> +

> +	/* Release the changes to the shared data structure.

> +	 * This store release will ensure that changes to any data

> +	 * structure are visible to the workers before the token

> +	 * update is visible.

> +	 */

> +	t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);

> +

> +	return t;

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Update quiescent state for a reader thread.

> + *

> + * This is implemented as a lock-free function. It is multi-thread safe.

> + * All the reader threads registered to report their quiescent state

> + * on the QS variable must call this API.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Update the quiescent state for the reader with this thread ID.

> + */

> +static __rte_always_inline void __rte_experimental

> +rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)

> +{

> +	uint64_t t;

> +

> +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> +

> +	/* Acquire the changes to the shared data structure released

> +	 * by rte_rcu_qsbr_start.

> +	 * Later loads of the shared data structure should not move

> +	 * above this load. Hence, use load-acquire.

> +	 */

> +	t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);

> +

> +	/* Inform the writer that updates are visible to this reader.

> +	 * Prior loads of the shared data structure should not move

> +	 * beyond this store. Hence use store-release.

> +	 */

> +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> +			 t, __ATOMIC_RELEASE);

> +

> +	RCU_DP_LOG(DEBUG, "%s: update: token = %lu, Thread ID = %d",

> +		__func__, t, thread_id);

> +}

> +

> +/* Check the quiescent state counter for registered threads only, assuming

> + * that not all threads have registered.

> + */

> +static __rte_always_inline int

> +__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)

> +{

> +	uint32_t i, j, id;

> +	uint64_t bmap;

> +	uint64_t c;

> +	uint64_t *reg_thread_id;

> +

> +	for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v, 0);

> +		i < v->num_elems;

> +		i++, reg_thread_id++) {

> +		/* Load the current registered thread bit map before

> +		 * loading the reader thread quiescent state counters.

> +		 */

> +		bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);

> +		id = i << RTE_QSBR_THRID_INDEX_SHIFT;

> +

> +		while (bmap) {

> +			j = __builtin_ctzl(bmap);

> +			RCU_DP_LOG(DEBUG,

> +				"%s: check: token = %lu, wait = %d, Bit Map = 0x%lx, Thread ID = %d",

> +				__func__, t, wait, bmap, id + j);

> +			c = __atomic_load_n(

> +					&v->qsbr_cnt[id + j].cnt,

> +					__ATOMIC_ACQUIRE);

> +			RCU_DP_LOG(DEBUG,

> +				"%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d",

> +				__func__, t, wait, c, id+j);

> +			/* Counter is not checked for wrap-around condition

> +			 * as it is a 64b counter.

> +			 */

> +			if (unlikely(c != RTE_QSBR_CNT_THR_OFFLINE && c < t)) {


This assumes that a 64-bit counter won't overflow, which is close enough
to true given current CPU clock frequencies.  ;-)

> +				/* This thread is not in quiescent state */

> +				if (!wait)

> +					return 0;

> +

> +				rte_pause();

> +				/* This thread might have unregistered.

> +				 * Re-read the bitmap.

> +				 */

> +				bmap = __atomic_load_n(reg_thread_id,

> +						__ATOMIC_ACQUIRE);

> +

> +				continue;

> +			}

> +

> +			bmap &= ~(1UL << j);

> +		}

> +	}

> +

> +	return 1;

> +}

> +

> +/* Check the quiescent state counter for all threads, assuming that

> + * all the threads have registered.

> + */

> +static __rte_always_inline int

> +__rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)


Does checking the bitmap really take long enough to make this worthwhile
as a separate function?  I would think that the bitmap-checking time
would be lost in the noise of cache misses from the ->cnt loads.

Sure, if you invoke __rcu_qsbr_check_selective() in a tight loop in
the absence of readers, you might see __rcu_qsbr_check_all() being a
bit faster.  But is that really what DPDK does?

> +{

> +	uint32_t i;

> +	struct rte_rcu_qsbr_cnt *cnt;

> +	uint64_t c;

> +

> +	for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {

> +		RCU_DP_LOG(DEBUG,

> +			"%s: check: token = %lu, wait = %d, Thread ID = %d",

> +			__func__, t, wait, i);

> +		while (1) {

> +			c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);

> +			RCU_DP_LOG(DEBUG,

> +				"%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d",

> +				__func__, t, wait, c, i);

> +			/* Counter is not checked for wrap-around condition

> +			 * as it is a 64b counter.

> +			 */

> +			if (likely(c == RTE_QSBR_CNT_THR_OFFLINE || c >= t))

> +				break;

> +

> +			/* This thread is not in quiescent state */

> +			if (!wait)

> +				return 0;

> +

> +			rte_pause();

> +		}

> +	}

> +

> +	return 1;

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Checks if all the reader threads have entered the quiescent state

> + * referenced by token.

> + *

> + * This is implemented as a lock-free function. It is multi-thread

> + * safe and can be called from the worker threads as well.

> + *

> + * If this API is called with 'wait' set to true, the following

> + * factors must be considered:

> + *

> + * 1) If the calling thread is also reporting the status on the

> + * same QS variable, it must update the quiescent state status, before

> + * calling this API.

> + *

> + * 2) In addition, while calling from multiple threads, only

> + * one of those threads can be reporting the quiescent state status

> + * on a given QS variable.

> + *

> + * @param v

> + *   QS variable

> + * @param t

> + *   Token returned by rte_rcu_qsbr_start API

> + * @param wait

> + *   If true, block till all the reader threads have completed entering

> + *   the quiescent state referenced by token 't'.

> + * @return

> + *   - 0 if all reader threads have NOT passed through specified number

> + *     of quiescent states.

> + *   - 1 if all reader threads have passed through specified number

> + *     of quiescent states.

> + */

> +static __rte_always_inline int __rte_experimental

> +rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)

> +{

> +	RTE_ASSERT(v != NULL);

> +

> +	if (likely(v->num_threads == v->max_threads))

> +		return __rcu_qsbr_check_all(v, t, wait);

> +	else

> +		return __rcu_qsbr_check_selective(v, t, wait);

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Wait till the reader threads have entered quiescent state.

> + *

> + * This is implemented as a lock-free function. It is multi-thread safe.

> + * This API can be thought of as a wrapper around rte_rcu_qsbr_start and

> + * rte_rcu_qsbr_check APIs.

> + *

> + * If this API is called from multiple threads, only one of

> + * those threads can be reporting the quiescent state status on a

> + * given QS variable.

> + *

> + * @param v

> + *   QS variable

> + * @param thread_id

> + *   Thread ID of the caller if it is registered to report quiescent state

> + *   on this QS variable (i.e. the calling thread is also part of the

> + *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.

> + */

> +static __rte_always_inline void __rte_experimental

> +rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)

> +{

> +	uint64_t t;

> +

> +	RTE_ASSERT(v != NULL);

> +

> +	t = rte_rcu_qsbr_start(v);

> +

> +	/* If the current thread has readside critical section,

> +	 * update its quiescent state status.

> +	 */

> +	if (thread_id != RTE_QSBR_THRID_INVALID)

> +		rte_rcu_qsbr_quiescent(v, thread_id);

> +

> +	/* Wait for other readers to enter quiescent state */

> +	rte_rcu_qsbr_check(v, t, true);


And you are presumably relying on 64-bit counters to avoid the need to
execute the above code twice in succession.  Which again works given
current CPU clock rates combined with system and human lifespans.
Otherwise, there are interesting race conditions that can happen, so
don't try this with a 32-bit counter!!!

(But think of the great^N grandchildren!!!)

More seriously, a comment warning people not to make the counter be 32
bits is in order.

> +}

> +

> +/**

> + * @warning

> + * @b EXPERIMENTAL: this API may change without prior notice

> + *

> + * Dump the details of a single QS variables to a file.

> + *

> + * It is NOT multi-thread safe.

> + *

> + * @param f

> + *   A pointer to a file for output

> + * @param v

> + *   QS variable

> + * @return

> + *   On success - 0

> + *   On error - 1 with error code set in rte_errno.

> + *   Possible rte_errno codes are:

> + *   - EINVAL - NULL parameters are passed

> + */

> +int __rte_experimental

> +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);

> +

> +#ifdef __cplusplus

> +}

> +#endif

> +

> +#endif /* _RTE_RCU_QSBR_H_ */

> diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map

> new file mode 100644

> index 000000000..ad8cb517c

> --- /dev/null

> +++ b/lib/librte_rcu/rte_rcu_version.map

> @@ -0,0 +1,11 @@

> +EXPERIMENTAL {

> +	global:

> +

> +	rte_rcu_qsbr_get_memsize;

> +	rte_rcu_qsbr_init;

> +	rte_rcu_qsbr_thread_register;

> +	rte_rcu_qsbr_thread_unregister;

> +	rte_rcu_qsbr_dump;

> +

> +	local: *;

> +};

> diff --git a/lib/meson.build b/lib/meson.build

> index 595314d7d..67be10659 100644

> --- a/lib/meson.build

> +++ b/lib/meson.build

> @@ -22,7 +22,7 @@ libraries = [

>  	'gro', 'gso', 'ip_frag', 'jobstats',

>  	'kni', 'latencystats', 'lpm', 'member',

>  	'power', 'pdump', 'rawdev',

> -	'reorder', 'sched', 'security', 'stack', 'vhost',

> +	'reorder', 'sched', 'security', 'stack', 'vhost', 'rcu',

>  	#ipsec lib depends on crypto and security

>  	'ipsec',

>  	# add pkt framework libs which use other libs from above

> diff --git a/mk/rte.app.mk b/mk/rte.app.mk

> index 7d994bece..e93cc366d 100644

> --- a/mk/rte.app.mk

> +++ b/mk/rte.app.mk

> @@ -97,6 +97,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_EAL)            += -lrte_eal

>  _LDLIBS-$(CONFIG_RTE_LIBRTE_CMDLINE)        += -lrte_cmdline

>  _LDLIBS-$(CONFIG_RTE_LIBRTE_REORDER)        += -lrte_reorder

>  _LDLIBS-$(CONFIG_RTE_LIBRTE_SCHED)          += -lrte_sched

> +_LDLIBS-$(CONFIG_RTE_LIBRTE_RCU)            += -lrte_rcu

>  

>  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)

>  _LDLIBS-$(CONFIG_RTE_LIBRTE_KNI)            += -lrte_kni

> -- 

> 2.17.1

>
Honnappa Nagarahalli April 11, 2019, 4:35 a.m. UTC | #2
Hi Paul,
	Thank you for your feedback.

> -----Original Message-----

> From: Paul E. McKenney <paulmck@linux.ibm.com>

> Sent: Wednesday, April 10, 2019 1:15 PM

> To: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>

> Cc: konstantin.ananyev@intel.com; stephen@networkplumber.org;

> marko.kovacevic@intel.com; dev@dpdk.org; Gavin Hu (Arm Technology

> China) <Gavin.Hu@arm.com>; Dharmik Thakkar

> <Dharmik.Thakkar@arm.com>; Malvika Gupta <Malvika.Gupta@arm.com>

> Subject: Re: [PATCH v4 1/3] rcu: add RCU library supporting QSBR

> mechanism

> 

> On Wed, Apr 10, 2019 at 06:20:04AM -0500, Honnappa Nagarahalli wrote:

> > Add RCU library supporting quiescent state based memory reclamation

> method.

> > This library helps identify the quiescent state of the reader threads

> > so that the writers can free the memory associated with the lock less

> > data structures.

> 

> I don't see any sign of read-side markers (rcu_read_lock() and

> rcu_read_unlock() in the Linux kernel, userspace RCU, etc.).

> 

> Yes, strictly speaking, these are not needed for QSBR to operate, but they

These APIs would be empty for QSBR.

> make it way easier to maintain and debug code using RCU.  For example,

> given the read-side markers, you can check for errors like having a call to

> rte_rcu_qsbr_quiescent() in the middle of a reader quite easily.

> Without those read-side markers, life can be quite hard and you will really

> hate yourself for failing to have provided them.

Want to make sure I understood this, do you mean the application would mark before and after accessing the shared data structure on the reader side?

rte_rcu_qsbr_lock()
<begin access shared data structure>
...
...
<end access shared data structure>
rte_rcu_qsbr_unlock()

If someone is debugging this code, they have to make sure that there is an unlock for every lock and there is no call to rte_rcu_qsbr_quiescent in between.
It sounds good to me. Obviously, they will not add any additional cycles as well.
Please let me know if my understanding is correct.

> 

> Some additional questions and comments interspersed.

> 

> 							Thanx, Paul

> 

> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> > Reviewed-by: Steve Capper <steve.capper@arm.com>

> > Reviewed-by: Gavin Hu <gavin.hu@arm.com>

> > Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>

> > Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

> > ---

> >  MAINTAINERS                        |   5 +

> >  config/common_base                 |   6 +

> >  lib/Makefile                       |   2 +

> >  lib/librte_rcu/Makefile            |  23 ++

> >  lib/librte_rcu/meson.build         |   5 +

> >  lib/librte_rcu/rte_rcu_qsbr.c      | 237 ++++++++++++

> >  lib/librte_rcu/rte_rcu_qsbr.h      | 554

> +++++++++++++++++++++++++++++

> >  lib/librte_rcu/rte_rcu_version.map |  11 +

> >  lib/meson.build                    |   2 +-

> >  mk/rte.app.mk                      |   1 +

> >  10 files changed, 845 insertions(+), 1 deletion(-)  create mode

> > 100644 lib/librte_rcu/Makefile  create mode 100644

> > lib/librte_rcu/meson.build  create mode 100644

> > lib/librte_rcu/rte_rcu_qsbr.c  create mode 100644

> > lib/librte_rcu/rte_rcu_qsbr.h  create mode 100644

> > lib/librte_rcu/rte_rcu_version.map

> >

> > diff --git a/MAINTAINERS b/MAINTAINERS index 9774344dd..6e9766eed

> > 100644

> > --- a/MAINTAINERS

> > +++ b/MAINTAINERS

> > @@ -1267,6 +1267,11 @@ F: examples/bpf/

> >  F: app/test/test_bpf.c

> >  F: doc/guides/prog_guide/bpf_lib.rst

> >

> > +RCU - EXPERIMENTAL

> > +M: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> > +F: lib/librte_rcu/

> > +F: doc/guides/prog_guide/rcu_lib.rst

> > +

> >

> >  Test Applications

> >  -----------------

> > diff --git a/config/common_base b/config/common_base index

> > 8da08105b..ad70c79e1 100644

> > --- a/config/common_base

> > +++ b/config/common_base

> > @@ -829,6 +829,12 @@ CONFIG_RTE_LIBRTE_LATENCY_STATS=y  #

> > CONFIG_RTE_LIBRTE_TELEMETRY=n

> >

> > +#

> > +# Compile librte_rcu

> > +#

> > +CONFIG_RTE_LIBRTE_RCU=y

> > +CONFIG_RTE_LIBRTE_RCU_DEBUG=n

> > +

> >  #

> >  # Compile librte_lpm

> >  #

> > diff --git a/lib/Makefile b/lib/Makefile index 26021d0c0..791e0d991

> > 100644

> > --- a/lib/Makefile

> > +++ b/lib/Makefile

> > @@ -111,6 +111,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_IPSEC) +=

> librte_ipsec

> > DEPDIRS-librte_ipsec := librte_eal librte_mbuf librte_cryptodev

> > librte_security

> >  DIRS-$(CONFIG_RTE_LIBRTE_TELEMETRY) += librte_telemetry

> > DEPDIRS-librte_telemetry := librte_eal librte_metrics librte_ethdev

> > +DIRS-$(CONFIG_RTE_LIBRTE_RCU) += librte_rcu DEPDIRS-librte_rcu :=

> > +librte_eal

> >

> >  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)

> >  DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni diff --git

> > a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile new file mode

> > 100644 index 000000000..6aa677bd1

> > --- /dev/null

> > +++ b/lib/librte_rcu/Makefile

> > @@ -0,0 +1,23 @@

> > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2018 Arm

> > +Limited

> > +

> > +include $(RTE_SDK)/mk/rte.vars.mk

> > +

> > +# library name

> > +LIB = librte_rcu.a

> > +

> > +CFLAGS += -DALLOW_EXPERIMENTAL_API

> > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 LDLIBS += -lrte_eal

> > +

> > +EXPORT_MAP := rte_rcu_version.map

> > +

> > +LIBABIVER := 1

> > +

> > +# all source are stored in SRCS-y

> > +SRCS-$(CONFIG_RTE_LIBRTE_RCU) := rte_rcu_qsbr.c

> > +

> > +# install includes

> > +SYMLINK-$(CONFIG_RTE_LIBRTE_RCU)-include := rte_rcu_qsbr.h

> > +

> > +include $(RTE_SDK)/mk/rte.lib.mk

> > diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build

> > new file mode 100644 index 000000000..c009ae4b7

> > --- /dev/null

> > +++ b/lib/librte_rcu/meson.build

> > @@ -0,0 +1,5 @@

> > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2018 Arm

> > +Limited

> > +

> > +sources = files('rte_rcu_qsbr.c')

> > +headers = files('rte_rcu_qsbr.h')

> > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c

> > b/lib/librte_rcu/rte_rcu_qsbr.c new file mode 100644 index

> > 000000000..53d08446a

> > --- /dev/null

> > +++ b/lib/librte_rcu/rte_rcu_qsbr.c

> > @@ -0,0 +1,237 @@

> > +/* SPDX-License-Identifier: BSD-3-Clause

> > + *

> > + * Copyright (c) 2018 Arm Limited

> > + */

> > +

> > +#include <stdio.h>

> > +#include <string.h>

> > +#include <stdint.h>

> > +#include <errno.h>

> > +

> > +#include <rte_common.h>

> > +#include <rte_log.h>

> > +#include <rte_memory.h>

> > +#include <rte_malloc.h>

> > +#include <rte_eal.h>

> > +#include <rte_eal_memconfig.h>

> > +#include <rte_atomic.h>

> > +#include <rte_per_lcore.h>

> > +#include <rte_lcore.h>

> > +#include <rte_errno.h>

> > +

> > +#include "rte_rcu_qsbr.h"

> > +

> > +/* Get the memory size of QSBR variable */ size_t __rte_experimental

> > +rte_rcu_qsbr_get_memsize(uint32_t max_threads) {

> > +	size_t sz;

> > +

> > +	if (max_threads == 0) {

> > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > +			"%s(): Invalid max_threads %u\n",

> > +			__func__, max_threads);

> > +		rte_errno = EINVAL;

> > +

> > +		return 1;

> > +	}

> > +

> > +	sz = sizeof(struct rte_rcu_qsbr);

> > +

> > +	/* Add the size of quiescent state counter array */

> > +	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;

> > +

> > +	/* Add the size of the registered thread ID bitmap array */

> > +	sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads);

> > +

> > +	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);

> 

> Given that you align here, should you also align in the earlier steps in the

> computation of sz?

Agree. I will remove the align here and keep the earlier one as the intent is to align the thread ID array.

> 

> > +}

> > +

> > +/* Initialize a quiescent state variable */ int __rte_experimental

> > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) {

> > +	size_t sz;

> > +

> > +	if (v == NULL) {

> > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > +			"%s(): Invalid input parameter\n", __func__);

> > +		rte_errno = EINVAL;

> > +

> > +		return 1;

> > +	}

> > +

> > +	sz = rte_rcu_qsbr_get_memsize(max_threads);

> > +	if (sz == 1)

> > +		return 1;

> > +

> > +	/* Set all the threads to offline */

> > +	memset(v, 0, sz);

> 

> We calculate sz here, but it looks like the caller must also calculate it in

> order to correctly allocate the memory referenced by the "v" argument to

> this function, with bad things happening if the two calculations get

> different results.  Should "v" instead be allocated within this function to

> avoid this sort of problem?

Earlier version allocated the memory with-in this library. However, it was decided to go with the current implementation as it provides flexibility for the application to manage the memory as it sees fit. For ex: it could allocate this as part of another structure in a single allocation. This also falls inline with similar approach taken in other libraries. 

> 

> > +	v->max_threads = max_threads;

> > +	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,

> > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /

> > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE;

> > +	v->token = RTE_QSBR_CNT_INIT;

> > +

> > +	return 0;

> > +}

> > +

> > +/* Register a reader thread to report its quiescent state

> > + * on a QS variable.

> > + */

> > +int __rte_experimental

> > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int

> > +thread_id) {

> > +	unsigned int i, id, success;

> > +	uint64_t old_bmap, new_bmap;

> > +

> > +	if (v == NULL || thread_id >= v->max_threads) {

> > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > +			"%s(): Invalid input parameter\n", __func__);

> > +		rte_errno = EINVAL;

> > +

> > +		return 1;

> > +	}

> > +

> > +	id = thread_id & RTE_QSBR_THRID_MASK;

> > +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> > +

> > +	/* Make sure that the counter for registered threads does not

> > +	 * go out of sync. Hence, additional checks are required.

> > +	 */

> > +	/* Check if the thread is already registered */

> > +	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > +					__ATOMIC_RELAXED);

> > +	if (old_bmap & 1UL << id)

> > +		return 0;

> > +

> > +	do {

> > +		new_bmap = old_bmap | (1UL << id);

> > +		success = __atomic_compare_exchange(

> > +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > +					&old_bmap, &new_bmap, 0,

> > +					__ATOMIC_RELEASE,

> __ATOMIC_RELAXED);

> > +

> > +		if (success)

> > +			__atomic_fetch_add(&v->num_threads,

> > +						1, __ATOMIC_RELAXED);

> > +		else if (old_bmap & (1UL << id))

> > +			/* Someone else registered this thread.

> > +			 * Counter should not be incremented.

> > +			 */

> > +			return 0;

> > +	} while (success == 0);

> 

> This would be simpler if threads were required to register themselves.

> Maybe you have use cases requiring registration of other threads, but this

> capability is adding significant complexity, so it might be worth some

> thought.

> 

It was simple earlier (__atomic_fetch_or). The complexity is added as 'num_threads' should not go out of sync.

> > +	return 0;

> > +}

> > +

> > +/* Remove a reader thread, from the list of threads reporting their

> > + * quiescent state on a QS variable.

> > + */

> > +int __rte_experimental

> > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int

> > +thread_id) {

> > +	unsigned int i, id, success;

> > +	uint64_t old_bmap, new_bmap;

> > +

> > +	if (v == NULL || thread_id >= v->max_threads) {

> > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > +			"%s(): Invalid input parameter\n", __func__);

> > +		rte_errno = EINVAL;

> > +

> > +		return 1;

> > +	}

> > +

> > +	id = thread_id & RTE_QSBR_THRID_MASK;

> > +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> > +

> > +	/* Make sure that the counter for registered threads does not

> > +	 * go out of sync. Hence, additional checks are required.

> > +	 */

> > +	/* Check if the thread is already unregistered */

> > +	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > +					__ATOMIC_RELAXED);

> > +	if (old_bmap & ~(1UL << id))

> > +		return 0;

> > +

> > +	do {

> > +		new_bmap = old_bmap & ~(1UL << id);

> > +		/* Make sure any loads of the shared data structure are

> > +		 * completed before removal of the thread from the list of

> > +		 * reporting threads.

> > +		 */

> > +		success = __atomic_compare_exchange(

> > +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > +					&old_bmap, &new_bmap, 0,

> > +					__ATOMIC_RELEASE,

> __ATOMIC_RELAXED);

> > +

> > +		if (success)

> > +			__atomic_fetch_sub(&v->num_threads,

> > +						1, __ATOMIC_RELAXED);

> > +		else if (old_bmap & ~(1UL << id))

> > +			/* Someone else unregistered this thread.

> > +			 * Counter should not be incremented.

> > +			 */

> > +			return 0;

> > +	} while (success == 0);

> 

> Ditto!

> 

> > +	return 0;

> > +}

> > +

> > +/* Dump the details of a single quiescent state variable to a file.

> > +*/ int __rte_experimental rte_rcu_qsbr_dump(FILE *f, struct

> > +rte_rcu_qsbr *v) {

> > +	uint64_t bmap;

> > +	uint32_t i, t;

> > +

> > +	if (v == NULL || f == NULL) {

> > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > +			"%s(): Invalid input parameter\n", __func__);

> > +		rte_errno = EINVAL;

> > +

> > +		return 1;

> > +	}

> > +

> > +	fprintf(f, "\nQuiescent State Variable @%p\n", v);

> > +

> > +	fprintf(f, "  QS variable memory size = %lu\n",

> > +				rte_rcu_qsbr_get_memsize(v-

> >max_threads));

> > +	fprintf(f, "  Given # max threads = %u\n", v->max_threads);

> > +	fprintf(f, "  Current # threads = %u\n", v->num_threads);

> > +

> > +	fprintf(f, "  Registered thread ID mask = 0x");

> > +	for (i = 0; i < v->num_elems; i++)

> > +		fprintf(f, "%lx", __atomic_load_n(

> > +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > +					__ATOMIC_ACQUIRE));

> > +	fprintf(f, "\n");

> > +

> > +	fprintf(f, "  Token = %lu\n",

> > +			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));

> > +

> > +	fprintf(f, "Quiescent State Counts for readers:\n");

> > +	for (i = 0; i < v->num_elems; i++) {

> > +		bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v,

> i),

> > +					__ATOMIC_ACQUIRE);

> > +		while (bmap) {

> > +			t = __builtin_ctzl(bmap);

> > +			fprintf(f, "thread ID = %d, count = %lu\n", t,

> > +				__atomic_load_n(

> > +					&v->qsbr_cnt[i].cnt,

> > +					__ATOMIC_RELAXED));

> > +			bmap &= ~(1UL << t);

> > +		}

> > +	}

> > +

> > +	return 0;

> > +}

> > +

> > +int rcu_log_type;

> > +

> > +RTE_INIT(rte_rcu_register)

> > +{

> > +	rcu_log_type = rte_log_register("lib.rcu");

> > +	if (rcu_log_type >= 0)

> > +		rte_log_set_level(rcu_log_type, RTE_LOG_ERR); }

> > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h

> > b/lib/librte_rcu/rte_rcu_qsbr.h new file mode 100644 index

> > 000000000..ff696aeab

> > --- /dev/null

> > +++ b/lib/librte_rcu/rte_rcu_qsbr.h

> > @@ -0,0 +1,554 @@

> > +/* SPDX-License-Identifier: BSD-3-Clause

> > + * Copyright (c) 2018 Arm Limited

> > + */

> > +

> > +#ifndef _RTE_RCU_QSBR_H_

> > +#define _RTE_RCU_QSBR_H_

> > +

> > +/**

> > + * @file

> > + * RTE Quiescent State Based Reclamation (QSBR)

> > + *

> > + * Quiescent State (QS) is any point in the thread execution

> > + * where the thread does not hold a reference to a data structure

> > + * in shared memory. While using lock-less data structures, the

> > +writer

> > + * can safely free memory once all the reader threads have entered

> > + * quiescent state.

> > + *

> > + * This library provides the ability for the readers to report

> > +quiescent

> > + * state and for the writers to identify when all the readers have

> > + * entered quiescent state.

> > + */

> > +

> > +#ifdef __cplusplus

> > +extern "C" {

> > +#endif

> > +

> > +#include <stdio.h>

> > +#include <stdint.h>

> > +#include <errno.h>

> > +#include <rte_common.h>

> > +#include <rte_memory.h>

> > +#include <rte_lcore.h>

> > +#include <rte_debug.h>

> > +#include <rte_atomic.h>

> > +

> > +extern int rcu_log_type;

> > +

> > +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG #define RCU_DP_LOG(level,

> fmt,

> > +args...) \

> > +	rte_log(RTE_LOG_ ## level, rcu_log_type, \

> > +		"%s(): " fmt "\n", __func__, ## args) #else #define

> > +RCU_DP_LOG(level, fmt, args...) #endif

> > +

> > +/* Registered thread IDs are stored as a bitmap of 64b element array.

> > + * Given thread id needs to be converted to index into the array and

> > + * the id within the array element.

> > + */

> > +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)

> #define

> > +RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \

> > +	RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \

> > +		RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3,

> RTE_CACHE_LINE_SIZE) #define

> > +RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \

> > +	((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i) #define

> > +RTE_QSBR_THRID_INDEX_SHIFT 6 #define RTE_QSBR_THRID_MASK 0x3f

> #define

> > +RTE_QSBR_THRID_INVALID 0xffffffff

> > +

> > +/* Worker thread counter */

> > +struct rte_rcu_qsbr_cnt {

> > +	uint64_t cnt;

> > +	/**< Quiescent state counter. Value 0 indicates the thread is

> > +offline */ } __rte_cache_aligned;

> > +

> > +#define RTE_QSBR_CNT_THR_OFFLINE 0

> > +#define RTE_QSBR_CNT_INIT 1

> > +

> > +/* RTE Quiescent State variable structure.

> > + * This structure has two elements that vary in size based on the

> > + * 'max_threads' parameter.

> > + * 1) Quiescent state counter array

> > + * 2) Register thread ID array

> > + */

> > +struct rte_rcu_qsbr {

> > +	uint64_t token __rte_cache_aligned;

> > +	/**< Counter to allow for multiple concurrent quiescent state

> > +queries */

> > +

> > +	uint32_t num_elems __rte_cache_aligned;

> > +	/**< Number of elements in the thread ID array */

> > +	uint32_t num_threads;

> > +	/**< Number of threads currently using this QS variable */

> > +	uint32_t max_threads;

> > +	/**< Maximum number of threads using this QS variable */

> > +

> > +	struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;

> > +	/**< Quiescent state counter array of 'max_threads' elements */

> > +

> > +	/**< Registered thread IDs are stored in a bitmap array,

> > +	 *   after the quiescent state counter array.

> > +	 */

> > +} __rte_cache_aligned;

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Return the size of the memory occupied by a Quiescent State

> variable.

> > + *

> > + * @param max_threads

> > + *   Maximum number of threads reporting quiescent state on this

> variable.

> > + * @return

> > + *   On success - size of memory in bytes required for this QS variable.

> > + *   On error - 1 with error code set in rte_errno.

> > + *   Possible rte_errno codes are:

> > + *   - EINVAL - max_threads is 0

> > + */

> > +size_t __rte_experimental

> > +rte_rcu_qsbr_get_memsize(uint32_t max_threads);

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Initialize a Quiescent State (QS) variable.

> > + *

> > + * @param v

> > + *   QS variable

> > + * @param max_threads

> > + *   Maximum number of threads reporting quiescent state on this

> variable.

> > + *   This should be the same value as passed to

> rte_rcu_qsbr_get_memsize.

> > + * @return

> > + *   On success - 0

> > + *   On error - 1 with error code set in rte_errno.

> > + *   Possible rte_errno codes are:

> > + *   - EINVAL - max_threads is 0 or 'v' is NULL.

> > + *

> > + */

> > +int __rte_experimental

> > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Register a reader thread to report its quiescent state

> > + * on a QS variable.

> > + *

> > + * This is implemented as a lock-free function. It is multi-thread

> > + * safe.

> > + * Any reader thread that wants to report its quiescent state must

> > + * call this API. This can be called during initialization or as part

> > + * of the packet processing loop.

> > + *

> > + * Note that rte_rcu_qsbr_thread_online must be called before the

> > + * thread updates its quiescent state using rte_rcu_qsbr_quiescent.

> > + *

> > + * @param v

> > + *   QS variable

> > + * @param thread_id

> > + *   Reader thread with this thread ID will report its quiescent state on

> > + *   the QS variable. thread_id is a value between 0 and (max_threads -

> 1).

> > + *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.

> > + */

> > +int __rte_experimental

> > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int

> > +thread_id);

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Remove a reader thread, from the list of threads reporting their

> > + * quiescent state on a QS variable.

> > + *

> > + * This is implemented as a lock-free function. It is multi-thread safe.

> > + * This API can be called from the reader threads during shutdown.

> > + * Ongoing quiescent state queries will stop waiting for the status

> > +from this

> > + * unregistered reader thread.

> > + *

> > + * @param v

> > + *   QS variable

> > + * @param thread_id

> > + *   Reader thread with this thread ID will stop reporting its quiescent

> > + *   state on the QS variable.

> > + */

> > +int __rte_experimental

> > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int

> > +thread_id);

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Add a registered reader thread, to the list of threads reporting

> > +their

> > + * quiescent state on a QS variable.

> > + *

> > + * This is implemented as a lock-free function. It is multi-thread

> > + * safe.

> > + *

> > + * Any registered reader thread that wants to report its quiescent

> > +state must

> > + * call this API before calling rte_rcu_qsbr_quiescent. This can be

> > +called

> > + * during initialization or as part of the packet processing loop.

> > + *

> > + * The reader thread must call rte_rcu_thread_offline API, before

> > + * calling any functions that block, to ensure that

> > +rte_rcu_qsbr_check

> > + * API does not wait indefinitely for the reader thread to update its QS.

> > + *

> > + * The reader thread must call rte_rcu_thread_online API, after the

> > +blocking

> > + * function call returns, to ensure that rte_rcu_qsbr_check API

> > + * waits for the reader thread to update its quiescent state.

> > + *

> > + * @param v

> > + *   QS variable

> > + * @param thread_id

> > + *   Reader thread with this thread ID will report its quiescent state on

> > + *   the QS variable.

> > + */

> > +static __rte_always_inline void __rte_experimental

> > +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int

> > +thread_id)

> 

> I am not clear on why this function should be inline.  Or do you have use

> cases where threads go offline and come back online extremely frequently?

Yes, there are use cases where the function call to receive the packets can block.

> 

> > +{

> > +	uint64_t t;

> > +

> > +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> > +

> > +	/* Copy the current value of token.

> > +	 * The fence at the end of the function will ensure that

> > +	 * the following will not move down after the load of any shared

> > +	 * data structure.

> > +	 */

> > +	t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);

> > +

> > +	/* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure

> > +	 * 'cnt' (64b) is accessed atomically.

> > +	 */

> > +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> > +		t, __ATOMIC_RELAXED);

> > +

> > +	/* The subsequent load of the data structure should not

> > +	 * move above the store. Hence a store-load barrier

> > +	 * is required.

> > +	 * If the load of the data structure moves above the store,

> > +	 * writer might not see that the reader is online, even though

> > +	 * the reader is referencing the shared data structure.

> > +	 */

> > +#ifdef RTE_ARCH_X86_64

> > +	/* rte_smp_mb() for x86 is lighter */

> > +	rte_smp_mb();

> > +#else

> > +	__atomic_thread_fence(__ATOMIC_SEQ_CST);

> > +#endif

> > +}

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Remove a registered reader thread from the list of threads

> > +reporting their

> > + * quiescent state on a QS variable.

> > + *

> > + * This is implemented as a lock-free function. It is multi-thread

> > + * safe.

> > + *

> > + * This can be called during initialization or as part of the packet

> > + * processing loop.

> > + *

> > + * The reader thread must call rte_rcu_thread_offline API, before

> > + * calling any functions that block, to ensure that

> > +rte_rcu_qsbr_check

> > + * API does not wait indefinitely for the reader thread to update its QS.

> > + *

> > + * @param v

> > + *   QS variable

> > + * @param thread_id

> > + *   rte_rcu_qsbr_check API will not wait for the reader thread with

> > + *   this thread ID to report its quiescent state on the QS variable.

> > + */

> > +static __rte_always_inline void __rte_experimental

> > +rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int

> > +thread_id)

> 

> Same here on inlining.

> 

> > +{

> > +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> > +

> > +	/* The reader can go offline only after the load of the

> > +	 * data structure is completed. i.e. any load of the

> > +	 * data strcture can not move after this store.

> > +	 */

> > +

> > +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> > +		RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE); }

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Ask the reader threads to report the quiescent state

> > + * status.

> > + *

> > + * This is implemented as a lock-free function. It is multi-thread

> > + * safe and can be called from worker threads.

> > + *

> > + * @param v

> > + *   QS variable

> > + * @return

> > + *   - This is the token for this call of the API. This should be

> > + *     passed to rte_rcu_qsbr_check API.

> > + */

> > +static __rte_always_inline uint64_t __rte_experimental

> > +rte_rcu_qsbr_start(struct rte_rcu_qsbr *v) {

> > +	uint64_t t;

> > +

> > +	RTE_ASSERT(v != NULL);

> > +

> > +	/* Release the changes to the shared data structure.

> > +	 * This store release will ensure that changes to any data

> > +	 * structure are visible to the workers before the token

> > +	 * update is visible.

> > +	 */

> > +	t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);

> > +

> > +	return t;

> > +}

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Update quiescent state for a reader thread.

> > + *

> > + * This is implemented as a lock-free function. It is multi-thread safe.

> > + * All the reader threads registered to report their quiescent state

> > + * on the QS variable must call this API.

> > + *

> > + * @param v

> > + *   QS variable

> > + * @param thread_id

> > + *   Update the quiescent state for the reader with this thread ID.

> > + */

> > +static __rte_always_inline void __rte_experimental

> > +rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int

> > +thread_id) {

> > +	uint64_t t;

> > +

> > +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> > +

> > +	/* Acquire the changes to the shared data structure released

> > +	 * by rte_rcu_qsbr_start.

> > +	 * Later loads of the shared data structure should not move

> > +	 * above this load. Hence, use load-acquire.

> > +	 */

> > +	t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);

> > +

> > +	/* Inform the writer that updates are visible to this reader.

> > +	 * Prior loads of the shared data structure should not move

> > +	 * beyond this store. Hence use store-release.

> > +	 */

> > +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> > +			 t, __ATOMIC_RELEASE);

> > +

> > +	RCU_DP_LOG(DEBUG, "%s: update: token = %lu, Thread ID = %d",

> > +		__func__, t, thread_id);

> > +}

> > +

> > +/* Check the quiescent state counter for registered threads only,

> > +assuming

> > + * that not all threads have registered.

> > + */

> > +static __rte_always_inline int

> > +__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool

> > +wait) {

> > +	uint32_t i, j, id;

> > +	uint64_t bmap;

> > +	uint64_t c;

> > +	uint64_t *reg_thread_id;

> > +

> > +	for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v, 0);

> > +		i < v->num_elems;

> > +		i++, reg_thread_id++) {

> > +		/* Load the current registered thread bit map before

> > +		 * loading the reader thread quiescent state counters.

> > +		 */

> > +		bmap = __atomic_load_n(reg_thread_id,

> __ATOMIC_ACQUIRE);

> > +		id = i << RTE_QSBR_THRID_INDEX_SHIFT;

> > +

> > +		while (bmap) {

> > +			j = __builtin_ctzl(bmap);

> > +			RCU_DP_LOG(DEBUG,

> > +				"%s: check: token = %lu, wait = %d, Bit Map

> = 0x%lx, Thread ID = %d",

> > +				__func__, t, wait, bmap, id + j);

> > +			c = __atomic_load_n(

> > +					&v->qsbr_cnt[id + j].cnt,

> > +					__ATOMIC_ACQUIRE);

> > +			RCU_DP_LOG(DEBUG,

> > +				"%s: status: token = %lu, wait = %d, Thread

> QS cnt = %lu, Thread ID = %d",

> > +				__func__, t, wait, c, id+j);

> > +			/* Counter is not checked for wrap-around

> condition

> > +			 * as it is a 64b counter.

> > +			 */

> > +			if (unlikely(c != RTE_QSBR_CNT_THR_OFFLINE && c

> < t)) {

> 

> This assumes that a 64-bit counter won't overflow, which is close enough

> to true given current CPU clock frequencies.  ;-)

> 

> > +				/* This thread is not in quiescent state */

> > +				if (!wait)

> > +					return 0;

> > +

> > +				rte_pause();

> > +				/* This thread might have unregistered.

> > +				 * Re-read the bitmap.

> > +				 */

> > +				bmap = __atomic_load_n(reg_thread_id,

> > +						__ATOMIC_ACQUIRE);

> > +

> > +				continue;

> > +			}

> > +

> > +			bmap &= ~(1UL << j);

> > +		}

> > +	}

> > +

> > +	return 1;

> > +}

> > +

> > +/* Check the quiescent state counter for all threads, assuming that

> > + * all the threads have registered.

> > + */

> > +static __rte_always_inline int

> > +__rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)

> 

> Does checking the bitmap really take long enough to make this worthwhile

> as a separate function?  I would think that the bitmap-checking time

> would be lost in the noise of cache misses from the ->cnt loads.

It avoids accessing one cache line. I think this is where the savings are (may be in theory). This is the most probable use case.
On the other hand, __rcu_qsbr_check_selective() will result in savings (depending on how many threads are currently registered) by avoiding accessing unwanted counters.

> 

> Sure, if you invoke __rcu_qsbr_check_selective() in a tight loop in the

> absence of readers, you might see __rcu_qsbr_check_all() being a bit

> faster.  But is that really what DPDK does?

I see improvements in the synthetic test case (similar to the one you have described, around 27%). However, in the more practical test cases I do not see any difference.
 
> 

> > +{

> > +	uint32_t i;

> > +	struct rte_rcu_qsbr_cnt *cnt;

> > +	uint64_t c;

> > +

> > +	for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {

> > +		RCU_DP_LOG(DEBUG,

> > +			"%s: check: token = %lu, wait = %d, Thread ID = %d",

> > +			__func__, t, wait, i);

> > +		while (1) {

> > +			c = __atomic_load_n(&cnt->cnt,

> __ATOMIC_ACQUIRE);

> > +			RCU_DP_LOG(DEBUG,

> > +				"%s: status: token = %lu, wait = %d, Thread

> QS cnt = %lu, Thread ID = %d",

> > +				__func__, t, wait, c, i);

> > +			/* Counter is not checked for wrap-around

> condition

> > +			 * as it is a 64b counter.

> > +			 */

> > +			if (likely(c == RTE_QSBR_CNT_THR_OFFLINE || c >=

> t))

> > +				break;

> > +

> > +			/* This thread is not in quiescent state */

> > +			if (!wait)

> > +				return 0;

> > +

> > +			rte_pause();

> > +		}

> > +	}

> > +

> > +	return 1;

> > +}

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Checks if all the reader threads have entered the quiescent state

> > + * referenced by token.

> > + *

> > + * This is implemented as a lock-free function. It is multi-thread

> > + * safe and can be called from the worker threads as well.

> > + *

> > + * If this API is called with 'wait' set to true, the following

> > + * factors must be considered:

> > + *

> > + * 1) If the calling thread is also reporting the status on the

> > + * same QS variable, it must update the quiescent state status,

> > +before

> > + * calling this API.

> > + *

> > + * 2) In addition, while calling from multiple threads, only

> > + * one of those threads can be reporting the quiescent state status

> > + * on a given QS variable.

> > + *

> > + * @param v

> > + *   QS variable

> > + * @param t

> > + *   Token returned by rte_rcu_qsbr_start API

> > + * @param wait

> > + *   If true, block till all the reader threads have completed entering

> > + *   the quiescent state referenced by token 't'.

> > + * @return

> > + *   - 0 if all reader threads have NOT passed through specified number

> > + *     of quiescent states.

> > + *   - 1 if all reader threads have passed through specified number

> > + *     of quiescent states.

> > + */

> > +static __rte_always_inline int __rte_experimental

> > +rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) {

> > +	RTE_ASSERT(v != NULL);

> > +

> > +	if (likely(v->num_threads == v->max_threads))

> > +		return __rcu_qsbr_check_all(v, t, wait);

> > +	else

> > +		return __rcu_qsbr_check_selective(v, t, wait); }

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Wait till the reader threads have entered quiescent state.

> > + *

> > + * This is implemented as a lock-free function. It is multi-thread safe.

> > + * This API can be thought of as a wrapper around rte_rcu_qsbr_start

> > +and

> > + * rte_rcu_qsbr_check APIs.

> > + *

> > + * If this API is called from multiple threads, only one of

> > + * those threads can be reporting the quiescent state status on a

> > + * given QS variable.

> > + *

> > + * @param v

> > + *   QS variable

> > + * @param thread_id

> > + *   Thread ID of the caller if it is registered to report quiescent state

> > + *   on this QS variable (i.e. the calling thread is also part of the

> > + *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.

> > + */

> > +static __rte_always_inline void __rte_experimental

> > +rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int

> > +thread_id) {

> > +	uint64_t t;

> > +

> > +	RTE_ASSERT(v != NULL);

> > +

> > +	t = rte_rcu_qsbr_start(v);

> > +

> > +	/* If the current thread has readside critical section,

> > +	 * update its quiescent state status.

> > +	 */

> > +	if (thread_id != RTE_QSBR_THRID_INVALID)

> > +		rte_rcu_qsbr_quiescent(v, thread_id);

> > +

> > +	/* Wait for other readers to enter quiescent state */

> > +	rte_rcu_qsbr_check(v, t, true);

> 

> And you are presumably relying on 64-bit counters to avoid the need to

> execute the above code twice in succession.  Which again works given

> current CPU clock rates combined with system and human lifespans.

> Otherwise, there are interesting race conditions that can happen, so don't

> try this with a 32-bit counter!!!

Yes. I am relying on 64-bit counters to avoid having to spend cycles (and time).

> 

> (But think of the great^N grandchildren!!!)

(It is an interesting thought. I wonder what would happen to all the code we are writing today 😊)

> 

> More seriously, a comment warning people not to make the counter be 32

> bits is in order.

Agree, I will add it in the structure definition.

> 

> > +}

> > +

> > +/**

> > + * @warning

> > + * @b EXPERIMENTAL: this API may change without prior notice

> > + *

> > + * Dump the details of a single QS variables to a file.

> > + *

> > + * It is NOT multi-thread safe.

> > + *

> > + * @param f

> > + *   A pointer to a file for output

> > + * @param v

> > + *   QS variable

> > + * @return

> > + *   On success - 0

> > + *   On error - 1 with error code set in rte_errno.

> > + *   Possible rte_errno codes are:

> > + *   - EINVAL - NULL parameters are passed

> > + */

> > +int __rte_experimental

> > +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);

> > +

> > +#ifdef __cplusplus

> > +}

> > +#endif

> > +

> > +#endif /* _RTE_RCU_QSBR_H_ */

> > diff --git a/lib/librte_rcu/rte_rcu_version.map

> > b/lib/librte_rcu/rte_rcu_version.map

> > new file mode 100644

> > index 000000000..ad8cb517c

> > --- /dev/null

> > +++ b/lib/librte_rcu/rte_rcu_version.map

> > @@ -0,0 +1,11 @@

> > +EXPERIMENTAL {

> > +	global:

> > +

> > +	rte_rcu_qsbr_get_memsize;

> > +	rte_rcu_qsbr_init;

> > +	rte_rcu_qsbr_thread_register;

> > +	rte_rcu_qsbr_thread_unregister;

> > +	rte_rcu_qsbr_dump;

> > +

> > +	local: *;

> > +};

> > diff --git a/lib/meson.build b/lib/meson.build index

> > 595314d7d..67be10659 100644

> > --- a/lib/meson.build

> > +++ b/lib/meson.build

> > @@ -22,7 +22,7 @@ libraries = [

> >  	'gro', 'gso', 'ip_frag', 'jobstats',

> >  	'kni', 'latencystats', 'lpm', 'member',

> >  	'power', 'pdump', 'rawdev',

> > -	'reorder', 'sched', 'security', 'stack', 'vhost',

> > +	'reorder', 'sched', 'security', 'stack', 'vhost', 'rcu',

> >  	#ipsec lib depends on crypto and security

> >  	'ipsec',

> >  	# add pkt framework libs which use other libs from above diff --git

> > a/mk/rte.app.mk b/mk/rte.app.mk index 7d994bece..e93cc366d 100644

> > --- a/mk/rte.app.mk

> > +++ b/mk/rte.app.mk

> > @@ -97,6 +97,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_EAL)            += -

> lrte_eal

> >  _LDLIBS-$(CONFIG_RTE_LIBRTE_CMDLINE)        += -lrte_cmdline

> >  _LDLIBS-$(CONFIG_RTE_LIBRTE_REORDER)        += -lrte_reorder

> >  _LDLIBS-$(CONFIG_RTE_LIBRTE_SCHED)          += -lrte_sched

> > +_LDLIBS-$(CONFIG_RTE_LIBRTE_RCU)            += -lrte_rcu

> >

> >  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)

> >  _LDLIBS-$(CONFIG_RTE_LIBRTE_KNI)            += -lrte_kni

> > --

> > 2.17.1

> >
Paul E. McKenney April 11, 2019, 3:26 p.m. UTC | #3
On Thu, Apr 11, 2019 at 04:35:04AM +0000, Honnappa Nagarahalli wrote:
> Hi Paul,

> 	Thank you for your feedback.

> 

> > -----Original Message-----

> > From: Paul E. McKenney <paulmck@linux.ibm.com>

> > Sent: Wednesday, April 10, 2019 1:15 PM

> > To: Honnappa Nagarahalli <Honnappa.Nagarahalli@arm.com>

> > Cc: konstantin.ananyev@intel.com; stephen@networkplumber.org;

> > marko.kovacevic@intel.com; dev@dpdk.org; Gavin Hu (Arm Technology

> > China) <Gavin.Hu@arm.com>; Dharmik Thakkar

> > <Dharmik.Thakkar@arm.com>; Malvika Gupta <Malvika.Gupta@arm.com>

> > Subject: Re: [PATCH v4 1/3] rcu: add RCU library supporting QSBR

> > mechanism

> > 

> > On Wed, Apr 10, 2019 at 06:20:04AM -0500, Honnappa Nagarahalli wrote:

> > > Add RCU library supporting quiescent state based memory reclamation

> > method.

> > > This library helps identify the quiescent state of the reader threads

> > > so that the writers can free the memory associated with the lock less

> > > data structures.

> > 

> > I don't see any sign of read-side markers (rcu_read_lock() and

> > rcu_read_unlock() in the Linux kernel, userspace RCU, etc.).

> > 

> > Yes, strictly speaking, these are not needed for QSBR to operate, but they

> These APIs would be empty for QSBR.

> 

> > make it way easier to maintain and debug code using RCU.  For example,

> > given the read-side markers, you can check for errors like having a call to

> > rte_rcu_qsbr_quiescent() in the middle of a reader quite easily.

> > Without those read-side markers, life can be quite hard and you will really

> > hate yourself for failing to have provided them.

> 

> Want to make sure I understood this, do you mean the application would mark before and after accessing the shared data structure on the reader side?

> 

> rte_rcu_qsbr_lock()

> <begin access shared data structure>

> ...

> ...

> <end access shared data structure>

> rte_rcu_qsbr_unlock()


Yes, that is the idea.

> If someone is debugging this code, they have to make sure that there is an unlock for every lock and there is no call to rte_rcu_qsbr_quiescent in between.

> It sounds good to me. Obviously, they will not add any additional cycles as well.

> Please let me know if my understanding is correct.


Yes.  And in some sort of debug mode, you could capture the counter at
rte_rcu_qsbr_lock() time and check it at rte_rcu_qsbr_unlock() time.  If
the counter has advanced too far (more than one, if I am not too confused)
there is a bug.  Also in debug mode, you could have rte_rcu_qsbr_lock()
increment a per-thread counter and rte_rcu_qsbr_unlock() decrement it.
If the counter is non-zero at a quiescent state, there is a bug.
And so on.

> > Some additional questions and comments interspersed.

> > 

> > 							Thanx, Paul

> > 

> > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> > > Reviewed-by: Steve Capper <steve.capper@arm.com>

> > > Reviewed-by: Gavin Hu <gavin.hu@arm.com>

> > > Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>

> > > Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

> > > ---

> > >  MAINTAINERS                        |   5 +

> > >  config/common_base                 |   6 +

> > >  lib/Makefile                       |   2 +

> > >  lib/librte_rcu/Makefile            |  23 ++

> > >  lib/librte_rcu/meson.build         |   5 +

> > >  lib/librte_rcu/rte_rcu_qsbr.c      | 237 ++++++++++++

> > >  lib/librte_rcu/rte_rcu_qsbr.h      | 554

> > +++++++++++++++++++++++++++++

> > >  lib/librte_rcu/rte_rcu_version.map |  11 +

> > >  lib/meson.build                    |   2 +-

> > >  mk/rte.app.mk                      |   1 +

> > >  10 files changed, 845 insertions(+), 1 deletion(-)  create mode

> > > 100644 lib/librte_rcu/Makefile  create mode 100644

> > > lib/librte_rcu/meson.build  create mode 100644

> > > lib/librte_rcu/rte_rcu_qsbr.c  create mode 100644

> > > lib/librte_rcu/rte_rcu_qsbr.h  create mode 100644

> > > lib/librte_rcu/rte_rcu_version.map

> > >

> > > diff --git a/MAINTAINERS b/MAINTAINERS index 9774344dd..6e9766eed

> > > 100644

> > > --- a/MAINTAINERS

> > > +++ b/MAINTAINERS

> > > @@ -1267,6 +1267,11 @@ F: examples/bpf/

> > >  F: app/test/test_bpf.c

> > >  F: doc/guides/prog_guide/bpf_lib.rst

> > >

> > > +RCU - EXPERIMENTAL

> > > +M: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> > > +F: lib/librte_rcu/

> > > +F: doc/guides/prog_guide/rcu_lib.rst

> > > +

> > >

> > >  Test Applications

> > >  -----------------

> > > diff --git a/config/common_base b/config/common_base index

> > > 8da08105b..ad70c79e1 100644

> > > --- a/config/common_base

> > > +++ b/config/common_base

> > > @@ -829,6 +829,12 @@ CONFIG_RTE_LIBRTE_LATENCY_STATS=y  #

> > > CONFIG_RTE_LIBRTE_TELEMETRY=n

> > >

> > > +#

> > > +# Compile librte_rcu

> > > +#

> > > +CONFIG_RTE_LIBRTE_RCU=y

> > > +CONFIG_RTE_LIBRTE_RCU_DEBUG=n

> > > +

> > >  #

> > >  # Compile librte_lpm

> > >  #

> > > diff --git a/lib/Makefile b/lib/Makefile index 26021d0c0..791e0d991

> > > 100644

> > > --- a/lib/Makefile

> > > +++ b/lib/Makefile

> > > @@ -111,6 +111,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_IPSEC) +=

> > librte_ipsec

> > > DEPDIRS-librte_ipsec := librte_eal librte_mbuf librte_cryptodev

> > > librte_security

> > >  DIRS-$(CONFIG_RTE_LIBRTE_TELEMETRY) += librte_telemetry

> > > DEPDIRS-librte_telemetry := librte_eal librte_metrics librte_ethdev

> > > +DIRS-$(CONFIG_RTE_LIBRTE_RCU) += librte_rcu DEPDIRS-librte_rcu :=

> > > +librte_eal

> > >

> > >  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)

> > >  DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni diff --git

> > > a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile new file mode

> > > 100644 index 000000000..6aa677bd1

> > > --- /dev/null

> > > +++ b/lib/librte_rcu/Makefile

> > > @@ -0,0 +1,23 @@

> > > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2018 Arm

> > > +Limited

> > > +

> > > +include $(RTE_SDK)/mk/rte.vars.mk

> > > +

> > > +# library name

> > > +LIB = librte_rcu.a

> > > +

> > > +CFLAGS += -DALLOW_EXPERIMENTAL_API

> > > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 LDLIBS += -lrte_eal

> > > +

> > > +EXPORT_MAP := rte_rcu_version.map

> > > +

> > > +LIBABIVER := 1

> > > +

> > > +# all source are stored in SRCS-y

> > > +SRCS-$(CONFIG_RTE_LIBRTE_RCU) := rte_rcu_qsbr.c

> > > +

> > > +# install includes

> > > +SYMLINK-$(CONFIG_RTE_LIBRTE_RCU)-include := rte_rcu_qsbr.h

> > > +

> > > +include $(RTE_SDK)/mk/rte.lib.mk

> > > diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build

> > > new file mode 100644 index 000000000..c009ae4b7

> > > --- /dev/null

> > > +++ b/lib/librte_rcu/meson.build

> > > @@ -0,0 +1,5 @@

> > > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2018 Arm

> > > +Limited

> > > +

> > > +sources = files('rte_rcu_qsbr.c')

> > > +headers = files('rte_rcu_qsbr.h')

> > > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c

> > > b/lib/librte_rcu/rte_rcu_qsbr.c new file mode 100644 index

> > > 000000000..53d08446a

> > > --- /dev/null

> > > +++ b/lib/librte_rcu/rte_rcu_qsbr.c

> > > @@ -0,0 +1,237 @@

> > > +/* SPDX-License-Identifier: BSD-3-Clause

> > > + *

> > > + * Copyright (c) 2018 Arm Limited

> > > + */

> > > +

> > > +#include <stdio.h>

> > > +#include <string.h>

> > > +#include <stdint.h>

> > > +#include <errno.h>

> > > +

> > > +#include <rte_common.h>

> > > +#include <rte_log.h>

> > > +#include <rte_memory.h>

> > > +#include <rte_malloc.h>

> > > +#include <rte_eal.h>

> > > +#include <rte_eal_memconfig.h>

> > > +#include <rte_atomic.h>

> > > +#include <rte_per_lcore.h>

> > > +#include <rte_lcore.h>

> > > +#include <rte_errno.h>

> > > +

> > > +#include "rte_rcu_qsbr.h"

> > > +

> > > +/* Get the memory size of QSBR variable */ size_t __rte_experimental

> > > +rte_rcu_qsbr_get_memsize(uint32_t max_threads) {

> > > +	size_t sz;

> > > +

> > > +	if (max_threads == 0) {

> > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > +			"%s(): Invalid max_threads %u\n",

> > > +			__func__, max_threads);

> > > +		rte_errno = EINVAL;

> > > +

> > > +		return 1;

> > > +	}

> > > +

> > > +	sz = sizeof(struct rte_rcu_qsbr);

> > > +

> > > +	/* Add the size of quiescent state counter array */

> > > +	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;

> > > +

> > > +	/* Add the size of the registered thread ID bitmap array */

> > > +	sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads);

> > > +

> > > +	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);

> > 

> > Given that you align here, should you also align in the earlier steps in the

> > computation of sz?

> 

> Agree. I will remove the align here and keep the earlier one as the intent is to align the thread ID array.


Sounds good!

> > > +}

> > > +

> > > +/* Initialize a quiescent state variable */ int __rte_experimental

> > > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) {

> > > +	size_t sz;

> > > +

> > > +	if (v == NULL) {

> > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > +			"%s(): Invalid input parameter\n", __func__);

> > > +		rte_errno = EINVAL;

> > > +

> > > +		return 1;

> > > +	}

> > > +

> > > +	sz = rte_rcu_qsbr_get_memsize(max_threads);

> > > +	if (sz == 1)

> > > +		return 1;

> > > +

> > > +	/* Set all the threads to offline */

> > > +	memset(v, 0, sz);

> > 

> > We calculate sz here, but it looks like the caller must also calculate it in

> > order to correctly allocate the memory referenced by the "v" argument to

> > this function, with bad things happening if the two calculations get

> > different results.  Should "v" instead be allocated within this function to

> > avoid this sort of problem?

> 

> Earlier version allocated the memory with-in this library. However, it was decided to go with the current implementation as it provides flexibility for the application to manage the memory as it sees fit. For ex: it could allocate this as part of another structure in a single allocation. This also falls inline with similar approach taken in other libraries. 


So the allocator APIs vary too much to allow a pointer to the desired
allocator function to be passed in?  Or do you also want to allow static
allocation?  If the latter, would a DEFINE_RTE_RCU_QSBR() be of use?

> > > +	v->max_threads = max_threads;

> > > +	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,

> > > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /

> > > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE;

> > > +	v->token = RTE_QSBR_CNT_INIT;

> > > +

> > > +	return 0;

> > > +}

> > > +

> > > +/* Register a reader thread to report its quiescent state

> > > + * on a QS variable.

> > > + */

> > > +int __rte_experimental

> > > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int

> > > +thread_id) {

> > > +	unsigned int i, id, success;

> > > +	uint64_t old_bmap, new_bmap;

> > > +

> > > +	if (v == NULL || thread_id >= v->max_threads) {

> > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > +			"%s(): Invalid input parameter\n", __func__);

> > > +		rte_errno = EINVAL;

> > > +

> > > +		return 1;

> > > +	}

> > > +

> > > +	id = thread_id & RTE_QSBR_THRID_MASK;

> > > +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> > > +

> > > +	/* Make sure that the counter for registered threads does not

> > > +	 * go out of sync. Hence, additional checks are required.

> > > +	 */

> > > +	/* Check if the thread is already registered */

> > > +	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > > +					__ATOMIC_RELAXED);

> > > +	if (old_bmap & 1UL << id)

> > > +		return 0;

> > > +

> > > +	do {

> > > +		new_bmap = old_bmap | (1UL << id);

> > > +		success = __atomic_compare_exchange(

> > > +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > > +					&old_bmap, &new_bmap, 0,

> > > +					__ATOMIC_RELEASE,

> > __ATOMIC_RELAXED);

> > > +

> > > +		if (success)

> > > +			__atomic_fetch_add(&v->num_threads,

> > > +						1, __ATOMIC_RELAXED);

> > > +		else if (old_bmap & (1UL << id))

> > > +			/* Someone else registered this thread.

> > > +			 * Counter should not be incremented.

> > > +			 */

> > > +			return 0;

> > > +	} while (success == 0);

> > 

> > This would be simpler if threads were required to register themselves.

> > Maybe you have use cases requiring registration of other threads, but this

> > capability is adding significant complexity, so it might be worth some

> > thought.

> > 

> It was simple earlier (__atomic_fetch_or). The complexity is added as 'num_threads' should not go out of sync.


Hmmm...

So threads are allowed to register other threads?  Or is there some other
reason that concurrent registration is required?

> > > +	return 0;

> > > +}

> > > +

> > > +/* Remove a reader thread, from the list of threads reporting their

> > > + * quiescent state on a QS variable.

> > > + */

> > > +int __rte_experimental

> > > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int

> > > +thread_id) {

> > > +	unsigned int i, id, success;

> > > +	uint64_t old_bmap, new_bmap;

> > > +

> > > +	if (v == NULL || thread_id >= v->max_threads) {

> > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > +			"%s(): Invalid input parameter\n", __func__);

> > > +		rte_errno = EINVAL;

> > > +

> > > +		return 1;

> > > +	}

> > > +

> > > +	id = thread_id & RTE_QSBR_THRID_MASK;

> > > +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> > > +

> > > +	/* Make sure that the counter for registered threads does not

> > > +	 * go out of sync. Hence, additional checks are required.

> > > +	 */

> > > +	/* Check if the thread is already unregistered */

> > > +	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > > +					__ATOMIC_RELAXED);

> > > +	if (old_bmap & ~(1UL << id))

> > > +		return 0;

> > > +

> > > +	do {

> > > +		new_bmap = old_bmap & ~(1UL << id);

> > > +		/* Make sure any loads of the shared data structure are

> > > +		 * completed before removal of the thread from the list of

> > > +		 * reporting threads.

> > > +		 */

> > > +		success = __atomic_compare_exchange(

> > > +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > > +					&old_bmap, &new_bmap, 0,

> > > +					__ATOMIC_RELEASE,

> > __ATOMIC_RELAXED);

> > > +

> > > +		if (success)

> > > +			__atomic_fetch_sub(&v->num_threads,

> > > +						1, __ATOMIC_RELAXED);

> > > +		else if (old_bmap & ~(1UL << id))

> > > +			/* Someone else unregistered this thread.

> > > +			 * Counter should not be incremented.

> > > +			 */

> > > +			return 0;

> > > +	} while (success == 0);

> > 

> > Ditto!

> > 

> > > +	return 0;

> > > +}

> > > +

> > > +/* Dump the details of a single quiescent state variable to a file.

> > > +*/ int __rte_experimental rte_rcu_qsbr_dump(FILE *f, struct

> > > +rte_rcu_qsbr *v) {

> > > +	uint64_t bmap;

> > > +	uint32_t i, t;

> > > +

> > > +	if (v == NULL || f == NULL) {

> > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > +			"%s(): Invalid input parameter\n", __func__);

> > > +		rte_errno = EINVAL;

> > > +

> > > +		return 1;

> > > +	}

> > > +

> > > +	fprintf(f, "\nQuiescent State Variable @%p\n", v);

> > > +

> > > +	fprintf(f, "  QS variable memory size = %lu\n",

> > > +				rte_rcu_qsbr_get_memsize(v-

> > >max_threads));

> > > +	fprintf(f, "  Given # max threads = %u\n", v->max_threads);

> > > +	fprintf(f, "  Current # threads = %u\n", v->num_threads);

> > > +

> > > +	fprintf(f, "  Registered thread ID mask = 0x");

> > > +	for (i = 0; i < v->num_elems; i++)

> > > +		fprintf(f, "%lx", __atomic_load_n(

> > > +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > > +					__ATOMIC_ACQUIRE));

> > > +	fprintf(f, "\n");

> > > +

> > > +	fprintf(f, "  Token = %lu\n",

> > > +			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));

> > > +

> > > +	fprintf(f, "Quiescent State Counts for readers:\n");

> > > +	for (i = 0; i < v->num_elems; i++) {

> > > +		bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v,

> > i),

> > > +					__ATOMIC_ACQUIRE);

> > > +		while (bmap) {

> > > +			t = __builtin_ctzl(bmap);

> > > +			fprintf(f, "thread ID = %d, count = %lu\n", t,

> > > +				__atomic_load_n(

> > > +					&v->qsbr_cnt[i].cnt,

> > > +					__ATOMIC_RELAXED));

> > > +			bmap &= ~(1UL << t);

> > > +		}

> > > +	}

> > > +

> > > +	return 0;

> > > +}

> > > +

> > > +int rcu_log_type;

> > > +

> > > +RTE_INIT(rte_rcu_register)

> > > +{

> > > +	rcu_log_type = rte_log_register("lib.rcu");

> > > +	if (rcu_log_type >= 0)

> > > +		rte_log_set_level(rcu_log_type, RTE_LOG_ERR); }

> > > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h

> > > b/lib/librte_rcu/rte_rcu_qsbr.h new file mode 100644 index

> > > 000000000..ff696aeab

> > > --- /dev/null

> > > +++ b/lib/librte_rcu/rte_rcu_qsbr.h

> > > @@ -0,0 +1,554 @@

> > > +/* SPDX-License-Identifier: BSD-3-Clause

> > > + * Copyright (c) 2018 Arm Limited

> > > + */

> > > +

> > > +#ifndef _RTE_RCU_QSBR_H_

> > > +#define _RTE_RCU_QSBR_H_

> > > +

> > > +/**

> > > + * @file

> > > + * RTE Quiescent State Based Reclamation (QSBR)

> > > + *

> > > + * Quiescent State (QS) is any point in the thread execution

> > > + * where the thread does not hold a reference to a data structure

> > > + * in shared memory. While using lock-less data structures, the

> > > +writer

> > > + * can safely free memory once all the reader threads have entered

> > > + * quiescent state.

> > > + *

> > > + * This library provides the ability for the readers to report

> > > +quiescent

> > > + * state and for the writers to identify when all the readers have

> > > + * entered quiescent state.

> > > + */

> > > +

> > > +#ifdef __cplusplus

> > > +extern "C" {

> > > +#endif

> > > +

> > > +#include <stdio.h>

> > > +#include <stdint.h>

> > > +#include <errno.h>

> > > +#include <rte_common.h>

> > > +#include <rte_memory.h>

> > > +#include <rte_lcore.h>

> > > +#include <rte_debug.h>

> > > +#include <rte_atomic.h>

> > > +

> > > +extern int rcu_log_type;

> > > +

> > > +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG #define RCU_DP_LOG(level,

> > fmt,

> > > +args...) \

> > > +	rte_log(RTE_LOG_ ## level, rcu_log_type, \

> > > +		"%s(): " fmt "\n", __func__, ## args) #else #define

> > > +RCU_DP_LOG(level, fmt, args...) #endif

> > > +

> > > +/* Registered thread IDs are stored as a bitmap of 64b element array.

> > > + * Given thread id needs to be converted to index into the array and

> > > + * the id within the array element.

> > > + */

> > > +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)

> > #define

> > > +RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \

> > > +	RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \

> > > +		RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3,

> > RTE_CACHE_LINE_SIZE) #define

> > > +RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \

> > > +	((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i) #define

> > > +RTE_QSBR_THRID_INDEX_SHIFT 6 #define RTE_QSBR_THRID_MASK 0x3f

> > #define

> > > +RTE_QSBR_THRID_INVALID 0xffffffff

> > > +

> > > +/* Worker thread counter */

> > > +struct rte_rcu_qsbr_cnt {

> > > +	uint64_t cnt;

> > > +	/**< Quiescent state counter. Value 0 indicates the thread is

> > > +offline */ } __rte_cache_aligned;

> > > +

> > > +#define RTE_QSBR_CNT_THR_OFFLINE 0

> > > +#define RTE_QSBR_CNT_INIT 1

> > > +

> > > +/* RTE Quiescent State variable structure.

> > > + * This structure has two elements that vary in size based on the

> > > + * 'max_threads' parameter.

> > > + * 1) Quiescent state counter array

> > > + * 2) Register thread ID array

> > > + */

> > > +struct rte_rcu_qsbr {

> > > +	uint64_t token __rte_cache_aligned;

> > > +	/**< Counter to allow for multiple concurrent quiescent state

> > > +queries */

> > > +

> > > +	uint32_t num_elems __rte_cache_aligned;

> > > +	/**< Number of elements in the thread ID array */

> > > +	uint32_t num_threads;

> > > +	/**< Number of threads currently using this QS variable */

> > > +	uint32_t max_threads;

> > > +	/**< Maximum number of threads using this QS variable */

> > > +

> > > +	struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;

> > > +	/**< Quiescent state counter array of 'max_threads' elements */

> > > +

> > > +	/**< Registered thread IDs are stored in a bitmap array,

> > > +	 *   after the quiescent state counter array.

> > > +	 */

> > > +} __rte_cache_aligned;

> > > +

> > > +/**

> > > + * @warning

> > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > + *

> > > + * Return the size of the memory occupied by a Quiescent State

> > variable.

> > > + *

> > > + * @param max_threads

> > > + *   Maximum number of threads reporting quiescent state on this

> > variable.

> > > + * @return

> > > + *   On success - size of memory in bytes required for this QS variable.

> > > + *   On error - 1 with error code set in rte_errno.

> > > + *   Possible rte_errno codes are:

> > > + *   - EINVAL - max_threads is 0

> > > + */

> > > +size_t __rte_experimental

> > > +rte_rcu_qsbr_get_memsize(uint32_t max_threads);

> > > +

> > > +/**

> > > + * @warning

> > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > + *

> > > + * Initialize a Quiescent State (QS) variable.

> > > + *

> > > + * @param v

> > > + *   QS variable

> > > + * @param max_threads

> > > + *   Maximum number of threads reporting quiescent state on this

> > variable.

> > > + *   This should be the same value as passed to

> > rte_rcu_qsbr_get_memsize.

> > > + * @return

> > > + *   On success - 0

> > > + *   On error - 1 with error code set in rte_errno.

> > > + *   Possible rte_errno codes are:

> > > + *   - EINVAL - max_threads is 0 or 'v' is NULL.

> > > + *

> > > + */

> > > +int __rte_experimental

> > > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);

> > > +

> > > +/**

> > > + * @warning

> > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > + *

> > > + * Register a reader thread to report its quiescent state

> > > + * on a QS variable.

> > > + *

> > > + * This is implemented as a lock-free function. It is multi-thread

> > > + * safe.

> > > + * Any reader thread that wants to report its quiescent state must

> > > + * call this API. This can be called during initialization or as part

> > > + * of the packet processing loop.

> > > + *

> > > + * Note that rte_rcu_qsbr_thread_online must be called before the

> > > + * thread updates its quiescent state using rte_rcu_qsbr_quiescent.

> > > + *

> > > + * @param v

> > > + *   QS variable

> > > + * @param thread_id

> > > + *   Reader thread with this thread ID will report its quiescent state on

> > > + *   the QS variable. thread_id is a value between 0 and (max_threads -

> > 1).

> > > + *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.

> > > + */

> > > +int __rte_experimental

> > > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int

> > > +thread_id);

> > > +

> > > +/**

> > > + * @warning

> > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > + *

> > > + * Remove a reader thread, from the list of threads reporting their

> > > + * quiescent state on a QS variable.

> > > + *

> > > + * This is implemented as a lock-free function. It is multi-thread safe.

> > > + * This API can be called from the reader threads during shutdown.

> > > + * Ongoing quiescent state queries will stop waiting for the status

> > > +from this

> > > + * unregistered reader thread.

> > > + *

> > > + * @param v

> > > + *   QS variable

> > > + * @param thread_id

> > > + *   Reader thread with this thread ID will stop reporting its quiescent

> > > + *   state on the QS variable.

> > > + */

> > > +int __rte_experimental

> > > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int

> > > +thread_id);

> > > +

> > > +/**

> > > + * @warning

> > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > + *

> > > + * Add a registered reader thread, to the list of threads reporting

> > > +their

> > > + * quiescent state on a QS variable.

> > > + *

> > > + * This is implemented as a lock-free function. It is multi-thread

> > > + * safe.

> > > + *

> > > + * Any registered reader thread that wants to report its quiescent

> > > +state must

> > > + * call this API before calling rte_rcu_qsbr_quiescent. This can be

> > > +called

> > > + * during initialization or as part of the packet processing loop.

> > > + *

> > > + * The reader thread must call rte_rcu_thread_offline API, before

> > > + * calling any functions that block, to ensure that

> > > +rte_rcu_qsbr_check

> > > + * API does not wait indefinitely for the reader thread to update its QS.

> > > + *

> > > + * The reader thread must call rte_rcu_thread_online API, after the

> > > +blocking

> > > + * function call returns, to ensure that rte_rcu_qsbr_check API

> > > + * waits for the reader thread to update its quiescent state.

> > > + *

> > > + * @param v

> > > + *   QS variable

> > > + * @param thread_id

> > > + *   Reader thread with this thread ID will report its quiescent state on

> > > + *   the QS variable.

> > > + */

> > > +static __rte_always_inline void __rte_experimental

> > > +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int

> > > +thread_id)

> > 

> > I am not clear on why this function should be inline.  Or do you have use

> > cases where threads go offline and come back online extremely frequently?

> 

> Yes, there are use cases where the function call to receive the packets can block.


OK.

> > > +{

> > > +	uint64_t t;

> > > +

> > > +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> > > +

> > > +	/* Copy the current value of token.

> > > +	 * The fence at the end of the function will ensure that

> > > +	 * the following will not move down after the load of any shared

> > > +	 * data structure.

> > > +	 */

> > > +	t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);

> > > +

> > > +	/* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure

> > > +	 * 'cnt' (64b) is accessed atomically.

> > > +	 */

> > > +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> > > +		t, __ATOMIC_RELAXED);

> > > +

> > > +	/* The subsequent load of the data structure should not

> > > +	 * move above the store. Hence a store-load barrier

> > > +	 * is required.

> > > +	 * If the load of the data structure moves above the store,

> > > +	 * writer might not see that the reader is online, even though

> > > +	 * the reader is referencing the shared data structure.

> > > +	 */

> > > +#ifdef RTE_ARCH_X86_64

> > > +	/* rte_smp_mb() for x86 is lighter */

> > > +	rte_smp_mb();

> > > +#else

> > > +	__atomic_thread_fence(__ATOMIC_SEQ_CST);

> > > +#endif

> > > +}

> > > +

> > > +/**

> > > + * @warning

> > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > + *

> > > + * Remove a registered reader thread from the list of threads

> > > +reporting their

> > > + * quiescent state on a QS variable.

> > > + *

> > > + * This is implemented as a lock-free function. It is multi-thread

> > > + * safe.

> > > + *

> > > + * This can be called during initialization or as part of the packet

> > > + * processing loop.

> > > + *

> > > + * The reader thread must call rte_rcu_thread_offline API, before

> > > + * calling any functions that block, to ensure that

> > > +rte_rcu_qsbr_check

> > > + * API does not wait indefinitely for the reader thread to update its QS.

> > > + *

> > > + * @param v

> > > + *   QS variable

> > > + * @param thread_id

> > > + *   rte_rcu_qsbr_check API will not wait for the reader thread with

> > > + *   this thread ID to report its quiescent state on the QS variable.

> > > + */

> > > +static __rte_always_inline void __rte_experimental

> > > +rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int

> > > +thread_id)

> > 

> > Same here on inlining.

> > 

> > > +{

> > > +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> > > +

> > > +	/* The reader can go offline only after the load of the

> > > +	 * data structure is completed. i.e. any load of the

> > > +	 * data strcture can not move after this store.

> > > +	 */

> > > +

> > > +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> > > +		RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE); }

> > > +

> > > +/**

> > > + * @warning

> > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > + *

> > > + * Ask the reader threads to report the quiescent state

> > > + * status.

> > > + *

> > > + * This is implemented as a lock-free function. It is multi-thread

> > > + * safe and can be called from worker threads.

> > > + *

> > > + * @param v

> > > + *   QS variable

> > > + * @return

> > > + *   - This is the token for this call of the API. This should be

> > > + *     passed to rte_rcu_qsbr_check API.

> > > + */

> > > +static __rte_always_inline uint64_t __rte_experimental

> > > +rte_rcu_qsbr_start(struct rte_rcu_qsbr *v) {

> > > +	uint64_t t;

> > > +

> > > +	RTE_ASSERT(v != NULL);

> > > +

> > > +	/* Release the changes to the shared data structure.

> > > +	 * This store release will ensure that changes to any data

> > > +	 * structure are visible to the workers before the token

> > > +	 * update is visible.

> > > +	 */

> > > +	t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);

> > > +

> > > +	return t;

> > > +}

> > > +

> > > +/**

> > > + * @warning

> > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > + *

> > > + * Update quiescent state for a reader thread.

> > > + *

> > > + * This is implemented as a lock-free function. It is multi-thread safe.

> > > + * All the reader threads registered to report their quiescent state

> > > + * on the QS variable must call this API.

> > > + *

> > > + * @param v

> > > + *   QS variable

> > > + * @param thread_id

> > > + *   Update the quiescent state for the reader with this thread ID.

> > > + */

> > > +static __rte_always_inline void __rte_experimental

> > > +rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int

> > > +thread_id) {

> > > +	uint64_t t;

> > > +

> > > +	RTE_ASSERT(v != NULL && thread_id < v->max_threads);

> > > +

> > > +	/* Acquire the changes to the shared data structure released

> > > +	 * by rte_rcu_qsbr_start.

> > > +	 * Later loads of the shared data structure should not move

> > > +	 * above this load. Hence, use load-acquire.

> > > +	 */

> > > +	t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);

> > > +

> > > +	/* Inform the writer that updates are visible to this reader.

> > > +	 * Prior loads of the shared data structure should not move

> > > +	 * beyond this store. Hence use store-release.

> > > +	 */

> > > +	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,

> > > +			 t, __ATOMIC_RELEASE);

> > > +

> > > +	RCU_DP_LOG(DEBUG, "%s: update: token = %lu, Thread ID = %d",

> > > +		__func__, t, thread_id);

> > > +}

> > > +

> > > +/* Check the quiescent state counter for registered threads only,

> > > +assuming

> > > + * that not all threads have registered.

> > > + */

> > > +static __rte_always_inline int

> > > +__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool

> > > +wait) {

> > > +	uint32_t i, j, id;

> > > +	uint64_t bmap;

> > > +	uint64_t c;

> > > +	uint64_t *reg_thread_id;

> > > +

> > > +	for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v, 0);

> > > +		i < v->num_elems;

> > > +		i++, reg_thread_id++) {

> > > +		/* Load the current registered thread bit map before

> > > +		 * loading the reader thread quiescent state counters.

> > > +		 */

> > > +		bmap = __atomic_load_n(reg_thread_id,

> > __ATOMIC_ACQUIRE);

> > > +		id = i << RTE_QSBR_THRID_INDEX_SHIFT;

> > > +

> > > +		while (bmap) {

> > > +			j = __builtin_ctzl(bmap);

> > > +			RCU_DP_LOG(DEBUG,

> > > +				"%s: check: token = %lu, wait = %d, Bit Map

> > = 0x%lx, Thread ID = %d",

> > > +				__func__, t, wait, bmap, id + j);

> > > +			c = __atomic_load_n(

> > > +					&v->qsbr_cnt[id + j].cnt,

> > > +					__ATOMIC_ACQUIRE);

> > > +			RCU_DP_LOG(DEBUG,

> > > +				"%s: status: token = %lu, wait = %d, Thread

> > QS cnt = %lu, Thread ID = %d",

> > > +				__func__, t, wait, c, id+j);

> > > +			/* Counter is not checked for wrap-around

> > condition

> > > +			 * as it is a 64b counter.

> > > +			 */

> > > +			if (unlikely(c != RTE_QSBR_CNT_THR_OFFLINE && c

> > < t)) {

> > 

> > This assumes that a 64-bit counter won't overflow, which is close enough

> > to true given current CPU clock frequencies.  ;-)

> > 

> > > +				/* This thread is not in quiescent state */

> > > +				if (!wait)

> > > +					return 0;

> > > +

> > > +				rte_pause();

> > > +				/* This thread might have unregistered.

> > > +				 * Re-read the bitmap.

> > > +				 */

> > > +				bmap = __atomic_load_n(reg_thread_id,

> > > +						__ATOMIC_ACQUIRE);

> > > +

> > > +				continue;

> > > +			}

> > > +

> > > +			bmap &= ~(1UL << j);

> > > +		}

> > > +	}

> > > +

> > > +	return 1;

> > > +}

> > > +

> > > +/* Check the quiescent state counter for all threads, assuming that

> > > + * all the threads have registered.

> > > + */

> > > +static __rte_always_inline int

> > > +__rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)

> > 

> > Does checking the bitmap really take long enough to make this worthwhile

> > as a separate function?  I would think that the bitmap-checking time

> > would be lost in the noise of cache misses from the ->cnt loads.

> 

> It avoids accessing one cache line. I think this is where the savings are (may be in theory). This is the most probable use case.

> On the other hand, __rcu_qsbr_check_selective() will result in savings (depending on how many threads are currently registered) by avoiding accessing unwanted counters.


Do you really expect to be calling this function on any kind of fastpath?

> > Sure, if you invoke __rcu_qsbr_check_selective() in a tight loop in the

> > absence of readers, you might see __rcu_qsbr_check_all() being a bit

> > faster.  But is that really what DPDK does?

> I see improvements in the synthetic test case (similar to the one you have described, around 27%). However, in the more practical test cases I do not see any difference.


If the performance improvement only occurs in a synthetic test case,
does it really make sense to optimize for it?

> > > +{

> > > +	uint32_t i;

> > > +	struct rte_rcu_qsbr_cnt *cnt;

> > > +	uint64_t c;

> > > +

> > > +	for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {

> > > +		RCU_DP_LOG(DEBUG,

> > > +			"%s: check: token = %lu, wait = %d, Thread ID = %d",

> > > +			__func__, t, wait, i);

> > > +		while (1) {

> > > +			c = __atomic_load_n(&cnt->cnt,

> > __ATOMIC_ACQUIRE);

> > > +			RCU_DP_LOG(DEBUG,

> > > +				"%s: status: token = %lu, wait = %d, Thread

> > QS cnt = %lu, Thread ID = %d",

> > > +				__func__, t, wait, c, i);

> > > +			/* Counter is not checked for wrap-around

> > condition

> > > +			 * as it is a 64b counter.

> > > +			 */

> > > +			if (likely(c == RTE_QSBR_CNT_THR_OFFLINE || c >=

> > t))

> > > +				break;

> > > +

> > > +			/* This thread is not in quiescent state */

> > > +			if (!wait)

> > > +				return 0;

> > > +

> > > +			rte_pause();

> > > +		}

> > > +	}

> > > +

> > > +	return 1;

> > > +}

> > > +

> > > +/**

> > > + * @warning

> > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > + *

> > > + * Checks if all the reader threads have entered the quiescent state

> > > + * referenced by token.

> > > + *

> > > + * This is implemented as a lock-free function. It is multi-thread

> > > + * safe and can be called from the worker threads as well.

> > > + *

> > > + * If this API is called with 'wait' set to true, the following

> > > + * factors must be considered:

> > > + *

> > > + * 1) If the calling thread is also reporting the status on the

> > > + * same QS variable, it must update the quiescent state status,

> > > +before

> > > + * calling this API.

> > > + *

> > > + * 2) In addition, while calling from multiple threads, only

> > > + * one of those threads can be reporting the quiescent state status

> > > + * on a given QS variable.

> > > + *

> > > + * @param v

> > > + *   QS variable

> > > + * @param t

> > > + *   Token returned by rte_rcu_qsbr_start API

> > > + * @param wait

> > > + *   If true, block till all the reader threads have completed entering

> > > + *   the quiescent state referenced by token 't'.

> > > + * @return

> > > + *   - 0 if all reader threads have NOT passed through specified number

> > > + *     of quiescent states.

> > > + *   - 1 if all reader threads have passed through specified number

> > > + *     of quiescent states.

> > > + */

> > > +static __rte_always_inline int __rte_experimental

> > > +rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) {

> > > +	RTE_ASSERT(v != NULL);

> > > +

> > > +	if (likely(v->num_threads == v->max_threads))

> > > +		return __rcu_qsbr_check_all(v, t, wait);

> > > +	else

> > > +		return __rcu_qsbr_check_selective(v, t, wait); }

> > > +

> > > +/**

> > > + * @warning

> > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > + *

> > > + * Wait till the reader threads have entered quiescent state.

> > > + *

> > > + * This is implemented as a lock-free function. It is multi-thread safe.

> > > + * This API can be thought of as a wrapper around rte_rcu_qsbr_start

> > > +and

> > > + * rte_rcu_qsbr_check APIs.

> > > + *

> > > + * If this API is called from multiple threads, only one of

> > > + * those threads can be reporting the quiescent state status on a

> > > + * given QS variable.

> > > + *

> > > + * @param v

> > > + *   QS variable

> > > + * @param thread_id

> > > + *   Thread ID of the caller if it is registered to report quiescent state

> > > + *   on this QS variable (i.e. the calling thread is also part of the

> > > + *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.

> > > + */

> > > +static __rte_always_inline void __rte_experimental

> > > +rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int

> > > +thread_id) {

> > > +	uint64_t t;

> > > +

> > > +	RTE_ASSERT(v != NULL);

> > > +

> > > +	t = rte_rcu_qsbr_start(v);

> > > +

> > > +	/* If the current thread has readside critical section,

> > > +	 * update its quiescent state status.

> > > +	 */

> > > +	if (thread_id != RTE_QSBR_THRID_INVALID)

> > > +		rte_rcu_qsbr_quiescent(v, thread_id);

> > > +

> > > +	/* Wait for other readers to enter quiescent state */

> > > +	rte_rcu_qsbr_check(v, t, true);

> > 

> > And you are presumably relying on 64-bit counters to avoid the need to

> > execute the above code twice in succession.  Which again works given

> > current CPU clock rates combined with system and human lifespans.

> > Otherwise, there are interesting race conditions that can happen, so don't

> > try this with a 32-bit counter!!!

> 

> Yes. I am relying on 64-bit counters to avoid having to spend cycles (and time).

> 

> > (But think of the great^N grandchildren!!!)

> 

> (It is an interesting thought. I wonder what would happen to all the code we are writing today 😊)


I suspect that most systems will be rebooted more than once per decade,
so unless CPU core clock rates manage to go up another order of magnitude,
we should be just fine.

Famous last words!  ;-)

> > More seriously, a comment warning people not to make the counter be 32

> > bits is in order.

> Agree, I will add it in the structure definition.


Sounds good!

							Thanx, Paul

> > > +}

> > > +

> > > +/**

> > > + * @warning

> > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > + *

> > > + * Dump the details of a single QS variables to a file.

> > > + *

> > > + * It is NOT multi-thread safe.

> > > + *

> > > + * @param f

> > > + *   A pointer to a file for output

> > > + * @param v

> > > + *   QS variable

> > > + * @return

> > > + *   On success - 0

> > > + *   On error - 1 with error code set in rte_errno.

> > > + *   Possible rte_errno codes are:

> > > + *   - EINVAL - NULL parameters are passed

> > > + */

> > > +int __rte_experimental

> > > +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);

> > > +

> > > +#ifdef __cplusplus

> > > +}

> > > +#endif

> > > +

> > > +#endif /* _RTE_RCU_QSBR_H_ */

> > > diff --git a/lib/librte_rcu/rte_rcu_version.map

> > > b/lib/librte_rcu/rte_rcu_version.map

> > > new file mode 100644

> > > index 000000000..ad8cb517c

> > > --- /dev/null

> > > +++ b/lib/librte_rcu/rte_rcu_version.map

> > > @@ -0,0 +1,11 @@

> > > +EXPERIMENTAL {

> > > +	global:

> > > +

> > > +	rte_rcu_qsbr_get_memsize;

> > > +	rte_rcu_qsbr_init;

> > > +	rte_rcu_qsbr_thread_register;

> > > +	rte_rcu_qsbr_thread_unregister;

> > > +	rte_rcu_qsbr_dump;

> > > +

> > > +	local: *;

> > > +};

> > > diff --git a/lib/meson.build b/lib/meson.build index

> > > 595314d7d..67be10659 100644

> > > --- a/lib/meson.build

> > > +++ b/lib/meson.build

> > > @@ -22,7 +22,7 @@ libraries = [

> > >  	'gro', 'gso', 'ip_frag', 'jobstats',

> > >  	'kni', 'latencystats', 'lpm', 'member',

> > >  	'power', 'pdump', 'rawdev',

> > > -	'reorder', 'sched', 'security', 'stack', 'vhost',

> > > +	'reorder', 'sched', 'security', 'stack', 'vhost', 'rcu',

> > >  	#ipsec lib depends on crypto and security

> > >  	'ipsec',

> > >  	# add pkt framework libs which use other libs from above diff --git

> > > a/mk/rte.app.mk b/mk/rte.app.mk index 7d994bece..e93cc366d 100644

> > > --- a/mk/rte.app.mk

> > > +++ b/mk/rte.app.mk

> > > @@ -97,6 +97,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_EAL)            += -

> > lrte_eal

> > >  _LDLIBS-$(CONFIG_RTE_LIBRTE_CMDLINE)        += -lrte_cmdline

> > >  _LDLIBS-$(CONFIG_RTE_LIBRTE_REORDER)        += -lrte_reorder

> > >  _LDLIBS-$(CONFIG_RTE_LIBRTE_SCHED)          += -lrte_sched

> > > +_LDLIBS-$(CONFIG_RTE_LIBRTE_RCU)            += -lrte_rcu

> > >

> > >  ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)

> > >  _LDLIBS-$(CONFIG_RTE_LIBRTE_KNI)            += -lrte_kni

> > > --

> > > 2.17.1

> > >

>
Honnappa Nagarahalli April 12, 2019, 8:21 p.m. UTC | #4
<snip>

> > >

> > > On Wed, Apr 10, 2019 at 06:20:04AM -0500, Honnappa Nagarahalli

> wrote:

> > > > Add RCU library supporting quiescent state based memory

> > > > reclamation

> > > method.

> > > > This library helps identify the quiescent state of the reader

> > > > threads so that the writers can free the memory associated with

> > > > the lock less data structures.

> > >

> > > I don't see any sign of read-side markers (rcu_read_lock() and

> > > rcu_read_unlock() in the Linux kernel, userspace RCU, etc.).

> > >

> > > Yes, strictly speaking, these are not needed for QSBR to operate,

> > > but they

> > These APIs would be empty for QSBR.

> >

> > > make it way easier to maintain and debug code using RCU.  For

> > > example, given the read-side markers, you can check for errors like

> > > having a call to

> > > rte_rcu_qsbr_quiescent() in the middle of a reader quite easily.

> > > Without those read-side markers, life can be quite hard and you will

> > > really hate yourself for failing to have provided them.

> >

> > Want to make sure I understood this, do you mean the application

> would mark before and after accessing the shared data structure on the

> reader side?

> >

> > rte_rcu_qsbr_lock()

> > <begin access shared data structure>

> > ...

> > ...

> > <end access shared data structure>

> > rte_rcu_qsbr_unlock()

> 

> Yes, that is the idea.

> 

> > If someone is debugging this code, they have to make sure that there is

> an unlock for every lock and there is no call to rte_rcu_qsbr_quiescent in

> between.

> > It sounds good to me. Obviously, they will not add any additional cycles

> as well.

> > Please let me know if my understanding is correct.

> 

> Yes.  And in some sort of debug mode, you could capture the counter at

> rte_rcu_qsbr_lock() time and check it at rte_rcu_qsbr_unlock() time.  If the

> counter has advanced too far (more than one, if I am not too confused)

> there is a bug.  Also in debug mode, you could have rte_rcu_qsbr_lock()

> increment a per-thread counter and rte_rcu_qsbr_unlock() decrement it.

> If the counter is non-zero at a quiescent state, there is a bug.

> And so on.

> 

Added this in V5

<snip>

> > > > +

> > > > +/* Get the memory size of QSBR variable */ size_t

> > > > +__rte_experimental rte_rcu_qsbr_get_memsize(uint32_t

> max_threads) {

> > > > +	size_t sz;

> > > > +

> > > > +	if (max_threads == 0) {

> > > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > > +			"%s(): Invalid max_threads %u\n",

> > > > +			__func__, max_threads);

> > > > +		rte_errno = EINVAL;

> > > > +

> > > > +		return 1;

> > > > +	}

> > > > +

> > > > +	sz = sizeof(struct rte_rcu_qsbr);

> > > > +

> > > > +	/* Add the size of quiescent state counter array */

> > > > +	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;

> > > > +

> > > > +	/* Add the size of the registered thread ID bitmap array */

> > > > +	sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads);

> > > > +

> > > > +	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);

> > >

> > > Given that you align here, should you also align in the earlier

> > > steps in the computation of sz?

> >

> > Agree. I will remove the align here and keep the earlier one as the intent

> is to align the thread ID array.

> 

> Sounds good!

Added this in V5

> 

> > > > +}

> > > > +

> > > > +/* Initialize a quiescent state variable */ int

> > > > +__rte_experimental rte_rcu_qsbr_init(struct rte_rcu_qsbr *v,

> uint32_t max_threads) {

> > > > +	size_t sz;

> > > > +

> > > > +	if (v == NULL) {

> > > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > > +			"%s(): Invalid input parameter\n", __func__);

> > > > +		rte_errno = EINVAL;

> > > > +

> > > > +		return 1;

> > > > +	}

> > > > +

> > > > +	sz = rte_rcu_qsbr_get_memsize(max_threads);

> > > > +	if (sz == 1)

> > > > +		return 1;

> > > > +

> > > > +	/* Set all the threads to offline */

> > > > +	memset(v, 0, sz);

> > >

> > > We calculate sz here, but it looks like the caller must also

> > > calculate it in order to correctly allocate the memory referenced by

> > > the "v" argument to this function, with bad things happening if the

> > > two calculations get different results.  Should "v" instead be

> > > allocated within this function to avoid this sort of problem?

> >

> > Earlier version allocated the memory with-in this library. However, it was

> decided to go with the current implementation as it provides flexibility for

> the application to manage the memory as it sees fit. For ex: it could

> allocate this as part of another structure in a single allocation. This also

> falls inline with similar approach taken in other libraries.

> 

> So the allocator APIs vary too much to allow a pointer to the desired

> allocator function to be passed in?  Or do you also want to allow static

> allocation?  If the latter, would a DEFINE_RTE_RCU_QSBR() be of use?

> 

This is done to allow for allocation of memory for QS variable as part of a another bigger data structure. This will help in not fragmenting the memory. For ex:

struct xyz {
    rte_ring *ring;
    rte_rcu_qsbr *v;
    abc *t;
};
struct xyz c;

Memory for the above structure can be allocated in one chunk by calculating the size required.

In some use cases static allocation might be enough as 'max_threads' might be a compile time constant. I am not sure on how to support both dynamic and static 'max_threads'.

> > > > +	v->max_threads = max_threads;

> > > > +	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,

> > > > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /

> > > > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE;

> > > > +	v->token = RTE_QSBR_CNT_INIT;

> > > > +

> > > > +	return 0;

> > > > +}

> > > > +

> > > > +/* Register a reader thread to report its quiescent state

> > > > + * on a QS variable.

> > > > + */

> > > > +int __rte_experimental

> > > > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int

> > > > +thread_id) {

> > > > +	unsigned int i, id, success;

> > > > +	uint64_t old_bmap, new_bmap;

> > > > +

> > > > +	if (v == NULL || thread_id >= v->max_threads) {

> > > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > > +			"%s(): Invalid input parameter\n", __func__);

> > > > +		rte_errno = EINVAL;

> > > > +

> > > > +		return 1;

> > > > +	}

> > > > +

> > > > +	id = thread_id & RTE_QSBR_THRID_MASK;

> > > > +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> > > > +

> > > > +	/* Make sure that the counter for registered threads does not

> > > > +	 * go out of sync. Hence, additional checks are required.

> > > > +	 */

> > > > +	/* Check if the thread is already registered */

> > > > +	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > > > +					__ATOMIC_RELAXED);

> > > > +	if (old_bmap & 1UL << id)

> > > > +		return 0;

> > > > +

> > > > +	do {

> > > > +		new_bmap = old_bmap | (1UL << id);

> > > > +		success = __atomic_compare_exchange(

> > > > +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > > > +					&old_bmap, &new_bmap, 0,

> > > > +					__ATOMIC_RELEASE,

> > > __ATOMIC_RELAXED);

> > > > +

> > > > +		if (success)

> > > > +			__atomic_fetch_add(&v->num_threads,

> > > > +						1, __ATOMIC_RELAXED);

> > > > +		else if (old_bmap & (1UL << id))

> > > > +			/* Someone else registered this thread.

> > > > +			 * Counter should not be incremented.

> > > > +			 */

> > > > +			return 0;

> > > > +	} while (success == 0);

> > >

> > > This would be simpler if threads were required to register themselves.

> > > Maybe you have use cases requiring registration of other threads,

> > > but this capability is adding significant complexity, so it might be

> > > worth some thought.

> > >

> > It was simple earlier (__atomic_fetch_or). The complexity is added as

> 'num_threads' should not go out of sync.

> 

> Hmmm...

> 

> So threads are allowed to register other threads?  Or is there some other

> reason that concurrent registration is required?

> 

Yes, control plane threads can register the fast path threads. Though, I am not sure how useful it is. I did not want to add the restriction. I expect that reader threads will register themselves. The reader threads require concurrent registration as they all will be running in parallel.
If the requirement of keeping track of the number of threads registered currently goes away, then this function will be simple.

<snip>

> > > > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h

> > > > b/lib/librte_rcu/rte_rcu_qsbr.h new file mode 100644 index

> > > > 000000000..ff696aeab

> > > > --- /dev/null

> > > > +++ b/lib/librte_rcu/rte_rcu_qsbr.h

> > > > @@ -0,0 +1,554 @@

> > > > +/* SPDX-License-Identifier: BSD-3-Clause

> > > > + * Copyright (c) 2018 Arm Limited  */

> > > > +

> > > > +#ifndef _RTE_RCU_QSBR_H_

> > > > +#define _RTE_RCU_QSBR_H_

> > > > +

> > > > +/**

> > > > + * @file

> > > > + * RTE Quiescent State Based Reclamation (QSBR)

> > > > + *

> > > > + * Quiescent State (QS) is any point in the thread execution

> > > > + * where the thread does not hold a reference to a data structure

> > > > + * in shared memory. While using lock-less data structures, the

> > > > +writer

> > > > + * can safely free memory once all the reader threads have

> > > > +entered

> > > > + * quiescent state.

> > > > + *

> > > > + * This library provides the ability for the readers to report

> > > > +quiescent

> > > > + * state and for the writers to identify when all the readers

> > > > +have

> > > > + * entered quiescent state.

> > > > + */

> > > > +

> > > > +#ifdef __cplusplus

> > > > +extern "C" {

> > > > +#endif

> > > > +

> > > > +#include <stdio.h>

> > > > +#include <stdint.h>

> > > > +#include <errno.h>

> > > > +#include <rte_common.h>

> > > > +#include <rte_memory.h>

> > > > +#include <rte_lcore.h>

> > > > +#include <rte_debug.h>

> > > > +#include <rte_atomic.h>

> > > > +

> > > > +extern int rcu_log_type;

> > > > +

> > > > +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG #define

> RCU_DP_LOG(level,

> > > fmt,

> > > > +args...) \

> > > > +	rte_log(RTE_LOG_ ## level, rcu_log_type, \

> > > > +		"%s(): " fmt "\n", __func__, ## args) #else #define

> > > > +RCU_DP_LOG(level, fmt, args...) #endif

> > > > +

> > > > +/* Registered thread IDs are stored as a bitmap of 64b element

> array.

> > > > + * Given thread id needs to be converted to index into the array

> > > > +and

> > > > + * the id within the array element.

> > > > + */

> > > > +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)

> > > #define

> > > > +RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \

> > > > +	RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \

> > > > +		RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3,

> > > RTE_CACHE_LINE_SIZE) #define

> > > > +RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \

> > > > +	((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)

> > > > +#define RTE_QSBR_THRID_INDEX_SHIFT 6 #define

> RTE_QSBR_THRID_MASK

> > > > +0x3f

> > > #define

> > > > +RTE_QSBR_THRID_INVALID 0xffffffff

> > > > +

> > > > +/* Worker thread counter */

> > > > +struct rte_rcu_qsbr_cnt {

> > > > +	uint64_t cnt;

> > > > +	/**< Quiescent state counter. Value 0 indicates the thread is

> > > > +offline */ } __rte_cache_aligned;

> > > > +

> > > > +#define RTE_QSBR_CNT_THR_OFFLINE 0 #define

> RTE_QSBR_CNT_INIT 1

> > > > +

> > > > +/* RTE Quiescent State variable structure.

> > > > + * This structure has two elements that vary in size based on the

> > > > + * 'max_threads' parameter.

> > > > + * 1) Quiescent state counter array

> > > > + * 2) Register thread ID array

> > > > + */

> > > > +struct rte_rcu_qsbr {

> > > > +	uint64_t token __rte_cache_aligned;

> > > > +	/**< Counter to allow for multiple concurrent quiescent state

> > > > +queries */

> > > > +

> > > > +	uint32_t num_elems __rte_cache_aligned;

> > > > +	/**< Number of elements in the thread ID array */

> > > > +	uint32_t num_threads;

> > > > +	/**< Number of threads currently using this QS variable */

> > > > +	uint32_t max_threads;

> > > > +	/**< Maximum number of threads using this QS variable */

> > > > +

> > > > +	struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;

> > > > +	/**< Quiescent state counter array of 'max_threads' elements */

> > > > +

> > > > +	/**< Registered thread IDs are stored in a bitmap array,

> > > > +	 *   after the quiescent state counter array.

> > > > +	 */

> > > > +} __rte_cache_aligned;

> > > > +


<snip>

> > > > +

> > > > +/* Check the quiescent state counter for registered threads only,

> > > > +assuming

> > > > + * that not all threads have registered.

> > > > + */

> > > > +static __rte_always_inline int

> > > > +__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t,

> > > > +bool

> > > > +wait) {

> > > > +	uint32_t i, j, id;

> > > > +	uint64_t bmap;

> > > > +	uint64_t c;

> > > > +	uint64_t *reg_thread_id;

> > > > +

> > > > +	for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v, 0);

> > > > +		i < v->num_elems;

> > > > +		i++, reg_thread_id++) {

> > > > +		/* Load the current registered thread bit map before

> > > > +		 * loading the reader thread quiescent state counters.

> > > > +		 */

> > > > +		bmap = __atomic_load_n(reg_thread_id,

> > > __ATOMIC_ACQUIRE);

> > > > +		id = i << RTE_QSBR_THRID_INDEX_SHIFT;

> > > > +

> > > > +		while (bmap) {

> > > > +			j = __builtin_ctzl(bmap);

> > > > +			RCU_DP_LOG(DEBUG,

> > > > +				"%s: check: token = %lu, wait = %d, Bit Map

> > > = 0x%lx, Thread ID = %d",

> > > > +				__func__, t, wait, bmap, id + j);

> > > > +			c = __atomic_load_n(

> > > > +					&v->qsbr_cnt[id + j].cnt,

> > > > +					__ATOMIC_ACQUIRE);

> > > > +			RCU_DP_LOG(DEBUG,

> > > > +				"%s: status: token = %lu, wait = %d, Thread

> > > QS cnt = %lu, Thread ID = %d",

> > > > +				__func__, t, wait, c, id+j);

> > > > +			/* Counter is not checked for wrap-around

> > > condition

> > > > +			 * as it is a 64b counter.

> > > > +			 */

> > > > +			if (unlikely(c != RTE_QSBR_CNT_THR_OFFLINE && c

> > > < t)) {

> > >

> > > This assumes that a 64-bit counter won't overflow, which is close

> > > enough to true given current CPU clock frequencies.  ;-)

> > >

> > > > +				/* This thread is not in quiescent state */

> > > > +				if (!wait)

> > > > +					return 0;

> > > > +

> > > > +				rte_pause();

> > > > +				/* This thread might have unregistered.

> > > > +				 * Re-read the bitmap.

> > > > +				 */

> > > > +				bmap = __atomic_load_n(reg_thread_id,

> > > > +						__ATOMIC_ACQUIRE);

> > > > +

> > > > +				continue;

> > > > +			}

> > > > +

> > > > +			bmap &= ~(1UL << j);

> > > > +		}

> > > > +	}

> > > > +

> > > > +	return 1;

> > > > +}

> > > > +

> > > > +/* Check the quiescent state counter for all threads, assuming

> > > > +that

> > > > + * all the threads have registered.

> > > > + */

> > > > +static __rte_always_inline int

> > > > +__rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool

> > > > +wait)

> > >

> > > Does checking the bitmap really take long enough to make this

> > > worthwhile as a separate function?  I would think that the

> > > bitmap-checking time would be lost in the noise of cache misses from

> the ->cnt loads.

> >

> > It avoids accessing one cache line. I think this is where the savings are

> (may be in theory). This is the most probable use case.

> > On the other hand, __rcu_qsbr_check_selective() will result in savings

> (depending on how many threads are currently registered) by avoiding

> accessing unwanted counters.

> 

> Do you really expect to be calling this function on any kind of fastpath?


Yes. For some of the libraries (rte_hash), the writer is on the fast path.

> 

> > > Sure, if you invoke __rcu_qsbr_check_selective() in a tight loop in

> > > the absence of readers, you might see __rcu_qsbr_check_all() being a

> > > bit faster.  But is that really what DPDK does?

> > I see improvements in the synthetic test case (similar to the one you

> have described, around 27%). However, in the more practical test cases I

> do not see any difference.

> 

> If the performance improvement only occurs in a synthetic test case, does

> it really make sense to optimize for it?

I had to fix few issues in the performance test cases and added more to do the comparison. These changes are in v5.
There are 4 performance tests involving this API.
1) 1 Writer, 'N' readers
     Writer: qsbr_start, qsbr_check(wait = true)
     Readers: qsbr_quiescent
2) 'N' writers
     Writers: qsbr_start, qsbr_check(wait == false)
3) 1 Writer, 'N' readers (this test uses the lock-free rte_hash data structure)
     Writer: hash_del, qsbr_start, qsbr_check(wait = true), validate that the reader was able to complete its work successfully
     Readers: thread_online, hash_lookup, access the pointer - do some work on it, qsbr_quiescent, thread_offline
4) Same as test 3) but qsbr_check (wait == false)

There are 2 sets of these tests.
a) QS variable is created with number of threads same as number of readers - this will exercise __rcu_qsbr_check_all
b) QS variable is created with 128 threads, number of registered threads is same as in a) - this will exercise __rcu_qsbr_check_selective

Following are the results on x86 (E5-2660 v4 @ 2.00GHz) comparing from a) to b) (on x86 in my setup, the results are not very stable between runs)
1) 25%
2) -3%
3) -0.4%
4) 1.38%

Following are the results on an Arm system comparing from a) to b) (results are not pretty stable between runs)
1) -3.45%
2) 0%
3) -0.03%
4) -0.04%

Konstantin, is it possible to run the tests on your setup and look at the results?

> 

> > > > +{

> > > > +	uint32_t i;

> > > > +	struct rte_rcu_qsbr_cnt *cnt;

> > > > +	uint64_t c;

> > > > +

> > > > +	for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {

> > > > +		RCU_DP_LOG(DEBUG,

> > > > +			"%s: check: token = %lu, wait = %d, Thread ID = %d",

> > > > +			__func__, t, wait, i);

> > > > +		while (1) {

> > > > +			c = __atomic_load_n(&cnt->cnt,

> > > __ATOMIC_ACQUIRE);

> > > > +			RCU_DP_LOG(DEBUG,

> > > > +				"%s: status: token = %lu, wait = %d, Thread

> > > QS cnt = %lu, Thread ID = %d",

> > > > +				__func__, t, wait, c, i);

> > > > +			/* Counter is not checked for wrap-around

> > > condition

> > > > +			 * as it is a 64b counter.

> > > > +			 */

> > > > +			if (likely(c == RTE_QSBR_CNT_THR_OFFLINE || c >=

> > > t))

> > > > +				break;

> > > > +

> > > > +			/* This thread is not in quiescent state */

> > > > +			if (!wait)

> > > > +				return 0;

> > > > +

> > > > +			rte_pause();

> > > > +		}

> > > > +	}

> > > > +

> > > > +	return 1;

> > > > +}

> > > > +

> > > > +/**

> > > > + * @warning

> > > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > > + *

> > > > + * Checks if all the reader threads have entered the quiescent

> > > > +state

> > > > + * referenced by token.

> > > > + *

> > > > + * This is implemented as a lock-free function. It is

> > > > +multi-thread

> > > > + * safe and can be called from the worker threads as well.

> > > > + *

> > > > + * If this API is called with 'wait' set to true, the following

> > > > + * factors must be considered:

> > > > + *

> > > > + * 1) If the calling thread is also reporting the status on the

> > > > + * same QS variable, it must update the quiescent state status,

> > > > +before

> > > > + * calling this API.

> > > > + *

> > > > + * 2) In addition, while calling from multiple threads, only

> > > > + * one of those threads can be reporting the quiescent state

> > > > +status

> > > > + * on a given QS variable.

> > > > + *

> > > > + * @param v

> > > > + *   QS variable

> > > > + * @param t

> > > > + *   Token returned by rte_rcu_qsbr_start API

> > > > + * @param wait

> > > > + *   If true, block till all the reader threads have completed entering

> > > > + *   the quiescent state referenced by token 't'.

> > > > + * @return

> > > > + *   - 0 if all reader threads have NOT passed through specified

> number

> > > > + *     of quiescent states.

> > > > + *   - 1 if all reader threads have passed through specified number

> > > > + *     of quiescent states.

> > > > + */

> > > > +static __rte_always_inline int __rte_experimental

> > > > +rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) {

> > > > +	RTE_ASSERT(v != NULL);

> > > > +

> > > > +	if (likely(v->num_threads == v->max_threads))

> > > > +		return __rcu_qsbr_check_all(v, t, wait);

> > > > +	else

> > > > +		return __rcu_qsbr_check_selective(v, t, wait); }

> > > > +

> > > > +/**

> > > > + * @warning

> > > > + * @b EXPERIMENTAL: this API may change without prior notice

> > > > + *

> > > > + * Wait till the reader threads have entered quiescent state.

> > > > + *

> > > > + * This is implemented as a lock-free function. It is multi-thread safe.

> > > > + * This API can be thought of as a wrapper around

> > > > +rte_rcu_qsbr_start and

> > > > + * rte_rcu_qsbr_check APIs.

> > > > + *

> > > > + * If this API is called from multiple threads, only one of

> > > > + * those threads can be reporting the quiescent state status on a

> > > > + * given QS variable.

> > > > + *

> > > > + * @param v

> > > > + *   QS variable

> > > > + * @param thread_id

> > > > + *   Thread ID of the caller if it is registered to report quiescent state

> > > > + *   on this QS variable (i.e. the calling thread is also part of the

> > > > + *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.

> > > > + */

> > > > +static __rte_always_inline void __rte_experimental

> > > > +rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int

> > > > +thread_id) {

> > > > +	uint64_t t;

> > > > +

> > > > +	RTE_ASSERT(v != NULL);

> > > > +

> > > > +	t = rte_rcu_qsbr_start(v);

> > > > +

> > > > +	/* If the current thread has readside critical section,

> > > > +	 * update its quiescent state status.

> > > > +	 */

> > > > +	if (thread_id != RTE_QSBR_THRID_INVALID)

> > > > +		rte_rcu_qsbr_quiescent(v, thread_id);

> > > > +

> > > > +	/* Wait for other readers to enter quiescent state */

> > > > +	rte_rcu_qsbr_check(v, t, true);

> > >

> > > And you are presumably relying on 64-bit counters to avoid the need

> > > to execute the above code twice in succession.  Which again works

> > > given current CPU clock rates combined with system and human

> lifespans.

> > > Otherwise, there are interesting race conditions that can happen, so

> > > don't try this with a 32-bit counter!!!

> >

> > Yes. I am relying on 64-bit counters to avoid having to spend cycles (and

> time).

> >

> > > (But think of the great^N grandchildren!!!)

> >

> > (It is an interesting thought. I wonder what would happen to all the

> > code we are writing today 😊)

> 

> I suspect that most systems will be rebooted more than once per decade,

> so unless CPU core clock rates manage to go up another order of

> magnitude, we should be just fine.

> 

> Famous last words!  ;-)

> 

> > > More seriously, a comment warning people not to make the counter

> be

> > > 32 bits is in order.

> > Agree, I will add it in the structure definition.

> 

> Sounds good!

Done in V5

<snip>
Ananyev, Konstantin April 15, 2019, 4:51 p.m. UTC | #5
> > > >

> > > > On Wed, Apr 10, 2019 at 06:20:04AM -0500, Honnappa Nagarahalli

> > wrote:

> > > > > Add RCU library supporting quiescent state based memory

> > > > > reclamation

> > > > method.

> > > > > This library helps identify the quiescent state of the reader

> > > > > threads so that the writers can free the memory associated with

> > > > > the lock less data structures.

> > > >

> > > > I don't see any sign of read-side markers (rcu_read_lock() and

> > > > rcu_read_unlock() in the Linux kernel, userspace RCU, etc.).

> > > >

> > > > Yes, strictly speaking, these are not needed for QSBR to operate,

> > > > but they

> > > These APIs would be empty for QSBR.

> > >

> > > > make it way easier to maintain and debug code using RCU.  For

> > > > example, given the read-side markers, you can check for errors like

> > > > having a call to

> > > > rte_rcu_qsbr_quiescent() in the middle of a reader quite easily.

> > > > Without those read-side markers, life can be quite hard and you will

> > > > really hate yourself for failing to have provided them.

> > >

> > > Want to make sure I understood this, do you mean the application

> > would mark before and after accessing the shared data structure on the

> > reader side?

> > >

> > > rte_rcu_qsbr_lock()

> > > <begin access shared data structure>

> > > ...

> > > ...

> > > <end access shared data structure>

> > > rte_rcu_qsbr_unlock()

> >

> > Yes, that is the idea.

> >

> > > If someone is debugging this code, they have to make sure that there is

> > an unlock for every lock and there is no call to rte_rcu_qsbr_quiescent in

> > between.

> > > It sounds good to me. Obviously, they will not add any additional cycles

> > as well.

> > > Please let me know if my understanding is correct.

> >

> > Yes.  And in some sort of debug mode, you could capture the counter at

> > rte_rcu_qsbr_lock() time and check it at rte_rcu_qsbr_unlock() time.  If the

> > counter has advanced too far (more than one, if I am not too confused)

> > there is a bug.  Also in debug mode, you could have rte_rcu_qsbr_lock()

> > increment a per-thread counter and rte_rcu_qsbr_unlock() decrement it.

> > If the counter is non-zero at a quiescent state, there is a bug.

> > And so on.

> >

> Added this in V5

> 

> <snip>

> 

> > > > > +

> > > > > +/* Get the memory size of QSBR variable */ size_t

> > > > > +__rte_experimental rte_rcu_qsbr_get_memsize(uint32_t

> > max_threads) {

> > > > > +	size_t sz;

> > > > > +

> > > > > +	if (max_threads == 0) {

> > > > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > > > +			"%s(): Invalid max_threads %u\n",

> > > > > +			__func__, max_threads);

> > > > > +		rte_errno = EINVAL;

> > > > > +

> > > > > +		return 1;

> > > > > +	}

> > > > > +

> > > > > +	sz = sizeof(struct rte_rcu_qsbr);

> > > > > +

> > > > > +	/* Add the size of quiescent state counter array */

> > > > > +	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;

> > > > > +

> > > > > +	/* Add the size of the registered thread ID bitmap array */

> > > > > +	sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads);

> > > > > +

> > > > > +	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);

> > > >

> > > > Given that you align here, should you also align in the earlier

> > > > steps in the computation of sz?

> > >

> > > Agree. I will remove the align here and keep the earlier one as the intent

> > is to align the thread ID array.

> >

> > Sounds good!

> Added this in V5

> 

> >

> > > > > +}

> > > > > +

> > > > > +/* Initialize a quiescent state variable */ int

> > > > > +__rte_experimental rte_rcu_qsbr_init(struct rte_rcu_qsbr *v,

> > uint32_t max_threads) {

> > > > > +	size_t sz;

> > > > > +

> > > > > +	if (v == NULL) {

> > > > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > > > +			"%s(): Invalid input parameter\n", __func__);

> > > > > +		rte_errno = EINVAL;

> > > > > +

> > > > > +		return 1;

> > > > > +	}

> > > > > +

> > > > > +	sz = rte_rcu_qsbr_get_memsize(max_threads);

> > > > > +	if (sz == 1)

> > > > > +		return 1;

> > > > > +

> > > > > +	/* Set all the threads to offline */

> > > > > +	memset(v, 0, sz);

> > > >

> > > > We calculate sz here, but it looks like the caller must also

> > > > calculate it in order to correctly allocate the memory referenced by

> > > > the "v" argument to this function, with bad things happening if the

> > > > two calculations get different results.  Should "v" instead be

> > > > allocated within this function to avoid this sort of problem?

> > >

> > > Earlier version allocated the memory with-in this library. However, it was

> > decided to go with the current implementation as it provides flexibility for

> > the application to manage the memory as it sees fit. For ex: it could

> > allocate this as part of another structure in a single allocation. This also

> > falls inline with similar approach taken in other libraries.

> >

> > So the allocator APIs vary too much to allow a pointer to the desired

> > allocator function to be passed in?  Or do you also want to allow static

> > allocation?  If the latter, would a DEFINE_RTE_RCU_QSBR() be of use?

> >

> This is done to allow for allocation of memory for QS variable as part of a another bigger data structure. This will help in not fragmenting

> the memory. For ex:

> 

> struct xyz {

>     rte_ring *ring;

>     rte_rcu_qsbr *v;

>     abc *t;

> };

> struct xyz c;

> 

> Memory for the above structure can be allocated in one chunk by calculating the size required.

> 

> In some use cases static allocation might be enough as 'max_threads' might be a compile time constant. I am not sure on how to support

> both dynamic and static 'max_threads'.


Same thought here-  would be good to have a static initializer (DEFINE_RTE_RCU_QSBR),
but that means new compile time limit ('max_threads') - thing that we try to avoid.

> 

> > > > > +	v->max_threads = max_threads;

> > > > > +	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,

> > > > > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /

> > > > > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE;

> > > > > +	v->token = RTE_QSBR_CNT_INIT;

> > > > > +

> > > > > +	return 0;

> > > > > +}

> > > > > +

> > > > > +/* Register a reader thread to report its quiescent state

> > > > > + * on a QS variable.

> > > > > + */

> > > > > +int __rte_experimental

> > > > > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int

> > > > > +thread_id) {

> > > > > +	unsigned int i, id, success;

> > > > > +	uint64_t old_bmap, new_bmap;

> > > > > +

> > > > > +	if (v == NULL || thread_id >= v->max_threads) {

> > > > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > > > +			"%s(): Invalid input parameter\n", __func__);

> > > > > +		rte_errno = EINVAL;

> > > > > +

> > > > > +		return 1;

> > > > > +	}

> > > > > +

> > > > > +	id = thread_id & RTE_QSBR_THRID_MASK;

> > > > > +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> > > > > +

> > > > > +	/* Make sure that the counter for registered threads does not

> > > > > +	 * go out of sync. Hence, additional checks are required.

> > > > > +	 */

> > > > > +	/* Check if the thread is already registered */

> > > > > +	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > > > > +					__ATOMIC_RELAXED);

> > > > > +	if (old_bmap & 1UL << id)

> > > > > +		return 0;

> > > > > +

> > > > > +	do {

> > > > > +		new_bmap = old_bmap | (1UL << id);

> > > > > +		success = __atomic_compare_exchange(

> > > > > +					RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > > > > +					&old_bmap, &new_bmap, 0,

> > > > > +					__ATOMIC_RELEASE,

> > > > __ATOMIC_RELAXED);

> > > > > +

> > > > > +		if (success)

> > > > > +			__atomic_fetch_add(&v->num_threads,

> > > > > +						1, __ATOMIC_RELAXED);

> > > > > +		else if (old_bmap & (1UL << id))

> > > > > +			/* Someone else registered this thread.

> > > > > +			 * Counter should not be incremented.

> > > > > +			 */

> > > > > +			return 0;

> > > > > +	} while (success == 0);

> > > >

> > > > This would be simpler if threads were required to register themselves.

> > > > Maybe you have use cases requiring registration of other threads,

> > > > but this capability is adding significant complexity, so it might be

> > > > worth some thought.

> > > >

> > > It was simple earlier (__atomic_fetch_or). The complexity is added as

> > 'num_threads' should not go out of sync.

> >

> > Hmmm...

> >

> > So threads are allowed to register other threads?  Or is there some other

> > reason that concurrent registration is required?

> >

> Yes, control plane threads can register the fast path threads. Though, I am not sure how useful it is. I did not want to add the restriction. I

> expect that reader threads will register themselves. The reader threads require concurrent registration as they all will be running in parallel.

> If the requirement of keeping track of the number of threads registered currently goes away, then this function will be simple.

> 

> <snip>

> 

> > > > > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h

> > > > > b/lib/librte_rcu/rte_rcu_qsbr.h new file mode 100644 index

> > > > > 000000000..ff696aeab

> > > > > --- /dev/null

> > > > > +++ b/lib/librte_rcu/rte_rcu_qsbr.h

> > > > > @@ -0,0 +1,554 @@

> > > > > +/* SPDX-License-Identifier: BSD-3-Clause

> > > > > + * Copyright (c) 2018 Arm Limited  */

> > > > > +

> > > > > +#ifndef _RTE_RCU_QSBR_H_

> > > > > +#define _RTE_RCU_QSBR_H_

> > > > > +

> > > > > +/**

> > > > > + * @file

> > > > > + * RTE Quiescent State Based Reclamation (QSBR)

> > > > > + *

> > > > > + * Quiescent State (QS) is any point in the thread execution

> > > > > + * where the thread does not hold a reference to a data structure

> > > > > + * in shared memory. While using lock-less data structures, the

> > > > > +writer

> > > > > + * can safely free memory once all the reader threads have

> > > > > +entered

> > > > > + * quiescent state.

> > > > > + *

> > > > > + * This library provides the ability for the readers to report

> > > > > +quiescent

> > > > > + * state and for the writers to identify when all the readers

> > > > > +have

> > > > > + * entered quiescent state.

> > > > > + */

> > > > > +

> > > > > +#ifdef __cplusplus

> > > > > +extern "C" {

> > > > > +#endif

> > > > > +

> > > > > +#include <stdio.h>

> > > > > +#include <stdint.h>

> > > > > +#include <errno.h>

> > > > > +#include <rte_common.h>

> > > > > +#include <rte_memory.h>

> > > > > +#include <rte_lcore.h>

> > > > > +#include <rte_debug.h>

> > > > > +#include <rte_atomic.h>

> > > > > +

> > > > > +extern int rcu_log_type;

> > > > > +

> > > > > +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG #define

> > RCU_DP_LOG(level,

> > > > fmt,

> > > > > +args...) \

> > > > > +	rte_log(RTE_LOG_ ## level, rcu_log_type, \

> > > > > +		"%s(): " fmt "\n", __func__, ## args) #else #define

> > > > > +RCU_DP_LOG(level, fmt, args...) #endif

> > > > > +

> > > > > +/* Registered thread IDs are stored as a bitmap of 64b element

> > array.

> > > > > + * Given thread id needs to be converted to index into the array

> > > > > +and

> > > > > + * the id within the array element.

> > > > > + */

> > > > > +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)

> > > > #define

> > > > > +RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \

> > > > > +	RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \

> > > > > +		RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3,

> > > > RTE_CACHE_LINE_SIZE) #define

> > > > > +RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \

> > > > > +	((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)

> > > > > +#define RTE_QSBR_THRID_INDEX_SHIFT 6 #define

> > RTE_QSBR_THRID_MASK

> > > > > +0x3f

> > > > #define

> > > > > +RTE_QSBR_THRID_INVALID 0xffffffff

> > > > > +

> > > > > +/* Worker thread counter */

> > > > > +struct rte_rcu_qsbr_cnt {

> > > > > +	uint64_t cnt;

> > > > > +	/**< Quiescent state counter. Value 0 indicates the thread is

> > > > > +offline */ } __rte_cache_aligned;

> > > > > +

> > > > > +#define RTE_QSBR_CNT_THR_OFFLINE 0 #define

> > RTE_QSBR_CNT_INIT 1

> > > > > +

> > > > > +/* RTE Quiescent State variable structure.

> > > > > + * This structure has two elements that vary in size based on the

> > > > > + * 'max_threads' parameter.

> > > > > + * 1) Quiescent state counter array

> > > > > + * 2) Register thread ID array

> > > > > + */

> > > > > +struct rte_rcu_qsbr {

> > > > > +	uint64_t token __rte_cache_aligned;

> > > > > +	/**< Counter to allow for multiple concurrent quiescent state

> > > > > +queries */

> > > > > +

> > > > > +	uint32_t num_elems __rte_cache_aligned;

> > > > > +	/**< Number of elements in the thread ID array */

> > > > > +	uint32_t num_threads;

> > > > > +	/**< Number of threads currently using this QS variable */

> > > > > +	uint32_t max_threads;

> > > > > +	/**< Maximum number of threads using this QS variable */

> > > > > +

> > > > > +	struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;

> > > > > +	/**< Quiescent state counter array of 'max_threads' elements */

> > > > > +

> > > > > +	/**< Registered thread IDs are stored in a bitmap array,

> > > > > +	 *   after the quiescent state counter array.

> > > > > +	 */

> > > > > +} __rte_cache_aligned;

> > > > > +

> 

> <snip>

> 

> > > > > +

> > > > > +/* Check the quiescent state counter for registered threads only,

> > > > > +assuming

> > > > > + * that not all threads have registered.

> > > > > + */

> > > > > +static __rte_always_inline int

> > > > > +__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t,

> > > > > +bool

> > > > > +wait) {

> > > > > +	uint32_t i, j, id;

> > > > > +	uint64_t bmap;

> > > > > +	uint64_t c;

> > > > > +	uint64_t *reg_thread_id;

> > > > > +

> > > > > +	for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v, 0);

> > > > > +		i < v->num_elems;

> > > > > +		i++, reg_thread_id++) {

> > > > > +		/* Load the current registered thread bit map before

> > > > > +		 * loading the reader thread quiescent state counters.

> > > > > +		 */

> > > > > +		bmap = __atomic_load_n(reg_thread_id,

> > > > __ATOMIC_ACQUIRE);

> > > > > +		id = i << RTE_QSBR_THRID_INDEX_SHIFT;

> > > > > +

> > > > > +		while (bmap) {

> > > > > +			j = __builtin_ctzl(bmap);

> > > > > +			RCU_DP_LOG(DEBUG,

> > > > > +				"%s: check: token = %lu, wait = %d, Bit Map

> > > > = 0x%lx, Thread ID = %d",

> > > > > +				__func__, t, wait, bmap, id + j);

> > > > > +			c = __atomic_load_n(

> > > > > +					&v->qsbr_cnt[id + j].cnt,

> > > > > +					__ATOMIC_ACQUIRE);

> > > > > +			RCU_DP_LOG(DEBUG,

> > > > > +				"%s: status: token = %lu, wait = %d, Thread

> > > > QS cnt = %lu, Thread ID = %d",

> > > > > +				__func__, t, wait, c, id+j);

> > > > > +			/* Counter is not checked for wrap-around

> > > > condition

> > > > > +			 * as it is a 64b counter.

> > > > > +			 */

> > > > > +			if (unlikely(c != RTE_QSBR_CNT_THR_OFFLINE && c

> > > > < t)) {

> > > >

> > > > This assumes that a 64-bit counter won't overflow, which is close

> > > > enough to true given current CPU clock frequencies.  ;-)

> > > >

> > > > > +				/* This thread is not in quiescent state */

> > > > > +				if (!wait)

> > > > > +					return 0;

> > > > > +

> > > > > +				rte_pause();

> > > > > +				/* This thread might have unregistered.

> > > > > +				 * Re-read the bitmap.

> > > > > +				 */

> > > > > +				bmap = __atomic_load_n(reg_thread_id,

> > > > > +						__ATOMIC_ACQUIRE);

> > > > > +

> > > > > +				continue;

> > > > > +			}

> > > > > +

> > > > > +			bmap &= ~(1UL << j);

> > > > > +		}

> > > > > +	}

> > > > > +

> > > > > +	return 1;

> > > > > +}

> > > > > +

> > > > > +/* Check the quiescent state counter for all threads, assuming

> > > > > +that

> > > > > + * all the threads have registered.

> > > > > + */

> > > > > +static __rte_always_inline int

> > > > > +__rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool

> > > > > +wait)

> > > >

> > > > Does checking the bitmap really take long enough to make this

> > > > worthwhile as a separate function?  I would think that the

> > > > bitmap-checking time would be lost in the noise of cache misses from

> > the ->cnt loads.

> > >

> > > It avoids accessing one cache line. I think this is where the savings are

> > (may be in theory). This is the most probable use case.

> > > On the other hand, __rcu_qsbr_check_selective() will result in savings

> > (depending on how many threads are currently registered) by avoiding

> > accessing unwanted counters.

> >

> > Do you really expect to be calling this function on any kind of fastpath?

> 

> Yes. For some of the libraries (rte_hash), the writer is on the fast path.

> 

> >

> > > > Sure, if you invoke __rcu_qsbr_check_selective() in a tight loop in

> > > > the absence of readers, you might see __rcu_qsbr_check_all() being a

> > > > bit faster.  But is that really what DPDK does?

> > > I see improvements in the synthetic test case (similar to the one you

> > have described, around 27%). However, in the more practical test cases I

> > do not see any difference.

> >

> > If the performance improvement only occurs in a synthetic test case, does

> > it really make sense to optimize for it?

> I had to fix few issues in the performance test cases and added more to do the comparison. These changes are in v5.

> There are 4 performance tests involving this API.

> 1) 1 Writer, 'N' readers

>      Writer: qsbr_start, qsbr_check(wait = true)

>      Readers: qsbr_quiescent

> 2) 'N' writers

>      Writers: qsbr_start, qsbr_check(wait == false)

> 3) 1 Writer, 'N' readers (this test uses the lock-free rte_hash data structure)

>      Writer: hash_del, qsbr_start, qsbr_check(wait = true), validate that the reader was able to complete its work successfully

>      Readers: thread_online, hash_lookup, access the pointer - do some work on it, qsbr_quiescent, thread_offline

> 4) Same as test 3) but qsbr_check (wait == false)

> 

> There are 2 sets of these tests.

> a) QS variable is created with number of threads same as number of readers - this will exercise __rcu_qsbr_check_all

> b) QS variable is created with 128 threads, number of registered threads is same as in a) - this will exercise __rcu_qsbr_check_selective

> 

> Following are the results on x86 (E5-2660 v4 @ 2.00GHz) comparing from a) to b) (on x86 in my setup, the results are not very stable

> between runs)

> 1) 25%

> 2) -3%

> 3) -0.4%

> 4) 1.38%

> 

> Following are the results on an Arm system comparing from a) to b) (results are not pretty stable between runs)

> 1) -3.45%

> 2) 0%

> 3) -0.03%

> 4) -0.04%

> 

> Konstantin, is it possible to run the tests on your setup and look at the results?


I did run V5 on my box (SKX 2.1 GHz) with 17 lcores (1 physical core per thread).
Didn't notice any siginifcatn fluctuations between runs, output below.

>rcu_qsbr_perf_autotesESC[0Kt

Number of cores provided = 17
Perf test with all reader threads registered
--------------------------------------------

Perf Test: 16 Readers/1 Writer('wait' in qsbr_check == true)
Total RCU updates = 65707232899
Cycles per 1000 updates: 18482
Total RCU checks = 20000000
Cycles per 1000 checks: 3794991

Perf Test: 17 Readers
Total RCU updates = 1700000000
Cycles per 1000 updates: 2128

Perf test: 17 Writers ('wait' in qsbr_check == false)
Total RCU checks = 340000000
Cycles per 1000 checks: 10030

Perf test: 1 writer, 17 readers, 1 QSBR variable, 1 QSBR Query, Blocking QSBR Check
Following numbers include calls to rte_hash functions
Cycles per 1 update(online/update/offline): 1984696
Cycles per 1 check(start, check): 2619002

Perf test: 1 writer, 17 readers, 1 QSBR variable, 1 QSBR Query, Non-Blocking QSBR check
Following numbers include calls to rte_hash functions
Cycles per 1 update(online/update/offline): 2028030
Cycles per 1 check(start, check): 2876667

Perf test with some of reader threads registered
------------------------------------------------

Perf Test: 16 Readers/1 Writer('wait' in qsbr_check == true)
Total RCU updates = 68850073055
Cycles per 1000 updates: 25490
Total RCU checks = 20000000
Cycles per 1000 checks: 5484403

Perf Test: 17 Readers
Total RCU updates = 1700000000
Cycles per 1000 updates: 2127

Perf test: 17 Writers ('wait' in qsbr_check == false)
Total RCU checks = 340000000
Cycles per 1000 checks: 10034

Perf test: 1 writer, 17 readers, 1 QSBR variable, 1 QSBR Query, Blocking QSBR Check
Following numbers include calls to rte_hash functions
Cycles per 1 update(online/update/offline): 3604489
Cycles per 1 check(start, check): 7077372

Perf test: 1 writer, 17 readers, 1 QSBR variable, 1 QSBR Query, Non-Blocking QSBR check
Following numbers include calls to rte_hash functions
Cycles per 1 update(online/update/offline): 3936831
Cycles per 1 check(start, check): 7262738


Test OK

Konstantin
Honnappa Nagarahalli April 15, 2019, 7:46 p.m. UTC | #6
> 

> > > > >

> > > > > On Wed, Apr 10, 2019 at 06:20:04AM -0500, Honnappa Nagarahalli

> > > wrote:

> > > > > > Add RCU library supporting quiescent state based memory

> > > > > > reclamation

> > > > > method.

> > > > > > This library helps identify the quiescent state of the reader

> > > > > > threads so that the writers can free the memory associated

> > > > > > with the lock less data structures.

> > > > >

> > > > > I don't see any sign of read-side markers (rcu_read_lock() and

> > > > > rcu_read_unlock() in the Linux kernel, userspace RCU, etc.).

> > > > >

> > > > > Yes, strictly speaking, these are not needed for QSBR to

> > > > > operate, but they

> > > > These APIs would be empty for QSBR.

> > > >

> > > > > make it way easier to maintain and debug code using RCU.  For

> > > > > example, given the read-side markers, you can check for errors

> > > > > like having a call to

> > > > > rte_rcu_qsbr_quiescent() in the middle of a reader quite easily.

> > > > > Without those read-side markers, life can be quite hard and you

> > > > > will really hate yourself for failing to have provided them.

> > > >

> > > > Want to make sure I understood this, do you mean the application

> > > would mark before and after accessing the shared data structure on

> > > the reader side?

> > > >

> > > > rte_rcu_qsbr_lock()

> > > > <begin access shared data structure> ...

> > > > ...

> > > > <end access shared data structure>

> > > > rte_rcu_qsbr_unlock()

> > >

> > > Yes, that is the idea.

> > >

> > > > If someone is debugging this code, they have to make sure that

> > > > there is

> > > an unlock for every lock and there is no call to

> > > rte_rcu_qsbr_quiescent in between.

> > > > It sounds good to me. Obviously, they will not add any additional

> > > > cycles

> > > as well.

> > > > Please let me know if my understanding is correct.

> > >

> > > Yes.  And in some sort of debug mode, you could capture the counter

> > > at

> > > rte_rcu_qsbr_lock() time and check it at rte_rcu_qsbr_unlock() time.

> > > If the counter has advanced too far (more than one, if I am not too

> > > confused) there is a bug.  Also in debug mode, you could have

> > > rte_rcu_qsbr_lock() increment a per-thread counter and

> rte_rcu_qsbr_unlock() decrement it.

> > > If the counter is non-zero at a quiescent state, there is a bug.

> > > And so on.

> > >

> > Added this in V5

> >

> > <snip>

> >

> > > > > > +

> > > > > > +/* Get the memory size of QSBR variable */ size_t

> > > > > > +__rte_experimental rte_rcu_qsbr_get_memsize(uint32_t

> > > max_threads) {

> > > > > > +	size_t sz;

> > > > > > +

> > > > > > +	if (max_threads == 0) {

> > > > > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > > > > +			"%s(): Invalid max_threads %u\n",

> > > > > > +			__func__, max_threads);

> > > > > > +		rte_errno = EINVAL;

> > > > > > +

> > > > > > +		return 1;

> > > > > > +	}

> > > > > > +

> > > > > > +	sz = sizeof(struct rte_rcu_qsbr);

> > > > > > +

> > > > > > +	/* Add the size of quiescent state counter array */

> > > > > > +	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;

> > > > > > +

> > > > > > +	/* Add the size of the registered thread ID bitmap array */

> > > > > > +	sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads);

> > > > > > +

> > > > > > +	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);

> > > > >

> > > > > Given that you align here, should you also align in the earlier

> > > > > steps in the computation of sz?

> > > >

> > > > Agree. I will remove the align here and keep the earlier one as

> > > > the intent

> > > is to align the thread ID array.

> > >

> > > Sounds good!

> > Added this in V5

> >

> > >

> > > > > > +}

> > > > > > +

> > > > > > +/* Initialize a quiescent state variable */ int

> > > > > > +__rte_experimental rte_rcu_qsbr_init(struct rte_rcu_qsbr *v,

> > > uint32_t max_threads) {

> > > > > > +	size_t sz;

> > > > > > +

> > > > > > +	if (v == NULL) {

> > > > > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > > > > +			"%s(): Invalid input parameter\n", __func__);

> > > > > > +		rte_errno = EINVAL;

> > > > > > +

> > > > > > +		return 1;

> > > > > > +	}

> > > > > > +

> > > > > > +	sz = rte_rcu_qsbr_get_memsize(max_threads);

> > > > > > +	if (sz == 1)

> > > > > > +		return 1;

> > > > > > +

> > > > > > +	/* Set all the threads to offline */

> > > > > > +	memset(v, 0, sz);

> > > > >

> > > > > We calculate sz here, but it looks like the caller must also

> > > > > calculate it in order to correctly allocate the memory

> > > > > referenced by the "v" argument to this function, with bad things

> > > > > happening if the two calculations get different results.  Should

> > > > > "v" instead be allocated within this function to avoid this sort of

> problem?

> > > >

> > > > Earlier version allocated the memory with-in this library.

> > > > However, it was

> > > decided to go with the current implementation as it provides

> > > flexibility for the application to manage the memory as it sees fit.

> > > For ex: it could allocate this as part of another structure in a

> > > single allocation. This also falls inline with similar approach taken in

> other libraries.

> > >

> > > So the allocator APIs vary too much to allow a pointer to the

> > > desired allocator function to be passed in?  Or do you also want to

> > > allow static allocation?  If the latter, would a DEFINE_RTE_RCU_QSBR()

> be of use?

> > >

> > This is done to allow for allocation of memory for QS variable as part

> > of a another bigger data structure. This will help in not fragmenting the

> memory. For ex:

> >

> > struct xyz {

> >     rte_ring *ring;

> >     rte_rcu_qsbr *v;

> >     abc *t;

> > };

> > struct xyz c;

> >

> > Memory for the above structure can be allocated in one chunk by

> calculating the size required.

> >

> > In some use cases static allocation might be enough as 'max_threads'

> > might be a compile time constant. I am not sure on how to support both

> dynamic and static 'max_threads'.

> 

> Same thought here-  would be good to have a static initializer

> (DEFINE_RTE_RCU_QSBR), but that means new compile time limit

> ('max_threads') - thing that we try to avoid.

> 

> >

> > > > > > +	v->max_threads = max_threads;

> > > > > > +	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,

> > > > > > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /

> > > > > > +			RTE_QSBR_THRID_ARRAY_ELM_SIZE;

> > > > > > +	v->token = RTE_QSBR_CNT_INIT;

> > > > > > +

> > > > > > +	return 0;

> > > > > > +}

> > > > > > +

> > > > > > +/* Register a reader thread to report its quiescent state

> > > > > > + * on a QS variable.

> > > > > > + */

> > > > > > +int __rte_experimental

> > > > > > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned

> > > > > > +int

> > > > > > +thread_id) {

> > > > > > +	unsigned int i, id, success;

> > > > > > +	uint64_t old_bmap, new_bmap;

> > > > > > +

> > > > > > +	if (v == NULL || thread_id >= v->max_threads) {

> > > > > > +		rte_log(RTE_LOG_ERR, rcu_log_type,

> > > > > > +			"%s(): Invalid input parameter\n", __func__);

> > > > > > +		rte_errno = EINVAL;

> > > > > > +

> > > > > > +		return 1;

> > > > > > +	}

> > > > > > +

> > > > > > +	id = thread_id & RTE_QSBR_THRID_MASK;

> > > > > > +	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;

> > > > > > +

> > > > > > +	/* Make sure that the counter for registered threads does

> not

> > > > > > +	 * go out of sync. Hence, additional checks are required.

> > > > > > +	 */

> > > > > > +	/* Check if the thread is already registered */

> > > > > > +	old_bmap =

> __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > > > > > +					__ATOMIC_RELAXED);

> > > > > > +	if (old_bmap & 1UL << id)

> > > > > > +		return 0;

> > > > > > +

> > > > > > +	do {

> > > > > > +		new_bmap = old_bmap | (1UL << id);

> > > > > > +		success = __atomic_compare_exchange(

> > > > > > +

> 	RTE_QSBR_THRID_ARRAY_ELM(v, i),

> > > > > > +					&old_bmap, &new_bmap, 0,

> > > > > > +					__ATOMIC_RELEASE,

> > > > > __ATOMIC_RELAXED);

> > > > > > +

> > > > > > +		if (success)

> > > > > > +			__atomic_fetch_add(&v->num_threads,

> > > > > > +						1,

> __ATOMIC_RELAXED);

> > > > > > +		else if (old_bmap & (1UL << id))

> > > > > > +			/* Someone else registered this thread.

> > > > > > +			 * Counter should not be incremented.

> > > > > > +			 */

> > > > > > +			return 0;

> > > > > > +	} while (success == 0);

> > > > >

> > > > > This would be simpler if threads were required to register

> themselves.

> > > > > Maybe you have use cases requiring registration of other

> > > > > threads, but this capability is adding significant complexity,

> > > > > so it might be worth some thought.

> > > > >

> > > > It was simple earlier (__atomic_fetch_or). The complexity is added

> > > > as

> > > 'num_threads' should not go out of sync.

> > >

> > > Hmmm...

> > >

> > > So threads are allowed to register other threads?  Or is there some

> > > other reason that concurrent registration is required?

> > >

> > Yes, control plane threads can register the fast path threads. Though,

> > I am not sure how useful it is. I did not want to add the restriction. I

> expect that reader threads will register themselves. The reader threads

> require concurrent registration as they all will be running in parallel.

> > If the requirement of keeping track of the number of threads registered

> currently goes away, then this function will be simple.

> >

> > <snip>

> >

> > > > > > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h

> > > > > > b/lib/librte_rcu/rte_rcu_qsbr.h new file mode 100644 index

> > > > > > 000000000..ff696aeab

> > > > > > --- /dev/null

> > > > > > +++ b/lib/librte_rcu/rte_rcu_qsbr.h

> > > > > > @@ -0,0 +1,554 @@

> > > > > > +/* SPDX-License-Identifier: BSD-3-Clause

> > > > > > + * Copyright (c) 2018 Arm Limited  */

> > > > > > +

> > > > > > +#ifndef _RTE_RCU_QSBR_H_

> > > > > > +#define _RTE_RCU_QSBR_H_

> > > > > > +

> > > > > > +/**

> > > > > > + * @file

> > > > > > + * RTE Quiescent State Based Reclamation (QSBR)

> > > > > > + *

> > > > > > + * Quiescent State (QS) is any point in the thread execution

> > > > > > + * where the thread does not hold a reference to a data

> > > > > > +structure

> > > > > > + * in shared memory. While using lock-less data structures,

> > > > > > +the writer

> > > > > > + * can safely free memory once all the reader threads have

> > > > > > +entered

> > > > > > + * quiescent state.

> > > > > > + *

> > > > > > + * This library provides the ability for the readers to

> > > > > > +report quiescent

> > > > > > + * state and for the writers to identify when all the readers

> > > > > > +have

> > > > > > + * entered quiescent state.

> > > > > > + */

> > > > > > +

> > > > > > +#ifdef __cplusplus

> > > > > > +extern "C" {

> > > > > > +#endif

> > > > > > +

> > > > > > +#include <stdio.h>

> > > > > > +#include <stdint.h>

> > > > > > +#include <errno.h>

> > > > > > +#include <rte_common.h>

> > > > > > +#include <rte_memory.h>

> > > > > > +#include <rte_lcore.h>

> > > > > > +#include <rte_debug.h>

> > > > > > +#include <rte_atomic.h>

> > > > > > +

> > > > > > +extern int rcu_log_type;

> > > > > > +

> > > > > > +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG #define

> > > RCU_DP_LOG(level,

> > > > > fmt,

> > > > > > +args...) \

> > > > > > +	rte_log(RTE_LOG_ ## level, rcu_log_type, \

> > > > > > +		"%s(): " fmt "\n", __func__, ## args) #else #define

> > > > > > +RCU_DP_LOG(level, fmt, args...) #endif

> > > > > > +

> > > > > > +/* Registered thread IDs are stored as a bitmap of 64b

> > > > > > +element

> > > array.

> > > > > > + * Given thread id needs to be converted to index into the

> > > > > > +array and

> > > > > > + * the id within the array element.

> > > > > > + */

> > > > > > +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)

> > > > > #define

> > > > > > +RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \

> > > > > > +	RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \

> > > > > > +		RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3,

> > > > > RTE_CACHE_LINE_SIZE) #define

> > > > > > +RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \

> > > > > > +	((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)

> > > > > > +#define RTE_QSBR_THRID_INDEX_SHIFT 6 #define

> > > RTE_QSBR_THRID_MASK

> > > > > > +0x3f

> > > > > #define

> > > > > > +RTE_QSBR_THRID_INVALID 0xffffffff

> > > > > > +

> > > > > > +/* Worker thread counter */

> > > > > > +struct rte_rcu_qsbr_cnt {

> > > > > > +	uint64_t cnt;

> > > > > > +	/**< Quiescent state counter. Value 0 indicates the thread

> > > > > > +is offline */ } __rte_cache_aligned;

> > > > > > +

> > > > > > +#define RTE_QSBR_CNT_THR_OFFLINE 0 #define

> > > RTE_QSBR_CNT_INIT 1

> > > > > > +

> > > > > > +/* RTE Quiescent State variable structure.

> > > > > > + * This structure has two elements that vary in size based on

> > > > > > +the

> > > > > > + * 'max_threads' parameter.

> > > > > > + * 1) Quiescent state counter array

> > > > > > + * 2) Register thread ID array  */ struct rte_rcu_qsbr {

> > > > > > +	uint64_t token __rte_cache_aligned;

> > > > > > +	/**< Counter to allow for multiple concurrent quiescent

> > > > > > +state queries */

> > > > > > +

> > > > > > +	uint32_t num_elems __rte_cache_aligned;

> > > > > > +	/**< Number of elements in the thread ID array */

> > > > > > +	uint32_t num_threads;

> > > > > > +	/**< Number of threads currently using this QS variable */

> > > > > > +	uint32_t max_threads;

> > > > > > +	/**< Maximum number of threads using this QS variable */

> > > > > > +

> > > > > > +	struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;

> > > > > > +	/**< Quiescent state counter array of 'max_threads'

> elements

> > > > > > +*/

> > > > > > +

> > > > > > +	/**< Registered thread IDs are stored in a bitmap array,

> > > > > > +	 *   after the quiescent state counter array.

> > > > > > +	 */

> > > > > > +} __rte_cache_aligned;

> > > > > > +

> >

> > <snip>

> >

> > > > > > +

> > > > > > +/* Check the quiescent state counter for registered threads

> > > > > > +only, assuming

> > > > > > + * that not all threads have registered.

> > > > > > + */

> > > > > > +static __rte_always_inline int

> > > > > > +__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t

> > > > > > +t, bool

> > > > > > +wait) {

> > > > > > +	uint32_t i, j, id;

> > > > > > +	uint64_t bmap;

> > > > > > +	uint64_t c;

> > > > > > +	uint64_t *reg_thread_id;

> > > > > > +

> > > > > > +	for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v,

> 0);

> > > > > > +		i < v->num_elems;

> > > > > > +		i++, reg_thread_id++) {

> > > > > > +		/* Load the current registered thread bit map

> before

> > > > > > +		 * loading the reader thread quiescent state

> counters.

> > > > > > +		 */

> > > > > > +		bmap = __atomic_load_n(reg_thread_id,

> > > > > __ATOMIC_ACQUIRE);

> > > > > > +		id = i << RTE_QSBR_THRID_INDEX_SHIFT;

> > > > > > +

> > > > > > +		while (bmap) {

> > > > > > +			j = __builtin_ctzl(bmap);

> > > > > > +			RCU_DP_LOG(DEBUG,

> > > > > > +				"%s: check: token = %lu, wait = %d,

> Bit Map

> > > > > = 0x%lx, Thread ID = %d",

> > > > > > +				__func__, t, wait, bmap, id + j);

> > > > > > +			c = __atomic_load_n(

> > > > > > +					&v->qsbr_cnt[id + j].cnt,

> > > > > > +					__ATOMIC_ACQUIRE);

> > > > > > +			RCU_DP_LOG(DEBUG,

> > > > > > +				"%s: status: token = %lu, wait = %d,

> Thread

> > > > > QS cnt = %lu, Thread ID = %d",

> > > > > > +				__func__, t, wait, c, id+j);

> > > > > > +			/* Counter is not checked for wrap-around

> > > > > condition

> > > > > > +			 * as it is a 64b counter.

> > > > > > +			 */

> > > > > > +			if (unlikely(c !=

> RTE_QSBR_CNT_THR_OFFLINE && c

> > > > > < t)) {

> > > > >

> > > > > This assumes that a 64-bit counter won't overflow, which is

> > > > > close enough to true given current CPU clock frequencies.  ;-)

> > > > >

> > > > > > +				/* This thread is not in quiescent

> state */

> > > > > > +				if (!wait)

> > > > > > +					return 0;

> > > > > > +

> > > > > > +				rte_pause();

> > > > > > +				/* This thread might have

> unregistered.

> > > > > > +				 * Re-read the bitmap.

> > > > > > +				 */

> > > > > > +				bmap =

> __atomic_load_n(reg_thread_id,

> > > > > > +						__ATOMIC_ACQUIRE);

> > > > > > +

> > > > > > +				continue;

> > > > > > +			}

> > > > > > +

> > > > > > +			bmap &= ~(1UL << j);

> > > > > > +		}

> > > > > > +	}

> > > > > > +

> > > > > > +	return 1;

> > > > > > +}

> > > > > > +

> > > > > > +/* Check the quiescent state counter for all threads,

> > > > > > +assuming that

> > > > > > + * all the threads have registered.

> > > > > > + */

> > > > > > +static __rte_always_inline int __rcu_qsbr_check_all(struct

> > > > > > +rte_rcu_qsbr *v, uint64_t t, bool

> > > > > > +wait)

> > > > >

> > > > > Does checking the bitmap really take long enough to make this

> > > > > worthwhile as a separate function?  I would think that the

> > > > > bitmap-checking time would be lost in the noise of cache misses

> > > > > from

> > > the ->cnt loads.

> > > >

> > > > It avoids accessing one cache line. I think this is where the

> > > > savings are

> > > (may be in theory). This is the most probable use case.

> > > > On the other hand, __rcu_qsbr_check_selective() will result in

> > > > savings

> > > (depending on how many threads are currently registered) by avoiding

> > > accessing unwanted counters.

> > >

> > > Do you really expect to be calling this function on any kind of fastpath?

> >

> > Yes. For some of the libraries (rte_hash), the writer is on the fast path.

> >

> > >

> > > > > Sure, if you invoke __rcu_qsbr_check_selective() in a tight loop

> > > > > in the absence of readers, you might see __rcu_qsbr_check_all()

> > > > > being a bit faster.  But is that really what DPDK does?

> > > > I see improvements in the synthetic test case (similar to the one

> > > > you

> > > have described, around 27%). However, in the more practical test

> > > cases I do not see any difference.

> > >

> > > If the performance improvement only occurs in a synthetic test case,

> > > does it really make sense to optimize for it?

> > I had to fix few issues in the performance test cases and added more to

> do the comparison. These changes are in v5.

> > There are 4 performance tests involving this API.

> > 1) 1 Writer, 'N' readers

> >      Writer: qsbr_start, qsbr_check(wait = true)

> >      Readers: qsbr_quiescent

> > 2) 'N' writers

> >      Writers: qsbr_start, qsbr_check(wait == false)

> > 3) 1 Writer, 'N' readers (this test uses the lock-free rte_hash data

> structure)

> >      Writer: hash_del, qsbr_start, qsbr_check(wait = true), validate that

> the reader was able to complete its work successfully

> >      Readers: thread_online, hash_lookup, access the pointer - do some

> > work on it, qsbr_quiescent, thread_offline

> > 4) Same as test 3) but qsbr_check (wait == false)

> >

> > There are 2 sets of these tests.

> > a) QS variable is created with number of threads same as number of

> > readers - this will exercise __rcu_qsbr_check_all

> > b) QS variable is created with 128 threads, number of registered

> > threads is same as in a) - this will exercise

> > __rcu_qsbr_check_selective

> >

> > Following are the results on x86 (E5-2660 v4 @ 2.00GHz) comparing from

> > a) to b) (on x86 in my setup, the results are not very stable between

> > runs)

> > 1) 25%

> > 2) -3%

> > 3) -0.4%

> > 4) 1.38%

> >

> > Following are the results on an Arm system comparing from a) to b)

> > (results are not pretty stable between runs)

                           ^^^
Correction, on the Arm system, the results *are* stable (copy-paste error)

> > 1) -3.45%

> > 2) 0%

> > 3) -0.03%

> > 4) -0.04%

> >

> > Konstantin, is it possible to run the tests on your setup and look at the

> results?

> 

> I did run V5 on my box (SKX 2.1 GHz) with 17 lcores (1 physical core per

> thread).

> Didn't notice any siginifcatn fluctuations between runs, output below.

> 

> >rcu_qsbr_perf_autotesESC[0Kt

> Number of cores provided = 17

> Perf test with all reader threads registered

> --------------------------------------------

> 

> Perf Test: 16 Readers/1 Writer('wait' in qsbr_check == true) Total RCU

> updates = 65707232899 Cycles per 1000 updates: 18482 Total RCU checks =

> 20000000 Cycles per 1000 checks: 3794991

> 

> Perf Test: 17 Readers

> Total RCU updates = 1700000000

> Cycles per 1000 updates: 2128

> 

> Perf test: 17 Writers ('wait' in qsbr_check == false) Total RCU checks =

> 340000000 Cycles per 1000 checks: 10030

> 

> Perf test: 1 writer, 17 readers, 1 QSBR variable, 1 QSBR Query, Blocking

> QSBR Check Following numbers include calls to rte_hash functions Cycles

> per 1 update(online/update/offline): 1984696 Cycles per 1 check(start,

> check): 2619002

> 

> Perf test: 1 writer, 17 readers, 1 QSBR variable, 1 QSBR Query, Non-

> Blocking QSBR check Following numbers include calls to rte_hash functions

> Cycles per 1 update(online/update/offline): 2028030 Cycles per 1

> check(start, check): 2876667

> 

> Perf test with some of reader threads registered

> ------------------------------------------------

> 

> Perf Test: 16 Readers/1 Writer('wait' in qsbr_check == true) Total RCU

> updates = 68850073055 Cycles per 1000 updates: 25490 Total RCU checks =

> 20000000 Cycles per 1000 checks: 5484403

> 

> Perf Test: 17 Readers

> Total RCU updates = 1700000000

> Cycles per 1000 updates: 2127

> 

> Perf test: 17 Writers ('wait' in qsbr_check == false) Total RCU checks =

> 340000000 Cycles per 1000 checks: 10034

> 

> Perf test: 1 writer, 17 readers, 1 QSBR variable, 1 QSBR Query, Blocking

> QSBR Check Following numbers include calls to rte_hash functions Cycles

> per 1 update(online/update/offline): 3604489 Cycles per 1 check(start,

> check): 7077372

> 

> Perf test: 1 writer, 17 readers, 1 QSBR variable, 1 QSBR Query, Non-

> Blocking QSBR check Following numbers include calls to rte_hash functions

> Cycles per 1 update(online/update/offline): 3936831 Cycles per 1

> check(start, check): 7262738

> 

> 

> Test OK

Thanks for running the test. From the numbers, the comparison is as follows:
1) -44%
2) 0.03%
3) -170%
4) -152%

Trend is the same between x86 and Arm. However, x86 has drastic improvement with __rcu_qsbr_check_all function.

> 

> Konstantin
diff mbox series

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index 9774344dd..6e9766eed 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1267,6 +1267,11 @@  F: examples/bpf/
 F: app/test/test_bpf.c
 F: doc/guides/prog_guide/bpf_lib.rst
 
+RCU - EXPERIMENTAL
+M: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
+F: lib/librte_rcu/
+F: doc/guides/prog_guide/rcu_lib.rst
+
 
 Test Applications
 -----------------
diff --git a/config/common_base b/config/common_base
index 8da08105b..ad70c79e1 100644
--- a/config/common_base
+++ b/config/common_base
@@ -829,6 +829,12 @@  CONFIG_RTE_LIBRTE_LATENCY_STATS=y
 #
 CONFIG_RTE_LIBRTE_TELEMETRY=n
 
+#
+# Compile librte_rcu
+#
+CONFIG_RTE_LIBRTE_RCU=y
+CONFIG_RTE_LIBRTE_RCU_DEBUG=n
+
 #
 # Compile librte_lpm
 #
diff --git a/lib/Makefile b/lib/Makefile
index 26021d0c0..791e0d991 100644
--- a/lib/Makefile
+++ b/lib/Makefile
@@ -111,6 +111,8 @@  DIRS-$(CONFIG_RTE_LIBRTE_IPSEC) += librte_ipsec
 DEPDIRS-librte_ipsec := librte_eal librte_mbuf librte_cryptodev librte_security
 DIRS-$(CONFIG_RTE_LIBRTE_TELEMETRY) += librte_telemetry
 DEPDIRS-librte_telemetry := librte_eal librte_metrics librte_ethdev
+DIRS-$(CONFIG_RTE_LIBRTE_RCU) += librte_rcu
+DEPDIRS-librte_rcu := librte_eal
 
 ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)
 DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni
diff --git a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile
new file mode 100644
index 000000000..6aa677bd1
--- /dev/null
+++ b/lib/librte_rcu/Makefile
@@ -0,0 +1,23 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Arm Limited
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = librte_rcu.a
+
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
+LDLIBS += -lrte_eal
+
+EXPORT_MAP := rte_rcu_version.map
+
+LIBABIVER := 1
+
+# all source are stored in SRCS-y
+SRCS-$(CONFIG_RTE_LIBRTE_RCU) := rte_rcu_qsbr.c
+
+# install includes
+SYMLINK-$(CONFIG_RTE_LIBRTE_RCU)-include := rte_rcu_qsbr.h
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build
new file mode 100644
index 000000000..c009ae4b7
--- /dev/null
+++ b/lib/librte_rcu/meson.build
@@ -0,0 +1,5 @@ 
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Arm Limited
+
+sources = files('rte_rcu_qsbr.c')
+headers = files('rte_rcu_qsbr.h')
diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
new file mode 100644
index 000000000..53d08446a
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -0,0 +1,237 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2018 Arm Limited
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <stdint.h>
+#include <errno.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_malloc.h>
+#include <rte_eal.h>
+#include <rte_eal_memconfig.h>
+#include <rte_atomic.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_errno.h>
+
+#include "rte_rcu_qsbr.h"
+
+/* Get the memory size of QSBR variable */
+size_t __rte_experimental
+rte_rcu_qsbr_get_memsize(uint32_t max_threads)
+{
+	size_t sz;
+
+	if (max_threads == 0) {
+		rte_log(RTE_LOG_ERR, rcu_log_type,
+			"%s(): Invalid max_threads %u\n",
+			__func__, max_threads);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	sz = sizeof(struct rte_rcu_qsbr);
+
+	/* Add the size of quiescent state counter array */
+	sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
+
+	/* Add the size of the registered thread ID bitmap array */
+	sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
+
+	return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
+}
+
+/* Initialize a quiescent state variable */
+int __rte_experimental
+rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
+{
+	size_t sz;
+
+	if (v == NULL) {
+		rte_log(RTE_LOG_ERR, rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	sz = rte_rcu_qsbr_get_memsize(max_threads);
+	if (sz == 1)
+		return 1;
+
+	/* Set all the threads to offline */
+	memset(v, 0, sz);
+	v->max_threads = max_threads;
+	v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
+			RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
+			RTE_QSBR_THRID_ARRAY_ELM_SIZE;
+	v->token = RTE_QSBR_CNT_INIT;
+
+	return 0;
+}
+
+/* Register a reader thread to report its quiescent state
+ * on a QS variable.
+ */
+int __rte_experimental
+rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	unsigned int i, id, success;
+	uint64_t old_bmap, new_bmap;
+
+	if (v == NULL || thread_id >= v->max_threads) {
+		rte_log(RTE_LOG_ERR, rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	id = thread_id & RTE_QSBR_THRID_MASK;
+	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
+
+	/* Make sure that the counter for registered threads does not
+	 * go out of sync. Hence, additional checks are required.
+	 */
+	/* Check if the thread is already registered */
+	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					__ATOMIC_RELAXED);
+	if (old_bmap & 1UL << id)
+		return 0;
+
+	do {
+		new_bmap = old_bmap | (1UL << id);
+		success = __atomic_compare_exchange(
+					RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					&old_bmap, &new_bmap, 0,
+					__ATOMIC_RELEASE, __ATOMIC_RELAXED);
+
+		if (success)
+			__atomic_fetch_add(&v->num_threads,
+						1, __ATOMIC_RELAXED);
+		else if (old_bmap & (1UL << id))
+			/* Someone else registered this thread.
+			 * Counter should not be incremented.
+			 */
+			return 0;
+	} while (success == 0);
+
+	return 0;
+}
+
+/* Remove a reader thread, from the list of threads reporting their
+ * quiescent state on a QS variable.
+ */
+int __rte_experimental
+rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	unsigned int i, id, success;
+	uint64_t old_bmap, new_bmap;
+
+	if (v == NULL || thread_id >= v->max_threads) {
+		rte_log(RTE_LOG_ERR, rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	id = thread_id & RTE_QSBR_THRID_MASK;
+	i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT;
+
+	/* Make sure that the counter for registered threads does not
+	 * go out of sync. Hence, additional checks are required.
+	 */
+	/* Check if the thread is already unregistered */
+	old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					__ATOMIC_RELAXED);
+	if (old_bmap & ~(1UL << id))
+		return 0;
+
+	do {
+		new_bmap = old_bmap & ~(1UL << id);
+		/* Make sure any loads of the shared data structure are
+		 * completed before removal of the thread from the list of
+		 * reporting threads.
+		 */
+		success = __atomic_compare_exchange(
+					RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					&old_bmap, &new_bmap, 0,
+					__ATOMIC_RELEASE, __ATOMIC_RELAXED);
+
+		if (success)
+			__atomic_fetch_sub(&v->num_threads,
+						1, __ATOMIC_RELAXED);
+		else if (old_bmap & ~(1UL << id))
+			/* Someone else unregistered this thread.
+			 * Counter should not be incremented.
+			 */
+			return 0;
+	} while (success == 0);
+
+	return 0;
+}
+
+/* Dump the details of a single quiescent state variable to a file. */
+int __rte_experimental
+rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
+{
+	uint64_t bmap;
+	uint32_t i, t;
+
+	if (v == NULL || f == NULL) {
+		rte_log(RTE_LOG_ERR, rcu_log_type,
+			"%s(): Invalid input parameter\n", __func__);
+		rte_errno = EINVAL;
+
+		return 1;
+	}
+
+	fprintf(f, "\nQuiescent State Variable @%p\n", v);
+
+	fprintf(f, "  QS variable memory size = %lu\n",
+				rte_rcu_qsbr_get_memsize(v->max_threads));
+	fprintf(f, "  Given # max threads = %u\n", v->max_threads);
+	fprintf(f, "  Current # threads = %u\n", v->num_threads);
+
+	fprintf(f, "  Registered thread ID mask = 0x");
+	for (i = 0; i < v->num_elems; i++)
+		fprintf(f, "%lx", __atomic_load_n(
+					RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					__ATOMIC_ACQUIRE));
+	fprintf(f, "\n");
+
+	fprintf(f, "  Token = %lu\n",
+			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
+
+	fprintf(f, "Quiescent State Counts for readers:\n");
+	for (i = 0; i < v->num_elems; i++) {
+		bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i),
+					__ATOMIC_ACQUIRE);
+		while (bmap) {
+			t = __builtin_ctzl(bmap);
+			fprintf(f, "thread ID = %d, count = %lu\n", t,
+				__atomic_load_n(
+					&v->qsbr_cnt[i].cnt,
+					__ATOMIC_RELAXED));
+			bmap &= ~(1UL << t);
+		}
+	}
+
+	return 0;
+}
+
+int rcu_log_type;
+
+RTE_INIT(rte_rcu_register)
+{
+	rcu_log_type = rte_log_register("lib.rcu");
+	if (rcu_log_type >= 0)
+		rte_log_set_level(rcu_log_type, RTE_LOG_ERR);
+}
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
new file mode 100644
index 000000000..ff696aeab
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -0,0 +1,554 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2018 Arm Limited
+ */
+
+#ifndef _RTE_RCU_QSBR_H_
+#define _RTE_RCU_QSBR_H_
+
+/**
+ * @file
+ * RTE Quiescent State Based Reclamation (QSBR)
+ *
+ * Quiescent State (QS) is any point in the thread execution
+ * where the thread does not hold a reference to a data structure
+ * in shared memory. While using lock-less data structures, the writer
+ * can safely free memory once all the reader threads have entered
+ * quiescent state.
+ *
+ * This library provides the ability for the readers to report quiescent
+ * state and for the writers to identify when all the readers have
+ * entered quiescent state.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_debug.h>
+#include <rte_atomic.h>
+
+extern int rcu_log_type;
+
+#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
+#define RCU_DP_LOG(level, fmt, args...) \
+	rte_log(RTE_LOG_ ## level, rcu_log_type, \
+		"%s(): " fmt "\n", __func__, ## args)
+#else
+#define RCU_DP_LOG(level, fmt, args...)
+#endif
+
+/* Registered thread IDs are stored as a bitmap of 64b element array.
+ * Given thread id needs to be converted to index into the array and
+ * the id within the array element.
+ */
+#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
+#define RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
+	RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
+		RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
+#define RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
+	((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
+#define RTE_QSBR_THRID_INDEX_SHIFT 6
+#define RTE_QSBR_THRID_MASK 0x3f
+#define RTE_QSBR_THRID_INVALID 0xffffffff
+
+/* Worker thread counter */
+struct rte_rcu_qsbr_cnt {
+	uint64_t cnt;
+	/**< Quiescent state counter. Value 0 indicates the thread is offline */
+} __rte_cache_aligned;
+
+#define RTE_QSBR_CNT_THR_OFFLINE 0
+#define RTE_QSBR_CNT_INIT 1
+
+/* RTE Quiescent State variable structure.
+ * This structure has two elements that vary in size based on the
+ * 'max_threads' parameter.
+ * 1) Quiescent state counter array
+ * 2) Register thread ID array
+ */
+struct rte_rcu_qsbr {
+	uint64_t token __rte_cache_aligned;
+	/**< Counter to allow for multiple concurrent quiescent state queries */
+
+	uint32_t num_elems __rte_cache_aligned;
+	/**< Number of elements in the thread ID array */
+	uint32_t num_threads;
+	/**< Number of threads currently using this QS variable */
+	uint32_t max_threads;
+	/**< Maximum number of threads using this QS variable */
+
+	struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
+	/**< Quiescent state counter array of 'max_threads' elements */
+
+	/**< Registered thread IDs are stored in a bitmap array,
+	 *   after the quiescent state counter array.
+	 */
+} __rte_cache_aligned;
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Return the size of the memory occupied by a Quiescent State variable.
+ *
+ * @param max_threads
+ *   Maximum number of threads reporting quiescent state on this variable.
+ * @return
+ *   On success - size of memory in bytes required for this QS variable.
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - max_threads is 0
+ */
+size_t __rte_experimental
+rte_rcu_qsbr_get_memsize(uint32_t max_threads);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Initialize a Quiescent State (QS) variable.
+ *
+ * @param v
+ *   QS variable
+ * @param max_threads
+ *   Maximum number of threads reporting quiescent state on this variable.
+ *   This should be the same value as passed to rte_rcu_qsbr_get_memsize.
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - max_threads is 0 or 'v' is NULL.
+ *
+ */
+int __rte_experimental
+rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Register a reader thread to report its quiescent state
+ * on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe.
+ * Any reader thread that wants to report its quiescent state must
+ * call this API. This can be called during initialization or as part
+ * of the packet processing loop.
+ *
+ * Note that rte_rcu_qsbr_thread_online must be called before the
+ * thread updates its quiescent state using rte_rcu_qsbr_quiescent.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Reader thread with this thread ID will report its quiescent state on
+ *   the QS variable. thread_id is a value between 0 and (max_threads - 1).
+ *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.
+ */
+int __rte_experimental
+rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Remove a reader thread, from the list of threads reporting their
+ * quiescent state on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread safe.
+ * This API can be called from the reader threads during shutdown.
+ * Ongoing quiescent state queries will stop waiting for the status from this
+ * unregistered reader thread.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Reader thread with this thread ID will stop reporting its quiescent
+ *   state on the QS variable.
+ */
+int __rte_experimental
+rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Add a registered reader thread, to the list of threads reporting their
+ * quiescent state on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe.
+ *
+ * Any registered reader thread that wants to report its quiescent state must
+ * call this API before calling rte_rcu_qsbr_quiescent. This can be called
+ * during initialization or as part of the packet processing loop.
+ *
+ * The reader thread must call rte_rcu_thread_offline API, before
+ * calling any functions that block, to ensure that rte_rcu_qsbr_check
+ * API does not wait indefinitely for the reader thread to update its QS.
+ *
+ * The reader thread must call rte_rcu_thread_online API, after the blocking
+ * function call returns, to ensure that rte_rcu_qsbr_check API
+ * waits for the reader thread to update its quiescent state.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Reader thread with this thread ID will report its quiescent state on
+ *   the QS variable.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
+
+	/* Copy the current value of token.
+	 * The fence at the end of the function will ensure that
+	 * the following will not move down after the load of any shared
+	 * data structure.
+	 */
+	t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
+
+	/* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
+	 * 'cnt' (64b) is accessed atomically.
+	 */
+	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
+		t, __ATOMIC_RELAXED);
+
+	/* The subsequent load of the data structure should not
+	 * move above the store. Hence a store-load barrier
+	 * is required.
+	 * If the load of the data structure moves above the store,
+	 * writer might not see that the reader is online, even though
+	 * the reader is referencing the shared data structure.
+	 */
+#ifdef RTE_ARCH_X86_64
+	/* rte_smp_mb() for x86 is lighter */
+	rte_smp_mb();
+#else
+	__atomic_thread_fence(__ATOMIC_SEQ_CST);
+#endif
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Remove a registered reader thread from the list of threads reporting their
+ * quiescent state on a QS variable.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe.
+ *
+ * This can be called during initialization or as part of the packet
+ * processing loop.
+ *
+ * The reader thread must call rte_rcu_thread_offline API, before
+ * calling any functions that block, to ensure that rte_rcu_qsbr_check
+ * API does not wait indefinitely for the reader thread to update its QS.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   rte_rcu_qsbr_check API will not wait for the reader thread with
+ *   this thread ID to report its quiescent state on the QS variable.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
+
+	/* The reader can go offline only after the load of the
+	 * data structure is completed. i.e. any load of the
+	 * data strcture can not move after this store.
+	 */
+
+	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
+		RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Ask the reader threads to report the quiescent state
+ * status.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe and can be called from worker threads.
+ *
+ * @param v
+ *   QS variable
+ * @return
+ *   - This is the token for this call of the API. This should be
+ *     passed to rte_rcu_qsbr_check API.
+ */
+static __rte_always_inline uint64_t __rte_experimental
+rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v != NULL);
+
+	/* Release the changes to the shared data structure.
+	 * This store release will ensure that changes to any data
+	 * structure are visible to the workers before the token
+	 * update is visible.
+	 */
+	t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
+
+	return t;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Update quiescent state for a reader thread.
+ *
+ * This is implemented as a lock-free function. It is multi-thread safe.
+ * All the reader threads registered to report their quiescent state
+ * on the QS variable must call this API.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Update the quiescent state for the reader with this thread ID.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v != NULL && thread_id < v->max_threads);
+
+	/* Acquire the changes to the shared data structure released
+	 * by rte_rcu_qsbr_start.
+	 * Later loads of the shared data structure should not move
+	 * above this load. Hence, use load-acquire.
+	 */
+	t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
+
+	/* Inform the writer that updates are visible to this reader.
+	 * Prior loads of the shared data structure should not move
+	 * beyond this store. Hence use store-release.
+	 */
+	__atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
+			 t, __ATOMIC_RELEASE);
+
+	RCU_DP_LOG(DEBUG, "%s: update: token = %lu, Thread ID = %d",
+		__func__, t, thread_id);
+}
+
+/* Check the quiescent state counter for registered threads only, assuming
+ * that not all threads have registered.
+ */
+static __rte_always_inline int
+__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
+{
+	uint32_t i, j, id;
+	uint64_t bmap;
+	uint64_t c;
+	uint64_t *reg_thread_id;
+
+	for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v, 0);
+		i < v->num_elems;
+		i++, reg_thread_id++) {
+		/* Load the current registered thread bit map before
+		 * loading the reader thread quiescent state counters.
+		 */
+		bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
+		id = i << RTE_QSBR_THRID_INDEX_SHIFT;
+
+		while (bmap) {
+			j = __builtin_ctzl(bmap);
+			RCU_DP_LOG(DEBUG,
+				"%s: check: token = %lu, wait = %d, Bit Map = 0x%lx, Thread ID = %d",
+				__func__, t, wait, bmap, id + j);
+			c = __atomic_load_n(
+					&v->qsbr_cnt[id + j].cnt,
+					__ATOMIC_ACQUIRE);
+			RCU_DP_LOG(DEBUG,
+				"%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d",
+				__func__, t, wait, c, id+j);
+			/* Counter is not checked for wrap-around condition
+			 * as it is a 64b counter.
+			 */
+			if (unlikely(c != RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
+				/* This thread is not in quiescent state */
+				if (!wait)
+					return 0;
+
+				rte_pause();
+				/* This thread might have unregistered.
+				 * Re-read the bitmap.
+				 */
+				bmap = __atomic_load_n(reg_thread_id,
+						__ATOMIC_ACQUIRE);
+
+				continue;
+			}
+
+			bmap &= ~(1UL << j);
+		}
+	}
+
+	return 1;
+}
+
+/* Check the quiescent state counter for all threads, assuming that
+ * all the threads have registered.
+ */
+static __rte_always_inline int
+__rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
+{
+	uint32_t i;
+	struct rte_rcu_qsbr_cnt *cnt;
+	uint64_t c;
+
+	for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
+		RCU_DP_LOG(DEBUG,
+			"%s: check: token = %lu, wait = %d, Thread ID = %d",
+			__func__, t, wait, i);
+		while (1) {
+			c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
+			RCU_DP_LOG(DEBUG,
+				"%s: status: token = %lu, wait = %d, Thread QS cnt = %lu, Thread ID = %d",
+				__func__, t, wait, c, i);
+			/* Counter is not checked for wrap-around condition
+			 * as it is a 64b counter.
+			 */
+			if (likely(c == RTE_QSBR_CNT_THR_OFFLINE || c >= t))
+				break;
+
+			/* This thread is not in quiescent state */
+			if (!wait)
+				return 0;
+
+			rte_pause();
+		}
+	}
+
+	return 1;
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Checks if all the reader threads have entered the quiescent state
+ * referenced by token.
+ *
+ * This is implemented as a lock-free function. It is multi-thread
+ * safe and can be called from the worker threads as well.
+ *
+ * If this API is called with 'wait' set to true, the following
+ * factors must be considered:
+ *
+ * 1) If the calling thread is also reporting the status on the
+ * same QS variable, it must update the quiescent state status, before
+ * calling this API.
+ *
+ * 2) In addition, while calling from multiple threads, only
+ * one of those threads can be reporting the quiescent state status
+ * on a given QS variable.
+ *
+ * @param v
+ *   QS variable
+ * @param t
+ *   Token returned by rte_rcu_qsbr_start API
+ * @param wait
+ *   If true, block till all the reader threads have completed entering
+ *   the quiescent state referenced by token 't'.
+ * @return
+ *   - 0 if all reader threads have NOT passed through specified number
+ *     of quiescent states.
+ *   - 1 if all reader threads have passed through specified number
+ *     of quiescent states.
+ */
+static __rte_always_inline int __rte_experimental
+rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
+{
+	RTE_ASSERT(v != NULL);
+
+	if (likely(v->num_threads == v->max_threads))
+		return __rcu_qsbr_check_all(v, t, wait);
+	else
+		return __rcu_qsbr_check_selective(v, t, wait);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Wait till the reader threads have entered quiescent state.
+ *
+ * This is implemented as a lock-free function. It is multi-thread safe.
+ * This API can be thought of as a wrapper around rte_rcu_qsbr_start and
+ * rte_rcu_qsbr_check APIs.
+ *
+ * If this API is called from multiple threads, only one of
+ * those threads can be reporting the quiescent state status on a
+ * given QS variable.
+ *
+ * @param v
+ *   QS variable
+ * @param thread_id
+ *   Thread ID of the caller if it is registered to report quiescent state
+ *   on this QS variable (i.e. the calling thread is also part of the
+ *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.
+ */
+static __rte_always_inline void __rte_experimental
+rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
+{
+	uint64_t t;
+
+	RTE_ASSERT(v != NULL);
+
+	t = rte_rcu_qsbr_start(v);
+
+	/* If the current thread has readside critical section,
+	 * update its quiescent state status.
+	 */
+	if (thread_id != RTE_QSBR_THRID_INVALID)
+		rte_rcu_qsbr_quiescent(v, thread_id);
+
+	/* Wait for other readers to enter quiescent state */
+	rte_rcu_qsbr_check(v, t, true);
+}
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Dump the details of a single QS variables to a file.
+ *
+ * It is NOT multi-thread safe.
+ *
+ * @param f
+ *   A pointer to a file for output
+ * @param v
+ *   QS variable
+ * @return
+ *   On success - 0
+ *   On error - 1 with error code set in rte_errno.
+ *   Possible rte_errno codes are:
+ *   - EINVAL - NULL parameters are passed
+ */
+int __rte_experimental
+rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RCU_QSBR_H_ */
diff --git a/lib/librte_rcu/rte_rcu_version.map b/lib/librte_rcu/rte_rcu_version.map
new file mode 100644
index 000000000..ad8cb517c
--- /dev/null
+++ b/lib/librte_rcu/rte_rcu_version.map
@@ -0,0 +1,11 @@ 
+EXPERIMENTAL {
+	global:
+
+	rte_rcu_qsbr_get_memsize;
+	rte_rcu_qsbr_init;
+	rte_rcu_qsbr_thread_register;
+	rte_rcu_qsbr_thread_unregister;
+	rte_rcu_qsbr_dump;
+
+	local: *;
+};
diff --git a/lib/meson.build b/lib/meson.build
index 595314d7d..67be10659 100644
--- a/lib/meson.build
+++ b/lib/meson.build
@@ -22,7 +22,7 @@  libraries = [
 	'gro', 'gso', 'ip_frag', 'jobstats',
 	'kni', 'latencystats', 'lpm', 'member',
 	'power', 'pdump', 'rawdev',
-	'reorder', 'sched', 'security', 'stack', 'vhost',
+	'reorder', 'sched', 'security', 'stack', 'vhost', 'rcu',
 	#ipsec lib depends on crypto and security
 	'ipsec',
 	# add pkt framework libs which use other libs from above
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index 7d994bece..e93cc366d 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -97,6 +97,7 @@  _LDLIBS-$(CONFIG_RTE_LIBRTE_EAL)            += -lrte_eal
 _LDLIBS-$(CONFIG_RTE_LIBRTE_CMDLINE)        += -lrte_cmdline
 _LDLIBS-$(CONFIG_RTE_LIBRTE_REORDER)        += -lrte_reorder
 _LDLIBS-$(CONFIG_RTE_LIBRTE_SCHED)          += -lrte_sched
+_LDLIBS-$(CONFIG_RTE_LIBRTE_RCU)            += -lrte_rcu
 
 ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_KNI)            += -lrte_kni