diff mbox

[API-NEXT,PATCHv10,14/14] linux-generic: queue: add ordered support for pktout queues

Message ID 1439002992-29285-15-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Aug. 8, 2015, 3:03 a.m. UTC
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_buffer_internal.h    |   2 +-
 .../linux-generic/include/odp_queue_internal.h     | 116 ++++++++
 platform/linux-generic/odp_queue.c                 | 308 ++++++++++++++-------
 3 files changed, 320 insertions(+), 106 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index ddd2642..6badeba 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -108,7 +108,7 @@  typedef union queue_entry_u queue_entry_t;
 
 /* Common buffer header */
 typedef struct odp_buffer_hdr_t {
-	struct odp_buffer_hdr_t *next;       /* next buf in a list */
+	struct odp_buffer_hdr_t *next;       /* next buf in a list--keep 1st */
 	union {                              /* Multi-use secondary link */
 		struct odp_buffer_hdr_t *prev;
 		struct odp_buffer_hdr_t *link;
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index aa36df5..66aa887 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -23,6 +23,7 @@  extern "C" {
 #include <odp_align_internal.h>
 #include <odp/packet_io.h>
 #include <odp/align.h>
+#include <odp/hints.h>
 
 
 #define USE_TICKETLOCK
@@ -99,6 +100,10 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
 int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num);
 
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+int queue_pktout_enq_multi(queue_entry_t *queue,
+			   odp_buffer_hdr_t *buf_hdr[], int num);
+
 int queue_enq_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
 int queue_enq_multi_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 			  int num);
@@ -143,6 +148,117 @@  static inline int queue_prio(queue_entry_t *qe)
 	return qe->s.param.sched.prio;
 }
 
