diff mbox

[API-NEXT,2/2] validation: scheduler: test ordered queue reorder processing

Message ID 1446085955-14366-2-git-send-email-bill.fischofer@linaro.org
State Accepted
Commit f6b6a09723ed1291dffb235089bcc903606c895f
Headers show

Commit Message

Bill Fischofer Oct. 29, 2015, 2:32 a.m. UTC
Extend scheduler CUnit test to add explicit tests that ordered queues
perform reorder processing properly.

This fixes Bug https://bugs.linaro.org/show_bug.cgi?id=1824

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 test/validation/scheduler/scheduler.c | 138 ++++++++++++++++++++++++++++++++--
 1 file changed, 130 insertions(+), 8 deletions(-)
diff mbox

Patch

diff --git a/test/validation/scheduler/scheduler.c b/test/validation/scheduler/scheduler.c
index 89c1099..7090e75 100644
--- a/test/validation/scheduler/scheduler.c
+++ b/test/validation/scheduler/scheduler.c
@@ -44,6 +44,7 @@  typedef struct {
 	int num_workers;
 	odp_barrier_t barrier;
 	int buf_count;
+	int buf_count_cpy;
 	odp_ticketlock_t lock;
 	odp_spinlock_t atomic_lock;
 } test_globals_t;
@@ -63,10 +64,12 @@  typedef struct {
 typedef struct {
 	uint64_t sequence;
 	uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
+	uint64_t output_sequence;
 } buf_contents;
 
 typedef struct {
 	odp_buffer_t ctx_handle;
+	odp_queue_t pq_handle;
 	uint64_t sequence;
 	uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
 } queue_context;
@@ -384,17 +387,21 @@  static void *schedule_common_(void *arg)
 	odp_schedule_sync_t sync;
 	test_globals_t *globals;
 	queue_context *qctx;
-	buf_contents *bctx;
+	buf_contents *bctx, *bctx_cpy;
+	odp_pool_t pool;
 
 	globals = args->globals;
 	sync = args->sync;
 
+	pool = odp_pool_lookup(MSG_POOL_NAME);
+	CU_ASSERT_FATAL(pool != ODP_POOL_INVALID);
+
 	if (args->num_workers > 1)
 		odp_barrier_wait(&globals->barrier);
 
 	while (1) {
 		odp_event_t ev;
-		odp_buffer_t buf;
+		odp_buffer_t buf, buf_cpy;
 		odp_queue_t from = ODP_QUEUE_INVALID;
 		int num = 0;
 		int locked;
@@ -407,7 +414,9 @@  static void *schedule_common_(void *arg)
 		odp_ticketlock_unlock(&globals->lock);
 
 		if (args->enable_schd_multi) {
-			odp_event_t events[BURST_BUF_SIZE];
+			odp_event_t events[BURST_BUF_SIZE],
+				ev_cpy[BURST_BUF_SIZE];
+			odp_buffer_t buf_cpy[BURST_BUF_SIZE];
 			int j;
 			num = odp_schedule_multi(&from, ODP_SCHED_NO_WAIT,
 						 events, BURST_BUF_SIZE);
@@ -419,11 +428,33 @@  static void *schedule_common_(void *arg)
 			if (sync == ODP_SCHED_SYNC_ORDERED) {
 				uint32_t ndx;
 				uint32_t ndx_max = odp_queue_lock_count(from);
+				int rc;
 
 				qctx = odp_queue_context(from);
+
+				for (j = 0; j < num; j++) {
+					bctx = odp_buffer_addr(
+						odp_buffer_from_event
+						(events[j]));
+
+					buf_cpy[j] = odp_buffer_alloc(pool);
+					CU_ASSERT_FATAL(buf_cpy[j] !=
+							ODP_BUFFER_INVALID);
+					bctx_cpy = odp_buffer_addr(buf_cpy[j]);
+					memcpy(bctx_cpy, bctx,
+					       sizeof(buf_contents));
+					bctx_cpy->output_sequence =
+						bctx_cpy->sequence;
+					ev_cpy[j] =
+						odp_buffer_to_event(buf_cpy[j]);
+				}
+
+				rc = odp_queue_enq_multi(qctx->pq_handle,
+							 ev_cpy, num);
+				CU_ASSERT(rc == num);
+
 				bctx = odp_buffer_addr(
 					odp_buffer_from_event(events[0]));
-
 				for (ndx = 0; ndx < ndx_max; ndx++) {
 					odp_schedule_order_lock(ndx);
 					CU_ASSERT(bctx->sequence ==
@@ -444,9 +475,20 @@  static void *schedule_common_(void *arg)
 			if (sync == ODP_SCHED_SYNC_ORDERED) {
 				uint32_t ndx;
 				uint32_t ndx_max = odp_queue_lock_count(from);
+				int rc;
 
 				qctx = odp_queue_context(from);
 				bctx = odp_buffer_addr(buf);
+				buf_cpy = odp_buffer_alloc(pool);
+				CU_ASSERT_FATAL(buf_cpy != ODP_BUFFER_INVALID);
+				bctx_cpy = odp_buffer_addr(buf_cpy);
+				memcpy(bctx_cpy, bctx, sizeof(buf_contents));
+				bctx_cpy->output_sequence = bctx_cpy->sequence;
+
+				rc = odp_queue_enq(qctx->pq_handle,
+						   odp_buffer_to_event
+						   (buf_cpy));
+				CU_ASSERT(rc == 0);
 
 				for (ndx = 0; ndx < ndx_max; ndx++) {
 					odp_schedule_order_lock(ndx);
@@ -456,6 +498,7 @@  static void *schedule_common_(void *arg)
 					odp_schedule_order_unlock(ndx);
 				}
 			}
+
 			odp_buffer_free(buf);
 		}
 
@@ -491,6 +534,52 @@  static void *schedule_common_(void *arg)
 		odp_ticketlock_unlock(&globals->lock);
 	}
 
+	if (args->num_workers > 1)
+		odp_barrier_wait(&globals->barrier);
+
+	if (sync == ODP_SCHED_SYNC_ORDERED &&
+	    odp_ticketlock_trylock(&globals->lock) &&
+	    globals->buf_count_cpy > 0) {
+		odp_event_t ev;
+		odp_queue_t pq;
+		uint64_t seq;
+		uint64_t bcount = 0;
+		int i, j;
+		char name[32];
+		uint64_t num_bufs = args->num_bufs;
+		uint64_t buf_count = globals->buf_count_cpy;
+
+		for (i = 0; i < args->num_prio; i++) {
+			for (j = 0; j < args->num_queues; j++) {
+				snprintf(name, sizeof(name),
+					 "poll_%d_%d_o", i, j);
+				pq = odp_queue_lookup(name);
+				CU_ASSERT_FATAL(pq != ODP_QUEUE_INVALID);
+
+				seq = 0;
+				while (1) {
+					ev = odp_queue_deq(pq);
+
+					if (ev == ODP_EVENT_INVALID) {
+						CU_ASSERT(seq == num_bufs);
+						break;
+					}
+
+					bctx = odp_buffer_addr(
+						odp_buffer_from_event(ev));
+
+					CU_ASSERT(bctx->sequence == seq);
+					seq++;
+					bcount++;
+					odp_event_free(ev);
+				}
+			}
+		}
+		CU_ASSERT(bcount == buf_count);
+		globals->buf_count_cpy = 0;
+		odp_ticketlock_unlock(&globals->lock);
+	}
+
 	return NULL;
 }
 
@@ -559,6 +648,7 @@  static void fill_queues(thread_args_t *args)
 	}
 
 	globals->buf_count = buf_count;
+	globals->buf_count_cpy = buf_count;
 }
 
 static void reset_queues(thread_args_t *args)
@@ -915,13 +1005,13 @@  static int create_queues(void)
 	int i, j, prios, rc;
 	odp_pool_param_t params;
 	odp_buffer_t queue_ctx_buf;
-	queue_context *qctx;
+	queue_context *qctx, *pqctx;
 	uint32_t ndx;
 
 	prios = odp_schedule_num_prio();
 	odp_pool_param_init(&params);
 	params.buf.size = sizeof(queue_context);
-	params.buf.num  = prios * QUEUES_PER_PRIO;
+	params.buf.num  = prios * QUEUES_PER_PRIO * 2;
 	params.type     = ODP_POOL_BUFFER;
 
 	queue_ctx_pool = odp_pool_create(QUEUE_CTX_POOL_NAME, &params);
@@ -939,7 +1029,7 @@  static int create_queues(void)
 		for (j = 0; j < QUEUES_PER_PRIO; j++) {
 			/* Per sched sync type */
 			char name[32];
-			odp_queue_t q;
+			odp_queue_t q, pq;
 
 			snprintf(name, sizeof(name), "sched_%d_%d_n", i, j);
 			p.sched.sync = ODP_SCHED_SYNC_NONE;
@@ -959,6 +1049,31 @@  static int create_queues(void)
 				return -1;
 			}
 
+			snprintf(name, sizeof(name), "poll_%d_%d_o", i, j);
+			pq = odp_queue_create(name, ODP_QUEUE_TYPE_POLL, NULL);
+			if (pq == ODP_QUEUE_INVALID) {
+				printf("Poll queue create failed.\n");
+				return -1;
+			}
+
+			queue_ctx_buf = odp_buffer_alloc(queue_ctx_pool);
+
+			if (queue_ctx_buf == ODP_BUFFER_INVALID) {
+				printf("Cannot allocate poll queue ctx buf\n");
+				return -1;
+			}
+
+			pqctx = odp_buffer_addr(queue_ctx_buf);
+			pqctx->ctx_handle = queue_ctx_buf;
+			pqctx->sequence = 0;
+
+			rc = odp_queue_context_set(pq, pqctx);
+
+			if (rc != 0) {
+				printf("Cannot set poll queue context\n");
+				return -1;
+			}
+
 			snprintf(name, sizeof(name), "sched_%d_%d_o", i, j);
 			p.sched.sync = ODP_SCHED_SYNC_ORDERED;
 			p.sched.lock_count =
@@ -988,6 +1103,7 @@  static int create_queues(void)
 
 			qctx = odp_buffer_addr(queue_ctx_buf);
 			qctx->ctx_handle = queue_ctx_buf;
+			qctx->pq_handle = pq;
 			qctx->sequence = 0;
 
 			for (ndx = 0;
@@ -1105,11 +1221,17 @@  static int destroy_queues(void)
 			snprintf(name, sizeof(name), "sched_%d_%d_o", i, j);
 			if (destroy_queue(name) != 0)
 				return -1;
+
+			snprintf(name, sizeof(name), "poll_%d_%d_o", i, j);
+			if (destroy_queue(name) != 0)
+				return -1;
 		}
 	}
 
-	if (odp_pool_destroy(queue_ctx_pool) != 0)
+	if (odp_pool_destroy(queue_ctx_pool) != 0) {
+		fprintf(stderr, "error: failed to destroy queue ctx pool\n");
 		return -1;
+	}
 
 	return 0;
 }