@@ -113,7 +113,6 @@ static inline queue_entry_t *qentry_from_handle(odp_queue_t handle)
void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size);
/* Functions for schedulers */
-void sched_queue_destroy_finalize(uint32_t queue_index);
void sched_queue_set_status(uint32_t queue_index, int status);
int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int num,
int update_status);
@@ -353,19 +353,6 @@ static odp_queue_t queue_create(const char *name,
return handle;
}
-void sched_queue_destroy_finalize(uint32_t queue_index)
-{
- queue_entry_t *queue = qentry_from_index(queue_index);
-
- LOCK(queue);
-
- if (queue->s.status == QUEUE_STATUS_DESTROYED) {
- queue->s.status = QUEUE_STATUS_FREE;
- sched_fn->destroy_queue(queue_index);
- }
- UNLOCK(queue);
-}
-
void sched_queue_set_status(uint32_t queue_index, int status)
{
queue_entry_t *queue = qentry_from_index(queue_index);
@@ -720,7 +707,12 @@ int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num,
if (odp_unlikely(status < QUEUE_STATUS_READY)) {
/* Bad queue, or queue has been destroyed.
- * Scheduler finalizes queue destroy after this. */
+ * Inform scheduler about a destroyed queue. */
+ if (queue->s.status == QUEUE_STATUS_DESTROYED) {
+ queue->s.status = QUEUE_STATUS_FREE;
+ sched_fn->destroy_queue(queue_index);
+ }
+
UNLOCK(queue);
return -1;
}
@@ -402,11 +402,6 @@ static int schedule_init_global(void)
return 0;
}
-static inline void queue_destroy_finalize(uint32_t qi)
-{
- sched_queue_destroy_finalize(qi);
-}
-
static int schedule_term_global(void)
{
int ret = 0;
@@ -427,9 +422,6 @@ static int schedule_term_global(void)
num = sched_queue_deq(qi, events, 1, 1);
- if (num < 0)
- queue_destroy_finalize(qi);
-
if (num > 0)
ODP_ERR("Queue not empty\n");
}
@@ -944,10 +936,9 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
num = sched_queue_deq(qi, ev_tbl, max_deq, !pktin);
- if (num < 0) {
+ if (odp_unlikely(num < 0)) {
/* Destroyed queue. Continue scheduling the same
* priority queue. */
- sched_queue_destroy_finalize(qi);
continue;
}
@@ -209,6 +209,7 @@ struct sched_thread_local {
* in the same priority level.
*/
odp_rwlock_t lock;
+ int r_locked;
queue_index_sparse_t indexes[NUM_SCHED_PRIO];
sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO];
@@ -292,9 +293,7 @@ static int schedule_term_global(void)
if (sched->availables[i])
count = sched_queue_deq(i, events, 1, 1);
- if (count < 0)
- sched_queue_destroy_finalize(i);
- else if (count > 0)
+ if (count > 0)
ODP_ERR("Queue (%d) not empty\n", i);
}
@@ -526,7 +525,14 @@ static void destroy_sched_queue(uint32_t queue_index)
return;
}
+ if (thread_local.r_locked)
+ odp_rwlock_read_unlock(&thread_local.lock);
+
__destroy_sched_queue(G, queue_index);
+
+ if (thread_local.r_locked)
+ odp_rwlock_read_lock(&thread_local.lock);
+
odp_rwlock_write_unlock(&G->lock);
if (sched->queues[queue_index].sync == ODP_SCHED_SYNC_ORDERED &&
@@ -614,9 +620,6 @@ static int schedule_pktio_stop(int pktio, int pktin ODP_UNUSED)
return remains;
}
-#define DO_SCHED_LOCK() odp_rwlock_read_lock(&thread_local.lock)
-#define DO_SCHED_UNLOCK() odp_rwlock_read_unlock(&thread_local.lock)
-
static inline bool do_schedule_prio(int prio);
static inline int pop_cache_events(odp_event_t ev[], unsigned int max)
@@ -720,7 +723,9 @@ static int do_schedule(odp_queue_t *out_queue,
if (odp_unlikely(thread_local.pause))
return count;
- DO_SCHED_LOCK();
+ odp_rwlock_read_lock(&thread_local.lock);
+ thread_local.r_locked = 1;
+
/* Schedule events */
for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
/* Round robin iterate the interested queue
@@ -732,11 +737,14 @@ static int do_schedule(odp_queue_t *out_queue,
count = pop_cache_events(out_ev, max_num);
assign_queue_handle(out_queue);
- DO_SCHED_UNLOCK();
+
+ odp_rwlock_read_unlock(&thread_local.lock);
+ thread_local.r_locked = 0;
return count;
}
- DO_SCHED_UNLOCK();
+ odp_rwlock_read_unlock(&thread_local.lock);
+ thread_local.r_locked = 0;
/* Poll packet input when there are no events */
pktio_poll_input();
@@ -1536,14 +1544,7 @@ static inline int consume_queue(int prio, unsigned int queue_index)
count = sched_queue_deq(queue_index, cache->stash, max, 1);
- if (count < 0) {
- DO_SCHED_UNLOCK();
- sched_queue_destroy_finalize(queue_index);
- DO_SCHED_LOCK();
- return 0;
- }
-
- if (count == 0)
+ if (count <= 0)
return 0;
cache->top = &cache->stash[0];
@@ -223,12 +223,21 @@ static int init_local(void)
static int term_global(void)
{
+ odp_event_t event;
int qi, ret = 0;
for (qi = 0; qi < NUM_QUEUE; qi++) {
+ int report = 1;
+
if (sched_global->queue_cmd[qi].s.init) {
- /* todo: dequeue until empty ? */
- sched_queue_destroy_finalize(qi);
+ while (sched_queue_deq(qi, &event, 1, 1) > 0) {
+ if (report) {
+ ODP_ERR("Queue not empty\n");
+ report = 0;
+ }
+ odp_event_free(event);
+ }
+
}
}
@@ -564,28 +573,20 @@ static int schedule_multi(odp_queue_t *from, uint64_t wait,
qi = cmd->s.index;
num = sched_queue_deq(qi, events, 1, 1);
- if (num > 0) {
- sched_local.cmd = cmd;
-
- if (from)
- *from = queue_from_index(qi);
-
- return num;
- }
-
- if (num < 0) {
- /* Destroyed queue */
- sched_queue_destroy_finalize(qi);
+ if (num <= 0) {
+ /* Destroyed or empty queue. Remove empty queue from
+ * scheduling. A dequeue operation to on an already
+ * empty queue moves it to NOTSCHED state and
+ * sched_queue() will be called on next enqueue. */
continue;
}
- if (num == 0) {
- /* Remove empty queue from scheduling. A dequeue
- * operation to on an already empty queue moves
- * it to NOTSCHED state and sched_queue() will
- * be called on next enqueue. */
- continue;
- }
+ sched_local.cmd = cmd;
+
+ if (from)
+ *from = queue_from_index(qi);
+
+ return num;
}
}