diff mbox series

[API-NEXT,v4,3/4] validation: queue: multi-thread plain queue test

Message ID 1515682819-12495-4-git-send-email-odpbot@yandex.ru
State Superseded
Headers show
Series [API-NEXT,v4,1/4] api: queue: block-free capabilities | expand

Commit Message

Github ODP bot Jan. 11, 2018, 3 p.m. UTC
From: Petri Savolainen <petri.savolainen@linaro.org>


Test plain queue enqueue and dequeue with multiple concurrent
threads. Test blocking and non-blocking lock-free
implementations.

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
/** Email created from pull request 353 (psavol:next-lockfree-queue-impl2)
 ** https://github.com/Linaro/odp/pull/353
 ** Patch: https://github.com/Linaro/odp/pull/353.patch
 ** Base sha: 6303c7d0e98fafe0f14c8c4dd9989b3b7633ebf4
 ** Merge commit sha: 065c75576263a97f76d1a47df24ee73cd18f54c5
 **/
 test/validation/api/queue/queue.c | 258 +++++++++++++++++++++++++++++++++++++-
 1 file changed, 257 insertions(+), 1 deletion(-)
diff mbox series

Patch

diff --git a/test/validation/api/queue/queue.c b/test/validation/api/queue/queue.c
index 1ff029176..59a917c08 100644
--- a/test/validation/api/queue/queue.c
+++ b/test/validation/api/queue/queue.c
@@ -14,6 +14,22 @@ 
 #define MAX_NUM_EVENT           (1 * 1024)
 #define MAX_ITERATION           (100)
 #define MAX_QUEUES              (64 * 1024)
+#define GLOBALS_NAME		"queue_test_globals"
+#define DEQ_RETRIES             100
+#define ENQ_RETRIES             100
+
+typedef struct {
+	pthrd_arg        cu_thr;
+	int              num_workers;
+	odp_barrier_t    barrier;
+	odp_queue_t      queue;
+	odp_atomic_u32_t num_event;
+
+	struct {
+		uint32_t num_event;
+	} thread[ODP_THREAD_COUNT_MAX];
+
+} test_globals_t;
 
 static int queue_context = 0xff;
 static odp_pool_t pool;
@@ -31,7 +47,30 @@  static void generate_name(char *name, uint32_t index)
 
 int queue_suite_init(void)
 {
+	odp_shm_t shm;
+	test_globals_t *globals;
 	odp_pool_param_t params;
+	int num_workers;
+	odp_cpumask_t mask;
+
+	shm = odp_shm_reserve(GLOBALS_NAME, sizeof(test_globals_t),
+			      ODP_CACHE_LINE_SIZE, 0);
+
+	if (shm == ODP_SHM_INVALID) {
+		printf("Shared memory reserve failed\n");
+		return -1;
+	}
+
+	globals = odp_shm_addr(shm);
+	memset(globals, 0, sizeof(test_globals_t));
+
+	num_workers = odp_cpumask_default_worker(&mask, 0);
+
+	if (num_workers > MAX_WORKERS)
+		num_workers = MAX_WORKERS;
+
+	globals->num_workers = num_workers;
+	odp_barrier_init(&globals->barrier, num_workers);
 
 	odp_pool_param_init(&params);
 
@@ -51,7 +90,25 @@  int queue_suite_init(void)
 
 int queue_suite_term(void)
 {
-	return odp_pool_destroy(pool);
+	odp_shm_t shm;
+
+	shm = odp_shm_lookup(GLOBALS_NAME);
+	if (shm == ODP_SHM_INVALID) {
+		printf("SHM lookup failed.\n");
+		return -1;
+	}
+
+	if (odp_shm_free(shm)) {
+		printf("SHM free failed.\n");
+		return -1;
+	}
+
+	if (odp_pool_destroy(pool)) {
+		printf("Pool destroy failed.\n");
+		return -1;
+	}
+
+	return 0;
 }
 
 void queue_test_capa(void)
@@ -411,12 +468,211 @@  void queue_test_info(void)
 	CU_ASSERT(odp_queue_destroy(q_order) == 0);
 }
 
