diff mbox

[RFC,2/2] linux-gen: add interests query (bitmap-based) scheduler

Message ID 1473822711-6866-3-git-send-email-yi.he@linaro.org
State New
Headers show

Commit Message

Yi He Sept. 14, 2016, 3:11 a.m. UTC
Add this interests query (bitmap-based) scheduler to optimize
performance. This initial version supports atomic and parallel
queues and can run odp_scheduling test program, will support
pktio in polling and ordered queue laterly.

Signed-off-by: Yi He <yi.he@linaro.org>

---
 platform/linux-generic/Makefile.am               |    1 +
 platform/linux-generic/include/odp_schedule_if.h |    2 +
 platform/linux-generic/m4/odp_schedule.m4        |    7 +
 platform/linux-generic/odp_queue.c               |   25 +-
 platform/linux-generic/odp_schedule.c            |    6 +
 platform/linux-generic/odp_schedule_if.c         |    6 +
 platform/linux-generic/odp_schedule_iquery.c     | 1059 ++++++++++++++++++++++
 platform/linux-generic/odp_schedule_sp.c         |    6 +
 8 files changed, 1109 insertions(+), 3 deletions(-)
 create mode 100644 platform/linux-generic/odp_schedule_iquery.c

-- 
2.7.4
diff mbox

Patch

diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
index 3f79d46..6fed3d2 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -178,6 +178,7 @@  __LIB__libodp_linux_la_SOURCES = \
 			   odp_schedule_if.c \
 			   odp_schedule_ordered.c \
 			   odp_schedule_sp.c \
+			   odp_schedule_iquery.c \
 			   odp_shared_memory.c \
 			   odp_sorted_list.c \
 			   odp_spinlock.c \
diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h
index 13cdfb3..c0423bf 100644
--- a/platform/linux-generic/include/odp_schedule_if.h
+++ b/platform/linux-generic/include/odp_schedule_if.h
@@ -30,6 +30,7 @@  typedef int (*schedule_init_queue_fn_t)(uint32_t queue_index,
 				       );
 typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index);
 typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index);
+typedef int (*schedule_unsched_queue_fn_t)(uint32_t queue_index);
 typedef int (*schedule_ord_enq_fn_t)(uint32_t queue_index, void *buf_hdr,
 				     int sustain, int *ret);
 typedef int (*schedule_ord_enq_multi_fn_t)(uint32_t queue_index,
@@ -48,6 +49,7 @@  typedef struct schedule_fn_t {
 	schedule_init_queue_fn_t    init_queue;
 	schedule_destroy_queue_fn_t destroy_queue;
 	schedule_sched_queue_fn_t   sched_queue;
+	schedule_unsched_queue_fn_t unsched_queue;
 	schedule_ord_enq_fn_t       ord_enq;
 	schedule_ord_enq_multi_fn_t ord_enq_multi;
 	schedule_init_global_fn_t   init_global;
diff --git a/platform/linux-generic/m4/odp_schedule.m4 b/platform/linux-generic/m4/odp_schedule.m4
index bc70c1f..2dcc9a7 100644
--- a/platform/linux-generic/m4/odp_schedule.m4
+++ b/platform/linux-generic/m4/odp_schedule.m4
@@ -4,3 +4,10 @@  AC_ARG_ENABLE([schedule-sp],
 	schedule-sp=yes
 	ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"
     fi])
+
+AC_ARG_ENABLE([schedule-iquery],
+    [  --enable-schedule-iquery    enable interests query (sparse bitmap) scheduler],
+    [if test x$enableval = xyes; then
+	schedule-iquery=yes
+	ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"
+    fi])
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index bec1e51..274ee09 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -415,9 +415,15 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 
 	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
 		queue->s.status = QUEUE_STATUS_SCHED;
+#if defined(ODP_SCHEDULE_IQUERY)
+		if (sched_fn->sched_queue(queue->s.index))
+			ODP_ABORT("schedule_queue failed\n");
+		UNLOCK(&queue->s.lock);
+#else
 		UNLOCK(&queue->s.lock);
 		if (sched_fn->sched_queue(queue->s.index))
 			ODP_ABORT("schedule_queue failed\n");
+#endif
 		return 0;
 	}
 
@@ -428,7 +434,9 @@  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 		    int num, int sustain)
 {
+#if !defined(ODP_SCHEDULE_IQUERY)
 	int sched = 0;
+#endif
 	int i, ret;
 	odp_buffer_hdr_t *tail;
 
@@ -461,14 +469,21 @@  int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 
 	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
 		queue->s.status = QUEUE_STATUS_SCHED;
+#if !defined(ODP_SCHEDULE_IQUERY)
 		sched = 1; /* retval: schedule queue */
+#else
+		/* Add queue to scheduling */
+		if (sched_fn->sched_queue(queue->s.index))
+			ODP_ABORT("schedule_queue failed\n");
+#endif
 	}
 	UNLOCK(&queue->s.lock);
 
+#if !defined(ODP_SCHEDULE_IQUERY)
 	/* Add queue to scheduling */
 	if (sched && sched_fn->sched_queue(queue->s.index))
 		ODP_ABORT("schedule_queue failed\n");
-
+#endif
 	return num; /* All events enqueued */
 }
 
