diff mbox series

[v1,4/6] linux-gen: sched: remove queue_destroy_finalize callback

Message ID 1536674416-8465-5-git-send-email-odpbot@yandex.ru
State New
Headers show
Series [v1,1/6] linux-gen: queue: remove extra checks | expand

Commit Message

Github ODP bot Sept. 11, 2018, 2 p.m. UTC
From: Petri Savolainen <petri.savolainen@linaro.org>


Scheduled queue dequeue function calls directly the scheduler
queue destroy callback. Sched_queue_deq() usage is simpler
when the extra round of callbacks is removed.

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
/** Email created from pull request 699 (psavol:master-sched-optim-clean-ups)
 ** https://github.com/Linaro/odp/pull/699
 ** Patch: https://github.com/Linaro/odp/pull/699.patch
 ** Base sha: 33fbc04b6373960ec3f84de4e7e7b34c49d71508
 ** Merge commit sha: 32d7a11f22e6f2e1e378b653993c5377d4116d8f
 **/
 .../include/odp_queue_basic_internal.h        |  1 -
 platform/linux-generic/odp_queue_basic.c      | 20 +++------
 platform/linux-generic/odp_schedule_basic.c   | 11 +----
 platform/linux-generic/odp_schedule_iquery.c  | 35 +++++++--------
 platform/linux-generic/odp_schedule_sp.c      | 43 ++++++++++---------
 5 files changed, 47 insertions(+), 63 deletions(-)
diff mbox series

Patch

diff --git a/platform/linux-generic/include/odp_queue_basic_internal.h b/platform/linux-generic/include/odp_queue_basic_internal.h
index 46b747955..41ca424c7 100644
--- a/platform/linux-generic/include/odp_queue_basic_internal.h
+++ b/platform/linux-generic/include/odp_queue_basic_internal.h
@@ -113,7 +113,6 @@  static inline queue_entry_t *qentry_from_handle(odp_queue_t handle)
 void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size);
 
 /* Functions for schedulers */
-void sched_queue_destroy_finalize(uint32_t queue_index);
 void sched_queue_set_status(uint32_t queue_index, int status);
 int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int num,
 		    int update_status);
diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c
index 61cf8a56c..3f00cc118 100644
--- a/platform/linux-generic/odp_queue_basic.c
+++ b/platform/linux-generic/odp_queue_basic.c
@@ -353,19 +353,6 @@  static odp_queue_t queue_create(const char *name,
 	return handle;
 }
 
-void sched_queue_destroy_finalize(uint32_t queue_index)
-{
-	queue_entry_t *queue = qentry_from_index(queue_index);
-
-	LOCK(queue);
-
-	if (queue->s.status == QUEUE_STATUS_DESTROYED) {
-		queue->s.status = QUEUE_STATUS_FREE;
-		sched_fn->destroy_queue(queue_index);
-	}
-	UNLOCK(queue);
-}
-
 void sched_queue_set_status(uint32_t queue_index, int status)
 {
 	queue_entry_t *queue = qentry_from_index(queue_index);
@@ -720,7 +707,12 @@  int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num,
 
 	if (odp_unlikely(status < QUEUE_STATUS_READY)) {
 		/* Bad queue, or queue has been destroyed.
-		 * Scheduler finalizes queue destroy after this. */
+		 * Inform scheduler about a destroyed queue. */
+		if (queue->s.status == QUEUE_STATUS_DESTROYED) {
+			queue->s.status = QUEUE_STATUS_FREE;
+			sched_fn->destroy_queue(queue_index);
+		}
+
 		UNLOCK(queue);
 		return -1;
 	}
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c
index 46ae7f1c1..6ed1f8b49 100644
--- a/platform/linux-generic/odp_schedule_basic.c
+++ b/platform/linux-generic/odp_schedule_basic.c
@@ -402,11 +402,6 @@  static int schedule_init_global(void)
 	return 0;
 }
 
-static inline void queue_destroy_finalize(uint32_t qi)
-{
-	sched_queue_destroy_finalize(qi);
-}
-
 static int schedule_term_global(void)
 {
 	int ret = 0;
@@ -427,9 +422,6 @@  static int schedule_term_global(void)
 
 					num = sched_queue_deq(qi, events, 1, 1);
 
-					if (num < 0)
-						queue_destroy_finalize(qi);
-
 					if (num > 0)
 						ODP_ERR("Queue not empty\n");
 				}
@@ -944,10 +936,9 @@  static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
 
 			num = sched_queue_deq(qi, ev_tbl, max_deq, !pktin);
 
-			if (num < 0) {
+			if (odp_unlikely(num < 0)) {
 				/* Destroyed queue. Continue scheduling the same
 				 * priority queue. */
-				sched_queue_destroy_finalize(qi);
 				continue;
 			}
 
diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c
index 7dde77844..f76942ff3 100644
--- a/platform/linux-generic/odp_schedule_iquery.c
+++ b/platform/linux-generic/odp_schedule_iquery.c
@@ -209,6 +209,7 @@  struct sched_thread_local {
 	 * in the same priority level.
 	 */
 	odp_rwlock_t lock;
+	int r_locked;
 	queue_index_sparse_t indexes[NUM_SCHED_PRIO];
 	sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO];
 
@@ -292,9 +293,7 @@  static int schedule_term_global(void)
 		if (sched->availables[i])
 			count = sched_queue_deq(i, events, 1, 1);
 
-		if (count < 0)
-			sched_queue_destroy_finalize(i);
-		else if (count > 0)
+		if (count > 0)
 			ODP_ERR("Queue (%d) not empty\n", i);
 	}
 
