diff mbox

[API-NEXT,PATCHv9,13/13] linux-generic: schedule: implement odp_schedule_order_lock/unlock

Message ID 1438948036-31868-14-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 7, 2015, 11:47 a.m. UTC
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../include/odp/plat/schedule_types.h              |  2 -
 .../linux-generic/include/odp_buffer_internal.h    |  5 ++-
 .../linux-generic/include/odp_queue_internal.h     |  2 +
 platform/linux-generic/odp_queue.c                 | 48 ++++++++++++++++++++--
 4 files changed, 51 insertions(+), 6 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp/plat/schedule_types.h b/platform/linux-generic/include/odp/plat/schedule_types.h
index f13bfab..3665fec 100644
--- a/platform/linux-generic/include/odp/plat/schedule_types.h
+++ b/platform/linux-generic/include/odp/plat/schedule_types.h
@@ -52,8 +52,6 @@  typedef int odp_schedule_group_t;
 
 #define ODP_SCHED_GROUP_NAME_LEN 32
 
-typedef int odp_schedule_olock_t;
-
 /**
  * @}
  */
diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index c9b8409..ddd2642 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -140,7 +140,10 @@  typedef struct odp_buffer_hdr_t {
 	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
 	uint64_t                 order;      /* sequence for ordered queues */
 	queue_entry_t           *origin_qe;  /* ordered queue origin */
-	queue_entry_t           *target_qe;  /* ordered queue target */
+	union {
+		queue_entry_t   *target_qe;  /* ordered queue target */
+		uint64_t         sync;       /* for ordered synchronization */
+	};
 } odp_buffer_hdr_t;
 
 /** @internal Compile time assert that the
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 9cca552..aa36df5 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -81,6 +81,8 @@  struct queue_entry_s {
 	uint64_t          order_out;
 	odp_buffer_hdr_t *reorder_head;
 	odp_buffer_hdr_t *reorder_tail;
+	odp_atomic_u64_t  sync_in;
+	odp_atomic_u64_t  sync_out;
 };
 
 typedef union queue_entry_u {
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index ec1f797..2d999aa 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -22,6 +22,7 @@ 
 #include <odp_debug_internal.h>
 #include <odp/hints.h>
 #include <odp/sync.h>
+#include <odp_spin_internal.h>
 
 #ifdef USE_TICKETLOCK
 #include <odp/ticketlock.h>
@@ -122,6 +123,8 @@  int odp_queue_init_global(void)
 		/* init locks */
 		queue_entry_t *queue = get_qentry(i);
 		LOCK_INIT(&queue->s.lock);
+		odp_atomic_init_u64(&queue->s.sync_in, 0);
+		odp_atomic_init_u64(&queue->s.sync_out, 0);
 		queue->s.handle = queue_from_id(i);
 	}
 
@@ -404,8 +407,10 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 		}
 
 		/* We're in order, so account for this and proceed with enq */
-		if (!buf_hdr->flags.sustain)
+		if (!buf_hdr->flags.sustain) {
 			origin_qe->s.order_out++;
+			odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+		}
 
 		/* if this element is linked, restore the linked chain */
 		buf_tail = buf_hdr->link;
@@ -524,6 +529,8 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 
 		/* Reflect the above two in the output sequence */
 		origin_qe->s.order_out += release_count + placeholder_count;
+		odp_atomic_fetch_add_u64(&origin_qe->s.sync_out,
+					 release_count + placeholder_count);
 
 		/* Now handle any unblocked buffers destined for other queues */
 		UNLOCK(&queue->s.lock);
@@ -689,7 +696,8 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 	buf_hdr->next = NULL;
 	if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
 		buf_hdr->origin_qe = queue;
-		buf_hdr->order     = queue->s.order_in++;
+		buf_hdr->order = queue->s.order_in++;
+		buf_hdr->sync  = odp_atomic_fetch_inc_u64(&queue->s.sync_in);
 		buf_hdr->flags.sustain = 0;
 	} else {
 		buf_hdr->origin_qe = NULL;
@@ -737,6 +745,8 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 		if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
 			buf_hdr[i]->origin_qe = queue;
 			buf_hdr[i]->order     = queue->s.order_in++;
+			buf_hdr[i]->sync =
+				odp_atomic_fetch_inc_u64(&queue->s.sync_in);
 			buf_hdr[i]->flags.sustain = 0;
 		} else {
 			buf_hdr[i]->origin_qe = NULL;
@@ -829,8 +839,10 @@  int odp_schedule_release_ordered(odp_event_t ev)
 	 */
 	if (buf_hdr->order <= origin_qe->s.order_out + 1) {
 		buf_hdr->origin_qe = NULL;
-		if (!buf_hdr->flags.sustain)
+		if (!buf_hdr->flags.sustain) {
 			origin_qe->s.order_out++;
+			odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+		}
 
 		/* check if this release allows us to unblock waiters */
 		reorder_buf = origin_qe->s.reorder_head;
@@ -911,8 +923,38 @@  int odp_schedule_order_copy(odp_event_t src_event, odp_event_t dst_event)
 
 	dst->origin_qe = origin_qe;
 	dst->order     = src->order;
+	dst->sync      = src->sync;
 	src->flags.sustain = 1;
 
 	UNLOCK(&origin_qe->s.lock);
 	return 0;
 }
+
+void odp_schedule_order_lock(odp_event_t ev)
+{
+	odp_buffer_hdr_t *buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+	/* Wait until we are in order. Note that sync_out will be incremented
+	 * both by unlocks as well as order resolution, so we're OK if only
+	 * some events in the ordered flow need to lock.
+	 */
+	while (buf_hdr->sync > odp_atomic_load_u64(&origin_qe->s.sync_out))
+		odp_spin();
+}
+
+void odp_schedule_order_unlock(odp_event_t ev)
+{
+	odp_buffer_hdr_t *buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
+	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+	/* Get a new sync order for reusability, and release the lock. Note
+	 * that this must be done in this sequence to prevent race conditions
+	 * where the next waiter could lock and unlock before we're able to
+	 * get a new sync order since that would cause order inversion on
+	 * subsequent locks we may perform on this event in this ordered
+	 * context.
+	 */
+	buf_hdr->sync = odp_atomic_fetch_inc_u64(&origin_qe->s.sync_in);
+	odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+}