diff mbox

[API-NEXT,PATCHv3,5/5] api: schedule: revise definition of ordered locks

Message ID 1442404320-5130-6-git-send-email-bill.fischofer@linaro.org
State Accepted
Commit 8f07daf0195e7d0d11bff935ac4bb4733c4d7121
Headers show

Commit Message

Bill Fischofer Sept. 16, 2015, 11:52 a.m. UTC
Revise the definition of odp_schedule_order_lock() and
odp_schedule_order_unlock(). These APIs now take as an argument the
index value of the ordered lock within the current context to be locked
or unlocked. Because the API is changed, this patch also updates the
linux-generic implementation of these APIs as well as the scheduler
validation test that uses them.

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 include/odp/api/schedule.h                         | 44 ++++++++++-----
 .../include/odp/plat/schedule_types.h              |  2 -
 .../linux-generic/include/odp_buffer_internal.h    |  2 +-
 .../linux-generic/include/odp_queue_internal.h     | 19 ++++---
 platform/linux-generic/odp_queue.c                 | 66 ++++++++++++++--------
 platform/linux-generic/odp_schedule.c              | 15 +++--
 test/validation/scheduler/scheduler.c              | 61 +++++++++++++++-----
 7 files changed, 141 insertions(+), 68 deletions(-)
diff mbox

Patch

