diff mbox

[API-NEXT,PATCHv4,4/8] linux-generic: queue: add ordered_queue_enq() routine - part 2

Message ID 1447166821-24585-5-git-send-email-bill.fischofer@linaro.org
State Accepted
Commit a665dc76b37c78a85d69412fcf686d4ecdd3425b
Headers show

Commit Message

Bill Fischofer Nov. 10, 2015, 2:46 p.m. UTC
Restructure queue_enq() to streamline it while using the new
ordered_queue_enq() routine to handle ordered queues. This change is
made in two parts to make the diffs easier to follow.

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 platform/linux-generic/odp_queue.c | 134 ++++---------------------------------
 1 file changed, 13 insertions(+), 121 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index a545927..d9be4b3 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -386,146 +386,37 @@  odp_queue_t odp_queue_lookup(const char *name)
 	return ODP_QUEUE_INVALID;
 }
 
-
 int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 {
-	int sched = 0;
 	queue_entry_t *origin_qe;
 	uint64_t order;
-	odp_buffer_hdr_t *buf_tail;
 
 	get_queue_order(&origin_qe, &order, buf_hdr);
 
-	/* Need two locks for enq operations from ordered queues */
-	if (origin_qe) {
-		get_qe_locks(origin_qe, queue);
-		if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
-			UNLOCK(&queue->s.lock);
-			if (origin_qe != queue)
-				UNLOCK(&origin_qe->s.lock);
-			ODP_ERR("Bad origin queue status\n");
-			ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
-				queue->s.name, origin_qe->s.name, buf_hdr);
-			return -1;
-		}
-	} else {
-		LOCK(&queue->s.lock);
-	}
+	/* Handle enqueues from ordered queues separately */
+	if (origin_qe)
+		return ordered_queue_enq(queue, buf_hdr, sustain,
+					 origin_qe, order);
+
+	LOCK(&queue->s.lock);
 
 	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
 		UNLOCK(&queue->s.lock);
-		if (origin_qe && origin_qe != queue)
-			UNLOCK(&origin_qe->s.lock);
 		ODP_ERR("Bad queue status\n");
 		return -1;
 	}
 
-	/* We can only complete the enq if we're in order */
-	if (origin_qe) {
-		sched_enq_called();
-		if (order > origin_qe->s.order_out) {
-			reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
-
-			/* This enq can't complete until order is restored, so
-			 * we're done here.
-			 */
-			UNLOCK(&queue->s.lock);
-			if (origin_qe != queue)
-				UNLOCK(&origin_qe->s.lock);
-			return 0;
-		}
-
-		/* We're in order, so account for this and proceed with enq */
-		if (!sustain) {
-			order_release(origin_qe, 1);
-			sched_order_resolved(buf_hdr);
-		}
-
-		/* if this element is linked, restore the linked chain */
-		buf_tail = buf_hdr->link;
-
-		if (buf_tail) {
-			buf_hdr->next = buf_tail;
-			buf_hdr->link = NULL;
-
-			/* find end of the chain */
-			while (buf_tail->next)
-				buf_tail = buf_tail->next;
-		} else {
-			buf_tail = buf_hdr;
-		}
-	} else {
-		buf_tail = buf_hdr;
-	}
-
-	if (!queue->s.head) {
-		/* Empty queue */
-		queue->s.head = buf_hdr;
-		queue->s.tail = buf_tail;
-		buf_tail->next = NULL;
-	} else {
-		queue->s.tail->next = buf_hdr;
-		queue->s.tail = buf_tail;
-		buf_tail->next = NULL;
-	}
+	queue_add(queue, buf_hdr);
 
 	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
 		queue->s.status = QUEUE_STATUS_SCHED;
-		sched = 1; /* retval: schedule queue */
-	}
-
-	/*
-	 * If we came from an ordered queue, check to see if our successful
-	 * enq has unblocked other buffers in the origin's reorder queue.
-	 */
-	if (origin_qe) {
-		odp_buffer_hdr_t *reorder_buf;
-		odp_buffer_hdr_t *next_buf;
-		odp_buffer_hdr_t *reorder_prev;
-		odp_buffer_hdr_t *placeholder_buf;
-		int               deq_count, release_count, placeholder_count;
-
-		deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
-					&reorder_prev, &placeholder_buf,
-					&release_count, &placeholder_count);
-
-		/* Add released buffers to the queue as well */
-		if (deq_count > 0) {
-			queue->s.tail->next       = origin_qe->s.reorder_head;
-			queue->s.tail             = reorder_prev;
-			origin_qe->s.reorder_head = reorder_prev->next;
-			reorder_prev->next        = NULL;
-		}
-
-		/* Reflect resolved orders in the output sequence */
-		order_release(origin_qe, release_count + placeholder_count);
-
-		/* Now handle any unblocked complete buffers destined for
-		 * other queues, appending placeholder bufs as needed.
-		 */
-		if (origin_qe != queue)
-			UNLOCK(&queue->s.lock);
-		reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
-				 1, 0);
-		UNLOCK(&origin_qe->s.lock);
-
-		if (reorder_buf)
-			queue_enq_internal(reorder_buf);
-
-		/* Free all placeholder bufs that are now released */
-		while (placeholder_buf) {
-			next_buf = placeholder_buf->next;
-			odp_buffer_free(placeholder_buf->handle.handle);
-			placeholder_buf = next_buf;
-		}
-	} else {
 		UNLOCK(&queue->s.lock);
+		if (schedule_queue(queue))
+			ODP_ABORT("schedule_queue failed\n");
+		return 0;
 	}
 
-	/* Add queue to scheduling */
-	if (sched && schedule_queue(queue))
-		ODP_ABORT("schedule_queue failed\n");
-
+	UNLOCK(&queue->s.lock);
 	return 0;
 }
 
@@ -667,7 +558,8 @@  int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 	get_queue_order(&origin_qe, &order, buf_hdr[0]);
 	if (origin_qe) {
 		buf_hdr[0]->link = buf_hdr[0]->next;
-		rc = queue_enq(queue, buf_hdr[0], sustain);
+		rc = ordered_queue_enq(queue, buf_hdr[0], sustain,
+				       origin_qe, order);
 		return rc == 0 ? num : rc;
 	}