@@ -526,7 +525,14 @@  static void destroy_sched_queue(uint32_t queue_index)
 		return;
 	}
 
+	if (thread_local.r_locked)
+		odp_rwlock_read_unlock(&thread_local.lock);
+
 	__destroy_sched_queue(G, queue_index);
+
+	if (thread_local.r_locked)
+		odp_rwlock_read_lock(&thread_local.lock);
+
 	odp_rwlock_write_unlock(&G->lock);
 
 	if (sched->queues[queue_index].sync == ODP_SCHED_SYNC_ORDERED &&
@@ -614,9 +620,6 @@  static int schedule_pktio_stop(int pktio, int pktin ODP_UNUSED)
 	return remains;
 }
 
-#define DO_SCHED_LOCK() odp_rwlock_read_lock(&thread_local.lock)
-#define DO_SCHED_UNLOCK() odp_rwlock_read_unlock(&thread_local.lock)
-
 static inline bool do_schedule_prio(int prio);
 
 static inline int pop_cache_events(odp_event_t ev[], unsigned int max)
@@ -720,7 +723,9 @@  static int do_schedule(odp_queue_t *out_queue,
 	if (odp_unlikely(thread_local.pause))
 		return count;
 
-	DO_SCHED_LOCK();
+	odp_rwlock_read_lock(&thread_local.lock);
+	thread_local.r_locked = 1;
+
 	/* Schedule events */
 	for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
 		/* Round robin iterate the interested queue
@@ -732,11 +737,14 @@  static int do_schedule(odp_queue_t *out_queue,
 
 		count = pop_cache_events(out_ev, max_num);
 		assign_queue_handle(out_queue);
-		DO_SCHED_UNLOCK();
+
+		odp_rwlock_read_unlock(&thread_local.lock);
+		thread_local.r_locked = 0;
 		return count;
 	}
 
-	DO_SCHED_UNLOCK();
+	odp_rwlock_read_unlock(&thread_local.lock);
+	thread_local.r_locked = 0;
 
 	/* Poll packet input when there are no events */
 	pktio_poll_input();
@@ -1536,14 +1544,7 @@  static inline int consume_queue(int prio, unsigned int queue_index)
 
 	count = sched_queue_deq(queue_index, cache->stash, max, 1);
 
-	if (count < 0) {
-		DO_SCHED_UNLOCK();
-		sched_queue_destroy_finalize(queue_index);
-		DO_SCHED_LOCK();
-		return 0;
-	}
-
-	if (count == 0)
+	if (count <= 0)
 		return 0;
 
 	cache->top = &cache->stash[0];
diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c
index 7932e1860..8ddd1e94e 100644
--- a/platform/linux-generic/odp_schedule_sp.c
+++ b/platform/linux-generic/odp_schedule_sp.c
@@ -223,12 +223,21 @@  static int init_local(void)
 
 static int term_global(void)
 {
+	odp_event_t event;
 	int qi, ret = 0;
 
 	for (qi = 0; qi < NUM_QUEUE; qi++) {
+		int report = 1;
+
 		if (sched_global->queue_cmd[qi].s.init) {
-			/* todo: dequeue until empty ? */
-			sched_queue_destroy_finalize(qi);
+			while (sched_queue_deq(qi, &event, 1, 1) > 0) {
+				if (report) {
+					ODP_ERR("Queue not empty\n");
+					report = 0;
+				}
+				odp_event_free(event);
+			}
+
 		}
 	}
 
@@ -564,28 +573,20 @@  static int schedule_multi(odp_queue_t *from, uint64_t wait,
 		qi  = cmd->s.index;
 		num = sched_queue_deq(qi, events, 1, 1);
 
-		if (num > 0) {
-			sched_local.cmd = cmd;
-
-			if (from)
-				*from = queue_from_index(qi);
-
-			return num;
-		}
-
-		if (num < 0) {
-			/* Destroyed queue */
-			sched_queue_destroy_finalize(qi);
+		if (num <= 0) {
+			/* Destroyed or empty queue. Remove empty queue from
+			 * scheduling. A dequeue operation to on an already
+			 * empty queue moves it to NOTSCHED state and
+			 * sched_queue() will be called on next enqueue. */
 			continue;
 		}
 
-		if (num == 0) {
-			/* Remove empty queue from scheduling. A dequeue
-			 * operation to on an already empty queue moves
-			 * it to NOTSCHED state and sched_queue() will
-			 * be called on next enqueue. */
-			continue;
-		}
+		sched_local.cmd = cmd;
+
+		if (from)
+			*from = queue_from_index(qi);
+
+		return num;
 	}
 }