+static inline void reorder_enq(queue_entry_t *queue,
+			       queue_entry_t *origin_qe,
+			       odp_buffer_hdr_t *buf_hdr)
+{
+	odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+	odp_buffer_hdr_t *reorder_prev =
+		(odp_buffer_hdr_t *)&origin_qe->s.reorder_head;
+
+	while (reorder_buf && buf_hdr->order >= reorder_buf->order) {
+		reorder_prev = reorder_buf;
+		reorder_buf  = reorder_buf->next;
+	}
+
+	buf_hdr->next = reorder_buf;
+	reorder_prev->next = buf_hdr;
+
+	if (!reorder_buf)
+		origin_qe->s.reorder_tail = buf_hdr;
+
+	buf_hdr->target_qe = queue;
+}
+
+static inline void order_release(queue_entry_t *origin_qe, int count)
+{
+	origin_qe->s.order_out += count;
+	odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, count);
+}
+
+static inline void reorder_deq(queue_entry_t *queue,
+			       queue_entry_t *origin_qe,
+			       odp_buffer_hdr_t **reorder_buf_return,
+			       odp_buffer_hdr_t **reorder_prev_return,
+			       odp_buffer_hdr_t **placeholder_buf_return,
+			       uint32_t *release_count_return,
+			       uint32_t *placeholder_count_return)
+{
+	odp_buffer_hdr_t *reorder_buf     = origin_qe->s.reorder_head;
+	odp_buffer_hdr_t *reorder_prev    = NULL;
+	odp_buffer_hdr_t *placeholder_buf = NULL;
+	odp_buffer_hdr_t *next_buf;
+	uint32_t          release_count = 0;
+	uint32_t          placeholder_count = 0;
+
+	while (reorder_buf &&
+	       reorder_buf->order <= origin_qe->s.order_out +
+	       release_count + placeholder_count) {
+		/*
+		 * Elements on the reorder list fall into one of
+		 * three categories:
+		 *
+		 * 1. Those destined for the same queue.  These
+		 *    can be enq'd now if they were waiting to
+		 *    be unblocked by this enq.
+		 *
+		 * 2. Those representing placeholders for events
+		 *    whose ordering was released by a prior
+		 *    odp_schedule_release_ordered() call.  These
+		 *    can now just be freed.
+		 *
+		 * 3. Those representing events destined for another
+		 *    queue. These cannot be consolidated with this
+		 *    enq since they have a different target.
+		 *
+		 * Detecting an element with an order sequence gap, an
+		 * element in category 3, or running out of elements
+		 * stops the scan.
+		 */
+		next_buf = reorder_buf->next;
+
+		if (odp_likely(reorder_buf->target_qe == queue)) {
+			/* promote any chain */
+			odp_buffer_hdr_t *reorder_link =
+				reorder_buf->link;
+
+			if (reorder_link) {
+				reorder_buf->next = reorder_link;
+				reorder_buf->link = NULL;
+				while (reorder_link->next)
+					reorder_link = reorder_link->next;
+				reorder_link->next = next_buf;
+				reorder_prev = reorder_link;
+			} else {
+				reorder_prev = reorder_buf;
+			}
+
+			if (!reorder_buf->flags.sustain)
+				release_count++;
+			reorder_buf = next_buf;
+		} else if (!reorder_buf->target_qe) {
+			if (reorder_prev)
+				reorder_prev->next = next_buf;
+			else
+				origin_qe->s.reorder_head = next_buf;
+
+			reorder_buf->next = placeholder_buf;
+			placeholder_buf = reorder_buf;
+
+			reorder_buf = next_buf;
+			placeholder_count++;
+		} else {
+			break;
+		}
+	}
+
+	*reorder_buf_return = reorder_buf;
+	*reorder_prev_return = reorder_prev;
+	*placeholder_buf_return = placeholder_buf;
+	*release_count_return = release_count;
+	*placeholder_count_return = placeholder_count;
+}
+
 void queue_destroy_finalize(queue_entry_t *qe);
 
 #ifdef __cplusplus
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 2d999aa..674717a 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -77,9 +77,9 @@  static void queue_init(queue_entry_t *queue, const char *name,
 		queue->s.dequeue_multi = pktin_deq_multi;
 		break;
 	case ODP_QUEUE_TYPE_PKTOUT:
-		queue->s.enqueue = pktout_enqueue;
+		queue->s.enqueue = queue_pktout_enq;
 		queue->s.dequeue = pktout_dequeue;
-		queue->s.enqueue_multi = pktout_enq_multi;
+		queue->s.enqueue_multi = queue_pktout_enq_multi;
 		queue->s.dequeue_multi = pktout_deq_multi;
 		break;
 	default:
@@ -369,34 +369,7 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 	/* We can only complete the enq if we're in order */
 	if (origin_qe) {
 		if (buf_hdr->order > origin_qe->s.order_out) {
-			odp_buffer_hdr_t *reorder_buf =
-				origin_qe->s.reorder_head;
-
-			if (!reorder_buf) {
-				buf_hdr->next = NULL;
-				origin_qe->s.reorder_head = buf_hdr;
-				origin_qe->s.reorder_tail = buf_hdr;
-			} else {
-				odp_buffer_hdr_t *reorder_prev = NULL;
-
-				while (buf_hdr->order >= reorder_buf->order) {
-					reorder_prev = reorder_buf;
-					reorder_buf  = reorder_buf->next;
-					if (!reorder_buf)
-						break;
-				}
-
-				buf_hdr->next = reorder_buf;
-				if (reorder_prev)
-					reorder_prev->next = buf_hdr;
-				else
-					origin_qe->s.reorder_head = buf_hdr;
-
-				if (!reorder_buf)
-					origin_qe->s.reorder_tail = buf_hdr;
-			}
-
-			buf_hdr->target_qe = queue;
+			reorder_enq(queue, origin_qe, buf_hdr);
 
 			/* This enq can't complete until order is restored, so
 			 * we're done here.
@@ -407,10 +380,8 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 		}
 
 		/* We're in order, so account for this and proceed with enq */
-		if (!buf_hdr->flags.sustain) {
-			origin_qe->s.order_out++;
-			odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
-		}
+		if (!buf_hdr->flags.sustain)
+			order_release(origin_qe, 1);
 
 		/* if this element is linked, restore the linked chain */
 		buf_tail = buf_hdr->link;
@@ -450,74 +421,16 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 	 * enq has unblocked other buffers in the origin's reorder queue.
 	 */
 	if (origin_qe) {
-		odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
-		odp_buffer_hdr_t *reorder_prev = NULL;
-		odp_buffer_hdr_t *placeholder_buf = NULL;
+		odp_buffer_hdr_t *reorder_buf;
 		odp_buffer_hdr_t *next_buf;
-		uint32_t          release_count = 0;
-		uint32_t          placeholder_count = 0;
-
-		while (reorder_buf &&
-		       reorder_buf->order <= origin_qe->s.order_out +
-		       release_count + placeholder_count) {
-			/*
-			 * Elements on the reorder list fall into one of
-			 * three categories:
-			 *
-			 * 1. Those destined for the same queue.  These
-			 *    can be enq'd now if they were waiting to
-			 *    be unblocked by this enq.
-			 *
-			 * 2. Those representing placeholders for events
-			 *    whose ordering was released by a prior
-			 *    odp_schedule_release_ordered() call.  These
-			 *    can now just be freed.
-			 *
-			 * 3. Those representing events destined for another
-			 *    queue. These cannot be consolidated with this
-			 *    enq since they have a different target.
-			 *
-			 * Detecting an element with an order sequence gap, an
-			 * element in category 3, or running out of elements
-			 * stops the scan.
-			 */
-			next_buf = reorder_buf->next;
-
-			if (odp_likely(reorder_buf->target_qe == queue)) {
-				/* promote any chain */
-				odp_buffer_hdr_t *reorder_link =
-					reorder_buf->link;
-
-				if (reorder_link) {
-					reorder_buf->next = reorder_link;
-					reorder_buf->link = NULL;
-					while (reorder_link->next)
-						reorder_link =
-							reorder_link->next;
-					reorder_link->next = next_buf;
-					reorder_prev = reorder_link;
-				} else {
-					reorder_prev = reorder_buf;
-				}
-
-				if (!reorder_buf->flags.sustain)
-					release_count++;
-				reorder_buf = next_buf;
-			} else if (!reorder_buf->target_qe) {
-				if (reorder_prev)
-					reorder_prev->next = next_buf;
-				else
-					origin_qe->s.reorder_head = next_buf;
-
-				reorder_buf->next = placeholder_buf;
-				placeholder_buf = reorder_buf;
-
-				reorder_buf = next_buf;
-				placeholder_count++;
-			} else {
-				break;
-			}
-		}
+		odp_buffer_hdr_t *reorder_prev;
+		odp_buffer_hdr_t *placeholder_buf;
+		uint32_t          release_count;
+		uint32_t          placeholder_count;
+
+		reorder_deq(queue, origin_qe,
+			    &reorder_buf, &reorder_prev, &placeholder_buf,
+			    &release_count, &placeholder_count);
 
 		/* Add released buffers to the queue as well */
 		if (release_count > 0) {
@@ -528,18 +441,18 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 		}
 
 		/* Reflect the above two in the output sequence */
-		origin_qe->s.order_out += release_count + placeholder_count;
-		odp_atomic_fetch_add_u64(&origin_qe->s.sync_out,
-					 release_count + placeholder_count);
+		order_release(origin_qe, release_count + placeholder_count);
 
 		/* Now handle any unblocked buffers destined for other queues */
 		UNLOCK(&queue->s.lock);
+
 		if (reorder_buf &&
 		    reorder_buf->order <= origin_qe->s.order_out)
 			origin_qe->s.reorder_head = reorder_buf->next;
 		else
 			reorder_buf = NULL;
 		UNLOCK(&origin_qe->s.lock);
+
 		if (reorder_buf)
 			odp_queue_enq(reorder_buf->target_qe->s.handle,
 				      (odp_event_t)reorder_buf->handle.handle);
@@ -547,7 +460,7 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 		/* Free all placeholder bufs that are now released */
 		while (placeholder_buf) {
 			next_buf = placeholder_buf->next;
-			odp_buffer_free(buf_hdr->handle.handle);
+			odp_buffer_free(placeholder_buf->handle.handle);
 			placeholder_buf = next_buf;
 		}
 	} else {
@@ -799,6 +712,191 @@  odp_event_t odp_queue_deq(odp_queue_t handle)
 	return ODP_EVENT_INVALID;
 }
 
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
+{
+	queue_entry_t *origin_qe = buf_hdr->origin_qe;
+	int rc, sustain;
+
+	/* Special processing needed only if we came from an ordered queue */
+	if (!origin_qe)
+		return pktout_enqueue(queue, buf_hdr);
+
+	/* Must lock origin_qe for ordered processing */
+	LOCK(&origin_qe->s.lock);
+	if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+		UNLOCK(&origin_qe->s.lock);
+		ODP_ERR("Bad origin queue status\n");
+		return -1;
+	}
+
+	/* We can only complete the enq if we're in order */
+	if (buf_hdr->order > origin_qe->s.order_out) {
+		reorder_enq(queue, origin_qe, buf_hdr);
+
+		/* This enq can't complete until order is restored, so
+		 * we're done here.
+		 */
+		UNLOCK(&origin_qe->s.lock);
+		return 0;
+	}
+
+	/* Perform our enq since we're in order.
+	 * Note: Don't hold the origin_qe lock across an I/O operation!
+	 * Note that we also cache the sustain flag since the buffer may
+	 * be freed by the I/O operation so we can't reference it afterwards.
+	 */
+	UNLOCK(&origin_qe->s.lock);
+	sustain = buf_hdr->flags.sustain;
+
+	/* Handle any chained buffers (internal calls) */
+	if (buf_hdr->link) {
+		odp_buffer_hdr_t *buf_hdrs[QUEUE_MULTI_MAX];
+		odp_buffer_hdr_t *next_buf;
+		int num = 0;
+
+		next_buf = buf_hdr->link;
+		buf_hdr->link = NULL;
+
+		while (next_buf) {
+			buf_hdrs[num++] = next_buf;
+			next_buf = next_buf->next;
+		}
+
+		rc = pktout_enq_multi(queue, buf_hdrs, num);
+		if (rc < num)
+			return -1;
+	} else {
+		rc = pktout_enqueue(queue, buf_hdr);
+		if (!rc)
+			return rc;
+	}
+
+	/* Reacquire the lock following the I/O send. Note that we're still
+	 * guaranteed to be in order here since we haven't released
+	 * order yet.
+	 */
+	LOCK(&origin_qe->s.lock);
+	if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+		UNLOCK(&origin_qe->s.lock);
+		ODP_ERR("Bad origin queue status\n");
+		return -1;
+	}
+
+	/* Account for this ordered enq */
+	if (!sustain)
+		order_release(origin_qe, 1);
+
+	/* Now check to see if our successful enq has unblocked other buffers
+	 * in the origin's reorder queue.
+	 */
+	odp_buffer_hdr_t *reorder_buf;
+	odp_buffer_hdr_t *next_buf;
+	odp_buffer_hdr_t *reorder_prev;
+	odp_buffer_hdr_t *xmit_buf;
+	odp_buffer_hdr_t *placeholder_buf;
+	uint32_t          release_count;
+	uint32_t          placeholder_count;
+
+	reorder_deq(queue, origin_qe,
+		    &reorder_buf, &reorder_prev, &placeholder_buf,
+		    &release_count, &placeholder_count);
+
+	/* Send released buffers as well */
+	if (release_count > 0) {
+		xmit_buf = origin_qe->s.reorder_head;
+		origin_qe->s.reorder_head = reorder_prev->next;
+		reorder_prev->next = NULL;
+		UNLOCK(&origin_qe->s.lock);
+
+		do {
+			next_buf = xmit_buf->next;
+			pktout_enqueue(queue, xmit_buf);
+			xmit_buf = next_buf;
+		} while (xmit_buf);
+
+		/* Reacquire the origin_qe lock to continue */
+		LOCK(&origin_qe->s.lock);
+		if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+			UNLOCK(&origin_qe->s.lock);
+			ODP_ERR("Bad origin queue status\n");
+			return -1;
+		}
+	}
+
+	/* Update the order sequence to reflect the deq'd elements */
+	order_release(origin_qe, release_count + placeholder_count);
+
+	/* Now handle sends to other queues that are ready to go */
+	if (reorder_buf && reorder_buf->order <= origin_qe->s.order_out)
+		origin_qe->s.reorder_head = reorder_buf->next;
+	else
+		reorder_buf = NULL;
+
+	/* We're fully done with the origin_qe at last */
+	UNLOCK(&origin_qe->s.lock);
+
+	/* Now send the next buffer to its target queue */
+	if (reorder_buf)
+		odp_queue_enq(reorder_buf->target_qe->s.handle,
+			      (odp_event_t)reorder_buf->handle.handle);
+
+	/* Free all placeholder bufs that are now released */
+	while (placeholder_buf) {
+		next_buf = placeholder_buf->next;
+		odp_buffer_free(placeholder_buf->handle.handle);
+		placeholder_buf = next_buf;
+	}
+
+	return 0;
+}
+
+int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+			   int num)
+{
+	int i, rc, ret_count = 0;
+	int ordered_head[num];
+	int ordered_count = 0;
+
+	/* Identify ordered chains in the input buffer list */
+	for (i = 0; i < num; i++) {
+		if (buf_hdr[i]->origin_qe)
+			ordered_head[ordered_count++] = i;
+
+		buf_hdr[i]->next = i < num - 1 ? buf_hdr[i + 1] : NULL;
+	}
+
+	ret_count = ordered_count ? ordered_head[0] : num;
+
+	/* Handle regular enq's at start of list */
+	if (ret_count) {
+		rc = pktout_enq_multi(queue, buf_hdr, ret_count);
+		if (rc < ret_count)
+			return rc;
+	}
+
+	/* Handle ordered chains in the list */
+	for (i = 0; i < ordered_count; i++) {
+		int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num;
+		int list_count = eol - i;
+
+		if (i < ordered_count - 1)
+			buf_hdr[eol - 1]->next = NULL;
+
+		buf_hdr[ordered_head[i]]->link =
+			list_count > 1 ? buf_hdr[ordered_head[i] + 1] : NULL;
+
+		rc = queue_pktout_enq(queue, buf_hdr[ordered_head[i]]);
+		if (rc < 0)
+			return ret_count;
+
+		if (rc < list_count)
+			return ret_count + rc;
+
+		ret_count += rc;
+	}
+
+	return ret_count;
+}
 
 void queue_lock(queue_entry_t *queue)
 {