diff mbox

[API-NEXT] linux-generic: queue: use locking hierarchy for ordered queueing

Message ID 1446480495-21778-1-git-send-email-bill.fischofer@linaro.org
State Superseded
Headers show

Commit Message

Bill Fischofer Nov. 2, 2015, 4:08 p.m. UTC
Enqueueing to ordered queues requires two locks (source and target
queues). Since any queue can be either source or target, queues do not
have a natural locking hierarchy. Create one by using the address of
the qentry as the lock order.

This addresses the aspect of Bug
https://bugs.linaro.org/show_bug.cgi?id=1879
relating to deadlock in unicore systems.

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 platform/linux-generic/odp_queue.c | 39 +++++++++++++++++++++++++++++---------
 1 file changed, 30 insertions(+), 9 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index a27af0b..1c15e17 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -48,6 +48,28 @@  typedef struct queue_table_t {
 static queue_table_t *queue_tbl;
 
 
+static inline void get_qe_locks(queue_entry_t *qe1, queue_entry_t *qe2)
+{
+	/* Special case: enq to self */
+	if (qe1 == qe2) {
+		LOCK(&qe1->s.lock);
+		return;
+	}
+
+       /* Since any queue can be either a source or target, queues do not have
+	* a natural locking hierarchy.  Create one by using the qentry address
+	* as the ordering mechanism.
+	*/
+
+	if (qe1 < qe2) {
+		LOCK(&qe1->s.lock);
+		LOCK(&qe2->s.lock);
+	} else {
+		LOCK(&qe2->s.lock);
+		LOCK(&qe1->s.lock);
+	}
+}
+
 queue_entry_t *get_qentry(uint32_t queue_id)
 {
 	return &queue_tbl->queue[queue_id];
@@ -370,14 +392,11 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 
 	/* Need two locks for enq operations from ordered queues */
 	if (origin_qe) {
-		LOCK(&origin_qe->s.lock);
-		while (!LOCK_TRY(&queue->s.lock)) {
-			UNLOCK(&origin_qe->s.lock);
-			LOCK(&origin_qe->s.lock);
-		}
+		get_qe_locks(origin_qe, queue);
 		if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
 			UNLOCK(&queue->s.lock);
-			UNLOCK(&origin_qe->s.lock);
+			if (origin_qe != queue)
+				UNLOCK(&origin_qe->s.lock);
 			ODP_ERR("Bad origin queue status\n");
 			ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
 				queue->s.name, origin_qe->s.name, buf_hdr);
@@ -389,7 +408,7 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 
 	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
 		UNLOCK(&queue->s.lock);
-		if (origin_qe)
+		if (origin_qe && origin_qe != queue)
 			UNLOCK(&origin_qe->s.lock);
 		ODP_ERR("Bad queue status\n");
 		return -1;
@@ -405,7 +424,8 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 			 * we're done here.
 			 */
 			UNLOCK(&queue->s.lock);
-			UNLOCK(&origin_qe->s.lock);
+			if (origin_qe != queue)
+				UNLOCK(&origin_qe->s.lock);
 			return 0;
 		}
 
@@ -477,7 +497,8 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 		/* Now handle any unblocked complete buffers destined for
 		 * other queues, appending placeholder bufs as needed.
 		 */
-		UNLOCK(&queue->s.lock);
+		if (origin_qe != queue)
+			UNLOCK(&queue->s.lock);
 		reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
 				 1, 0);
 		UNLOCK(&origin_qe->s.lock);