[v4,2/3] test/rcu_qsbr: add API and functional tests

Message ID 20190410112006.21644-3-honnappa.nagarahalli@arm.com
State New
Headers show
Series
  • lib/rcu: add RCU library supporting QSBR mechanism
Related show

Commit Message

Honnappa Nagarahalli April 10, 2019, 11:20 a.m.
From: Dharmik Thakkar <dharmik.thakkar@arm.com>


Add API positive/negative test cases, functional tests and
performance tests.

Signed-off-by: Malvika Gupta <malvika.gupta@arm.com>

Signed-off-by: Dharmik Thakkar <dharmik.thakkar@arm.com>

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

Reviewed-by: Gavin Hu <gavin.hu@arm.com>

Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

---
 app/test/Makefile             |    2 +
 app/test/autotest_data.py     |   12 +
 app/test/meson.build          |    5 +
 app/test/test_rcu_qsbr.c      | 1004 +++++++++++++++++++++++++++++++++
 app/test/test_rcu_qsbr_perf.c |  615 ++++++++++++++++++++
 5 files changed, 1638 insertions(+)
 create mode 100644 app/test/test_rcu_qsbr.c
 create mode 100644 app/test/test_rcu_qsbr_perf.c

-- 
2.17.1

Comments

Stephen Hemminger April 10, 2019, 3:26 p.m. | #1
On Wed, 10 Apr 2019 06:20:05 -0500
Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> wrote:

> From: Dharmik Thakkar <dharmik.thakkar@arm.com>

> 

> Add API positive/negative test cases, functional tests and

> performance tests.

> 

> Signed-off-by: Malvika Gupta <malvika.gupta@arm.com>

> Signed-off-by: Dharmik Thakkar <dharmik.thakkar@arm.com>

> Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> Reviewed-by: Gavin Hu <gavin.hu@arm.com>

> Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>


Could you add (or modify existing) l2/l3 fwd examples to demonstrate how
this would be used. Having just documentation and test code is probably not
enough to spur adoption.
Honnappa Nagarahalli April 10, 2019, 4:15 p.m. | #2
> Subject: Re: [PATCH v4 2/3] test/rcu_qsbr: add API and functional tests

> 

> On Wed, 10 Apr 2019 06:20:05 -0500

> Honnappa Nagarahalli <honnappa.nagarahalli@arm.com> wrote:

> 

> > From: Dharmik Thakkar <dharmik.thakkar@arm.com>

> >

> > Add API positive/negative test cases, functional tests and performance

> > tests.

> >

> > Signed-off-by: Malvika Gupta <malvika.gupta@arm.com>

> > Signed-off-by: Dharmik Thakkar <dharmik.thakkar@arm.com>

> > Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

> > Reviewed-by: Gavin Hu <gavin.hu@arm.com>

> > Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>

> 

> Could you add (or modify existing) l2/l3 fwd examples to demonstrate

> how this would be used. Having just documentation and test code is

> probably not enough to spur adoption.

The existing examples have static configuration. They do not delete (or add) any flows dynamically. I can show how the code looks like from the data plane perspective, but memory reclamation part cannot be demonstrated.

But, if we are ok to add more code to these applications, dynamic flow add/delete can be done.

Any thoughts on adding a new sample application?

Patch

diff --git a/app/test/Makefile b/app/test/Makefile
index b28bed2d4..10f551ecb 100644
--- a/app/test/Makefile
+++ b/app/test/Makefile
@@ -217,6 +217,8 @@  SRCS-$(CONFIG_RTE_LIBRTE_KVARGS) += test_kvargs.c
 
 SRCS-$(CONFIG_RTE_LIBRTE_BPF) += test_bpf.c
 
+SRCS-$(CONFIG_RTE_LIBRTE_RCU) += test_rcu_qsbr.c test_rcu_qsbr_perf.c
+
 SRCS-$(CONFIG_RTE_LIBRTE_IPSEC) += test_ipsec.c
 ifeq ($(CONFIG_RTE_LIBRTE_IPSEC),y)
 LDLIBS += -lrte_ipsec
diff --git a/app/test/autotest_data.py b/app/test/autotest_data.py
index db2527489..5f259e838 100644
--- a/app/test/autotest_data.py
+++ b/app/test/autotest_data.py
@@ -700,6 +700,18 @@ 
         "Func":    default_autotest,
         "Report":  None,
     },
+    {
+        "Name":    "RCU QSBR autotest",
+        "Command": "rcu_qsbr_autotest",
+        "Func":    default_autotest,
+        "Report":  None,
+    },
+    {
+        "Name":    "RCU QSBR performance autotest",
+        "Command": "rcu_qsbr_perf_autotest",
+        "Func":    default_autotest,
+        "Report":  None,
+    },
     #
     # Please always make sure that ring_perf is the last test!
     #
diff --git a/app/test/meson.build b/app/test/meson.build
index 867cc5863..1a2ee18a5 100644
--- a/app/test/meson.build
+++ b/app/test/meson.build
@@ -110,6 +110,8 @@  test_sources = files('commands.c',
 	'test_timer_perf.c',
 	'test_timer_racecond.c',
 	'test_ticketlock.c',
+	'test_rcu_qsbr.c',
+	'test_rcu_qsbr_perf.c',
 	'test_version.c',
 	'virtual_pmd.c'
 )
@@ -137,6 +139,7 @@  test_deps = ['acl',
 	'ring',
 	'stack',
 	'timer'
+	'rcu'
 ]
 
 # All test cases in fast_parallel_test_names list are parallel
