diff mbox

[API-NEXT,PATCHv5,6/9] linux-generic: schedule: implement odp_schedule_release_ordered()

Message ID 1438564655-12686-7-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 3, 2015, 1:17 a.m. UTC
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 platform/linux-generic/odp_queue.c | 169 +++++++++++++++++++++++++++++++++----
 1 file changed, 152 insertions(+), 17 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 4d0e1b4..a2460e7 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -14,6 +14,7 @@ 
 #include <odp_buffer_inlines.h>
 #include <odp_internal.h>
 #include <odp/shared_memory.h>
+#include <odp/schedule.h>
 #include <odp_schedule_internal.h>
 #include <odp/config.h>
 #include <odp_packet_io_internal.h>
@@ -401,6 +402,7 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 			return 0;
 		}
 
+		/* We're in order, so account for this and proceed with enq */
 		origin_qe->s.order_out++;
 	}
 
@@ -426,16 +428,56 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 	 */
 	if (origin_qe) {
 		odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
-		odp_buffer_hdr_t *reorder_prev;
+		odp_buffer_hdr_t *reorder_prev = NULL;
+		odp_buffer_hdr_t *placeholder_buf = NULL;
+		odp_buffer_hdr_t *next_buf;
 		uint32_t          release_count = 0;
+		uint32_t          placeholder_count = 0;
 
 		while (reorder_buf &&
-		       reorder_buf->target_qe == queue &&
-		       reorder_buf->order <=
-		       origin_qe->s.order_out + release_count) {
-			release_count++;
-			reorder_prev = reorder_buf;
-			reorder_buf  = reorder_buf->next;
+		       reorder_buf->order <= origin_qe->s.order_out +
+		       release_count + placeholder_count) {
+			/*
+			 * Elements on the reorder list fall into one of
+			 * three categories:
+			 *
+			 * 1. Those destined for the same queue.  These
+			 *    can be enq'd now if they were waiting to
+			 *    be unblocked by this enq.
+			 *
+			 * 2. Those representing placeholders for events
+			 *    whose ordering was released by a prior
+			 *    odp_schedule_release_ordered() call.  These
+			 *    can now just be freed.
+			 *
+			 * 3. Those representing events destined for another
+			 *    queue. These cannot be consolidated with this
+			 *    enq since they have a different target.
+			 *
+			 * Detecting an element with an order sequence gap, an
+			 * element in category 3, or running out of elements
+			 * stops the scan.
+			 */
+			next_buf = reorder_buf->next;
+
+			if (odp_likely(reorder_buf->target_qe == queue)) {
+				reorder_prev = reorder_buf;
+				reorder_buf  = next_buf;
+				release_count++;
+			} else if (!reorder_buf->target_qe) {
+				if (reorder_prev)
+					reorder_prev->next = next_buf;
+				else
+					origin_qe->s.reorder_head = next_buf;
+
+				reorder_buf->next = placeholder_buf;
+				placeholder_buf = reorder_buf;
+
+				reorder_buf = next_buf;
+				placeholder_count++;
+			} else {
+				break;
+			}
 		}
 
 		/* Add released buffers to the queue as well */
@@ -444,19 +486,28 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 			queue->s.tail             = reorder_prev;
 			origin_qe->s.reorder_head = reorder_prev->next;
 			reorder_prev->next        = NULL;
-			origin_qe->s.order_out   += release_count;
 		}
 
