diff mbox

[6/7] linux-generic: queue: correct handling of reorder completion

Message ID 1441238841-25105-7-git-send-email-bill.fischofer@linaro.org
State Superseded
Headers show

Commit Message

Bill Fischofer Sept. 3, 2015, 12:07 a.m. UTC
Fix a race condition that could cause events to be stuck in an ordered
queue's reorder queue.

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../linux-generic/include/odp_queue_internal.h     | 36 ++++++++++++----
 platform/linux-generic/odp_queue.c                 | 48 +++++++++-------------
 2 files changed, 48 insertions(+), 36 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index f285ea3..48576bc 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -284,18 +284,38 @@  static inline int reorder_deq(queue_entry_t *queue,
 	return deq_count;
 }
 
-static inline int reorder_complete(odp_buffer_hdr_t *reorder_buf)
+static inline void reorder_complete(queue_entry_t *origin_qe,
+				    odp_buffer_hdr_t **reorder_buf_return,
+				    odp_buffer_hdr_t **placeholder_buf,
+				    int placeholder_append)
 {
-	odp_buffer_hdr_t *next_buf = reorder_buf->next;
-	uint64_t order = reorder_buf->order;
+	odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+	odp_buffer_hdr_t *next_buf;
+
+	*reorder_buf_return = NULL;
+	if (!placeholder_append)
+		*placeholder_buf = NULL;
 
-	while (reorder_buf->flags.sustain &&
-	       next_buf && next_buf->order == order) {
-		reorder_buf = next_buf;
+	while (reorder_buf &&
+	       reorder_buf->order <= origin_qe->s.order_out) {
 		next_buf = reorder_buf->next;
-	}
 
-	return !reorder_buf->flags.sustain;
+		if (!reorder_buf->target_qe) {
+			origin_qe->s.reorder_head = next_buf;
+			reorder_buf->next         = *placeholder_buf;
+			*placeholder_buf          = reorder_buf;
+
+			reorder_buf = next_buf;
+			order_release(origin_qe, 1);
+		} else if (reorder_buf->flags.sustain) {
+			reorder_buf = next_buf;
+		} else {
+			*reorder_buf_return = origin_qe->s.reorder_head;
+			origin_qe->s.reorder_head =
+				origin_qe->s.reorder_head->next;
+			break;
+		}
+	}
 }
 
 static inline void get_queue_order(queue_entry_t **origin_qe, uint64_t *order,
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 7b7cb0a..17e02e1 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -457,18 +457,10 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 		order_release(origin_qe, release_count + placeholder_count);
 
 		/* Now handle any unblocked complete buffers destined for
-		 * other queues.  Note that these must be complete because
-		 * otherwise another thread is working on it and is
-		 * responsible for resolving order when it is complete.
+		 * other queues, appending placeholder bufs as needed.
 		 */
 		UNLOCK(&queue->s.lock);
-
-		if (reorder_buf &&
-		    reorder_buf->order <= origin_qe->s.order_out &&
-		    reorder_complete(reorder_buf))
-			origin_qe->s.reorder_head = reorder_buf->next;
-		else
-			reorder_buf = NULL;
+		reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
 		UNLOCK(&origin_qe->s.lock);
 
 		if (reorder_buf)
@@ -826,12 +818,7 @@  int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
 	order_release(origin_qe, release_count + placeholder_count);
 
 	/* Now handle sends to other queues that are ready to go */
-	if (reorder_buf &&
-	    reorder_buf->order <= origin_qe->s.order_out &&
-	    reorder_complete(reorder_buf))
-		origin_qe->s.reorder_head = reorder_buf->next;
-	else
-		reorder_buf = NULL;
+	reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1);
 
 	/* We're fully done with the origin_qe at last */
 	UNLOCK(&origin_qe->s.lock);
@@ -909,23 +896,28 @@  int release_order(queue_entry_t *origin_qe, uint64_t order,
 		order_release(origin_qe, 1);
 
 		/* Check if this release allows us to unblock waiters.
-		 * Note that we can only process complete waiters since
-		 * if the sustain bit is set for a buffer this means that
-		 * some other thread is working on it and will be
-		 * responsible for resolving order when it is complete.
+		 * At the point of this call, the reorder list may contain
+		 * zero or more placeholders that need to be freed, followed
+		 * by zero or one complete reorder buffer chain.
 		 */
-		reorder_buf = origin_qe->s.reorder_head;
-
-		if (reorder_buf &&
-		    reorder_buf->order <= origin_qe->s.order_out &&
-		    reorder_complete(reorder_buf))
-			origin_qe->s.reorder_head = reorder_buf->next;
-		else
-			reorder_buf = NULL;
+		reorder_complete(origin_qe, &reorder_buf,
+				 &placeholder_buf_hdr, 0);
 
+		/* Now safe to unlock */
 		UNLOCK(&origin_qe->s.lock);
+
+		/* If reorder_buf has a target, do the enq now */
 		if (reorder_buf)
 			queue_enq_internal(reorder_buf);
+
+		while (placeholder_buf_hdr) {
+			odp_buffer_hdr_t *placeholder_next =
+				placeholder_buf_hdr->next;
+
+			odp_buffer_free(placeholder_buf_hdr->handle.handle);
+			placeholder_buf_hdr = placeholder_next;
+		}
+
 		return 0;
 	}