@@ -513,8 +528,10 @@  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 
 	if (queue->s.head == NULL) {
 		/* Already empty queue */
-		if (queue->s.status == QUEUE_STATUS_SCHED)
+		if (queue->s.status == QUEUE_STATUS_SCHED) {
 			queue->s.status = QUEUE_STATUS_NOTSCHED;
+			sched_fn->unsched_queue(queue->s.index);
+		}
 
 		UNLOCK(&queue->s.lock);
 		return NULL;
@@ -569,8 +586,10 @@  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 
 	if (hdr == NULL) {
 		/* Already empty queue */
-		if (queue->s.status == QUEUE_STATUS_SCHED)
+		if (queue->s.status == QUEUE_STATUS_SCHED) {
 			queue->s.status = QUEUE_STATUS_NOTSCHED;
+			sched_fn->unsched_queue(queue->s.index);
+		}
 
 		UNLOCK(&queue->s.lock);
 		return 0;
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c
index 78982d9..04c7d5a 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -1049,6 +1049,11 @@  static int schedule_sched_queue(uint32_t queue_index)
 	return 0;
 }
 
+static int schedule_unsched_queue(uint32_t queue_index ODP_UNUSED)
+{
+	return 0;
+}
+
 static int schedule_num_grps(void)
 {
 	return NUM_SCHED_GRPS;
@@ -1063,6 +1068,7 @@  const schedule_fn_t schedule_default_fn = {
 	.init_queue = schedule_init_queue,
 	.destroy_queue = schedule_destroy_queue,
 	.sched_queue = schedule_sched_queue,
+	.unsched_queue = schedule_unsched_queue,
 	.ord_enq = schedule_ordered_queue_enq,
 	.ord_enq_multi = schedule_ordered_queue_enq_multi,
 	.init_global = schedule_init_global,
diff --git a/platform/linux-generic/odp_schedule_if.c b/platform/linux-generic/odp_schedule_if.c
index daf6c98..a9ede98 100644
--- a/platform/linux-generic/odp_schedule_if.c
+++ b/platform/linux-generic/odp_schedule_if.c
@@ -12,9 +12,15 @@  extern const schedule_api_t schedule_sp_api;
 extern const schedule_fn_t schedule_default_fn;
 extern const schedule_api_t schedule_default_api;
 
+extern const schedule_fn_t schedule_iquery_fn;
+extern const schedule_api_t schedule_iquery_api;
+
 #ifdef ODP_SCHEDULE_SP
 const schedule_fn_t *sched_fn   = &schedule_sp_fn;
 const schedule_api_t *sched_api = &schedule_sp_api;
+#elif defined(ODP_SCHEDULE_IQUERY)
+const schedule_fn_t *sched_fn   = &schedule_iquery_fn;
+const schedule_api_t *sched_api = &schedule_iquery_api;
 #else
 const schedule_fn_t  *sched_fn  = &schedule_default_fn;
 const schedule_api_t *sched_api = &schedule_default_api;
diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c
new file mode 100644
index 0000000..8a45793
--- /dev/null
+++ b/platform/linux-generic/odp_schedule_iquery.c
@@ -0,0 +1,1059 @@ 
+/* Copyright (c) 2016, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#include <odp/api/schedule.h>
+#include <odp_schedule_if.h>
+#include <odp/api/align.h>
+#include <odp/api/queue.h>
+#include <odp/api/shared_memory.h>
+#include <odp_internal.h>
+#include <odp_debug_internal.h>
+#include <odp_bitmap_internal.h>
+#include <odp/api/thread.h>
+#include <odp/api/time.h>
+#include <odp/api/rwlock.h>
+#include <odp/api/hints.h>
+#include <odp/api/cpu.h>
+#include <odp/api/thrmask.h>
+#include <odp_config_internal.h>
+#include <odp_schedule_internal.h>
+#include <odp_schedule_ordered_internal.h>
+
+/* Number of priority levels */
+#define NUM_SCHED_PRIO 8
+
+ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (NUM_SCHED_PRIO - 1),
+		  "lowest_prio_does_not_match_with_num_prios");
+
+ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
+		  (ODP_SCHED_PRIO_NORMAL < (NUM_SCHED_PRIO - 1)),
+		  "normal_prio_is_not_between_highest_and_lowest");
+
+/* Number of scheduling groups */
+#define NUM_SCHED_GRPS 256
+
+/* Start of named groups in group mask arrays */
+#define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1)
+
+/* Instantiate a WAPL bitmap to be used as queue index bitmap */
+typedef WAPL_BITMAP(ODP_CONFIG_QUEUES) queue_index_bitmap_t;
+
+typedef struct {
+	odp_rwlock_t lock;
+	queue_index_bitmap_t queues; /* queues in this priority level */
+} sched_prio_t;
+
+typedef struct {
+	odp_rwlock_t lock;
+	odp_thrmask_t threads; /* threads subscribe to this group */
+	queue_index_bitmap_t queues; /* queues in this group */
+	char name[ODP_SCHED_GROUP_NAME_LEN];
+} sched_group_t;
+
+/* Forward declaration */
+typedef struct sched_thread_local sched_thread_local_t;
+
+typedef struct {
+	odp_shm_t selfie;
+
+	/* Schedule priorities */
+	sched_prio_t prios[NUM_SCHED_PRIO];
+
+	/* Schedule groups */
+	sched_group_t groups[NUM_SCHED_GRPS];
+
+	/* Cache queue parameters for easy reference */
+	odp_schedule_param_t queues[ODP_CONFIG_QUEUES];
+
+	/* Queues send or unwind their availability indications
+	 * for scheduling, the bool value also serves as a focal
+	 * point for atomic competition. */
+	bool availables[ODP_CONFIG_QUEUES];
+
+	/* Quick reference to per thread context */
+	sched_thread_local_t *threads[ODP_THREAD_COUNT_MAX];
+} sched_global_t;
+
+/* Per thread events cache */
+typedef struct {
+	int count;
+	odp_queue_t queue;
+	odp_event_t stash[MAX_DEQ], *top;
+} event_cache_t;
+
+/* Instantiate a sparse bitmap to store thread's interested
+ * queue indexes per priority.
+ */
+typedef SPARSE_BITMAP(ODP_CONFIG_QUEUES) queue_index_sparse_t;
+
+typedef struct sched_thread_local {
+	int thread;
+	bool pause;
+
+	/* Cache events only for atomic queue */
+	event_cache_t cache;
+
+	/* Saved atomic context */
+	bool *atomic;
+
+	/* Interested queue indexes to be checked by thread
+	 * at each priority level for scheduling, and a round
+	 * robin iterator to improve fairness between queues
+	 * in the same priority level.
+	 */
+	odp_rwlock_t lock;
+	queue_index_sparse_t indexes[NUM_SCHED_PRIO];
+	sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO];
+} sched_thread_local_t;
+
+/* Global scheduler context */
+static sched_global_t *sched;
+
+/* Thread local scheduler context */
+__thread sched_thread_local_t thread_local;
+
+static int schedule_init_global(void)
+{
+	int prio, group;
+	odp_shm_t shm;
+
+	ODP_DBG("Schedule[iquery] init ... ");
+
+	shm = odp_shm_reserve("odp_scheduler_iquery",
+			      sizeof(sched_global_t),
+			      ODP_CACHE_LINE_SIZE, 0);
+
+	sched = odp_shm_addr(shm);
+
+	if (sched == NULL) {
+		ODP_ERR("Schedule[iquery] "
+			"init: shm reserve.\n");
+		return -1;
+	}
+
+	memset(sched, 0, sizeof(sched_global_t));
+
+	sched->selfie = shm;
+
+	for (prio = 0; prio < NUM_SCHED_PRIO; prio++)
+		odp_rwlock_init(&sched->prios[prio].lock);
+
+	for (group = 0; group < NUM_SCHED_GRPS; group++)
+		odp_rwlock_init(&sched->groups[group].lock);
+
+	ODP_DBG("done\n");
+	return 0;
+}
+
+static int schedule_term_global(void)
+{
+	odp_shm_t shm = sched->selfie;
+
+	memset(sched, 0, sizeof(sched_global_t));
+
+	if (odp_shm_free(shm) < 0) {
+		ODP_ERR("Schedule[iquery] "
+			"term: shm release.\n");
+		return -1;
+	}
+	return 0;
+}
+
+/*
+ * These APIs are used to manipulate thread's interests.
+ */
+static void thread_set_interest(sched_thread_local_t *thread,
+	unsigned int queue_index, int prio);
+
+static void thread_clear_interest(sched_thread_local_t *thread,
+	unsigned int queue_index, int prio);
+
+static void thread_set_interests(sched_thread_local_t *thread,
+	queue_index_bitmap_t *set);
+
+static void thread_clear_interests(sched_thread_local_t *thread,
+	queue_index_bitmap_t *clear);
+
+static void sched_thread_local_reset(void)
+{
+	int prio;
+	queue_index_sparse_t *index;
+	sparse_bitmap_iterator_t *iterator;
+
+	memset(&thread_local, 0, sizeof(sched_thread_local_t));
+
+	thread_local.thread = odp_thread_id();
+	thread_local.cache.queue = ODP_QUEUE_INVALID;
+
+	odp_rwlock_init(&thread_local.lock);
+
+	for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
+		index = &thread_local.indexes[prio];
+		iterator = &thread_local.iterators[prio];
+
+		sparse_bitmap_zero(index);
+		sparse_bitmap_iterator(iterator, index);
+	}
+}
+
+static int schedule_init_local(void)
+{
+	int group;
+	sched_group_t *G;
+	queue_index_bitmap_t collect;
+
+	wapl_bitmap_zero(&collect);
+	sched_thread_local_reset();
+
+	/* Collect all queue indexes of the schedule groups
+	 * which this thread has subscribed
+	 */
+	for (group = 0; group < NUM_SCHED_GRPS; group++) {
+		G = &sched->groups[group];
+		odp_rwlock_read_lock(&G->lock);
+
+		if ((group < SCHED_GROUP_NAMED || G->name[0] != 0) &&
+		    odp_thrmask_isset(&G->threads, thread_local.thread))
+			wapl_bitmap_or(&collect, &collect, &G->queues);
+
+		odp_rwlock_read_unlock(&G->lock);
+	}
+
+	/* Distribute the above collected queue indexes into
+	 * thread local interests per priority level.
+	 */
+	thread_set_interests(&thread_local, &collect);
+
+	/* "Night gathers, and now my watch begins..." */
+	sched->threads[thread_local.thread] = &thread_local;
+	return 0;
+}
+
+static inline void schedule_release_context(void);
+
+static int schedule_term_local(void)
+{
+	int group;
+	sched_group_t *G;
+
+	if (thread_local.cache.count) {
+		ODP_ERR("Locally pre-scheduled events exist.\n");
+		return -1;
+	}
+
+	schedule_release_context();
+
+	/* Unsubscribe all named schedule groups */
+	for (group = SCHED_GROUP_NAMED;
+		group < NUM_SCHED_GRPS; group++) {
+		G = &sched->groups[group];
+		odp_rwlock_write_lock(&G->lock);
+
+		if (G->name[0] != 0 && odp_thrmask_isset(
+			&G->threads, thread_local.thread))
+			odp_thrmask_clr(&G->threads, thread_local.thread);
+
+		odp_rwlock_write_unlock(&G->lock);
+	}
+
+	/* "...for this night and all the nights to come." */
+	sched->threads[thread_local.thread] = NULL;
+	sched_thread_local_reset();
+	return 0;
+}
+
+static int init_sched_queue(uint32_t queue_index,
+			    const odp_schedule_param_t *sched_param)
+{
+	int prio, group, thread;
+	sched_prio_t *P;
+	sched_group_t *G;
+	sched_thread_local_t *local;
+
+	prio = sched_param->prio;
+	group = sched_param->group;
+
+	G = &sched->groups[group];
+	odp_rwlock_write_lock(&G->lock);
+
+	/* Named schedule group must be created prior
+	 * to queue creation to this group.
+	 */
+	if (group >= SCHED_GROUP_NAMED && G->name[0] == 0) {
+		odp_rwlock_write_unlock(&G->lock);
+		return -1;
+	}
+
+	/* Record the queue in its priority level globally */
+	P = &sched->prios[prio];
+
+	odp_rwlock_write_lock(&P->lock);
+	wapl_bitmap_set(&P->queues, queue_index);
+	odp_rwlock_write_unlock(&P->lock);
+
+	/* Record the queue in its schedule group */
+	wapl_bitmap_set(&G->queues, queue_index);
+
+	/* Cache queue parameters for easy reference */
+	memcpy(&sched->queues[queue_index],
+		sched_param, sizeof(odp_schedule_param_t));
+
+	/* Update all threads in this schedule group to
+	 * start check this queue index upon scheduling.
+	 */
+	thread = odp_thrmask_first(&G->threads);
+	while (thread >= 0) {
+		local = sched->threads[thread];
+		thread_set_interest(local, queue_index, prio);
+		thread = odp_thrmask_next(&G->threads, thread);
+	}
+
+	odp_rwlock_write_unlock(&G->lock);
+	return 0;
+}
+
+/*
+ * Must be called with schedule group's rwlock held.
+ * This is also being used in destroy_schedule_group()
+ * to destroy all orphan queues while destroying a whole
+ * schedule group.
+ */
+static void __destroy_sched_queue(
+	sched_group_t *G, uint32_t queue_index)
+{
+	int prio, thread;
+	sched_prio_t *P;
+	sched_thread_local_t *local;
+
+	prio = sched->queues[queue_index].prio;
+
+	/* Forget the queue in its schedule group */
+	wapl_bitmap_clear(&G->queues, queue_index);
+
+	/* Forget queue schedule parameters */
+	memset(&sched->queues[queue_index],
+		0, sizeof(odp_schedule_param_t));
+
+	/* Update all threads in this schedule group to
+	 * stop check this queue index upon scheduling.
+	 */
+	thread = odp_thrmask_first(&G->threads);
+	while (thread >= 0) {
+		local = sched->threads[thread];
+		thread_clear_interest(local, queue_index, prio);
+		thread = odp_thrmask_next(&G->threads, thread);
+	}
+
+	/* Forget the queue in its priority level globally */
+	P = &sched->prios[prio];
+
+	odp_rwlock_write_lock(&P->lock);
+	wapl_bitmap_clear(&P->queues, queue_index);
+	odp_rwlock_write_unlock(&P->lock);
+}
+
+static void destroy_sched_queue(uint32_t queue_index)
+{
+	int group;
+	sched_group_t *G;
+
+	group = sched->queues[queue_index].group;
+
+	G = &sched->groups[group];
+	odp_rwlock_write_lock(&G->lock);
+
+	/* Named schedule group could have been destroyed
+	 * earlier and left these orphan queues.
+	 */
+	if (group >= SCHED_GROUP_NAMED && G->name[0] == 0) {
+		odp_rwlock_write_unlock(&G->lock);
+		return;
+	}
+
+	__destroy_sched_queue(G, queue_index);
+	odp_rwlock_write_unlock(&G->lock);
+}
+
+static void schedule_pktio_start(int pktio_index ODP_UNUSED,
+	int num_in_queue ODP_UNUSED, int in_queue_idx[] ODP_UNUSED)
+{
+}
+
+#define DO_SCHED_LOCK() odp_rwlock_read_lock(&thread_local.lock)
+#define DO_SCHED_UNLOCK() odp_rwlock_read_unlock(&thread_local.lock)
+
+static inline bool do_schedule_prio(int prio);
+
+static inline int pop_cache_events(odp_event_t ev[], unsigned int max)
+{
+	int k = 0;
+	event_cache_t *cache;
+
+	cache = &thread_local.cache;
+	while (cache->count && max) {
+		ev[k] = *cache->top++;
+		k++;
+		max--;
+		cache->count--;
+	}
+
+	return k;
+}
+
+static inline void assign_queue_handle(odp_queue_t *handle)
+{
+	if (handle)
+		*handle = thread_local.cache.queue;
+}
+
+/*
+ * Schedule queues
+ */
+static int do_schedule(odp_queue_t *out_queue,
+	odp_event_t out_ev[], unsigned int max_num)
+{
+	int prio, count;
+
+	/* Consume locally cached events */
+	count = pop_cache_events(out_ev, max_num);
+	if (count > 0) {
+		assign_queue_handle(out_queue);
+		return count;
+	}
+
+	schedule_release_context();
+
+	if (odp_unlikely(thread_local.pause))
+		return count;
+
+	DO_SCHED_LOCK();
+	/* Schedule events */
+	for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
+		/* Round robin iterate the interested queue
+		 * indexes in this priority level to compete
+		 * and consume available queues
+		 */
+		if (!do_schedule_prio(prio))
+			continue;
+
+		count = pop_cache_events(out_ev, max_num);
+		assign_queue_handle(out_queue);
+		DO_SCHED_UNLOCK();
+		return count;
+	}
+
+	DO_SCHED_UNLOCK();
+	return 0;
+}
+
+static int schedule_loop(odp_queue_t *out_queue, uint64_t wait,
+			 odp_event_t out_ev[], unsigned int max_num)
+{
+	int count, first = 1;
+	odp_time_t next, wtime;
+
+	while (1) {
+		count = do_schedule(out_queue, out_ev, max_num);
+
+		if (count)
+			break;
+
+		if (wait == ODP_SCHED_WAIT)
+			continue;
+
+		if (wait == ODP_SCHED_NO_WAIT)
+			break;
+
+		if (first) {
+			wtime = odp_time_local_from_ns(wait);
+			next = odp_time_sum(odp_time_local(), wtime);
+			first = 0;
+			continue;
+		}
+
+		if (odp_time_cmp(next, odp_time_local()) < 0)
+			break;
+	}
+
+	return count;
+}
+
+static odp_event_t schedule(odp_queue_t *out_queue, uint64_t wait)
+{
+	odp_event_t ev;
+
+	ev = ODP_EVENT_INVALID;
+
+	schedule_loop(out_queue, wait, &ev, 1);
+
+	return ev;
+}
+
+static int schedule_multi(odp_queue_t *out_queue, uint64_t wait,
+			  odp_event_t events[], int num)
+{
+	return schedule_loop(out_queue, wait, events, num);
+}
+
+static void schedule_pause(void)
+{
+	thread_local.pause = 1;
+}
+
+static void schedule_resume(void)
+{
+	thread_local.pause = 0;
+}
+
+static uint64_t schedule_wait_time(uint64_t ns)
+{
+	return ns;
+}
+
+static int number_of_priorites(void)
+{
+	return NUM_SCHED_PRIO;
+}
+
+/*
+ * Create a named schedule group with pre-defined
+ * set of subscription threads.
+ *
+ * Sched queues belonging to this group must be
+ * created after the group creation. Upon creation
+ * the group holds 0 sched queues.
+ */
+static odp_schedule_group_t schedule_group_create(
+	const char *name, const odp_thrmask_t *mask)
+{
+	int group;
+	sched_group_t *G;
+
+	for (group = SCHED_GROUP_NAMED;
+		group < NUM_SCHED_GRPS; group++) {
+		G = &sched->groups[group];
+
+		odp_rwlock_write_lock(&G->lock);
+		if (G->name[0] == 0) {
+			strncpy(G->name, name,
+				ODP_SCHED_GROUP_NAME_LEN - 1);
+			odp_thrmask_copy(&G->threads, mask);
+			wapl_bitmap_zero(&G->queues);
+
+			odp_rwlock_write_unlock(&G->lock);
+			return (odp_schedule_group_t)group;
+		}
+		odp_rwlock_write_unlock(&G->lock);
+	}
+
+	return ODP_SCHED_GROUP_INVALID;
+}
+
+static inline void __destroy_group_queues(sched_group_t *group)
+{
+	unsigned int index;
+	wapl_bitmap_iterator_t it;
+
+	/* Constructor */
+	wapl_bitmap_iterator(&it, &group->queues);
+
+	/* Walk throught the queue index bitmap */
+	for (it.start(&it); it.has_next(&it);) {
+		index = it.next(&it);
+		__destroy_sched_queue(group, index);
+	}
+}
+
+/*
+ * Destroy a named schedule group.
+ */
+static int schedule_group_destroy(odp_schedule_group_t group)
+{
+	int done = -1;
+	sched_group_t *G;
+
+	if (group < SCHED_GROUP_NAMED ||
+	    group >= NUM_SCHED_GRPS)
+		return -1;
+
+	G = &sched->groups[group];
+	odp_rwlock_write_lock(&G->lock);
+
+	if (G->name[0] != 0) {
+		/* Destroy all queues in this schedule group
+		 * and leave no orphan queues.
+		 */
+		__destroy_group_queues(G);
+
+		done = 0;
+		wapl_bitmap_zero(&G->queues);
+		odp_thrmask_zero(&G->threads);
+		memset(G->name, 0, ODP_SCHED_GROUP_NAME_LEN);
+	}
+
+	odp_rwlock_write_unlock(&G->lock);
+	return done;
+}
+
+static odp_schedule_group_t schedule_group_lookup(const char *name)
+{
+	int group;
+	sched_group_t *G;
+
+	for (group = SCHED_GROUP_NAMED;
+	     group < NUM_SCHED_GRPS; group++) {
+		G = &sched->groups[group];
+
+		odp_rwlock_read_lock(&G->lock);
+		if (strcmp(name, G->name) == 0) {
+			odp_rwlock_read_unlock(&G->lock);
+			return (odp_schedule_group_t)group;
+		}
+		odp_rwlock_read_unlock(&G->lock);
+	}
+
+	return ODP_SCHED_GROUP_INVALID;
+}
+
+static int schedule_group_join(odp_schedule_group_t group,
+			       const odp_thrmask_t *mask)
+{
+	int done = -1, thread;
+	sched_group_t *G;
+	sched_thread_local_t *local;
+
+	/* Named schedule group only */
+	if (group < SCHED_GROUP_NAMED ||
+		group >= NUM_SCHED_GRPS)
+		return done;
+
+	G = &sched->groups[group];
+	odp_rwlock_write_lock(&G->lock);
+
+	if (G->name[0] != 0) {
+		/* Make new joined threads to start check
+		 * queue indexes in this schedule group
+		 */
+		thread = odp_thrmask_first(mask);
+		while (thread >= 0) {
+			local = sched->threads[thread];
+			thread_set_interests(local, &G->queues);
+
+			odp_thrmask_set(&G->threads, thread);
+			thread = odp_thrmask_next(mask, thread);
+		}
+		done = 0;
+	}
+
+	odp_rwlock_write_unlock(&G->lock);
+	return done;
+}
+
+static int schedule_group_leave(odp_schedule_group_t group,
+				const odp_thrmask_t *mask)
+{
+	int done = -1, thread;
+	sched_group_t *G;
+	sched_thread_local_t *local;
+
+	/* Named schedule group only */
+	if (group < SCHED_GROUP_NAMED ||
+		group >= NUM_SCHED_GRPS)
+		return done;
+
+	G = &sched->groups[group];
+	odp_rwlock_write_lock(&G->lock);
+
+	if (G->name[0] != 0) {
+		/* Make leaving threads to stop check
+		 * queue indexes in this schedule group
+		 */
+		thread = odp_thrmask_first(mask);
+		while (thread >= 0) {
+			local = sched->threads[thread];
+			thread_clear_interests(local, &G->queues);
+
+			odp_thrmask_clr(&G->threads, thread);
+			thread = odp_thrmask_next(mask, thread);
+		}
+		done = 0;
+	}
+
+	odp_rwlock_write_unlock(&G->lock);
+	return done;
+}
+
+static int schedule_group_thrmask(odp_schedule_group_t group,
+				  odp_thrmask_t *thrmask)
+{
+	int done = -1;
+	sched_group_t *G;
+
+	/* Named schedule group only */
+	if (group < SCHED_GROUP_NAMED ||
+		group >= NUM_SCHED_GRPS)
+		return done;
+
+	G = &sched->groups[group];
+	odp_rwlock_read_lock(&G->lock);
+
+	if (G->name[0] != 0 && thrmask != NULL) {
+		done = 0;
+		odp_thrmask_copy(thrmask, &G->threads);
+	}
+
+	odp_rwlock_read_unlock(&G->lock);
+	return done;
+}
+
+static int schedule_group_info(odp_schedule_group_t group,
+			       odp_schedule_group_info_t *info)
+{
+	int done = -1;
+	sched_group_t *G;
+
+	/* Named schedule group only */
+	if (group < SCHED_GROUP_NAMED ||
+		group >= NUM_SCHED_GRPS)
+		return done;
+
+	G = &sched->groups[group];
+	odp_rwlock_read_lock(&G->lock);
+
+	if (G->name[0] != 0 && info != NULL) {
+		done = 0;
+		info->name = G->name;
+		odp_thrmask_copy(&info->thrmask, &G->threads);
+	}
+
+	odp_rwlock_read_unlock(&G->lock);
+	return done;
+}
+
+/* This function is a no-op */
+static void schedule_prefetch(int num ODP_UNUSED)
+{
+}
+
+/*
+ * Limited to join and leave pre-defined schedule groups
+ * before and after thread local initialization or termination.
+ */
+static int group_add_thread(odp_schedule_group_t group, int thread)
+{
+	sched_group_t *G;
+
+	if (group < 0 || group >= SCHED_GROUP_NAMED)
+		return -1;
+
+	G = &sched->groups[group];
+
+	odp_rwlock_write_lock(&G->lock);
+	odp_thrmask_set(&G->threads, thread);
+	odp_rwlock_write_unlock(&G->lock);
+	return 0;
+}
+
+static int group_remove_thread(odp_schedule_group_t group, int thread)
+{
+	sched_group_t *G;
+
+	if (group < 0 || group >= SCHED_GROUP_NAMED)
+		return -1;
+
+	G = &sched->groups[group];
+
+	odp_rwlock_write_lock(&G->lock);
+	odp_thrmask_clr(&G->threads, thread);
+	odp_rwlock_write_unlock(&G->lock);
+	return 0;
+}
+
+static int schedule_sched_queue(uint32_t queue_index)
+{
+	/* Set available indications globally */
+	sched->availables[queue_index] = true;
+	return 0;
+}
+
+static int schedule_unsched_queue(uint32_t queue_index)
+{
+	/* Clear available indications globally */
+	sched->availables[queue_index] = false;
+	return 0;
+}
+
+static void schedule_release_atomic(void)
+{
+	unsigned int queue_index;
+
+	if (thread_local.atomic != NULL) {
+		queue_index = thread_local.atomic - sched->availables;
+		thread_local.atomic = NULL;
+		sched->availables[queue_index] = true;
+	}
+}
+
+static void schedule_release_ordered(void)
+{
+}
+
+static inline void schedule_release_context(void)
+{
+	schedule_release_atomic();
+}
+
+static int number_of_groups(void)
+{
+	return NUM_SCHED_GRPS;
+}
+
+/* Fill in scheduler interface */
+const schedule_fn_t schedule_iquery_fn = {
+	.pktio_start   = schedule_pktio_start,
+	.thr_add       = group_add_thread,
+	.thr_rem       = group_remove_thread,
+	.num_grps      = number_of_groups,
+	.init_queue    = init_sched_queue,
+	.destroy_queue = destroy_sched_queue,
+	.sched_queue   = schedule_sched_queue,
+	.unsched_queue = schedule_unsched_queue,
+	.ord_enq       = schedule_ordered_queue_enq,
+	.ord_enq_multi = schedule_ordered_queue_enq_multi,
+	.init_global   = schedule_init_global,
+	.term_global   = schedule_term_global,
+	.init_local    = schedule_init_local,
+	.term_local    = schedule_term_local
+};
+
+/* Fill in scheduler API calls */
+const schedule_api_t schedule_iquery_api = {
+	.schedule_wait_time       = schedule_wait_time,
+	.schedule                 = schedule,
+	.schedule_multi           = schedule_multi,
+	.schedule_pause           = schedule_pause,
+	.schedule_resume          = schedule_resume,
+	.schedule_release_atomic  = schedule_release_atomic,
+	.schedule_release_ordered = schedule_release_ordered,
+	.schedule_prefetch        = schedule_prefetch,
+	.schedule_num_prio        = number_of_priorites,
+	.schedule_group_create    = schedule_group_create,
+	.schedule_group_destroy   = schedule_group_destroy,
+	.schedule_group_lookup    = schedule_group_lookup,
+	.schedule_group_join      = schedule_group_join,
+	.schedule_group_leave     = schedule_group_leave,
+	.schedule_group_thrmask   = schedule_group_thrmask,
+	.schedule_group_info      = schedule_group_info,
+	.schedule_order_lock      = schedule_order_lock,
+	.schedule_order_unlock    = schedule_order_unlock
+};
+
+static void thread_set_interest(sched_thread_local_t *thread,
+	unsigned int queue_index, int prio)
+{
+	queue_index_sparse_t *index;
+
+	if (thread == NULL)
+		return;
+
+	if (prio >= NUM_SCHED_PRIO)
+		return;
+
+	index = &thread->indexes[prio];
+
+	odp_rwlock_write_lock(&thread->lock);
+	sparse_bitmap_set(index, queue_index);
+	odp_rwlock_write_unlock(&thread->lock);
+}
+
+static void thread_clear_interest(sched_thread_local_t *thread,
+	unsigned int queue_index, int prio)
+{
+	queue_index_sparse_t *index;
+
+	if (thread == NULL)
+		return;
+
+	if (prio >= NUM_SCHED_PRIO)
+		return;
+
+	index = &thread->indexes[prio];
+
+	odp_rwlock_write_lock(&thread->lock);
+	sparse_bitmap_clear(index, queue_index);
+	odp_rwlock_write_unlock(&thread->lock);
+}
+
+static void thread_set_interests(sched_thread_local_t *thread,
+	queue_index_bitmap_t *set)
+{
+	int prio;
+	sched_prio_t *P;
+	unsigned int queue_index;
+	queue_index_bitmap_t subset;
+	wapl_bitmap_iterator_t it;
+
+	if (thread == NULL || set == NULL)
+		return;
+
+	for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
+		P = &sched->prios[prio];
+		odp_rwlock_read_lock(&P->lock);
+
+		/* The collection of queue indexes in 'set'
+		 * may belong to several priority levels.
+		 */
+		wapl_bitmap_zero(&subset);
+		wapl_bitmap_and(&subset, &P->queues, set);
+
+		odp_rwlock_read_unlock(&P->lock);
+
+		/* Add the subset to local indexes */
+		wapl_bitmap_iterator(&it, &subset);
+		for (it.start(&it); it.has_next(&it);) {
+			queue_index = it.next(&it);
+			thread_set_interest(thread, queue_index, prio);
+		}
+	}
+}
+
+static void thread_clear_interests(sched_thread_local_t *thread,
+	queue_index_bitmap_t *clear)
+{
+	int prio;
+	sched_prio_t *P;
+	unsigned int queue_index;
+	queue_index_bitmap_t subset;
+	wapl_bitmap_iterator_t it;
+
+	if (thread == NULL || clear == NULL)
+		return;
+
+	for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
+		P = &sched->prios[prio];
+		odp_rwlock_read_lock(&P->lock);
+
+		/* The collection of queue indexes in 'clear'
+		 * may belong to several priority levels.
+		 */
+		wapl_bitmap_zero(&subset);
+		wapl_bitmap_and(&subset, &P->queues, clear);
+
+		odp_rwlock_read_unlock(&P->lock);
+
+		/* Remove the subset from local indexes */
+		wapl_bitmap_iterator(&it, &subset);
+		for (it.start(&it); it.has_next(&it);) {
+			queue_index = it.next(&it);
+			thread_clear_interest(thread, queue_index, prio);
+		}
+	}
+}
+
+static inline bool is_atomic_queue(unsigned int queue_index)
+{
+	return (sched->queues[queue_index].sync
+			== ODP_SCHED_SYNC_ATOMIC);
+}
+
+static inline bool is_ordered_queue(unsigned int queue_index)
+{
+	return (sched->queues[queue_index].sync
+			== ODP_SCHED_SYNC_ORDERED);
+}
+
+static inline bool compete_atomic_queue(unsigned int queue_index)
+{
+	bool expected = sched->availables[queue_index];
+
+	if (expected && is_atomic_queue(queue_index)) {
+		expected = __atomic_compare_exchange_n(
+			&sched->availables[queue_index],
+			&expected, false, 0,
+			__ATOMIC_RELEASE, __ATOMIC_RELAXED);
+	}
+
+	return expected;
+}
+
+static inline void save_atomic_context(unsigned int queue_index)
+{
+	if (is_atomic_queue(queue_index))
+		thread_local.atomic = &sched->availables[queue_index];
+}
+
+static inline int consume_queue(int prio, unsigned int queue_index)
+{
+	int count;
+	unsigned int max = MAX_DEQ;
+	event_cache_t *cache = &thread_local.cache;
+
+	/* For ordered queues we want consecutive events to
+	 * be dispatched to separate threads, so do not cache
+	 * them locally.
+	 */
+	if (is_ordered_queue(queue_index))
+		max = 1;
+
+	/* Low priorities have smaller batch size to limit
+	 * head of line blocking latency.
+	 */
+	if (odp_unlikely(prio > ODP_SCHED_PRIO_DEFAULT))
+		max = MAX_DEQ / 2;
+
+	count = sched_cb_queue_deq_multi(
+		queue_index, cache->stash, max);
+
+	if (count < 0) {
+		DO_SCHED_UNLOCK();
+		destroy_sched_queue(queue_index);
+		DO_SCHED_LOCK();
+		return 0;
+	}
+
+	if (count == 0)
+		return 0;
+
+	cache->top = &cache->stash[0];
+	cache->count = count;
+	cache->queue = sched_cb_queue_handle(queue_index);
+	return count;
+}
+
+static inline bool do_schedule_prio(int prio)
+{
+	int nbits, next, end;
+	unsigned int queue_index;
+	sparse_bitmap_iterator_t *it;
+
+	it = &thread_local.iterators[prio];
+	nbits = (int) *(it->_base.last);
+
+	/* No interests at all! */
+	if (nbits <= 0)
+		return false;
+
+	/* In critical path, cannot affort iterator calls,
+	 * do it manually with internal knowledge
+	 */
+	it->_start = (it->_start + 1) % nbits;
+	end = it->_start + nbits;
+
+	for (next = it->_start; next < end; next++) {
+		queue_index = it->_base.il[next % nbits];
+
+		if (!compete_atomic_queue(queue_index))
+			continue;
+
+		if (!consume_queue(prio, queue_index))
+			continue;
+
+		save_atomic_context(queue_index);
+		return true;
+	}
+
+	return false;
+}
diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c
index 2e28aa4..16f8aca 100644
--- a/platform/linux-generic/odp_schedule_sp.c
+++ b/platform/linux-generic/odp_schedule_sp.c
@@ -298,6 +298,11 @@  static int sched_queue(uint32_t qi)
 	return 0;
 }
 
+static int unsched_queue(uint32_t qi ODP_UNUSED)
+{
+	return 0;
+}
+
 static int ord_enq(uint32_t queue_index, void *buf_hdr, int sustain, int *ret)
 {
 	(void)queue_index;
@@ -673,6 +678,7 @@  const schedule_fn_t schedule_sp_fn = {
 	.init_queue    = init_queue,
 	.destroy_queue = destroy_queue,
 	.sched_queue   = sched_queue,
+	.unsched_queue = unsched_queue,
 	.ord_enq       = ord_enq,
 	.ord_enq_multi = ord_enq_multi,
 	.init_global   = init_global,