diff --git a/include/odp/api/schedule.h b/include/odp/api/schedule.h
index 11f85ad..55191f9 100644
--- a/include/odp/api/schedule.h
+++ b/include/odp/api/schedule.h
@@ -31,11 +31,6 @@  extern "C" {
  */
 
 /**
- * @typedef odp_schedule_order_lock_t
- * Scheduler ordered context lock
- */
-
-/**
  * @def ODP_SCHED_WAIT
  * Wait infinitely
  */
@@ -299,15 +294,31 @@  int odp_schedule_group_thrmask(odp_schedule_group_t group,
 /**
  * Acquire ordered context lock
  *
- * This call is valid only when holding an ordered synchronization context. The
- * lock is used to protect a critical section that is executed within an
- * ordered context. Threads enter the critical section in the order determined
- * by the context (source queue). Lock ordering is automatically skipped for
- * threads that release the context instead of calling the lock.
- *
- * @param lock    Ordered context lock
+ * This call is valid only when holding an ordered synchronization context.
+ * Ordered locks are used to protect critical sections that are executed
+ * within an ordered context. Threads enter the critical section in the order
+ * determined by the context (source queue). Lock ordering is automatically
+ * skipped for threads that release the context instead of using the lock.
+ *
+ * The number of ordered locks available is set by the lock_count parameter of
+ * the schedule parameters passed to odp_queue_create(), which must be less
+ * than or equal to the ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE configuration
+ * option. If this routine is called outside of an ordered context or with a
+ * lock_index that exceeds the number of available ordered locks in this
+ * context results are undefined. The number of ordered locks associated with
+ * a given ordered queue may be queried by the odp_queue_lock_count() API.
+ *
+ * Each ordered lock may be used only once per ordered context. If events
+ * are to be processed with multiple ordered critical sections, each should
+ * be protected by its own ordered lock. This promotes maximum parallelism by
+ * allowing order to maintained on a more granular basis. If an ordered lock
+ * is used multiple times in the same ordered context results are undefined.
+ *
+ * @param lock_index Index of the ordered lock in the current context to be
+ *                   acquired. Must be in the range 0..odp_queue_lock_count()
+ *                   - 1
  */
-void odp_schedule_order_lock(odp_schedule_order_lock_t *lock);
+void odp_schedule_order_lock(unsigned lock_index);
 
 /**
  * Release ordered context lock
@@ -315,9 +326,12 @@  void odp_schedule_order_lock(odp_schedule_order_lock_t *lock);
  * This call is valid only when holding an ordered synchronization context.
  * Release a previously locked ordered context lock.
  *
- * @param lock    Ordered context lock
+ * @param lock_index Index of the ordered lock in the current context to be
+ *                   released. Results are undefined if the caller does not
+ *                   hold this lock. Must be in the range
+ *                   0..odp_queue_lock_count() - 1
  */
-void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock);
+void odp_schedule_order_unlock(unsigned lock_index);
 
 /**
  * @}
diff --git a/platform/linux-generic/include/odp/plat/schedule_types.h b/platform/linux-generic/include/odp/plat/schedule_types.h
index c48b652..2189cbb 100644
--- a/platform/linux-generic/include/odp/plat/schedule_types.h
+++ b/platform/linux-generic/include/odp/plat/schedule_types.h
@@ -51,8 +51,6 @@  typedef int odp_schedule_group_t;
 
 #define ODP_SCHED_GROUP_NAME_LEN 32
 
-typedef int odp_schedule_order_lock_t;
-
 /**
  * @}
  */
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index 4cacca1..f063c4c 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -137,7 +137,7 @@  struct odp_buffer_hdr_t {
 	queue_entry_t           *origin_qe;  /* ordered queue origin */
 	union {
 		queue_entry_t   *target_qe;  /* ordered queue target */
-		uint64_t         sync;       /* for ordered synchronization */
+		uint64_t         sync[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
 	};
 };
 
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 19a0f07..cc7120d 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -83,8 +83,8 @@  struct queue_entry_s {
 	uint64_t          order_out;
 	odp_buffer_hdr_t *reorder_head;
 	odp_buffer_hdr_t *reorder_tail;
-	odp_atomic_u64_t  sync_in;
-	odp_atomic_u64_t  sync_out;
+	odp_atomic_u64_t  sync_in[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
+	odp_atomic_u64_t  sync_out[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
 };
 
 union queue_entry_u {
@@ -123,7 +123,7 @@  int queue_sched_atomic(odp_queue_t handle);
 int release_order(queue_entry_t *origin_qe, uint64_t order,
 		  odp_pool_t pool, int enq_called);
 void get_sched_order(queue_entry_t **origin_qe, uint64_t *order);
-void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync);
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync, uint32_t ndx);
 void sched_enq_called(void);
 void sched_order_resolved(odp_buffer_hdr_t *buf_hdr);
 
@@ -194,12 +194,17 @@  static inline void reorder_enq(queue_entry_t *queue,
 
 static inline void order_release(queue_entry_t *origin_qe, int count)
 {
-	uint64_t sync = odp_atomic_load_u64(&origin_qe->s.sync_out);
+	uint64_t sync;
+	uint32_t i;
 
 	origin_qe->s.order_out += count;
-	if (sync < origin_qe->s.order_out)
-		odp_atomic_fetch_add_u64(&origin_qe->s.sync_out,
-					 origin_qe->s.order_out - sync);
+
+	for (i = 0; i < origin_qe->s.param.sched.lock_count; i++) {
+		sync = odp_atomic_load_u64(&origin_qe->s.sync_out[i]);
+		if (sync < origin_qe->s.order_out)
+			odp_atomic_fetch_add_u64(&origin_qe->s.sync_out[i],
+						 origin_qe->s.order_out - sync);
+	}
 }
 
 static inline int reorder_deq(queue_entry_t *queue,
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index ac933da..383cf87 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -53,14 +53,17 @@  queue_entry_t *get_qentry(uint32_t queue_id)
 	return &queue_tbl->queue[queue_id];
 }
 
-static void queue_init(queue_entry_t *queue, const char *name,
-		       odp_queue_type_t type, odp_queue_param_t *param)
+static int queue_init(queue_entry_t *queue, const char *name,
+		      odp_queue_type_t type, odp_queue_param_t *param)
 {
 	strncpy(queue->s.name, name, ODP_QUEUE_NAME_LEN - 1);
 	queue->s.type = type;
 
 	if (param) {
 		memcpy(&queue->s.param, param, sizeof(odp_queue_param_t));
+		if (queue->s.param.sched.lock_count >
+		    ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE)
+			return -1;
 	} else {
 		/* Defaults */
 		memset(&queue->s.param, 0, sizeof(odp_queue_param_t));
@@ -98,12 +101,13 @@  static void queue_init(queue_entry_t *queue, const char *name,
 
 	queue->s.pri_queue = ODP_QUEUE_INVALID;
 	queue->s.cmd_ev    = ODP_EVENT_INVALID;
+	return 0;
 }
 
 
 int odp_queue_init_global(void)
 {
-	uint32_t i;
+	uint32_t i, j;
 	odp_shm_t shm;
 
 	ODP_DBG("Queue init ... ");
@@ -123,8 +127,10 @@  int odp_queue_init_global(void)
 		/* init locks */
 		queue_entry_t *queue = get_qentry(i);
 		LOCK_INIT(&queue->s.lock);
-		odp_atomic_init_u64(&queue->s.sync_in, 0);
-		odp_atomic_init_u64(&queue->s.sync_out, 0);
+		for (j = 0; j < ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE; j++) {
+			odp_atomic_init_u64(&queue->s.sync_in[j], 0);
+			odp_atomic_init_u64(&queue->s.sync_out[j], 0);
+		}
 		queue->s.handle = queue_from_id(i);
 	}
 
@@ -201,6 +207,14 @@  odp_schedule_group_t odp_queue_sched_group(odp_queue_t handle)
 	return queue->s.param.sched.group;
 }
 
+int odp_queue_lock_count(odp_queue_t handle)
+{
+	queue_entry_t *queue = queue_to_qentry(handle);
+
+	return queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED ?
+		(int)queue->s.param.sched.lock_count : -1;
+}
+
 odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type,
 			     odp_queue_param_t *param)
 {
@@ -216,7 +230,10 @@  odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type,
 
 		LOCK(&queue->s.lock);
 		if (queue->s.status == QUEUE_STATUS_FREE) {
-			queue_init(queue, name, type, param);
+			if (queue_init(queue, name, type, param)) {
+				UNLOCK(&queue->s.lock);
+				return handle;
+			}
 
 			if (type == ODP_QUEUE_TYPE_SCHED ||
 			    type == ODP_QUEUE_TYPE_PKTIN)
@@ -577,6 +594,7 @@  int queue_enq_internal(odp_buffer_hdr_t *buf_hdr)
 odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 {
 	odp_buffer_hdr_t *buf_hdr;
+	uint32_t i;
 
 	LOCK(&queue->s.lock);
 
@@ -600,7 +618,10 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 	if (queue_is_ordered(queue)) {
 		buf_hdr->origin_qe = queue;
 		buf_hdr->order = queue->s.order_in++;
-		buf_hdr->sync  = odp_atomic_fetch_inc_u64(&queue->s.sync_in);
+		for (i = 0; i < queue->s.param.sched.lock_count; i++) {
+			buf_hdr->sync[i] =
+				odp_atomic_fetch_inc_u64(&queue->s.sync_in[i]);
+		}
 		buf_hdr->flags.sustain = 0;
 	} else {
 		buf_hdr->origin_qe = NULL;
@@ -621,6 +642,7 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 {
 	odp_buffer_hdr_t *hdr;
 	int i;
+	uint32_t j;
 
 	LOCK(&queue->s.lock);
 	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
@@ -648,8 +670,11 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 		if (queue_is_ordered(queue)) {
 			buf_hdr[i]->origin_qe = queue;
 			buf_hdr[i]->order     = queue->s.order_in++;
-			buf_hdr[i]->sync =
-				odp_atomic_fetch_inc_u64(&queue->s.sync_in);
+			for (j = 0; j < queue->s.param.sched.lock_count; j++) {
+				buf_hdr[i]->sync[j] =
+					odp_atomic_fetch_inc_u64
+					(&queue->s.sync_in[j]);
+			}
 			buf_hdr[i]->flags.sustain = 0;
 		} else {
 			buf_hdr[i]->origin_qe = NULL;
@@ -966,39 +991,32 @@  int release_order(queue_entry_t *origin_qe, uint64_t order,
 	return 0;
 }
 
-/* This routine is a no-op in linux-generic */
-int odp_schedule_order_lock_init(odp_schedule_order_lock_t *lock ODP_UNUSED,
-				 odp_queue_t queue ODP_UNUSED)
-{
-	return 0;
-}
-
-void odp_schedule_order_lock(odp_schedule_order_lock_t *lock ODP_UNUSED)
+void odp_schedule_order_lock(uint32_t lock_index)
 {
 	queue_entry_t *origin_qe;
 	uint64_t *sync;
 
-	get_sched_sync(&origin_qe, &sync);
-	if (!origin_qe)
+	get_sched_sync(&origin_qe, &sync, lock_index);
+	if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count)
 		return;
 
 	/* Wait until we are in order. Note that sync_out will be incremented
 	 * both by unlocks as well as order resolution, so we're OK if only
 	 * some events in the ordered flow need to lock.
 	 */
-	while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out))
+	while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index]))
 		odp_spin();
 }
 
-void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock ODP_UNUSED)
+void odp_schedule_order_unlock(uint32_t lock_index)
 {
 	queue_entry_t *origin_qe;
 	uint64_t *sync;
 
-	get_sched_sync(&origin_qe, &sync);
-	if (!origin_qe)
+	get_sched_sync(&origin_qe, &sync, lock_index);
+	if (!origin_qe || lock_index >= origin_qe->s.param.sched.lock_count)
 		return;
 
 	/* Release the ordered lock */
-	odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+	odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out[lock_index]);
 }
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c
index c6619e5..a08d995 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -86,7 +86,7 @@  typedef struct {
 	queue_entry_t *qe;
 	queue_entry_t *origin_qe;
 	uint64_t order;
-	uint64_t sync;
+	uint64_t sync[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
 	odp_pool_t pool;
 	int enq_called;
 	int num;
@@ -440,6 +440,7 @@  static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 	int i, j;
 	int thr;
 	int ret;
+	uint32_t k;
 
 	if (sched_local.num) {
 		ret = copy_events(out_ev, max_num);
@@ -551,8 +552,12 @@  static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 				sched_local.origin_qe = qe;
 				sched_local.order =
 					sched_local.buf_hdr[0]->order;
-				sched_local.sync =
-					sched_local.buf_hdr[0]->sync;
+				for (k = 0;
+				     k < qe->s.param.sched.lock_count;
+				     k++) {
+					sched_local.sync[k] =
+						sched_local.buf_hdr[0]->sync[k];
+				}
 				sched_local.enq_called = 0;
 			} else if (queue_is_atomic(qe)) {
 				/* Hold queue during atomic access */
@@ -802,10 +807,10 @@  void get_sched_order(queue_entry_t **origin_qe, uint64_t *order)
 	*order     = sched_local.order;
 }
 
-void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync)
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync, uint32_t ndx)
 {
 	*origin_qe = sched_local.origin_qe;
-	*sync      = &sched_local.sync;
+	*sync      = &sched_local.sync[ndx];
 }
 
 void sched_order_resolved(odp_buffer_hdr_t *buf_hdr)
diff --git a/test/validation/scheduler/scheduler.c b/test/validation/scheduler/scheduler.c
index 8cde985..a28d60c 100644
--- a/test/validation/scheduler/scheduler.c
+++ b/test/validation/scheduler/scheduler.c
@@ -62,13 +62,13 @@  typedef struct {
 
 typedef struct {
 	uint64_t sequence;
+	uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
 } buf_contents;
 
 typedef struct {
 	odp_buffer_t ctx_handle;
 	uint64_t sequence;
-	uint64_t lock_sequence;
-	odp_schedule_order_lock_t order_lock;
+	uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
 } queue_context;
 
 odp_pool_t pool;
@@ -419,14 +419,20 @@  static void *schedule_common_(void *arg)
 				continue;
 
 			if (sync == ODP_SCHED_SYNC_ORDERED) {
+				uint32_t ndx;
+				uint32_t ndx_max = odp_queue_lock_count(from);
+
 				qctx = odp_queue_context(from);
 				bctx = odp_buffer_addr(
 					odp_buffer_from_event(events[0]));
-				odp_schedule_order_lock(&qctx->order_lock);
-				CU_ASSERT(bctx->sequence ==
-					  qctx->lock_sequence);
-				qctx->lock_sequence += num;
-				odp_schedule_order_unlock(&qctx->order_lock);
+
+				for (ndx = 0; ndx < ndx_max; ndx++) {
+					odp_schedule_order_lock(ndx);
+					CU_ASSERT(bctx->sequence ==
+						  qctx->lock_sequence[ndx]);
+					qctx->lock_sequence[ndx] += num;
+					odp_schedule_order_unlock(ndx);
+				}
 			}
 
 			for (j = 0; j < num; j++)
@@ -438,13 +444,19 @@  static void *schedule_common_(void *arg)
 				continue;
 			num = 1;
 			if (sync == ODP_SCHED_SYNC_ORDERED) {
+				uint32_t ndx;
+				uint32_t ndx_max = odp_queue_lock_count(from);
+
 				qctx = odp_queue_context(from);
 				bctx = odp_buffer_addr(buf);
-				odp_schedule_order_lock(&qctx->order_lock);
-				CU_ASSERT(bctx->sequence ==
-					  qctx->lock_sequence);
-				qctx->lock_sequence += num;
-				odp_schedule_order_unlock(&qctx->order_lock);
+
+				for (ndx = 0; ndx < ndx_max; ndx++) {
+					odp_schedule_order_lock(ndx);
+					CU_ASSERT(bctx->sequence ==
+						  qctx->lock_sequence[ndx]);
+					qctx->lock_sequence[ndx] += num;
+					odp_schedule_order_unlock(ndx);
+				}
 			}
 			odp_buffer_free(buf);
 		}
@@ -570,8 +582,12 @@  static void reset_queues(thread_args_t *args)
 			for (k = 0; k < args->num_bufs; k++) {
 				queue_context *qctx =
 					odp_queue_context(queue);
+				uint32_t ndx;
+				uint32_t ndx_max =
+					odp_queue_lock_count(queue);
 				qctx->sequence = 0;
-				qctx->lock_sequence = 0;
+				for (ndx = 0; ndx < ndx_max; ndx++)
+					qctx->lock_sequence[ndx] = 0;
 			}
 		}
 	}