+static uint32_t alloc_and_enqueue(odp_queue_t queue, odp_pool_t pool,
+				  uint32_t num)
+{
+	uint32_t i, ret;
+	odp_buffer_t buf;
+	odp_event_t ev;
+
+	for (i = 0; i < num; i++) {
+		buf = odp_buffer_alloc(pool);
+
+		CU_ASSERT(buf != ODP_BUFFER_INVALID);
+
+		ev = odp_buffer_to_event(buf);
+
+		ret = odp_queue_enq(queue, ev);
+
+		CU_ASSERT(ret == 0);
+
+		if (ret)
+			break;
+	}
+
+	return i;
+}
+
+static uint32_t dequeue_and_free_all(odp_queue_t queue)
+{
+	odp_event_t ev;
+	uint32_t num, retries;
+
+	num = 0;
+	retries = 0;
+
+	while (1) {
+		ev = odp_queue_deq(queue);
+
+		if (ev == ODP_EVENT_INVALID) {
+			if (retries >= DEQ_RETRIES)
+				return num;
+
+			retries++;
+			continue;
+		}
+
+		retries = 0;
+		num++;
+
+		odp_event_free(ev);
+	}
+
+	return num;
+}
+
+static int enqueue_with_retry(odp_queue_t queue, odp_event_t ev)
+{
+	int i;
+
+	for (i = 0; i < ENQ_RETRIES; i++)
+		if (odp_queue_enq(queue, ev) == 0)
+			return 0;
+
+	return -1;
+}
+
+static int queue_test_worker(void *arg)
+{
+	uint32_t num, retries, num_workers;
+	int thr_id, ret;
+	odp_event_t ev;
+	odp_queue_t queue;
+	test_globals_t *globals = arg;
+
+	thr_id      = odp_thread_id();
+	queue       = globals->queue;
+	num_workers = globals->num_workers;
+
+	if (num_workers > 1)
+		odp_barrier_wait(&globals->barrier);
+
+	retries = 0;
+	num     = odp_atomic_fetch_inc_u32(&globals->num_event);
+
+	/* On average, each worker deq-enq each event once */
+	while (num < (num_workers * MAX_NUM_EVENT)) {
+		ev = odp_queue_deq(queue);
+
+		if (ev == ODP_EVENT_INVALID) {
+			if (retries < DEQ_RETRIES) {
+				retries++;
+				continue;
+			}
+
+			/* Prevent thread to starve */
+			num = odp_atomic_fetch_inc_u32(&globals->num_event);
+			retries = 0;
+			continue;
+		}
+
+		globals->thread[thr_id].num_event++;
+
+		ret = enqueue_with_retry(queue, ev);
+
+		CU_ASSERT(ret == 0);
+
+		num = odp_atomic_fetch_inc_u32(&globals->num_event);
+	}
+
+	return 0;
+}
+
+static void reset_thread_stat(test_globals_t *globals)
+{
+	int i;
+
+	odp_atomic_init_u32(&globals->num_event, 0);
+
+	for (i = 0; i < ODP_THREAD_COUNT_MAX; i++)
+		globals->thread[i].num_event = 0;
+}
+
+static void multithread_test(odp_nonblocking_t nonblocking)
+{
+	odp_shm_t shm;
+	test_globals_t *globals;
+	odp_queue_t queue;
+	odp_queue_param_t qparams;
+	odp_queue_capability_t capa;
+	uint32_t queue_size, max_size;
+	uint32_t num, sum, num_free, i;
+
+	CU_ASSERT(odp_queue_capability(&capa) == 0);
+
+	queue_size = 2 * MAX_NUM_EVENT;
+
+	max_size = capa.plain.max_size;
+
+	if (nonblocking == ODP_NONBLOCKING_LF) {
+		if (capa.plain.lockfree.max_num == 0)
+			return;
+
+		max_size = capa.plain.lockfree.max_size;
+	}
+
+	if (max_size && queue_size > max_size)
+		queue_size = max_size;
+
+	num = MAX_NUM_EVENT;
+
+	if (num > queue_size)
+		num = queue_size / 2;
+
+	shm = odp_shm_lookup(GLOBALS_NAME);
+	CU_ASSERT_FATAL(shm != ODP_SHM_INVALID);
+
+	globals = odp_shm_addr(shm);
+	globals->cu_thr.numthrds = globals->num_workers;
+
+	odp_queue_param_init(&qparams);
+	qparams.type = ODP_QUEUE_TYPE_PLAIN;
+	qparams.size = queue_size;
+	qparams.nonblocking = nonblocking;
+
+	queue = odp_queue_create("queue_test_mt", &qparams);
+	CU_ASSERT_FATAL(queue != ODP_QUEUE_INVALID);
+
+	globals->queue = queue;
+	reset_thread_stat(globals);
+
+	CU_ASSERT(alloc_and_enqueue(queue, pool, num) == num);
+
+	odp_cunit_thread_create(queue_test_worker, (pthrd_arg *)globals);
+
+	/* Wait for worker threads to terminate */
+	odp_cunit_thread_exit((pthrd_arg *)globals);
+
+	sum = 0;
+	for (i = 0; i < ODP_THREAD_COUNT_MAX; i++)
+		sum += globals->thread[i].num_event;
+
+	CU_ASSERT(sum != 0);
+
+	num_free = dequeue_and_free_all(queue);
+
+	CU_ASSERT(num_free == num);
+	CU_ASSERT(odp_queue_destroy(queue) == 0);
+}
+
+static void queue_test_mt_plain_block(void)
+{
+	multithread_test(ODP_BLOCKING);
+}
+
+static void queue_test_mt_plain_nonblock_lf(void)
+{
+	multithread_test(ODP_NONBLOCKING_LF);
+}
+
 odp_testinfo_t queue_suite[] = {
 	ODP_TEST_INFO(queue_test_capa),
 	ODP_TEST_INFO(queue_test_mode),
 	ODP_TEST_INFO(queue_test_lockfree),
 	ODP_TEST_INFO(queue_test_param),
 	ODP_TEST_INFO(queue_test_info),
+	ODP_TEST_INFO(queue_test_mt_plain_block),
+	ODP_TEST_INFO(queue_test_mt_plain_nonblock_lf),
 	ODP_TEST_INFO_NULL,
 };