-		/* Now handle unblocked buffers destined for other queues */
+		/* Reflect the above two in the output sequence */
+		origin_qe->s.order_out += release_count + placeholder_count;
+
+		/* Now handle any unblocked buffers destined for other queues */
+		UNLOCK(&queue->s.lock);
 		if (reorder_buf &&
-		    reorder_buf->order <= origin_qe->s.order_out) {
-			UNLOCK(&origin_qe->s.lock);
-			UNLOCK(&queue->s.lock);
-			if (schedule_enq(reorder_buf->target_qe, origin_qe))
-				ODP_ABORT("schedule_enq failed\n");
-		} else {
-			UNLOCK(&origin_qe->s.lock);
-			UNLOCK(&queue->s.lock);
+		    reorder_buf->order <= origin_qe->s.order_out)
+			origin_qe->s.reorder_head = reorder_buf->next;
+		else
+			reorder_buf = NULL;
+		UNLOCK(&origin_qe->s.lock);
+		if (reorder_buf)
+			odp_queue_enq(reorder_buf->target_qe->s.handle,
+				      (odp_event_t)reorder_buf->handle.handle);
+
+		/* Free all placeholder bufs that are now released */
+		while (placeholder_buf) {
+			next_buf = placeholder_buf->next;
+			odp_buffer_free(buf_hdr->handle.handle);
+			placeholder_buf = next_buf;
 		}
 	} else {
 		UNLOCK(&queue->s.lock);
@@ -685,3 +736,87 @@  void odp_queue_param_init(odp_queue_param_t *params)
 {
 	memset(params, 0, sizeof(odp_queue_param_t));
 }
+
+/* This routine exists here rather than in odp_schedule
+ * because it operates on queue interenal structures
+ */
+int odp_schedule_release_ordered(odp_event_t ev)
+{
+	odp_buffer_t placeholder_buf;
+	odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *reorder_prev;
+	odp_buffer_hdr_t *buf_hdr =
+		odp_buf_to_hdr(odp_buffer_from_event(ev));
+	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+	/* Can't release if we didn't originate from an ordered queue */
+	if (!origin_qe)
+		return -1;
+
+	LOCK(&origin_qe->s.lock);
+
+	/* If we are the first or second element beyond the next in order,
+	 * we can release immediately since there can be no confusion about
+	 * intermediate elements
+	 */
+	if (buf_hdr->order <= origin_qe->s.order_out + 1) {
+		buf_hdr->origin_qe = NULL;
+		origin_qe->s.order_out++;
+
+		/* check if this release allows us to unblock waiters */
+		reorder_buf = origin_qe->s.reorder_head;
+		if (reorder_buf &&
+		    reorder_buf->order <= origin_qe->s.order_out)
+			origin_qe->s.reorder_head = reorder_buf->next;
+		else
+			reorder_buf = NULL;
+		UNLOCK(&origin_qe->s.lock);
+		if (reorder_buf)
+			odp_queue_enq(reorder_buf->target_qe->s.handle,
+				      (odp_event_t)reorder_buf->handle.handle);
+		return 0;
+	}
+
+	/* If we are beyond the second element in the expected order, we need
+	 * a placeholder to represent our "place in line". Just use an element
+	 * from the same pool the buffer being released is from.
+	 */
+	placeholder_buf = odp_buffer_alloc(buf_hdr->pool_hdl);
+
+	/* Can't release if no placeholder is available */
+	if (placeholder_buf == ODP_BUFFER_INVALID) {
+		UNLOCK(&origin_qe->s.lock);
+		return -1;
+	}
+
+	placeholder_buf_hdr = odp_buf_to_hdr(placeholder_buf);
+	reorder_buf = origin_qe->s.reorder_head;
+
+	if (!reorder_buf) {
+		placeholder_buf_hdr->next = NULL;
+		origin_qe->s.reorder_head = placeholder_buf_hdr;
+		origin_qe->s.reorder_tail = placeholder_buf_hdr;
+	} else {
+		reorder_prev = NULL;
+
+		while (buf_hdr->order > reorder_buf->order) {
+			reorder_prev = reorder_buf;
+			reorder_buf  = reorder_buf->next;
+			if (!reorder_buf)
+				break;
+		}
+
+		placeholder_buf_hdr->next = reorder_buf;
+		if (reorder_prev)
+			reorder_prev->next = placeholder_buf_hdr;
+		else
+			origin_qe->s.reorder_head = placeholder_buf_hdr;
+
+		if (!reorder_buf)
+			origin_qe->s.reorder_tail = placeholder_buf_hdr;
+	}
+
+	placeholder_buf_hdr->target_qe = NULL;
+
+	UNLOCK(&origin_qe->s.lock);
+	return 0;
+}