@@ -175,6 +178,7 @@  fast_parallel_test_names = [
         'ring_autotest',
         'ring_pmd_autotest',
         'rwlock_autotest',
+	'rcu_qsbr_autotest',
         'sched_autotest',
         'spinlock_autotest',
         'stack_autotest',
@@ -242,6 +246,7 @@  perf_test_names = [
         'red_perf',
         'distributor_perf_autotest',
         'ring_pmd_perf_autotest',
+	'rcu_qsbr_perf_autotest',
         'pmd_perf_autotest',
         'stack_perf_autotest',
         'stack_nb_perf_autotest',
diff --git a/app/test/test_rcu_qsbr.c b/app/test/test_rcu_qsbr.c
new file mode 100644
index 000000000..8156aa56a
--- /dev/null
+++ b/app/test/test_rcu_qsbr.c
@@ -0,0 +1,1004 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2018 Arm Limited
+ */
+
+#include <stdio.h>
+#include <stdbool.h>
+#include <rte_pause.h>
+#include <rte_rcu_qsbr.h>
+#include <rte_hash.h>
+#include <rte_hash_crc.h>
+#include <rte_malloc.h>
+#include <rte_cycles.h>
+#include <unistd.h>
+
+#include "test.h"
+
+/* Check condition and return an error if true. */
+#define TEST_RCU_QSBR_RETURN_IF_ERROR(cond, str, ...) do { \
+	if (cond) { \
+		printf("ERROR file %s, line %d: " str "\n", __FILE__, \
+			__LINE__, ##__VA_ARGS__); \
+		return -1; \
+	} \
+} while (0)
+
+#define TEST_RCU_MAX_LCORE 128
+uint16_t enabled_core_ids[TEST_RCU_MAX_LCORE];
+uint8_t num_cores;
+
+static uint32_t *keys;
+#define TOTAL_ENTRY (1024 * 8)
+#define COUNTER_VALUE 4096
+static uint32_t *hash_data[TEST_RCU_MAX_LCORE][TOTAL_ENTRY];
+static uint8_t writer_done;
+
+static struct rte_rcu_qsbr *t[TEST_RCU_MAX_LCORE];
+struct rte_hash *h[TEST_RCU_MAX_LCORE];
+char hash_name[TEST_RCU_MAX_LCORE][8];
+
+static inline int
+get_enabled_cores_mask(void)
+{
+	uint16_t core_id;
+	uint32_t max_cores = rte_lcore_count();
+	if (max_cores > TEST_RCU_MAX_LCORE) {
+		printf("Number of cores exceed %d\n", TEST_RCU_MAX_LCORE);
+		return -1;
+	}
+
+	core_id = 0;
+	num_cores = 0;
+	RTE_LCORE_FOREACH_SLAVE(core_id) {
+		enabled_core_ids[num_cores] = core_id;
+		num_cores++;
+	}
+
+	return 0;
+}
+
+static int
+alloc_rcu(void)
+{
+	uint32_t sz;
+
+	sz = rte_rcu_qsbr_get_memsize(TEST_RCU_MAX_LCORE);
+
+	t[0] = (struct rte_rcu_qsbr *)rte_zmalloc("rcu0", sz,
+						RTE_CACHE_LINE_SIZE);
+	t[1] = (struct rte_rcu_qsbr *)rte_zmalloc("rcu1", sz,
+						RTE_CACHE_LINE_SIZE);
+	t[2] = (struct rte_rcu_qsbr *)rte_zmalloc("rcu2", sz,
+						RTE_CACHE_LINE_SIZE);
+	t[3] = (struct rte_rcu_qsbr *)rte_zmalloc("rcu3", sz,
+						RTE_CACHE_LINE_SIZE);
+
+	return 0;
+}
+
+static int
+free_rcu(void)
+{
+	int i;
+
+	for (i = 0; i < 4; i++)
+		rte_free(t[i]);
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_thread_register: Add a reader thread, to the list of threads
+ * reporting their quiescent state on a QS variable.
+ */
+static int
+test_rcu_qsbr_get_memsize(void)
+{
+	uint32_t sz;
+
+	printf("\nTest rte_rcu_qsbr_thread_register()\n");
+
+	sz = rte_rcu_qsbr_get_memsize(0);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((sz != 1), "Get Memsize for 0 threads");
+
+	sz = rte_rcu_qsbr_get_memsize(TEST_RCU_MAX_LCORE);
+	/* For 128 threads,
+	 * for machines with cache line size of 64B - 8384
+	 * for machines with cache line size of 128 - 16768
+	 */
+	TEST_RCU_QSBR_RETURN_IF_ERROR((sz != 8384 && sz != 16768),
+		"Get Memsize");
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_init: Initialize a QSBR variable.
+ */
+static int
+test_rcu_qsbr_init(void)
+{
+	int r;
+
+	printf("\nTest rte_rcu_qsbr_init()\n");
+
+	r = rte_rcu_qsbr_init(NULL, TEST_RCU_MAX_LCORE);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((r != 1), "NULL variable");
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_thread_register: Add a reader thread, to the list of threads
+ * reporting their quiescent state on a QS variable.
+ */
+static int
+test_rcu_qsbr_thread_register(void)
+{
+	int ret;
+
+	printf("\nTest rte_rcu_qsbr_thread_register()\n");
+
+	ret = rte_rcu_qsbr_thread_register(NULL, enabled_core_ids[0]);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "NULL variable check");
+
+	ret = rte_rcu_qsbr_thread_register(NULL, 100000);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0),
+		"NULL variable, invalid thread id");
+
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	/* Register valid thread id */
+	ret = rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1), "Valid thread id");
+
+	/* Re-registering should not return error */
+	ret = rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1),
+		"Already registered thread id");
+
+	/* Register valid thread id - max allowed thread id */
+	ret = rte_rcu_qsbr_thread_register(t[0], TEST_RCU_MAX_LCORE - 1);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1), "Max thread id");
+
+	ret = rte_rcu_qsbr_thread_register(t[0], 100000);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0),
+		"NULL variable, invalid thread id");
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_thread_unregister: Remove a reader thread, from the list of
+ * threads reporting their quiescent state on a QS variable.
+ */
+static int
+test_rcu_qsbr_thread_unregister(void)
+{
+	int i, j, ret;
+	uint64_t token;
+	uint8_t num_threads[3] = {1, TEST_RCU_MAX_LCORE, 1};
+
+	printf("\nTest rte_rcu_qsbr_thread_unregister()\n");
+
+	ret = rte_rcu_qsbr_thread_unregister(NULL, enabled_core_ids[0]);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "NULL variable check");
+
+	ret = rte_rcu_qsbr_thread_unregister(NULL, 100000);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0),
+		"NULL variable, invalid thread id");
+
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
+
+	ret = rte_rcu_qsbr_thread_unregister(t[0], 100000);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0),
+		"NULL variable, invalid thread id");
+
+	/* Find first disabled core */
+	for (i = 0; i < TEST_RCU_MAX_LCORE; i++) {
+		if (enabled_core_ids[i] == 0)
+			break;
+	}
+	/* Test with disabled lcore */
+	ret = rte_rcu_qsbr_thread_unregister(t[0], i);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1),
+		"disabled thread id");
+	/* Unregister already unregistered core */
+	ret = rte_rcu_qsbr_thread_unregister(t[0], i);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1),
+		"Already unregistered core");
+
+	/* Test with enabled lcore */
+	ret = rte_rcu_qsbr_thread_unregister(t[0], enabled_core_ids[0]);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1),
+		"enabled thread id");
+	/* Unregister already unregistered core */
+	ret = rte_rcu_qsbr_thread_unregister(t[0], enabled_core_ids[0]);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1),
+		"Already unregistered core");
+
+	/*
+	 * Test with different thread_ids:
+	 * 1 - thread_id = 0
+	 * 2 - All possible thread_ids, from 0 to TEST_RCU_MAX_LCORE
+	 * 3 - thread_id = TEST_RCU_MAX_LCORE - 1
+	 */
+	for (j = 0; j < 3; j++) {
+		rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+		for (i = 0; i < num_threads[j]; i++)
+			rte_rcu_qsbr_thread_register(t[0],
+				(j == 2) ? (TEST_RCU_MAX_LCORE - 1) : i);
+
+		token = rte_rcu_qsbr_start(t[0]);
+		TEST_RCU_QSBR_RETURN_IF_ERROR(
+			(token != (RTE_QSBR_CNT_INIT + 1)), "QSBR Start");
+		/* Update quiescent state counter */
+		for (i = 0; i < num_threads[j]; i++) {
+			/* Skip one update */
+			if (i == (TEST_RCU_MAX_LCORE - 10))
+				continue;
+			rte_rcu_qsbr_quiescent(t[0],
+				(j == 2) ? (TEST_RCU_MAX_LCORE - 1) : i);
+		}
+
+		if (j == 1) {
+			/* Validate the updates */
+			ret = rte_rcu_qsbr_check(t[0], token, false);
+			TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Non-blocking QSBR check");
+			/* Update the previously skipped thread */
+			rte_rcu_qsbr_quiescent(t[0], TEST_RCU_MAX_LCORE - 10);
+		}
+
+		/* Validate the updates */
+		ret = rte_rcu_qsbr_check(t[0], token, false);
+		TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Non-blocking QSBR check");
+
+		for (i = 0; i < num_threads[j]; i++)
+			rte_rcu_qsbr_thread_unregister(t[0],
+				(j == 2) ? (TEST_RCU_MAX_LCORE - 1) : i);
+
+		/* Check with no thread registered */
+		ret = rte_rcu_qsbr_check(t[0], token, true);
+		TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Blocking QSBR check");
+	}
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_start: Ask the worker threads to report the quiescent state
+ * status.
+ */
+static int
+test_rcu_qsbr_start(void)
+{
+	uint64_t token;
+	int i;
+
+	printf("\nTest rte_rcu_qsbr_start()\n");
+
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	for (i = 0; i < 3; i++)
+		rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[i]);
+
+	token = rte_rcu_qsbr_start(t[0]);
+	TEST_RCU_QSBR_RETURN_IF_ERROR(
+		(token != (RTE_QSBR_CNT_INIT + 1)), "QSBR Start");
+	return 0;
+}
+
+static int
+test_rcu_qsbr_check_reader(void *arg)
+{
+	struct rte_rcu_qsbr *temp;
+	uint8_t read_type = (uint8_t)((uintptr_t)arg);
+	temp = t[read_type];
+
+	/* Update quiescent state counter */
+	rte_rcu_qsbr_quiescent(temp, enabled_core_ids[0]);
+	rte_rcu_qsbr_quiescent(temp, enabled_core_ids[1]);
+	rte_rcu_qsbr_thread_unregister(temp, enabled_core_ids[2]);
+	rte_rcu_qsbr_quiescent(temp, enabled_core_ids[3]);
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_check: Checks if all the worker threads have entered the queis-
+ * cent state 'n' number of times. 'n' is provided in rte_rcu_qsbr_start API.
+ */
+static int
+test_rcu_qsbr_check(void)
+{
+	int i, ret;
+	uint64_t token;
+
+	printf("\nTest rte_rcu_qsbr_check()\n");
+
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	token = rte_rcu_qsbr_start(t[0]);
+	TEST_RCU_QSBR_RETURN_IF_ERROR(
+		(token != (RTE_QSBR_CNT_INIT + 1)), "QSBR Start");
+
+
+	ret = rte_rcu_qsbr_check(t[0], 0, false);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Token = 0");
+
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Blocking QSBR check");
+
+	for (i = 0; i < 3; i++)
+		rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[i]);
+
+	ret = rte_rcu_qsbr_check(t[0], token, false);
+	/* Threads are offline, hence this should pass */
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Non-blocking QSBR check");
+
+	token = rte_rcu_qsbr_start(t[0]);
+	TEST_RCU_QSBR_RETURN_IF_ERROR(
+		(token != (RTE_QSBR_CNT_INIT + 2)), "QSBR Start");
+
+	ret = rte_rcu_qsbr_check(t[0], token, false);
+	/* Threads are offline, hence this should pass */
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Non-blocking QSBR check");
+
+	for (i = 0; i < 3; i++)
+		rte_rcu_qsbr_thread_unregister(t[0], enabled_core_ids[i]);
+
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Blocking QSBR check");
+
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	for (i = 0; i < 4; i++)
+		rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[i]);
+
+	token = rte_rcu_qsbr_start(t[0]);
+	TEST_RCU_QSBR_RETURN_IF_ERROR(
+		(token != (RTE_QSBR_CNT_INIT + 1)), "QSBR Start");
+
+	rte_eal_remote_launch(test_rcu_qsbr_check_reader, NULL,
+							enabled_core_ids[0]);
+
+	rte_eal_mp_wait_lcore();
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "Blocking QSBR check");
+
+	return 0;
+}
+
+static int
+test_rcu_qsbr_synchronize_reader(void *arg)
+{
+	uint32_t lcore_id = rte_lcore_id();
+	(void)arg;
+
+	/* Register and become online */
+	rte_rcu_qsbr_thread_register(t[0], lcore_id);
+	rte_rcu_qsbr_thread_online(t[0], lcore_id);
+
+	while (!writer_done)
+		rte_rcu_qsbr_quiescent(t[0], lcore_id);
+
+	rte_rcu_qsbr_thread_offline(t[0], lcore_id);
+	rte_rcu_qsbr_thread_unregister(t[0], lcore_id);
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_synchronize: Wait till all the reader threads have entered
+ * the queiscent state.
+ */
+static int
+test_rcu_qsbr_synchronize(void)
+{
+	int i;
+
+	printf("\nTest rte_rcu_qsbr_synchronize()\n");
+
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	/* Test if the API returns when there are no threads reporting
+	 * QS on the variable.
+	 */
+	rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
+
+	/* Test if the API returns when there are threads registered
+	 * but not online.
+	 */
+	for (i = 0; i < TEST_RCU_MAX_LCORE; i++)
+		rte_rcu_qsbr_thread_register(t[0], i);
+	rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
+
+	/* Test if the API returns when the caller is also
+	 * reporting the QS status.
+	 */
+	rte_rcu_qsbr_thread_online(t[0], 0);
+	rte_rcu_qsbr_synchronize(t[0], 0);
+	rte_rcu_qsbr_thread_offline(t[0], 0);
+
+	/* Check the other boundary */
+	rte_rcu_qsbr_thread_online(t[0], TEST_RCU_MAX_LCORE - 1);
+	rte_rcu_qsbr_synchronize(t[0], TEST_RCU_MAX_LCORE - 1);
+	rte_rcu_qsbr_thread_offline(t[0], TEST_RCU_MAX_LCORE - 1);
+
+	/* Test if the API returns after unregisterng all the threads */
+	for (i = 0; i < TEST_RCU_MAX_LCORE; i++)
+		rte_rcu_qsbr_thread_unregister(t[0], i);
+	rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
+
+	/* Test if the API returns with the live threads */
+	writer_done = 0;
+	for (i = 0; i < num_cores; i++)
+		rte_eal_remote_launch(test_rcu_qsbr_synchronize_reader,
+			NULL, enabled_core_ids[i]);
+	rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
+	rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
+	rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
+	rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
+	rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
+
+	writer_done = 1;
+	rte_eal_mp_wait_lcore();
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_thread_online: Add a registered reader thread, to
+ * the list of threads reporting their quiescent state on a QS variable.
+ */
+static int
+test_rcu_qsbr_thread_online(void)
+{
+	int i, ret;
+	uint64_t token;
+
+	printf("Test rte_rcu_qsbr_thread_online()\n");
+
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	/* Register 2 threads to validate that only the
+	 * online thread is waited upon.
+	 */
+	rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
+	rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[1]);
+
+	/* Use qsbr_start to verify that the thread_online API
+	 * succeeded.
+	 */
+	token = rte_rcu_qsbr_start(t[0]);
+
+	/* Make the thread online */
+	rte_rcu_qsbr_thread_online(t[0], enabled_core_ids[0]);
+
+	/* Check if the thread is online */
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread online");
+
+	/* Check if the online thread, can report QS */
+	token = rte_rcu_qsbr_start(t[0]);
+	rte_rcu_qsbr_quiescent(t[0], enabled_core_ids[0]);
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread update");
+
+	/* Make all the threads online */
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+	token = rte_rcu_qsbr_start(t[0]);
+	for (i = 0; i < TEST_RCU_MAX_LCORE; i++) {
+		rte_rcu_qsbr_thread_register(t[0], i);
+		rte_rcu_qsbr_thread_online(t[0], i);
+	}
+	/* Check if all the threads are online */
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread online");
+	/* Check if all the online threads can report QS */
+	token = rte_rcu_qsbr_start(t[0]);
+	for (i = 0; i < TEST_RCU_MAX_LCORE; i++)
+		rte_rcu_qsbr_quiescent(t[0], i);
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread update");
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_thread_offline: Remove a registered reader thread, from
+ * the list of threads reporting their quiescent state on a QS variable.
+ */
+static int
+test_rcu_qsbr_thread_offline(void)
+{
+	int i, ret;
+	uint64_t token;
+
+	printf("\nTest rte_rcu_qsbr_thread_offline()\n");
+
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
+
+	/* Make the thread offline */
+	rte_rcu_qsbr_thread_offline(t[0], enabled_core_ids[0]);
+
+	/* Use qsbr_start to verify that the thread_offline API
+	 * succeeded.
+	 */
+	token = rte_rcu_qsbr_start(t[0]);
+	/* Check if the thread is offline */
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread offline");
+
+	/* Bring an offline thread online and check if it can
+	 * report QS.
+	 */
+	rte_rcu_qsbr_thread_online(t[0], enabled_core_ids[0]);
+	/* Check if the online thread, can report QS */
+	token = rte_rcu_qsbr_start(t[0]);
+	rte_rcu_qsbr_quiescent(t[0], enabled_core_ids[0]);
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "offline to online");
+
+	/*
+	 * Check a sequence of online/status/offline/status/online/status
+	 */
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+	token = rte_rcu_qsbr_start(t[0]);
+	/* Make the threads online */
+	for (i = 0; i < TEST_RCU_MAX_LCORE; i++) {
+		rte_rcu_qsbr_thread_register(t[0], i);
+		rte_rcu_qsbr_thread_online(t[0], i);
+	}
+
+	/* Check if all the threads are online */
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread online");
+
+	/* Check if all the online threads can report QS */
+	token = rte_rcu_qsbr_start(t[0]);
+	for (i = 0; i < TEST_RCU_MAX_LCORE; i++)
+		rte_rcu_qsbr_quiescent(t[0], i);
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "report QS");
+
+	/* Make all the threads offline */
+	for (i = 0; i < TEST_RCU_MAX_LCORE; i++)
+		rte_rcu_qsbr_thread_offline(t[0], i);
+	/* Make sure these threads are not being waited on */
+	token = rte_rcu_qsbr_start(t[0]);
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "offline QS");
+
+	/* Make the threads online */
+	for (i = 0; i < TEST_RCU_MAX_LCORE; i++)
+		rte_rcu_qsbr_thread_online(t[0], i);
+	/* Check if all the online threads can report QS */
+	token = rte_rcu_qsbr_start(t[0]);
+	for (i = 0; i < TEST_RCU_MAX_LCORE; i++)
+		rte_rcu_qsbr_quiescent(t[0], i);
+	ret = rte_rcu_qsbr_check(t[0], token, true);
+	TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "online again");
+
+	return 0;
+}
+
+/*
+ * rte_rcu_qsbr_dump: Dump status of a single QS variable to a file
+ */
+static int
+test_rcu_qsbr_dump(void)
+{
+	int i;
+
+	printf("\nTest rte_rcu_qsbr_dump()\n");
+
+	/* Negative tests */
+	rte_rcu_qsbr_dump(NULL, t[0]);
+	rte_rcu_qsbr_dump(stdout, NULL);
+	rte_rcu_qsbr_dump(NULL, NULL);
+
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+	rte_rcu_qsbr_init(t[1], TEST_RCU_MAX_LCORE);
+
+	/* QS variable with 0 core mask */
+	rte_rcu_qsbr_dump(stdout, t[0]);
+
+	rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
+
+	for (i = 1; i < 3; i++)
+		rte_rcu_qsbr_thread_register(t[1], enabled_core_ids[i]);
+
+	rte_rcu_qsbr_dump(stdout, t[0]);
+	rte_rcu_qsbr_dump(stdout, t[1]);
+	printf("\n");
+	return 0;
+}
+
+static int
+test_rcu_qsbr_reader(void *arg)
+{
+	struct rte_rcu_qsbr *temp;
+	struct rte_hash *hash = NULL;
+	int i;
+	uint32_t lcore_id = rte_lcore_id();
+	uint8_t read_type = (uint8_t)((uintptr_t)arg);
+	uint32_t *pdata;
+
+	temp = t[read_type];
+	hash = h[read_type];
+
+	do {
+		rte_rcu_qsbr_thread_register(temp, lcore_id);
+		rte_rcu_qsbr_thread_online(temp, lcore_id);
+		for (i = 0; i < TOTAL_ENTRY; i++) {
+			if (rte_hash_lookup_data(hash, keys+i,
+					(void **)&pdata) != -ENOENT) {
+				*pdata = 0;
+				while (*pdata < COUNTER_VALUE)
+					++*pdata;
+			}
+		}
+		/* Update quiescent state counter */
+		rte_rcu_qsbr_quiescent(temp, lcore_id);
+		rte_rcu_qsbr_thread_offline(temp, lcore_id);
+		rte_rcu_qsbr_thread_unregister(temp, lcore_id);
+	} while (!writer_done);
+
+	return 0;
+}
+
+static int
+test_rcu_qsbr_writer(void *arg)
+{
+	uint64_t token;
+	int32_t pos;
+	struct rte_rcu_qsbr *temp;
+	struct rte_hash *hash = NULL;
+	uint8_t writer_type = (uint8_t)((uintptr_t)arg);
+
+	temp = t[(writer_type/2) % TEST_RCU_MAX_LCORE];
+	hash = h[(writer_type/2) % TEST_RCU_MAX_LCORE];
+
+	/* Delete element from the shared data structure */
+	pos = rte_hash_del_key(hash, keys + (writer_type % TOTAL_ENTRY));
+	if (pos < 0) {
+		printf("Delete key failed #%d\n",
+		       keys[writer_type % TOTAL_ENTRY]);
+		return -1;
+	}
+	/* Start the quiescent state query process */
+	token = rte_rcu_qsbr_start(temp);
+	/* Check the quiescent state status */
+	rte_rcu_qsbr_check(temp, token, true);
+	if (*hash_data[(writer_type/2) % TEST_RCU_MAX_LCORE]
+	    [writer_type % TOTAL_ENTRY] != COUNTER_VALUE &&
+	    *hash_data[(writer_type/2) % TEST_RCU_MAX_LCORE]
+	    [writer_type % TOTAL_ENTRY] != 0) {
+		printf("Reader did not complete #%d = %d\t", writer_type,
+			*hash_data[(writer_type/2) % TEST_RCU_MAX_LCORE]
+				[writer_type % TOTAL_ENTRY]);
+		return -1;
+	}
+
+	if (rte_hash_free_key_with_position(hash, pos) < 0) {
+		printf("Failed to free the key #%d\n",
+		       keys[writer_type % TOTAL_ENTRY]);
+		return -1;
+	}
+	rte_free(hash_data[(writer_type/2) % TEST_RCU_MAX_LCORE]
+				[writer_type % TOTAL_ENTRY]);
+	hash_data[(writer_type/2) % TEST_RCU_MAX_LCORE]
+			[writer_type % TOTAL_ENTRY] = NULL;
+
+	return 0;
+}
+
+static struct rte_hash *
+init_hash(int hash_id)
+{
+	int i;
+	struct rte_hash *h = NULL;
+	sprintf(hash_name[hash_id], "hash%d", hash_id);
+	struct rte_hash_parameters hash_params = {
+		.entries = TOTAL_ENTRY,
+		.key_len = sizeof(uint32_t),
+		.hash_func_init_val = 0,
+		.socket_id = rte_socket_id(),
+		.hash_func = rte_hash_crc,
+		.extra_flag =
+			RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF,
+		.name = hash_name[hash_id],
+	};
+
+	h = rte_hash_create(&hash_params);
+	if (h == NULL) {
+		printf("Hash create Failed\n");
+		return NULL;
+	}
+
+	for (i = 0; i < TOTAL_ENTRY; i++) {
+		hash_data[hash_id][i] = rte_zmalloc(NULL, sizeof(uint32_t), 0);
+		if (hash_data[hash_id][i] == NULL) {
+			printf("No memory\n");
+			return NULL;
+		}
+	}
+	keys = rte_malloc(NULL, sizeof(uint32_t) * TOTAL_ENTRY, 0);
+	if (keys == NULL) {
+		printf("No memory\n");
+		return NULL;
+	}
+
+	for (i = 0; i < TOTAL_ENTRY; i++)
+		keys[i] = i;
+
+	for (i = 0; i < TOTAL_ENTRY; i++) {
+		if (rte_hash_add_key_data(h, keys + i,
+				(void *)((uintptr_t)hash_data[hash_id][i]))
+				< 0) {
+			printf("Hash key add Failed #%d\n", i);
+			return NULL;
+		}
+	}
+	return h;
+}
+
+/*
+ * Functional test:
+ * Single writer, Single QS variable, simultaneous QSBR Queries
+ */
+static int
+test_rcu_qsbr_sw_sv_3qs(void)
+{
+	uint64_t token[3];
+	int i;
+	int32_t pos[3];
+	writer_done = 0;
+
+	printf("Test: 1 writer, 1 QSBR variable, simultaneous QSBR queries\n");
+
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	/* Shared data structure created */
+	h[0] = init_hash(0);
+	if (h[0] == NULL) {
+		printf("Hash init failed\n");
+		goto error;
+	}
+
+	/* Reader threads are launched */
+	for (i = 0; i < 4; i++)
+		rte_eal_remote_launch(test_rcu_qsbr_reader, NULL,
+					enabled_core_ids[i]);
+
+	/* Delete element from the shared data structure */
+	pos[0] = rte_hash_del_key(h[0], keys + 0);
+	if (pos[0] < 0) {
+		printf("Delete key failed #%d\n", keys[0]);
+		goto error;
+	}
+	/* Start the quiescent state query process */
+	token[0] = rte_rcu_qsbr_start(t[0]);
+
+	/* Delete element from the shared data structure */
+	pos[1] = rte_hash_del_key(h[0], keys + 3);
+	if (pos[1] < 0) {
+		printf("Delete key failed #%d\n", keys[3]);
+		goto error;
+	}
+	/* Start the quiescent state query process */
+	token[1] = rte_rcu_qsbr_start(t[0]);
+
+	/* Delete element from the shared data structure */
+	pos[2] = rte_hash_del_key(h[0], keys + 6);
+	if (pos[2] < 0) {
+		printf("Delete key failed #%d\n", keys[6]);
+		goto error;
+	}
+	/* Start the quiescent state query process */
+	token[2] = rte_rcu_qsbr_start(t[0]);
+
+	/* Check the quiescent state status */
+	rte_rcu_qsbr_check(t[0], token[0], true);
+	if (*hash_data[0][0] != COUNTER_VALUE && *hash_data[0][0] != 0) {
+		printf("Reader did not complete #0 = %d\n", *hash_data[0][0]);
+		goto error;
+	}
+
+	if (rte_hash_free_key_with_position(h[0], pos[0]) < 0) {
+		printf("Failed to free the key #%d\n", keys[0]);
+		goto error;
+	}
+	rte_free(hash_data[0][0]);
+	hash_data[0][0] = NULL;
+
+	/* Check the quiescent state status */
+	rte_rcu_qsbr_check(t[0], token[1], true);
+	if (*hash_data[0][3] != COUNTER_VALUE && *hash_data[0][3] != 0) {
+		printf("Reader did not complete #3 = %d\n", *hash_data[0][3]);
+		goto error;
+	}
+
+	if (rte_hash_free_key_with_position(h[0], pos[1]) < 0) {
+		printf("Failed to free the key #%d\n", keys[3]);
+		goto error;
+	}
+	rte_free(hash_data[0][3]);
+	hash_data[0][3] = NULL;
+
+	/* Check the quiescent state status */
+	rte_rcu_qsbr_check(t[0], token[2], true);
+	if (*hash_data[0][6] != COUNTER_VALUE && *hash_data[0][6] != 0) {
+		printf("Reader did not complete #6 = %d\n", *hash_data[0][6]);
+		goto error;
+	}
+
+	if (rte_hash_free_key_with_position(h[0], pos[2]) < 0) {
+		printf("Failed to free the key #%d\n", keys[6]);
+		goto error;
+	}
+	rte_free(hash_data[0][6]);
+	hash_data[0][6] = NULL;
+
+	writer_done = 1;
+	/* Wait until all readers have exited */
+	rte_eal_mp_wait_lcore();
+	/* Check return value from threads */
+	for (i = 0; i < 4; i++)
+		if (lcore_config[enabled_core_ids[i]].ret < 0)
+			goto error;
+	rte_hash_free(h[0]);
+	rte_free(keys);
+
+	return 0;
+
+error:
+	writer_done = 1;
+	/* Wait until all readers have exited */
+	rte_eal_mp_wait_lcore();
+
+	rte_hash_free(h[0]);
+	rte_free(keys);
+	for (i = 0; i < TOTAL_ENTRY; i++)
+		rte_free(hash_data[0][i]);
+
+	return -1;
+}
+
+/*
+ * Multi writer, Multiple QS variable, simultaneous QSBR queries
+ */
+static int
+test_rcu_qsbr_mw_mv_mqs(void)
+{
+	int i, j;
+	writer_done = 0;
+	uint8_t test_cores;
+	test_cores = num_cores / 4;
+	test_cores = test_cores * 4;
+
+	printf("Test: %d writers, %d QSBR variable, simultaneous QSBR queries\n"
+	       , test_cores / 2, test_cores / 4);
+
+	for (i = 0; i < num_cores / 4; i++) {
+		rte_rcu_qsbr_init(t[i], TEST_RCU_MAX_LCORE);
+		h[i] = init_hash(i);
+		if (h[i] == NULL) {
+			printf("Hash init failed\n");
+			goto error;
+		}
+	}
+
+	/* Reader threads are launched */
+	for (i = 0; i < test_cores / 2; i++)
+		rte_eal_remote_launch(test_rcu_qsbr_reader,
+				      (void *)(uintptr_t)(i / 2),
+					enabled_core_ids[i]);
+
+	/* Writer threads are launched */
+	for (; i < test_cores; i++)
+		rte_eal_remote_launch(test_rcu_qsbr_writer,
+				      (void *)(uintptr_t)(i - (test_cores / 2)),
+					enabled_core_ids[i]);
+	/* Wait for writers to complete */
+	for (i = test_cores / 2; i < test_cores;  i++)
+		rte_eal_wait_lcore(enabled_core_ids[i]);
+
+	writer_done = 1;
+	/* Wait for readers to complete */
+	rte_eal_mp_wait_lcore();
+
+	/* Check return value from threads */
+	for (i = 0; i < test_cores; i++)
+		if (lcore_config[enabled_core_ids[i]].ret < 0)
+			goto error;
+
+	for (i = 0; i < num_cores / 4; i++)
+		rte_hash_free(h[i]);
+
+	rte_free(keys);
+
+	return 0;
+
+error:
+	writer_done = 1;
+	/* Wait until all readers have exited */
+	rte_eal_mp_wait_lcore();
+
+	for (i = 0; i < num_cores / 4; i++)
+		rte_hash_free(h[i]);
+	rte_free(keys);
+	for (j = 0; j < TEST_RCU_MAX_LCORE; j++)
+		for (i = 0; i < TOTAL_ENTRY; i++)
+			rte_free(hash_data[j][i]);
+
+	return -1;
+}
+
+static int
+test_rcu_qsbr_main(void)
+{
+	if (get_enabled_cores_mask() != 0)
+		return -1;
+
+	/* Error-checking test cases */
+	if (test_rcu_qsbr_get_memsize() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_init() < 0)
+		goto test_fail;
+
+	alloc_rcu();
+
+	if (test_rcu_qsbr_thread_register() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_thread_unregister() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_start() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_check() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_synchronize() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_dump() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_thread_online() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_thread_offline() < 0)
+		goto test_fail;
+
+	printf("\nFunctional tests\n");
+	if (num_cores < 4) {
+		printf("Test failed! Need 4 or more cores\n");
+		goto test_fail;
+	}
+
+	if (test_rcu_qsbr_sw_sv_3qs() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_mw_mv_mqs() < 0)
+		goto test_fail;
+
+	free_rcu();
+
+	printf("\n");
+	return 0;
+
+test_fail:
+	free_rcu();
+
+	return -1;
+}
+
+REGISTER_TEST_COMMAND(rcu_qsbr_autotest, test_rcu_qsbr_main);
diff --git a/app/test/test_rcu_qsbr_perf.c b/app/test/test_rcu_qsbr_perf.c
new file mode 100644
index 000000000..a69f827d8
--- /dev/null
+++ b/app/test/test_rcu_qsbr_perf.c
@@ -0,0 +1,615 @@ 
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2018 Arm Limited
+ */
+
+#include <stdio.h>
+#include <stdbool.h>
+#include <rte_pause.h>
+#include <rte_rcu_qsbr.h>
+#include <rte_hash.h>
+#include <rte_hash_crc.h>
+#include <rte_malloc.h>
+#include <rte_cycles.h>
+#include <unistd.h>
+
+#include "test.h"
+
+/* Check condition and return an error if true. */
+#define TEST_RCU_MAX_LCORE 128
+static uint16_t enabled_core_ids[TEST_RCU_MAX_LCORE];
+static uint8_t num_cores;
+
+static uint32_t *keys;
+#define TOTAL_ENTRY (1024 * 8)
+#define COUNTER_VALUE 4096
+static uint32_t *hash_data[TEST_RCU_MAX_LCORE][TOTAL_ENTRY];
+static volatile uint8_t writer_done;
+
+static struct rte_rcu_qsbr *t[TEST_RCU_MAX_LCORE];
+static struct rte_hash *h[TEST_RCU_MAX_LCORE];
+static char hash_name[TEST_RCU_MAX_LCORE][8];
+static rte_atomic64_t updates, checks;
+static rte_atomic64_t update_cycles, check_cycles;
+
+/* Scale down results to 1000 operations to support lower
+ * granularity clocks.
+ */
+#define RCU_SCALE_DOWN 1000
+
+static inline int
+get_enabled_cores_mask(void)
+{
+	uint16_t core_id;
+	uint32_t max_cores = rte_lcore_count();
+	if (max_cores > TEST_RCU_MAX_LCORE) {
+		printf("Number of cores exceed %d\n", TEST_RCU_MAX_LCORE);
+		return -1;
+	}
+
+	core_id = 0;
+	num_cores = 0;
+	RTE_LCORE_FOREACH_SLAVE(core_id) {
+		enabled_core_ids[num_cores] = core_id;
+		num_cores++;
+	}
+
+	return 0;
+}
+
+static int
+alloc_rcu(void)
+{
+	uint32_t sz;
+
+	sz = rte_rcu_qsbr_get_memsize(TEST_RCU_MAX_LCORE);
+
+	t[0] = (struct rte_rcu_qsbr *)rte_zmalloc("rcu0", sz,
+						RTE_CACHE_LINE_SIZE);
+	t[1] = (struct rte_rcu_qsbr *)rte_zmalloc("rcu1", sz,
+						RTE_CACHE_LINE_SIZE);
+	t[2] = (struct rte_rcu_qsbr *)rte_zmalloc("rcu2", sz,
+						RTE_CACHE_LINE_SIZE);
+	t[3] = (struct rte_rcu_qsbr *)rte_zmalloc("rcu3", sz,
+						RTE_CACHE_LINE_SIZE);
+
+	return 0;
+}
+
+static int
+free_rcu(void)
+{
+	int i;
+
+	for (i = 0; i < 4; i++)
+		rte_free(t[i]);
+
+	return 0;
+}
+
+static int
+test_rcu_qsbr_reader_perf(void *arg)
+{
+	bool writer_present = (bool)arg;
+	uint32_t lcore_id = rte_lcore_id();
+	uint64_t loop_cnt = 0;
+	uint64_t begin, cycles;
+
+	/* Register for report QS */
+	rte_rcu_qsbr_thread_register(t[0], lcore_id);
+	/* Make the thread online */
+	rte_rcu_qsbr_thread_online(t[0], lcore_id);
+
+	begin = rte_rdtsc_precise();
+
+	if (writer_present) {
+		while (!writer_done) {
+			/* Update quiescent state counter */
+			rte_rcu_qsbr_quiescent(t[0], lcore_id);
+			loop_cnt++;
+		}
+	} else {
+		while (loop_cnt < 100000000) {
+			/* Update quiescent state counter */
+			rte_rcu_qsbr_quiescent(t[0], lcore_id);
+			loop_cnt++;
+		}
+	}
+
+	cycles = rte_rdtsc_precise() - begin;
+	rte_atomic64_add(&update_cycles, cycles);
+	rte_atomic64_add(&updates, loop_cnt);
+
+	/* Make the thread offline */
+	rte_rcu_qsbr_thread_offline(t[0], lcore_id);
+	/* Unregister before exiting to avoid writer from waiting */
+	rte_rcu_qsbr_thread_unregister(t[0], lcore_id);
+
+	return 0;
+}
+
+static int
+test_rcu_qsbr_writer_perf(void *arg)
+{
+	bool wait = (bool)arg;
+	uint64_t token = 0;
+	uint64_t loop_cnt = 0;
+	uint64_t begin, cycles;
+
+	begin = rte_rdtsc_precise();
+
+	do {
+		/* Start the quiescent state query process */
+		if (wait)
+			token = rte_rcu_qsbr_start(t[0]);
+
+		/* Check quiescent state status */
+		rte_rcu_qsbr_check(t[0], token, wait);
+		loop_cnt++;
+	} while (loop_cnt < 20000000);
+
+	cycles = rte_rdtsc_precise() - begin;
+	rte_atomic64_add(&check_cycles, cycles);
+	rte_atomic64_add(&checks, loop_cnt);
+	return 0;
+}
+
+/*
+ * Perf test: Reader/writer
+ * Single writer, Multiple Readers, Single QS var, Non-Blocking rcu_qsbr_check
+ */
+static int
+test_rcu_qsbr_perf(void)
+{
+	int i;
+	writer_done = 0;
+
+	rte_atomic64_clear(&updates);
+	rte_atomic64_clear(&update_cycles);
+	rte_atomic64_clear(&checks);
+	rte_atomic64_clear(&check_cycles);
+
+	printf("\nPerf Test: %d Readers/1 Writer('wait' in qsbr_check == true)\n",
+		num_cores - 1);
+
+	/* QS variable is initialized */
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	/* Reader threads are launched */
+	for (i = 0; i < num_cores - 1; i++)
+		rte_eal_remote_launch(test_rcu_qsbr_reader_perf, (void *)1,
+					enabled_core_ids[i]);
+
+	/* Writer thread is launched */
+	rte_eal_remote_launch(test_rcu_qsbr_writer_perf,
+			      (void *)1, enabled_core_ids[i]);
+
+	/* Wait for the writer thread */
+	rte_eal_wait_lcore(enabled_core_ids[i]);
+	writer_done = 1;
+
+	/* Wait until all readers have exited */
+	rte_eal_mp_wait_lcore();
+
+	printf("Total RCU updates = %ld\n", rte_atomic64_read(&updates));
+	printf("Cycles per %d updates: %lu\n", RCU_SCALE_DOWN,
+		rte_atomic64_read(&update_cycles) /
+		(rte_atomic64_read(&updates) / RCU_SCALE_DOWN));
+	printf("Total RCU checks = %ld\n", rte_atomic64_read(&checks));
+	printf("Cycles per %d checks: %lu\n", RCU_SCALE_DOWN,
+		rte_atomic64_read(&check_cycles) /
+		(rte_atomic64_read(&checks) / RCU_SCALE_DOWN));
+
+	return 0;
+}
+
+/*
+ * Perf test: Readers
+ * Single writer, Multiple readers, Single QS variable
+ */
+static int
+test_rcu_qsbr_rperf(void)
+{
+	int i;
+
+	rte_atomic64_clear(&updates);
+	rte_atomic64_clear(&update_cycles);
+
+	printf("\nPerf Test: %d Readers\n", num_cores);
+
+	/* QS variable is initialized */
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	/* Reader threads are launched */
+	for (i = 0; i < num_cores; i++)
+		rte_eal_remote_launch(test_rcu_qsbr_reader_perf, NULL,
+					enabled_core_ids[i]);
+
+	/* Wait until all readers have exited */
+	rte_eal_mp_wait_lcore();
+
+	printf("Total RCU updates = %ld\n", rte_atomic64_read(&updates));
+	printf("Cycles per %d updates: %lu\n", RCU_SCALE_DOWN,
+		rte_atomic64_read(&update_cycles) /
+		(rte_atomic64_read(&updates) / RCU_SCALE_DOWN));
+
+	return 0;
+}
+
+/*
+ * Perf test:
+ * Multiple writer, Single QS variable, Non-blocking rcu_qsbr_check
+ */
+static int
+test_rcu_qsbr_wperf(void)
+{
+	int i;
+
+	rte_atomic64_clear(&checks);
+	rte_atomic64_clear(&check_cycles);
+
+	printf("\nPerf test: %d Writers ('wait' in qsbr_check == false)\n",
+		num_cores);
+
+	/* QS variable is initialized */
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	/* Writer threads are launched */
+	for (i = 0; i < num_cores; i++)
+		rte_eal_remote_launch(test_rcu_qsbr_writer_perf,
+				(void *)0, enabled_core_ids[i]);
+
+	/* Wait until all readers have exited */
+	rte_eal_mp_wait_lcore();
+
+	printf("Total RCU checks = %ld\n", rte_atomic64_read(&checks));
+	printf("Cycles per %d checks: %lu\n", RCU_SCALE_DOWN,
+		rte_atomic64_read(&check_cycles) /
+		(rte_atomic64_read(&checks) / RCU_SCALE_DOWN));
+
+	return 0;
+}
+
+/*
+ * RCU test cases using rte_hash data structure.
+ */
+static int
+test_rcu_qsbr_hash_reader(void *arg)
+{
+	struct rte_rcu_qsbr *temp;
+	struct rte_hash *hash = NULL;
+	int i;
+	uint64_t loop_cnt = 0;
+	uint64_t begin, cycles;
+	uint32_t lcore_id = rte_lcore_id();
+	uint8_t read_type = (uint8_t)((uintptr_t)arg);
+	uint32_t *pdata;
+
+	temp = t[read_type];
+	hash = h[read_type];
+
+	rte_rcu_qsbr_thread_register(temp, lcore_id);
+
+	begin = rte_rdtsc_precise();
+
+	do {
+		rte_rcu_qsbr_thread_online(temp, lcore_id);
+		for (i = 0; i < TOTAL_ENTRY; i++) {
+			if (rte_hash_lookup_data(hash, keys+i,
+					(void **)&pdata) != -ENOENT) {
+				*pdata = 0;
+				while (*pdata < COUNTER_VALUE)
+					++*pdata;
+			}
+		}
+		/* Update quiescent state counter */
+		rte_rcu_qsbr_quiescent(temp, lcore_id);
+		rte_rcu_qsbr_thread_offline(temp, lcore_id);
+		loop_cnt++;
+	} while (!writer_done);
+
+	cycles = rte_rdtsc_precise() - begin;
+	rte_atomic64_add(&update_cycles, cycles);
+	rte_atomic64_add(&updates, loop_cnt);
+
+	rte_rcu_qsbr_thread_unregister(temp, lcore_id);
+
+	return 0;
+}
+
+static struct rte_hash *
+init_hash(int hash_id)
+{
+	int i;
+	struct rte_hash *h = NULL;
+	sprintf(hash_name[hash_id], "hash%d", hash_id);
+	struct rte_hash_parameters hash_params = {
+		.entries = TOTAL_ENTRY,
+		.key_len = sizeof(uint32_t),
+		.hash_func_init_val = 0,
+		.socket_id = rte_socket_id(),
+		.hash_func = rte_hash_crc,
+		.extra_flag =
+			RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF,
+		.name = hash_name[hash_id],
+	};
+
+	h = rte_hash_create(&hash_params);
+	if (h == NULL) {
+		printf("Hash create Failed\n");
+		return NULL;
+	}
+
+	for (i = 0; i < TOTAL_ENTRY; i++) {
+		hash_data[hash_id][i] = rte_zmalloc(NULL, sizeof(uint32_t), 0);
+		if (hash_data[hash_id][i] == NULL) {
+			printf("No memory\n");
+			return NULL;
+		}
+	}
+	keys = rte_malloc(NULL, sizeof(uint32_t) * TOTAL_ENTRY, 0);
+	if (keys == NULL) {
+		printf("No memory\n");
+		return NULL;
+	}
+
+	for (i = 0; i < TOTAL_ENTRY; i++)
+		keys[i] = i;
+
+	for (i = 0; i < TOTAL_ENTRY; i++) {
+		if (rte_hash_add_key_data(h, keys + i,
+				(void *)((uintptr_t)hash_data[hash_id][i]))
+				< 0) {
+			printf("Hash key add Failed #%d\n", i);
+			return NULL;
+		}
+	}
+	return h;
+}
+
+/*
+ * Functional test:
+ * Single writer, Single QS variable Single QSBR query, Blocking rcu_qsbr_check
+ */
+static int
+test_rcu_qsbr_sw_sv_1qs(void)
+{
+	uint64_t token, begin, cycles;
+	int i;
+	int32_t pos;
+	writer_done = 0;
+
+	rte_atomic64_clear(&updates);
+	rte_atomic64_clear(&update_cycles);
+	rte_atomic64_clear(&checks);
+	rte_atomic64_clear(&check_cycles);
+
+	printf("\nPerf test: 1 writer, %d readers, 1 QSBR variable, 1 QSBR Query, "
+	       "Blocking QSBR Check\n", num_cores);
+
+	/* QS variable is initialized */
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	/* Shared data structure created */
+	h[0] = init_hash(0);
+	if (h[0] == NULL) {
+		printf("Hash init failed\n");
+		goto error;
+	}
+
+	/* Reader threads are launched */
+	for (i = 0; i < num_cores; i++)
+		rte_eal_remote_launch(test_rcu_qsbr_hash_reader, NULL,
+					enabled_core_ids[i]);
+
+	begin = rte_rdtsc_precise();
+
+	for (i = 0; i < TOTAL_ENTRY; i++) {
+		/* Delete elements from the shared data structure */
+		pos = rte_hash_del_key(h[0], keys + i);
+		if (pos < 0) {
+			printf("Delete key failed #%d\n", keys[i]);
+			goto error;
+		}
+		/* Start the quiescent state query process */
+		token = rte_rcu_qsbr_start(t[0]);
+
+		/* Check the quiescent state status */
+		rte_rcu_qsbr_check(t[0], token, true);
+		if (*hash_data[0][i] != COUNTER_VALUE &&
+			*hash_data[0][i] != 0) {
+			printf("Reader did not complete #%d =  %d\n", i,
+							*hash_data[0][i]);
+			goto error;
+		}
+
+		if (rte_hash_free_key_with_position(h[0], pos) < 0) {
+			printf("Failed to free the key #%d\n", keys[i]);
+			goto error;
+		}
+		rte_free(hash_data[0][i]);
+		hash_data[0][i] = NULL;
+	}
+
+	cycles = rte_rdtsc_precise() - begin;
+	rte_atomic64_add(&check_cycles, cycles);
+	rte_atomic64_add(&checks, i);
+
+	writer_done = 1;
+
+	/* Wait until all readers have exited */
+	rte_eal_mp_wait_lcore();
+	/* Check return value from threads */
+	for (i = 0; i < 4; i++)
+		if (lcore_config[enabled_core_ids[i]].ret < 0)
+			goto error;
+	rte_hash_free(h[0]);
+	rte_free(keys);
+
+	printf("Following numbers include calls to rte_hash functions\n");
+	printf("Cycles per 1 update(register/update/unregister): %lu\n",
+		rte_atomic64_read(&update_cycles) /
+		rte_atomic64_read(&updates));
+
+	printf("Cycles per 1 check(start, check): %lu\n\n",
+		rte_atomic64_read(&check_cycles) /
+		rte_atomic64_read(&checks));
+
+	return 0;
+
+error:
+	writer_done = 1;
+	/* Wait until all readers have exited */
+	rte_eal_mp_wait_lcore();
+
+	rte_hash_free(h[0]);
+	rte_free(keys);
+	for (i = 0; i < TOTAL_ENTRY; i++)
+		rte_free(hash_data[0][i]);
+
+	return -1;
+}
+
+/*
+ * Functional test:
+ * Single writer, Single QS variable, Single QSBR query,
+ * Non-blocking rcu_qsbr_check
+ */
+static int
+test_rcu_qsbr_sw_sv_1qs_non_blocking(void)
+{
+	uint64_t token, begin, cycles;
+	int i, ret;
+	int32_t pos;
+	writer_done = 0;
+
+	printf("Perf test: 1 writer, %d readers, 1 QSBR variable, 1 QSBR Query, "
+	       "Non-Blocking QSBR check\n", num_cores);
+
+	rte_rcu_qsbr_init(t[0], TEST_RCU_MAX_LCORE);
+
+	/* Shared data structure created */
+	h[0] = init_hash(0);
+	if (h[0] == NULL) {
+		printf("Hash init failed\n");
+		goto error;
+	}
+
+	/* Reader threads are launched */
+	for (i = 0; i < num_cores; i++)
+		rte_eal_remote_launch(test_rcu_qsbr_hash_reader, NULL,
+					enabled_core_ids[i]);
+
+	begin = rte_rdtsc_precise();
+
+	for (i = 0; i < TOTAL_ENTRY; i++) {
+		/* Delete elements from the shared data structure */
+		pos = rte_hash_del_key(h[0], keys + i);
+		if (pos < 0) {
+			printf("Delete key failed #%d\n", keys[i]);
+			goto error;
+		}
+		/* Start the quiescent state query process */
+		token = rte_rcu_qsbr_start(t[0]);
+
+		/* Check the quiescent state status */
+		do {
+			ret = rte_rcu_qsbr_check(t[0], token, false);
+		} while (ret == 0);
+		if (*hash_data[0][i] != COUNTER_VALUE &&
+			*hash_data[0][i] != 0) {
+			printf("Reader did not complete  #%d = %d\n", i,
+							*hash_data[0][i]);
+			goto error;
+		}
+
+		if (rte_hash_free_key_with_position(h[0], pos) < 0) {
+			printf("Failed to free the key #%d\n", keys[i]);
+			goto error;
+		}
+		rte_free(hash_data[0][i]);
+		hash_data[0][i] = NULL;
+	}
+
+	cycles = rte_rdtsc_precise() - begin;
+	rte_atomic64_add(&check_cycles, cycles);
+	rte_atomic64_add(&checks, i);
+
+	writer_done = 1;
+	/* Wait until all readers have exited */
+	rte_eal_mp_wait_lcore();
+	/* Check return value from threads */
+	for (i = 0; i < num_cores; i++)
+		if (lcore_config[enabled_core_ids[i]].ret < 0)
+			goto error;
+	rte_hash_free(h[0]);
+	rte_free(keys);
+
+	printf("Following numbers include calls to rte_hash functions\n");
+	printf("Cycles per 1 update(register/update/unregister): %lu\n",
+		rte_atomic64_read(&update_cycles) /
+		rte_atomic64_read(&updates));
+
+	printf("Cycles per 1 check(start, check): %lu\n\n",
+		rte_atomic64_read(&check_cycles) /
+		rte_atomic64_read(&checks));
+
+	return 0;
+
+error:
+	writer_done = 1;
+	/* Wait until all readers have exited */
+	rte_eal_mp_wait_lcore();
+
+	rte_hash_free(h[0]);
+	rte_free(keys);
+	for (i = 0; i < TOTAL_ENTRY; i++)
+		rte_free(hash_data[0][i]);
+
+	return -1;
+}
+
+static int
+test_rcu_qsbr_main(void)
+{
+	rte_atomic64_init(&updates);
+	rte_atomic64_init(&update_cycles);
+	rte_atomic64_init(&checks);
+	rte_atomic64_init(&check_cycles);
+
+	if (get_enabled_cores_mask() != 0)
+		return -1;
+
+	if (num_cores < 2) {
+		printf("Test failed! Need 2 or more cores\n");
+		goto test_fail;
+	}
+
+	alloc_rcu();
+
+	if (test_rcu_qsbr_perf() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_rperf() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_wperf() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_sw_sv_1qs() < 0)
+		goto test_fail;
+
+	if (test_rcu_qsbr_sw_sv_1qs_non_blocking() < 0)
+		goto test_fail;
+
+	printf("\n");
+
+	free_rcu();
+
+	return 0;
+
+test_fail:
+	free_rcu();
+	return -1;
+}
+
+REGISTER_TEST_COMMAND(rcu_qsbr_perf_autotest, test_rcu_qsbr_main);