@@ -902,6 +918,7 @@  static int create_queues(void)
 	odp_pool_param_t params;
 	odp_buffer_t queue_ctx_buf;
 	queue_context *qctx;
+	uint32_t ndx;
 
 	prios = odp_schedule_num_prio();
 	odp_pool_param_init(&params);
@@ -946,12 +963,23 @@  static int create_queues(void)
 
 			snprintf(name, sizeof(name), "sched_%d_%d_o", i, j);
 			p.sched.sync = ODP_SCHED_SYNC_ORDERED;
+			p.sched.lock_count =
+				ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE;
 			q = odp_queue_create(name, ODP_QUEUE_TYPE_SCHED, &p);
 
 			if (q == ODP_QUEUE_INVALID) {
 				printf("Schedule queue create failed.\n");
 				return -1;
 			}
+			if (odp_queue_lock_count(q) !=
+			    ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE) {
+				printf("Queue %" PRIu64 " created with "
+				       "%d locks instead of expected %d\n",
+				       odp_queue_to_u64(q),
+				       odp_queue_lock_count(q),
+				       ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE);
+				return -1;
+			}
 
 			queue_ctx_buf = odp_buffer_alloc(queue_ctx_pool);
 
@@ -963,7 +991,12 @@  static int create_queues(void)
 			qctx = odp_buffer_addr(queue_ctx_buf);
 			qctx->ctx_handle = queue_ctx_buf;
 			qctx->sequence = 0;
-			qctx->lock_sequence = 0;
+
+			for (ndx = 0;
+			     ndx < ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE;
+			     ndx++) {
+				qctx->lock_sequence[ndx] = 0;
+			}
 
 			rc = odp_queue_context_set(q, qctx);