diff mbox

[API-NEXT,PATCHv4,10/10] linux-generic: schedule: implement ordered locks

Message ID 1440796737-8636-11-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 28, 2015, 9:18 p.m. UTC
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_buffer_internal.h    |  3 ++
 .../linux-generic/include/odp_queue_internal.h     |  3 ++
 platform/linux-generic/odp_queue.c                 | 48 ++++++++++++++++++++++
 platform/linux-generic/odp_schedule.c              |  7 ++++
 4 files changed, 61 insertions(+)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index ca4d314..6badeba 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -140,7 +140,10 @@  typedef struct odp_buffer_hdr_t {
 	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
 	uint64_t                 order;      /* sequence for ordered queues */
 	queue_entry_t           *origin_qe;  /* ordered queue origin */
+	union {
 		queue_entry_t   *target_qe;  /* ordered queue target */
+		uint64_t         sync;       /* for ordered synchronization */
+	};
 } odp_buffer_hdr_t;
 
 /** @internal Compile time assert that the
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 163172c..4cee9b6 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -82,6 +82,8 @@  struct queue_entry_s {
 	uint64_t          order_out;
 	odp_buffer_hdr_t *reorder_head;
 	odp_buffer_hdr_t *reorder_tail;
+	odp_atomic_u64_t  sync_in;
+	odp_atomic_u64_t  sync_out;
 };
 
 typedef union queue_entry_u {
@@ -120,6 +122,7 @@  int queue_sched_atomic(odp_queue_t handle);
 int release_order(queue_entry_t *origin_qe, uint64_t order,
 		  odp_pool_t pool, int enq_called);
 void get_sched_order(queue_entry_t **origin_qe, uint64_t *order);
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync);
 void sched_enq_called(void);
 void sched_order_resolved(odp_buffer_hdr_t *buf_hdr);
 
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 09b0398..1bd0de6 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -123,6 +123,8 @@  int odp_queue_init_global(void)
 		/* init locks */
 		queue_entry_t *queue = get_qentry(i);
 		LOCK_INIT(&queue->s.lock);
+		odp_atomic_init_u64(&queue->s.sync_in, 0);
+		odp_atomic_init_u64(&queue->s.sync_out, 0);
 		queue->s.handle = queue_from_id(i);
 	}
 
@@ -599,6 +601,7 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 	if (queue_is_ordered(queue)) {
 		buf_hdr->origin_qe = queue;
 		buf_hdr->order = queue->s.order_in++;
+		buf_hdr->sync  = odp_atomic_fetch_inc_u64(&queue->s.sync_in);
 		buf_hdr->flags.sustain = 0;
 	} else {
 		buf_hdr->origin_qe = NULL;
@@ -646,6 +649,8 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 		if (queue_is_ordered(queue)) {
 			buf_hdr[i]->origin_qe = queue;
 			buf_hdr[i]->order     = queue->s.order_in++;
+			buf_hdr[i]->sync =
+				odp_atomic_fetch_inc_u64(&queue->s.sync_in);
 			buf_hdr[i]->flags.sustain = 0;
 		} else {
 			buf_hdr[i]->origin_qe = NULL;
@@ -960,3 +965,46 @@  int release_order(queue_entry_t *origin_qe, uint64_t order,
 	UNLOCK(&origin_qe->s.lock);
 	return 0;
 }
+
+/* This routine is a no-op in linux-generic */
+int odp_schedule_order_lock_init(odp_schedule_order_lock_t *lock ODP_UNUSED,
+				 odp_queue_t queue ODP_UNUSED)
+{
+	return 0;
+}
+
+void odp_schedule_order_lock(odp_schedule_order_lock_t *lock ODP_UNUSED)
+{
+	queue_entry_t *origin_qe;
+	uint64_t *sync;
+
+	get_sched_sync(&origin_qe, &sync);
+	if (!origin_qe)
+		return;
+
+	/* Wait until we are in order. Note that sync_out will be incremented
+	 * both by unlocks as well as order resolution, so we're OK if only
+	 * some events in the ordered flow need to lock.
+	 */
+	while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out))
+		odp_spin();
+}
+
+void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock ODP_UNUSED)
+{
+	queue_entry_t *origin_qe;
+	uint64_t *sync;
+
+	get_sched_sync(&origin_qe, &sync);
+	if (!origin_qe)
+		return;
+
+	/* Get a new sync order for reusability, and release the lock. Note
+	 * that this must be done in this sequence to prevent race conditions
+	 * where the next waiter could lock and unlock before we're able to
+	 * get a new sync order since that would cause order inversion on
+	 * subsequent locks we may perform in this ordered context.
+	 */
+	*sync = odp_atomic_fetch_inc_u64(&origin_qe->s.sync_in);
+	odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+}
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c
index 1c6cdbb..dc2d75f 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -86,6 +86,7 @@  typedef struct {
 	queue_entry_t *qe;
 	queue_entry_t *origin_qe;
 	uint64_t order;
+	uint64_t sync;
 	odp_pool_t pool;
 	int enq_called;
 	int num;
@@ -799,6 +800,12 @@  void get_sched_order(queue_entry_t **origin_qe, uint64_t *order)
 	*order     = sched_local.order;
 }
 
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync)
+{
+	*origin_qe = sched_local.origin_qe;
+	*sync      = &sched_local.sync;
+}
+
 void sched_order_resolved(odp_buffer_hdr_t *buf_hdr)
 {
 	if (buf_hdr)