diff mbox

[API-NEXT,PATCHv5,4/9] linux-generic: queue: implement ordered queues

Message ID 1438564655-12686-5-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 3, 2015, 1:17 a.m. UTC
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_buffer_internal.h    |   5 +
 .../linux-generic/include/odp_queue_internal.h     |   4 +
 .../linux-generic/include/odp_schedule_internal.h  |   2 +-
 platform/linux-generic/odp_pool.c                  |   3 +
 platform/linux-generic/odp_queue.c                 | 144 ++++++++++++++++++++-
 platform/linux-generic/odp_schedule.c              |   2 -
 6 files changed, 151 insertions(+), 9 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index ae799dd..c459fce 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -103,6 +103,8 @@  typedef union odp_buffer_bits_t {
 
 /* forward declaration */
 struct odp_buffer_hdr_t;
+union queue_entry_u;
+typedef union queue_entry_u queue_entry_t;
 
 /* Common buffer header */
 typedef struct odp_buffer_hdr_t {
@@ -131,6 +133,9 @@  typedef struct odp_buffer_hdr_t {
 	uint32_t                 segcount;   /* segment count */
 	uint32_t                 segsize;    /* segment size */
 	void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
+	uint64_t                 order;      /* sequence for ordered queues */
+	queue_entry_t           *origin_qe;  /* ordered queue origin */
+	queue_entry_t           *target_qe;  /* ordered queue target */
 } odp_buffer_hdr_t;
 
 /** @internal Compile time assert that the
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 61d0c43..9cca552 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -77,6 +77,10 @@  struct queue_entry_s {
 	odp_pktio_t       pktin;
 	odp_pktio_t       pktout;
 	char              name[ODP_QUEUE_NAME_LEN];
+	uint64_t          order_in;
+	uint64_t          order_out;
+	odp_buffer_hdr_t *reorder_head;
+	odp_buffer_hdr_t *reorder_tail;
 };
 
 typedef union queue_entry_u {
diff --git a/platform/linux-generic/include/odp_schedule_internal.h b/platform/linux-generic/include/odp_schedule_internal.h
index 4c6577d..6ea90fb 100644
--- a/platform/linux-generic/include/odp_schedule_internal.h
+++ b/platform/linux-generic/include/odp_schedule_internal.h
@@ -15,6 +15,7 @@  extern "C" {
 
 
 #include <odp/buffer.h>
+#include <odp_buffer_internal.h>
 #include <odp/queue.h>
 #include <odp/packet_io.h>
 #include <odp_queue_internal.h>
@@ -28,7 +29,6 @@  static inline int schedule_queue(const queue_entry_t *qe)
 	return odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev);
 }
 
-
 int schedule_pktio_start(odp_pktio_t pktio, int prio);
 
 
diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c
index 14221fd..30d4b2b 100644
--- a/platform/linux-generic/odp_pool.c
+++ b/platform/linux-generic/odp_pool.c
@@ -514,6 +514,9 @@  odp_buffer_t buffer_alloc(odp_pool_t pool_hdl, size_t size)
 	/* By default, buffers inherit their pool's zeroization setting */
 	buf->buf.flags.zeroized = pool->s.flags.zeroized;
 
+	/* By default, buffers are not associated with an ordered queue */
+	buf->buf.origin_qe = NULL;
+
 	if (buf->buf.type == ODP_EVENT_PACKET)
 		packet_init(pool, &buf->pkt, size);
 
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 4a0df11..4d0e1b4 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -27,11 +27,13 @@ 
 #define LOCK(a)      odp_ticketlock_lock(a)
 #define UNLOCK(a)    odp_ticketlock_unlock(a)
 #define LOCK_INIT(a) odp_ticketlock_init(a)
+#define LOCK_TRY(a)  odp_ticketlock_trylock(a)
 #else
 #include <odp/spinlock.h>
 #define LOCK(a)      odp_spinlock_lock(a)
 #define UNLOCK(a)    odp_spinlock_unlock(a)
 #define LOCK_INIT(a) odp_spinlock_init(a)
+#define LOCK_TRY(a)  odp_spinlock_trylock(a)
 #endif
 
 #include <string.h>
@@ -89,6 +91,9 @@  static void queue_init(queue_entry_t *queue, const char *name,
 	queue->s.head = NULL;
 	queue->s.tail = NULL;
 
+	queue->s.reorder_head = NULL;
+	queue->s.reorder_tail = NULL;
+
 	queue->s.pri_queue = ODP_QUEUE_INVALID;
 	queue->s.cmd_ev    = ODP_EVENT_INVALID;
 }
@@ -329,14 +334,76 @@  odp_queue_t odp_queue_lookup(const char *name)
 int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 {
 	int sched = 0;
+	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+	/* Need two locks for enq operations from ordered queues */
+	if (origin_qe) {
+		LOCK(&origin_qe->s.lock);
+		while (!LOCK_TRY(&queue->s.lock)) {
+			UNLOCK(&origin_qe->s.lock);
+			LOCK(&origin_qe->s.lock);
+		}
+		if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+			UNLOCK(&queue->s.lock);
+			UNLOCK(&origin_qe->s.lock);
+			ODP_ERR("Bad origin queue status\n");
+			return -1;
+		}
+	} else {
+		LOCK(&queue->s.lock);
+	}
 
-	LOCK(&queue->s.lock);
 	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
 		UNLOCK(&queue->s.lock);
+		if (origin_qe)
+			UNLOCK(&origin_qe->s.lock);
 		ODP_ERR("Bad queue status\n");
 		return -1;
 	}
 
+	/* We can only complete the enq if we're in order */
+	if (origin_qe) {
+		if (buf_hdr->order > origin_qe->s.order_out) {
+			odp_buffer_hdr_t *reorder_buf =
+				origin_qe->s.reorder_head;
+
+			if (!reorder_buf) {
+				buf_hdr->next = NULL;
+				origin_qe->s.reorder_head = buf_hdr;
+				origin_qe->s.reorder_tail = buf_hdr;
+			} else {
+				odp_buffer_hdr_t *reorder_prev = NULL;
+
+				while (buf_hdr->order > reorder_buf->order) {
+					reorder_prev = reorder_buf;
+					reorder_buf  = reorder_buf->next;
+					if (!reorder_buf)
+						break;
+				}
+
+				buf_hdr->next = reorder_buf;
+				if (reorder_prev)
+					reorder_prev->next = buf_hdr;
+				else
+					origin_qe->s.reorder_head = buf_hdr;
+
+				if (!reorder_buf)
+					origin_qe->s.reorder_tail = buf_hdr;
+			}
+
+			buf_hdr->target_qe = queue;
+
+			/* This enq can't complete until order is restored, so
+			 * we're done here.
+			 */
+			UNLOCK(&queue->s.lock);
+			UNLOCK(&origin_qe->s.lock);
+			return 0;
+		}
+
+		origin_qe->s.order_out++;
+	}
+
 	if (queue->s.head == NULL) {
 		/* Empty queue */
 		queue->s.head = buf_hdr;
@@ -352,7 +419,48 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 		queue->s.status = QUEUE_STATUS_SCHED;
 		sched = 1; /* retval: schedule queue */
 	}
-	UNLOCK(&queue->s.lock);
+
+	/*
+	 * If we came from an ordered queue, check to see if our successful
+	 * enq has unblocked other buffers in the origin's reorder queue.
+	 */
+	if (origin_qe) {
+		odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+		odp_buffer_hdr_t *reorder_prev;
+		uint32_t          release_count = 0;
+
+		while (reorder_buf &&
+		       reorder_buf->target_qe == queue &&
+		       reorder_buf->order <=
+		       origin_qe->s.order_out + release_count) {
+			release_count++;
+			reorder_prev = reorder_buf;
+			reorder_buf  = reorder_buf->next;
+		}
+
+		/* Add released buffers to the queue as well */
+		if (release_count > 0) {
+			queue->s.tail->next       = origin_qe->s.reorder_head;
+			queue->s.tail             = reorder_prev;
+			origin_qe->s.reorder_head = reorder_prev->next;
+			reorder_prev->next        = NULL;
+			origin_qe->s.order_out   += release_count;
+		}
+
+		/* Now handle unblocked buffers destined for other queues */
+		if (reorder_buf &&
+		    reorder_buf->order <= origin_qe->s.order_out) {
+			UNLOCK(&origin_qe->s.lock);
+			UNLOCK(&queue->s.lock);
+			if (schedule_enq(reorder_buf->target_qe, origin_qe))
+				ODP_ABORT("schedule_enq failed\n");
+		} else {
+			UNLOCK(&origin_qe->s.lock);
+			UNLOCK(&queue->s.lock);
+		}
+	} else {
+		UNLOCK(&queue->s.lock);
+	}
 
 	/* Add queue to scheduling */
 	if (sched && schedule_queue(queue))
@@ -364,14 +472,26 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 {
 	int sched = 0;
-	int i;
+	int i, j;
 	odp_buffer_hdr_t *tail;
 
-	for (i = 0; i < num - 1; i++)
-		buf_hdr[i]->next = buf_hdr[i+1];
+	for (i = 0; i < num; i++) {
+		/* If any buffer is coming from an ordered queue, enqueue them
+		 * individually since in the general case each might originate
+		 * from a different ordered queue.  If any of these fail, the
+		 * return code tells the caller how many succeeded.
+		 */
+		if (buf_hdr[i]->origin_qe) {
+			for (j = 0; j < num; j++) {
+				if (queue_enq(queue, buf_hdr[j]))
+					return j;
+			}
+			return num;
+		}
+		buf_hdr[i]->next = i == num - 1 ? NULL : buf_hdr[i + 1];
+	}
 
 	tail = buf_hdr[num-1];
-	buf_hdr[num-1]->next = NULL;
 
 	LOCK(&queue->s.lock);
 	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
@@ -449,6 +569,12 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 	buf_hdr       = queue->s.head;
 	queue->s.head = buf_hdr->next;
 	buf_hdr->next = NULL;
+	if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
+		buf_hdr->origin_qe = queue;
+		buf_hdr->order     = queue->s.order_in++;
+	} else {
+		buf_hdr->origin_qe = NULL;
+	}
 
 	if (queue->s.head == NULL) {
 		/* Queue is now empty */
@@ -489,6 +615,12 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 		buf_hdr[i]       = hdr;
 		hdr              = hdr->next;
 		buf_hdr[i]->next = NULL;
+		if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
+			buf_hdr[i]->origin_qe = queue;
+			buf_hdr[i]->order     = queue->s.order_in++;
+		} else {
+			buf_hdr[i]->origin_qe = NULL;
+		}
 	}
 
 	queue->s.head = hdr;
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c
index 20dd850..df90df3 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -411,8 +411,6 @@  static inline int copy_events(odp_event_t out_ev[], unsigned int max)
 
 /*
  * Schedule queues
- *
- * TODO: SYNC_ORDERED not implemented yet
  */
 static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 		    unsigned int max_num, unsigned int max_deq)