diff mbox

[API-NEXT,PATCHv2] linux-generic: queue: yield trying to obtain multiple locks

Message ID 27228BE6FE51F54FBE6396D376D2BD711155263D@DEMUMBX012.nsn-intra.net
State New
Headers show

Commit Message

Wallen, Carl (Nokia - FI/Espoo) Nov. 2, 2015, 1:43 p.m. UTC
Reviewed-by: Carl Wallen <carl.wallen@nokia.com>


-----Original Message-----
From: EXT Bill Fischofer [mailto:bill.fischofer@linaro.org] 
Sent: Friday, October 30, 2015 10:33 PM
To: lng-odp@lists.linaro.org; Wallen, Carl (Nokia - FI/Espoo) <carl.wallen@nokia.com>
Cc: Bill Fischofer <bill.fischofer@linaro.org>
Subject: [API-NEXT PATCHv2] linux-generic: queue: yield trying to obtain multiple locks

To avoid deadlock, especially on a single core, force an explicit
yield while not holding either lock when attempting to acquire multiple
locks for ordered queue processing. Also handle enqueues to self as in
this case the origin and target queues share a single lock.

This addresses the aspect of Bug
https://bugs.linaro.org/show_bug.cgi?id=1879
relating to deadlock in unicore systems.

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 platform/linux-generic/odp_queue.c | 47 ++++++++++++++++++++++++++++++--------
 1 file changed, 38 insertions(+), 9 deletions(-)
diff mbox

Patch

diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index a27af0b..4366683 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -48,6 +48,36 @@  typedef struct queue_table_t {
 static queue_table_t *queue_tbl;
 
 
+static inline void get_qe_locks(queue_entry_t *qe1, queue_entry_t *qe2)
+{
+	int i;
+
+	/* Special case: enq to self */
+	if (qe1 == qe2) {
+		LOCK(&qe1->s.lock);
+		return;
+	}
+
+       /* Enq to another queue. Issue is that since any queue can be either
+	* origin or target we can't have a static lock hierarchy. Strategy is
+	* to get one lock then attempt to get the other. If the second lock
+	* attempt fails, release the first and try again.  Note that in single
+	* CPU mode we require the explicit yield since otherwise we may never
+	* resolve unless the scheduler happens to timeslice exactly when we
+	* hold no lock.
+	*/
+	while (1) {
+		for (i = 0; i < 10; i++) {
+			LOCK(&qe1->s.lock);
+			if (LOCK_TRY(&qe2->s.lock))
+				return;
+			UNLOCK(&qe1->s.lock);
+			odp_sync_stores();
+		}
+		sched_yield();
+	}
+}
+
 queue_entry_t *get_qentry(uint32_t queue_id)
 {
 	return &queue_tbl->queue[queue_id];
@@ -370,14 +400,11 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 
 	/* Need two locks for enq operations from ordered queues */
 	if (origin_qe) {
-		LOCK(&origin_qe->s.lock);
-		while (!LOCK_TRY(&queue->s.lock)) {
-			UNLOCK(&origin_qe->s.lock);
-			LOCK(&origin_qe->s.lock);
-		}
+		get_qe_locks(origin_qe, queue);
 		if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
 			UNLOCK(&queue->s.lock);
-			UNLOCK(&origin_qe->s.lock);
+			if (origin_qe != queue)
+				UNLOCK(&origin_qe->s.lock);
 			ODP_ERR("Bad origin queue status\n");
 			ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
 				queue->s.name, origin_qe->s.name, buf_hdr);
@@ -389,7 +416,7 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 
 	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
 		UNLOCK(&queue->s.lock);
-		if (origin_qe)
+		if (origin_qe && origin_qe != queue)
 			UNLOCK(&origin_qe->s.lock);
 		ODP_ERR("Bad queue status\n");
 		return -1;
@@ -405,7 +432,8 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 			 * we're done here.
 			 */
 			UNLOCK(&queue->s.lock);
-			UNLOCK(&origin_qe->s.lock);
+			if (origin_qe != queue)
+				UNLOCK(&origin_qe->s.lock);
 			return 0;
 		}
 
@@ -477,7 +505,8 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 		/* Now handle any unblocked complete buffers destined for
 		 * other queues, appending placeholder bufs as needed.
 		 */
-		UNLOCK(&queue->s.lock);
+		if (origin_qe != queue)
+			UNLOCK(&queue->s.lock);
 		reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
 				 1, 0);
 		UNLOCK(&origin_qe->s.lock);