diff mbox

[API-NEXT,3/3] linux-gen: add interests query (iquery) scheduler

Message ID 1480312009-32464-4-git-send-email-yi.he@linaro.org
State New
Headers show

Commit Message

Yi He Nov. 28, 2016, 5:46 a.m. UTC
Add this interests query (iquery) scheduler as an
alternate choice of ODP-linux scheduler component
for performance optimization especially in lower
queue counts use cases.

It includes a new core algorithm, but adopted the
ring-based pktio poll algorithm from default scheduler,
and still uses the old ordered queue implementation.

Signed-off-by: Yi He <yi.he@linaro.org>

---
 platform/linux-generic/Makefile.am                 |    1 +
 .../linux-generic/include/odp_bitmap_internal.h    |    8 +-
 platform/linux-generic/include/odp_schedule_if.h   |    2 +
 platform/linux-generic/m4/odp_schedule.m4          |    7 +
 platform/linux-generic/odp_queue.c                 |   15 +-
 platform/linux-generic/odp_schedule.c              |    6 +
 platform/linux-generic/odp_schedule_if.c           |    6 +
 platform/linux-generic/odp_schedule_iquery.c       | 1306 ++++++++++++++++++++
 platform/linux-generic/odp_schedule_sp.c           |    6 +
 9 files changed, 1354 insertions(+), 3 deletions(-)
 create mode 100644 platform/linux-generic/odp_schedule_iquery.c

-- 
2.7.4

Comments

Bill Fischofer Nov. 29, 2016, 2:59 a.m. UTC | #1
Attempting to compile this with clang I get:

odp_schedule_iquery.c:170:3: error: redefinition of typedef
      'sched_thread_local_t' is a C11 feature [-Werror,-Wtypedef-redefinition]
} sched_thread_local_t;
  ^
odp_schedule_iquery.c:111:35: note: previous definition is here
typedef struct sched_thread_local sched_thread_local_t;
                                  ^
1 error generated.

On Sun, Nov 27, 2016 at 11:46 PM, Yi He <yi.he@linaro.org> wrote:
> Add this interests query (iquery) scheduler as an

> alternate choice of ODP-linux scheduler component

> for performance optimization especially in lower

> queue counts use cases.

>

> It includes a new core algorithm, but adopted the

> ring-based pktio poll algorithm from default scheduler,

> and still uses the old ordered queue implementation.

>

> Signed-off-by: Yi He <yi.he@linaro.org>

> ---

>  platform/linux-generic/Makefile.am                 |    1 +

>  .../linux-generic/include/odp_bitmap_internal.h    |    8 +-

>  platform/linux-generic/include/odp_schedule_if.h   |    2 +

>  platform/linux-generic/m4/odp_schedule.m4          |    7 +

>  platform/linux-generic/odp_queue.c                 |   15 +-

>  platform/linux-generic/odp_schedule.c              |    6 +

>  platform/linux-generic/odp_schedule_if.c           |    6 +

>  platform/linux-generic/odp_schedule_iquery.c       | 1306 ++++++++++++++++++++

>  platform/linux-generic/odp_schedule_sp.c           |    6 +

>  9 files changed, 1354 insertions(+), 3 deletions(-)

>  create mode 100644 platform/linux-generic/odp_schedule_iquery.c

>

> diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am

> index 0245e37..37f51fb 100644

> --- a/platform/linux-generic/Makefile.am

> +++ b/platform/linux-generic/Makefile.am

> @@ -212,6 +212,7 @@ __LIB__libodp_linux_la_SOURCES = \

>                            odp_schedule_if.c \

>                            odp_schedule_ordered.c \

>                            odp_schedule_sp.c \

> +                          odp_schedule_iquery.c \

>                            odp_shared_memory.c \

>                            odp_sorted_list.c \

>                            odp_spinlock.c \

> diff --git a/platform/linux-generic/include/odp_bitmap_internal.h b/platform/linux-generic/include/odp_bitmap_internal.h

> index 7e028fd..192c6f9 100644

> --- a/platform/linux-generic/include/odp_bitmap_internal.h

> +++ b/platform/linux-generic/include/odp_bitmap_internal.h

> @@ -21,7 +21,13 @@ extern "C" {

>  #include <stdbool.h>

>  #include <string.h>

>  #include <odp/api/hints.h>

> -#include <odp_ring_internal.h> /* TOKENIZE and ARRAY_SIZE */

> +

> +/* Generate unique identifier for instantiated class */

> +#define TOKENIZE(template, line) \

> +       template ## _ ## line ## _ ## __COUNTER__

> +

> +/* Array size in general */

> +#define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0]))

>

>  #define BITS_PER_BYTE  (8)

>  #define BITS_PER_LONG  __WORDSIZE

> diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h

> index df73e70..6fe2536 100644

> --- a/platform/linux-generic/include/odp_schedule_if.h

> +++ b/platform/linux-generic/include/odp_schedule_if.h

> @@ -30,6 +30,7 @@ typedef int (*schedule_init_queue_fn_t)(uint32_t queue_index,

>                                        );

>  typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index);

>  typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index);

> +typedef int (*schedule_unsched_queue_fn_t)(uint32_t queue_index);

>  typedef int (*schedule_ord_enq_multi_fn_t)(uint32_t queue_index,

>                                            void *buf_hdr[], int num,

>                                            int sustain, int *ret);

> @@ -46,6 +47,7 @@ typedef struct schedule_fn_t {

>         schedule_init_queue_fn_t    init_queue;

>         schedule_destroy_queue_fn_t destroy_queue;

>         schedule_sched_queue_fn_t   sched_queue;

> +       schedule_unsched_queue_fn_t unsched_queue;

>         schedule_ord_enq_multi_fn_t ord_enq_multi;

>         schedule_init_global_fn_t   init_global;

>         schedule_term_global_fn_t   term_global;

> diff --git a/platform/linux-generic/m4/odp_schedule.m4 b/platform/linux-generic/m4/odp_schedule.m4

> index bc70c1f..2dcc9a7 100644

> --- a/platform/linux-generic/m4/odp_schedule.m4

> +++ b/platform/linux-generic/m4/odp_schedule.m4

> @@ -4,3 +4,10 @@ AC_ARG_ENABLE([schedule-sp],

>         schedule-sp=yes

>         ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"

>      fi])

> +

> +AC_ARG_ENABLE([schedule-iquery],

> +    [  --enable-schedule-iquery    enable interests query (sparse bitmap) scheduler],

> +    [if test x$enableval = xyes; then

> +       schedule-iquery=yes

> +       ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"

> +    fi])

> diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c

> index 43e212a..53a5def 100644

> --- a/platform/linux-generic/odp_queue.c

> +++ b/platform/linux-generic/odp_queue.c

> @@ -381,7 +381,9 @@ odp_queue_t odp_queue_lookup(const char *name)

>  static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],

>                             int num, int sustain)

>  {

> +#if !defined(ODP_SCHEDULE_IQUERY)

>         int sched = 0;

> +#endif

>         int i, ret;

>         odp_buffer_hdr_t *hdr, *tail, *next_hdr;

>

> @@ -442,14 +444,21 @@ static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],

>

>         if (queue->s.status == QUEUE_STATUS_NOTSCHED) {

>                 queue->s.status = QUEUE_STATUS_SCHED;

> +#if !defined(ODP_SCHEDULE_IQUERY)

>                 sched = 1; /* retval: schedule queue */

> +#else

> +               /* Add queue to scheduling */

> +               if (sched_fn->sched_queue(queue->s.index))

> +                       ODP_ABORT("schedule_queue failed\n");

> +#endif

>         }

>         UNLOCK(&queue->s.lock);

>

> +#if !defined(ODP_SCHEDULE_IQUERY)

>         /* Add queue to scheduling */

>         if (sched && sched_fn->sched_queue(queue->s.index))

>                 ODP_ABORT("schedule_queue failed\n");

> -

> +#endif

>         return num; /* All events enqueued */

>  }

>

> @@ -522,8 +531,10 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],

>

>         if (hdr == NULL) {

>                 /* Already empty queue */

> -               if (queue->s.status == QUEUE_STATUS_SCHED)

> +               if (queue->s.status == QUEUE_STATUS_SCHED) {

>                         queue->s.status = QUEUE_STATUS_NOTSCHED;

> +                       sched_fn->unsched_queue(queue->s.index);

> +               }

>

>                 UNLOCK(&queue->s.lock);

>                 return 0;

> diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c

> index 1ef85ac..73c267e 100644

> --- a/platform/linux-generic/odp_schedule.c

> +++ b/platform/linux-generic/odp_schedule.c

> @@ -973,6 +973,11 @@ static int schedule_sched_queue(uint32_t queue_index)

>         return 0;

>  }

>

> +static int schedule_unsched_queue(uint32_t queue_index ODP_UNUSED)

> +{

> +       return 0;

> +}

> +

>  static int schedule_num_grps(void)

>  {

>         return NUM_SCHED_GRPS;

> @@ -987,6 +992,7 @@ const schedule_fn_t schedule_default_fn = {

>         .init_queue = schedule_init_queue,

>         .destroy_queue = schedule_destroy_queue,

>         .sched_queue = schedule_sched_queue,

> +       .unsched_queue = schedule_unsched_queue,

>         .ord_enq_multi = schedule_ordered_queue_enq_multi,

>         .init_global = schedule_init_global,

>         .term_global = schedule_term_global,

> diff --git a/platform/linux-generic/odp_schedule_if.c b/platform/linux-generic/odp_schedule_if.c

> index daf6c98..a9ede98 100644

> --- a/platform/linux-generic/odp_schedule_if.c

> +++ b/platform/linux-generic/odp_schedule_if.c

> @@ -12,9 +12,15 @@ extern const schedule_api_t schedule_sp_api;

>  extern const schedule_fn_t schedule_default_fn;

>  extern const schedule_api_t schedule_default_api;

>

> +extern const schedule_fn_t schedule_iquery_fn;

> +extern const schedule_api_t schedule_iquery_api;

> +

>  #ifdef ODP_SCHEDULE_SP

>  const schedule_fn_t *sched_fn   = &schedule_sp_fn;

>  const schedule_api_t *sched_api = &schedule_sp_api;

> +#elif defined(ODP_SCHEDULE_IQUERY)

> +const schedule_fn_t *sched_fn   = &schedule_iquery_fn;

> +const schedule_api_t *sched_api = &schedule_iquery_api;

>  #else

>  const schedule_fn_t  *sched_fn  = &schedule_default_fn;

>  const schedule_api_t *sched_api = &schedule_default_api;

> diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c

> new file mode 100644

> index 0000000..f232e23

> --- /dev/null

> +++ b/platform/linux-generic/odp_schedule_iquery.c

> @@ -0,0 +1,1306 @@

> +/* Copyright (c) 2016, Linaro Limited

> + * All rights reserved.

> + *

> + * SPDX-License-Identifier:     BSD-3-Clause

> + */

> +

> +#include <odp/api/schedule.h>

> +#include <odp_schedule_if.h>

> +#include <odp/api/align.h>

> +#include <odp/api/queue.h>

> +#include <odp/api/shared_memory.h>

> +#include <odp_internal.h>

> +#include <odp_debug_internal.h>

> +#include <odp_ring_internal.h>

> +#include <odp_bitmap_internal.h>

> +#include <odp/api/thread.h>

> +#include <odp/api/time.h>

> +#include <odp/api/rwlock.h>

> +#include <odp/api/hints.h>

> +#include <odp/api/cpu.h>

> +#include <odp/api/thrmask.h>

> +#include <odp_config_internal.h>

> +#include <odp_schedule_internal.h>

> +#include <odp_schedule_ordered_internal.h>

> +

> +/* Number of priority levels */

> +#define NUM_SCHED_PRIO 8

> +

> +ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (NUM_SCHED_PRIO - 1),

> +                 "lowest_prio_does_not_match_with_num_prios");

> +

> +ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&

> +                 (ODP_SCHED_PRIO_NORMAL < (NUM_SCHED_PRIO - 1)),

> +                 "normal_prio_is_not_between_highest_and_lowest");

> +

> +/* Number of scheduling groups */

> +#define NUM_SCHED_GRPS 256

> +

> +/* Start of named groups in group mask arrays */

> +#define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1)

> +

> +/* Instantiate a WAPL bitmap to be used as queue index bitmap */

> +typedef WAPL_BITMAP(ODP_CONFIG_QUEUES) queue_index_bitmap_t;

> +

> +typedef struct {

> +       odp_rwlock_t lock;

> +       queue_index_bitmap_t queues; /* queues in this priority level */

> +} sched_prio_t;

> +

> +typedef struct {

> +       odp_rwlock_t lock;

> +       bool allocated;

> +       odp_thrmask_t threads; /* threads subscribe to this group */

> +       queue_index_bitmap_t queues; /* queues in this group */

> +       char name[ODP_SCHED_GROUP_NAME_LEN];

> +} sched_group_t;

> +

> +/* Packet input poll command queues */

> +#define PKTIO_CMD_QUEUES 4

> +

> +/* Maximum number of packet input queues per command */

> +#define MAX_PKTIN 16

> +

> +/* Maximum number of packet IO interfaces */

> +#define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES

> +

> +/* Maximum number of pktio poll commands */

> +#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO)

> +

> +/* Pktio command is free */

> +#define PKTIO_CMD_FREE ((uint32_t)-1)

> +

> +/* Packet IO poll queue ring size. In worst case, all pktios

> + * have all pktins enabled and one poll command is created per

> + * pktin queue. The ring size must be larger than or equal to

> + * NUM_PKTIO_CMD / PKTIO_CMD_QUEUES, so that it can hold all

> + * poll commands in the worst case.

> + */

> +#define PKTIO_RING_SIZE (NUM_PKTIO_CMD / PKTIO_CMD_QUEUES)

> +

> +/* Mask for wrapping around pktio poll command index */

> +#define PKTIO_RING_MASK (PKTIO_RING_SIZE - 1)

> +

> +/* Instantiate a RING data structure as pktio command queue */

> +typedef struct {

> +       /* Ring header */

> +       ring_t ring;

> +

> +       /* Ring data: pktio poll command indexes */

> +       uint32_t cmd_index[PKTIO_RING_SIZE];

> +} pktio_cmd_queue_t ODP_ALIGNED_CACHE;

> +

> +/* Packet IO poll command */

> +typedef struct {

> +       int pktio;

> +       int count;

> +       int pktin[MAX_PKTIN];

> +       uint32_t index;

> +} pktio_cmd_t;

> +

> +/* Collect the pktio poll resources */

> +typedef struct {

> +       odp_rwlock_t lock;

> +       /* count active commands per pktio interface */

> +       int actives[NUM_PKTIO];

> +       pktio_cmd_t commands[NUM_PKTIO_CMD];

> +       pktio_cmd_queue_t queues[PKTIO_CMD_QUEUES];

> +} pktio_poll_t;

> +

> +/* Forward declaration */

> +typedef struct sched_thread_local sched_thread_local_t;

> +

> +typedef struct {

> +       odp_shm_t selfie;

> +

> +       /* Schedule priorities */

> +       sched_prio_t prios[NUM_SCHED_PRIO];

> +

> +       /* Schedule groups */

> +       sched_group_t groups[NUM_SCHED_GRPS];

> +

> +       /* Cache queue parameters for easy reference */

> +       odp_schedule_param_t queues[ODP_CONFIG_QUEUES];

> +

> +       /* Poll pktio inputs in spare time */

> +       pktio_poll_t pktio_poll;

> +

> +       /* Queues send or unwind their availability indications

> +        * for scheduling, the bool value also serves as a focal

> +        * point for atomic competition. */

> +       bool availables[ODP_CONFIG_QUEUES];

> +

> +       /* Quick reference to per thread context */

> +       sched_thread_local_t *threads[ODP_THREAD_COUNT_MAX];

> +} sched_global_t;

> +

> +/* Per thread events cache */

> +typedef struct {

> +       int count;

> +       odp_queue_t queue;

> +       odp_event_t stash[MAX_DEQ], *top;

> +} event_cache_t;

> +

> +/* Instantiate a sparse bitmap to store thread's interested

> + * queue indexes per priority.

> + */

> +typedef SPARSE_BITMAP(ODP_CONFIG_QUEUES) queue_index_sparse_t;

> +

> +typedef struct sched_thread_local {

> +       int thread;

> +       bool pause;

> +

> +       /* Cache events only for atomic queue */

> +       event_cache_t cache;

> +

> +       /* Saved atomic context */

> +       bool *atomic;

> +

> +       /* Record the pktio polls have done */

> +       uint16_t pktin_polls;

> +

> +       /* Interested queue indexes to be checked by thread

> +        * at each priority level for scheduling, and a round

> +        * robin iterator to improve fairness between queues

> +        * in the same priority level.

> +        */

> +       odp_rwlock_t lock;

> +       queue_index_sparse_t indexes[NUM_SCHED_PRIO];

> +       sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO];

> +} sched_thread_local_t;

> +

> +/* Global scheduler context */

> +static sched_global_t *sched;

> +

> +/* Thread local scheduler context */

> +__thread sched_thread_local_t thread_local;

> +

> +static int schedule_init_global(void)

> +{

> +       odp_shm_t shm;

> +       int i, k, prio, group;

> +

> +       ODP_DBG("Schedule[iquery] init ... ");

> +

> +       shm = odp_shm_reserve("odp_scheduler_iquery",

> +                             sizeof(sched_global_t),

> +                             ODP_CACHE_LINE_SIZE, 0);

> +

> +       sched = odp_shm_addr(shm);

> +

> +       if (sched == NULL) {

> +               ODP_ERR("Schedule[iquery] "

> +                       "init: shm reserve.\n");

> +               return -1;

> +       }

> +

> +       memset(sched, 0, sizeof(sched_global_t));

> +

> +       sched->selfie = shm;

> +

> +       for (prio = 0; prio < NUM_SCHED_PRIO; prio++)

> +               odp_rwlock_init(&sched->prios[prio].lock);

> +

> +       for (group = 0; group < NUM_SCHED_GRPS; group++) {

> +               sched->groups[group].allocated = false;

> +               odp_rwlock_init(&sched->groups[group].lock);

> +       }

> +

> +       odp_rwlock_init(&sched->pktio_poll.lock);

> +

> +       for (i = 0; i < PKTIO_CMD_QUEUES; i++) {

> +               pktio_cmd_queue_t *queue =

> +                       &sched->pktio_poll.queues[i];

> +

> +               ring_init(&queue->ring);

> +

> +               for (k = 0; k < PKTIO_RING_SIZE; k++)

> +                       queue->cmd_index[k] = RING_EMPTY;

> +       }

> +

> +       for (i = 0; i < NUM_PKTIO_CMD; i++)

> +               sched->pktio_poll.commands[i].index = PKTIO_CMD_FREE;

> +

> +       ODP_DBG("done\n");

> +       return 0;

> +}

> +

> +static int schedule_term_global(void)

> +{

> +       uint32_t i;

> +       odp_shm_t shm = sched->selfie;

> +

> +       for (i = 0; i < ODP_CONFIG_QUEUES; i++) {

> +               int count = 0;

> +               odp_event_t events[1];

> +

> +               if (sched->availables[i])

> +                       count = sched_cb_queue_deq_multi(i, events, 1);

> +

> +               if (count < 0)

> +                       sched_cb_queue_destroy_finalize(i);

> +               else if (count > 0)

> +                       ODP_ERR("Queue (%d) not empty\n", i);

> +       }

> +

> +       memset(sched, 0, sizeof(sched_global_t));

> +

> +       if (odp_shm_free(shm) < 0) {

> +               ODP_ERR("Schedule[iquery] "

> +                       "term: shm release.\n");

> +               return -1;

> +       }

> +       return 0;

> +}

> +

> +/*

> + * These APIs are used to manipulate thread's interests.

> + */

> +static void thread_set_interest(sched_thread_local_t *thread,

> +       unsigned int queue_index, int prio);

> +

> +static void thread_clear_interest(sched_thread_local_t *thread,

> +       unsigned int queue_index, int prio);

> +

> +static void thread_set_interests(sched_thread_local_t *thread,

> +       queue_index_bitmap_t *set);

> +

> +static void thread_clear_interests(sched_thread_local_t *thread,

> +       queue_index_bitmap_t *clear);

> +

> +static void sched_thread_local_reset(void)

> +{

> +       int prio;

> +       queue_index_sparse_t *index;

> +       sparse_bitmap_iterator_t *iterator;

> +

> +       memset(&sched_local, 0, sizeof(sched_local_t));

> +       memset(&thread_local, 0, sizeof(sched_thread_local_t));

> +

> +       thread_local.thread = odp_thread_id();

> +       thread_local.cache.queue = ODP_QUEUE_INVALID;

> +

> +       odp_rwlock_init(&thread_local.lock);

> +

> +       for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {

> +               index = &thread_local.indexes[prio];

> +               iterator = &thread_local.iterators[prio];

> +

> +               sparse_bitmap_zero(index);

> +               sparse_bitmap_iterator(iterator, index);

> +       }

> +}

> +

> +static int schedule_init_local(void)

> +{

> +       int group;

> +       sched_group_t *G;

> +       queue_index_bitmap_t collect;

> +

> +       wapl_bitmap_zero(&collect);

> +       sched_thread_local_reset();

> +

> +       /* Collect all queue indexes of the schedule groups

> +        * which this thread has subscribed

> +        */

> +       for (group = 0; group < NUM_SCHED_GRPS; group++) {

> +               G = &sched->groups[group];

> +               odp_rwlock_read_lock(&G->lock);

> +

> +               if ((group < SCHED_GROUP_NAMED || G->allocated == true) &&

> +                   odp_thrmask_isset(&G->threads, thread_local.thread))

> +                       wapl_bitmap_or(&collect, &collect, &G->queues);

> +

> +               odp_rwlock_read_unlock(&G->lock);

> +       }

> +

> +       /* Distribute the above collected queue indexes into

> +        * thread local interests per priority level.

> +        */

> +       thread_set_interests(&thread_local, &collect);

> +

> +       /* "Night gathers, and now my watch begins..." */

> +       sched->threads[thread_local.thread] = &thread_local;

> +       return 0;

> +}

> +

> +static inline void schedule_release_context(void);

> +

> +static int schedule_term_local(void)

> +{

> +       int group;

> +       sched_group_t *G;

> +

> +       if (thread_local.cache.count) {

> +               ODP_ERR("Locally pre-scheduled events exist.\n");

> +               return -1;

> +       }

> +

> +       schedule_release_context();

> +

> +       /* Unsubscribe all named schedule groups */

> +       for (group = SCHED_GROUP_NAMED;

> +               group < NUM_SCHED_GRPS; group++) {

> +               G = &sched->groups[group];

> +               odp_rwlock_write_lock(&G->lock);

> +

> +               if (G->allocated == true && odp_thrmask_isset(

> +                       &G->threads, thread_local.thread))

> +                       odp_thrmask_clr(&G->threads, thread_local.thread);

> +

> +               odp_rwlock_write_unlock(&G->lock);

> +       }

> +

> +       /* "...for this night and all the nights to come." */

> +       sched->threads[thread_local.thread] = NULL;

> +       sched_thread_local_reset();

> +       return 0;

> +}

> +

> +static int init_sched_queue(uint32_t queue_index,

> +                           const odp_schedule_param_t *sched_param)

> +{

> +       int prio, group, thread;

> +       sched_prio_t *P;

> +       sched_group_t *G;

> +       sched_thread_local_t *local;

> +

> +       prio = sched_param->prio;

> +       group = sched_param->group;

> +

> +       G = &sched->groups[group];

> +       odp_rwlock_write_lock(&G->lock);

> +

> +       /* Named schedule group must be created prior

> +        * to queue creation to this group.

> +        */

> +       if (group >= SCHED_GROUP_NAMED && G->allocated == false) {

> +               odp_rwlock_write_unlock(&G->lock);

> +               return -1;

> +       }

> +

> +       /* Record the queue in its priority level globally */

> +       P = &sched->prios[prio];

> +

> +       odp_rwlock_write_lock(&P->lock);

> +       wapl_bitmap_set(&P->queues, queue_index);

> +       odp_rwlock_write_unlock(&P->lock);

> +

> +       /* Record the queue in its schedule group */

> +       wapl_bitmap_set(&G->queues, queue_index);

> +

> +       /* Cache queue parameters for easy reference */

> +       memcpy(&sched->queues[queue_index],

> +               sched_param, sizeof(odp_schedule_param_t));

> +

> +       /* Update all threads in this schedule group to

> +        * start check this queue index upon scheduling.

> +        */

> +       thread = odp_thrmask_first(&G->threads);

> +       while (thread >= 0) {

> +               local = sched->threads[thread];

> +               thread_set_interest(local, queue_index, prio);

> +               thread = odp_thrmask_next(&G->threads, thread);

> +       }

> +

> +       odp_rwlock_write_unlock(&G->lock);

> +       return 0;

> +}

> +

> +/*

> + * Must be called with schedule group's rwlock held.

> + * This is also being used in destroy_schedule_group()

> + * to destroy all orphan queues while destroying a whole

> + * schedule group.

> + */

> +static void __destroy_sched_queue(

> +       sched_group_t *G, uint32_t queue_index)

> +{

> +       int prio, thread;

> +       sched_prio_t *P;

> +       sched_thread_local_t *local;

> +

> +       prio = sched->queues[queue_index].prio;

> +

> +       /* Forget the queue in its schedule group */

> +       wapl_bitmap_clear(&G->queues, queue_index);

> +

> +       /* Forget queue schedule parameters */

> +       memset(&sched->queues[queue_index],

> +               0, sizeof(odp_schedule_param_t));

> +

> +       /* Update all threads in this schedule group to

> +        * stop check this queue index upon scheduling.

> +        */

> +       thread = odp_thrmask_first(&G->threads);

> +       while (thread >= 0) {

> +               local = sched->threads[thread];

> +               thread_clear_interest(local, queue_index, prio);

> +               thread = odp_thrmask_next(&G->threads, thread);

> +       }

> +

> +       /* Forget the queue in its priority level globally */

> +       P = &sched->prios[prio];

> +

> +       odp_rwlock_write_lock(&P->lock);

> +       wapl_bitmap_clear(&P->queues, queue_index);

> +       odp_rwlock_write_unlock(&P->lock);

> +}

> +

> +static void destroy_sched_queue(uint32_t queue_index)

> +{

> +       int group;

> +       sched_group_t *G;

> +

> +       group = sched->queues[queue_index].group;

> +

> +       G = &sched->groups[group];

> +       odp_rwlock_write_lock(&G->lock);

> +

> +       /* Named schedule group could have been destroyed

> +        * earlier and left these orphan queues.

> +        */

> +       if (group >= SCHED_GROUP_NAMED && G->allocated == false) {

> +               odp_rwlock_write_unlock(&G->lock);

> +               return;

> +       }

> +

> +       __destroy_sched_queue(G, queue_index);

> +       odp_rwlock_write_unlock(&G->lock);

> +}

> +

> +static int pktio_cmd_queue_hash(int pktio, int pktin)

> +{

> +       return (pktio ^ pktin) % PKTIO_CMD_QUEUES;

> +}

> +

> +static inline pktio_cmd_t *alloc_pktio_cmd(void)

> +{

> +       int i;

> +       pktio_cmd_t *cmd = NULL;

> +

> +       odp_rwlock_write_lock(&sched->pktio_poll.lock);

> +

> +       /* Find next free command */

> +       for (i = 0; i < NUM_PKTIO_CMD; i++) {

> +               if (sched->pktio_poll.commands[i].index

> +                               == PKTIO_CMD_FREE) {

> +                       cmd = &sched->pktio_poll.commands[i];

> +                       cmd->index = i;

> +                       break;

> +               }

> +       }

> +

> +       odp_rwlock_write_unlock(&sched->pktio_poll.lock);

> +       return cmd;

> +}

> +

> +static inline void free_pktio_cmd(pktio_cmd_t *cmd)

> +{

> +       odp_rwlock_write_lock(&sched->pktio_poll.lock);

> +

> +       cmd->index = PKTIO_CMD_FREE;

> +

> +       odp_rwlock_write_unlock(&sched->pktio_poll.lock);

> +}

> +

> +static void schedule_pktio_start(int pktio, int count, int pktin[])

> +{

> +       int i, index;

> +       pktio_cmd_t *cmd;

> +

> +       if (count > MAX_PKTIN)

> +               ODP_ABORT("Too many input queues for scheduler\n");

> +

> +       /* Record the active commands count per pktio interface */

> +       sched->pktio_poll.actives[pktio] = count;

> +

> +       /* Create a pktio poll command per pktin */

> +       for (i = 0; i < count; i++) {

> +

> +               cmd = alloc_pktio_cmd();

> +

> +               if (cmd == NULL)

> +                       ODP_ABORT("Scheduler out of pktio commands\n");

> +

> +               index = pktio_cmd_queue_hash(pktio, pktin[i]);

> +

> +               cmd->pktio = pktio;

> +               cmd->count = 1;

> +               cmd->pktin[0] = pktin[i];

> +               ring_enq(&sched->pktio_poll.queues[index].ring,

> +                       PKTIO_RING_MASK, cmd->index);

> +       }

> +}

> +

> +static int schedule_pktio_stop(int pktio, int pktin ODP_UNUSED)

> +{

> +       int remains;

> +

> +       odp_rwlock_write_lock(&sched->pktio_poll.lock);

> +

> +       sched->pktio_poll.actives[pktio]--;

> +       remains = sched->pktio_poll.actives[pktio];

> +

> +       odp_rwlock_write_unlock(&sched->pktio_poll.lock);

> +       return remains;

> +}

> +

> +#define DO_SCHED_LOCK() odp_rwlock_read_lock(&thread_local.lock)

> +#define DO_SCHED_UNLOCK() odp_rwlock_read_unlock(&thread_local.lock)

> +

> +static inline bool do_schedule_prio(int prio);

> +

> +static inline int pop_cache_events(odp_event_t ev[], unsigned int max)

> +{

> +       int k = 0;

> +       event_cache_t *cache;

> +

> +       cache = &thread_local.cache;

> +       while (cache->count && max) {

> +               ev[k] = *cache->top++;

> +               k++;

> +               max--;

> +               cache->count--;

> +       }

> +

> +       return k;

> +}

> +

> +static inline void assign_queue_handle(odp_queue_t *handle)

> +{

> +       if (handle)

> +               *handle = thread_local.cache.queue;

> +}

> +

> +static inline void pktio_poll_input(void)

> +{

> +       int i, hash;

> +       uint32_t index;

> +

> +       ring_t *ring;

> +       pktio_cmd_t *cmd;

> +

> +       /*

> +        * Each thread starts the search for a poll command

> +        * from the hash(threadID) queue to mitigate contentions.

> +        * If the queue is empty, it moves to other queues.

> +        *

> +        * Most of the times, the search stops on the first

> +        * command found to optimize multi-threaded performance.

> +        * A small portion of polls have to do full iteration to

> +        * avoid packet input starvation when there are less

> +        * threads than command queues.

> +        */

> +       hash = thread_local.thread % PKTIO_CMD_QUEUES;

> +

> +       for (i = 0; i < PKTIO_CMD_QUEUES; i++,

> +               hash = (hash + 1) % PKTIO_CMD_QUEUES) {

> +

> +               ring = &sched->pktio_poll.queues[hash].ring;

> +               index = ring_deq(ring, PKTIO_RING_MASK);

> +

> +               if (odp_unlikely(index == RING_EMPTY))

> +                       continue;

> +

> +               cmd = &sched->pktio_poll.commands[index];

> +

> +               /* Poll packet input */

> +               if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio,

> +                                                    cmd->count,

> +                                                    cmd->pktin))) {

> +                       /* Pktio stopped or closed. Remove poll

> +                        * command and call stop_finalize when all

> +                        * commands of the pktio has been removed.

> +                        */

> +                       if (schedule_pktio_stop(cmd->pktio,

> +                                               cmd->pktin[0]) == 0)

> +                               sched_cb_pktio_stop_finalize(cmd->pktio);

> +

> +                       free_pktio_cmd(cmd);

> +               } else {

> +                       /* Continue scheduling the pktio */

> +                       ring_enq(ring, PKTIO_RING_MASK, index);

> +

> +                       /* Do not iterate through all pktin poll

> +                        * command queues every time.

> +                        */

> +                       if (odp_likely(thread_local.pktin_polls & 0xF))

> +                               break;

> +               }

> +       }

> +

> +       thread_local.pktin_polls++;

> +}

> +

> +/*

> + * Schedule queues

> + */

> +static int do_schedule(odp_queue_t *out_queue,

> +       odp_event_t out_ev[], unsigned int max_num)

> +{

> +       int prio, count;

> +

> +       /* Consume locally cached events */

> +       count = pop_cache_events(out_ev, max_num);

> +       if (count > 0) {

> +               assign_queue_handle(out_queue);

> +               return count;

> +       }

> +

> +       schedule_release_context();

> +

> +       if (odp_unlikely(thread_local.pause))

> +               return count;

> +

> +       DO_SCHED_LOCK();

> +       /* Schedule events */

> +       for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {

> +               /* Round robin iterate the interested queue

> +                * indexes in this priority level to compete

> +                * and consume available queues

> +                */

> +               if (!do_schedule_prio(prio))

> +                       continue;

> +

> +               count = pop_cache_events(out_ev, max_num);

> +               assign_queue_handle(out_queue);

> +               DO_SCHED_UNLOCK();

> +               return count;

> +       }

> +

> +       DO_SCHED_UNLOCK();

> +

> +       /* Poll packet input when there are no events */

> +       pktio_poll_input();

> +       return 0;

> +}

> +

> +static int schedule_loop(odp_queue_t *out_queue, uint64_t wait,

> +                        odp_event_t out_ev[], unsigned int max_num)

> +{

> +       int count, first = 1;

> +       odp_time_t next, wtime;

> +

> +       while (1) {

> +               count = do_schedule(out_queue, out_ev, max_num);

> +

> +               if (count)

> +                       break;

> +

> +               if (wait == ODP_SCHED_WAIT)

> +                       continue;

> +

> +               if (wait == ODP_SCHED_NO_WAIT)

> +                       break;

> +

> +               if (first) {

> +                       wtime = odp_time_local_from_ns(wait);

> +                       next = odp_time_sum(odp_time_local(), wtime);

> +                       first = 0;

> +                       continue;

> +               }

> +

> +               if (odp_time_cmp(next, odp_time_local()) < 0)

> +                       break;

> +       }

> +

> +       return count;

> +}

> +

> +static odp_event_t schedule(odp_queue_t *out_queue, uint64_t wait)

> +{

> +       odp_event_t ev;

> +

> +       ev = ODP_EVENT_INVALID;

> +

> +       schedule_loop(out_queue, wait, &ev, 1);

> +

> +       return ev;

> +}

> +

> +static int schedule_multi(odp_queue_t *out_queue, uint64_t wait,

> +                         odp_event_t events[], int num)

> +{

> +       return schedule_loop(out_queue, wait, events, num);

> +}

> +

> +static void schedule_pause(void)

> +{

> +       thread_local.pause = 1;

> +}

> +

> +static void schedule_resume(void)

> +{

> +       thread_local.pause = 0;

> +}

> +

> +static uint64_t schedule_wait_time(uint64_t ns)

> +{

> +       return ns;

> +}

> +

> +static int number_of_priorites(void)

> +{

> +       return NUM_SCHED_PRIO;

> +}

> +

> +/*

> + * Create a named schedule group with pre-defined

> + * set of subscription threads.

> + *

> + * Sched queues belonging to this group must be

> + * created after the group creation. Upon creation

> + * the group holds 0 sched queues.

> + */

> +static odp_schedule_group_t schedule_group_create(

> +       const char *name, const odp_thrmask_t *mask)

> +{

> +       int group;

> +       sched_group_t *G;

> +

> +       for (group = SCHED_GROUP_NAMED;

> +               group < NUM_SCHED_GRPS; group++) {

> +               G = &sched->groups[group];

> +

> +               odp_rwlock_write_lock(&G->lock);

> +               if (G->allocated == false) {

> +                       strncpy(G->name, name ? name : "",

> +                               ODP_SCHED_GROUP_NAME_LEN - 1);

> +                       odp_thrmask_copy(&G->threads, mask);

> +                       wapl_bitmap_zero(&G->queues);

> +

> +                       G->allocated = true;

> +                       odp_rwlock_write_unlock(&G->lock);

> +                       return (odp_schedule_group_t)group;

> +               }

> +               odp_rwlock_write_unlock(&G->lock);

> +       }

> +

> +       return ODP_SCHED_GROUP_INVALID;

> +}

> +

> +static inline void __destroy_group_queues(sched_group_t *group)

> +{

> +       unsigned int index;

> +       wapl_bitmap_iterator_t it;

> +

> +       /* Constructor */

> +       wapl_bitmap_iterator(&it, &group->queues);

> +

> +       /* Walk through the queue index bitmap */

> +       for (it.start(&it); it.has_next(&it);) {

> +               index = it.next(&it);

> +               __destroy_sched_queue(group, index);

> +       }

> +}

> +

> +/*

> + * Destroy a named schedule group.

> + */

> +static int schedule_group_destroy(odp_schedule_group_t group)

> +{

> +       int done = -1;

> +       sched_group_t *G;

> +

> +       if (group < SCHED_GROUP_NAMED ||

> +           group >= NUM_SCHED_GRPS)

> +               return -1;

> +

> +       G = &sched->groups[group];

> +       odp_rwlock_write_lock(&G->lock);

> +

> +       if (G->allocated == true) {

> +               /* Destroy all queues in this schedule group

> +                * and leave no orphan queues.

> +                */

> +               __destroy_group_queues(G);

> +

> +               done = 0;

> +               G->allocated = false;

> +               wapl_bitmap_zero(&G->queues);

> +               odp_thrmask_zero(&G->threads);

> +               memset(G->name, 0, ODP_SCHED_GROUP_NAME_LEN);

> +       }

> +

> +       odp_rwlock_write_unlock(&G->lock);

> +       return done;

> +}

> +

> +static odp_schedule_group_t schedule_group_lookup(const char *name)

> +{

> +       int group;

> +       sched_group_t *G;

> +

> +       for (group = SCHED_GROUP_NAMED;

> +            group < NUM_SCHED_GRPS; group++) {

> +               G = &sched->groups[group];

> +

> +               odp_rwlock_read_lock(&G->lock);

> +               if (strcmp(name, G->name) == 0) {

> +                       odp_rwlock_read_unlock(&G->lock);

> +                       return (odp_schedule_group_t)group;

> +               }

> +               odp_rwlock_read_unlock(&G->lock);

> +       }

> +

> +       return ODP_SCHED_GROUP_INVALID;

> +}

> +

> +static int schedule_group_join(odp_schedule_group_t group,

> +                              const odp_thrmask_t *mask)

> +{

> +       int done = -1, thread;

> +       sched_group_t *G;

> +       sched_thread_local_t *local;

> +

> +       /* Named schedule group only */

> +       if (group < SCHED_GROUP_NAMED ||

> +               group >= NUM_SCHED_GRPS)

> +               return done;

> +

> +       G = &sched->groups[group];

> +       odp_rwlock_write_lock(&G->lock);

> +

> +       if (G->allocated == true) {

> +               /* Make new joined threads to start check

> +                * queue indexes in this schedule group

> +                */

> +               thread = odp_thrmask_first(mask);

> +               while (thread >= 0) {

> +                       local = sched->threads[thread];

> +                       thread_set_interests(local, &G->queues);

> +

> +                       odp_thrmask_set(&G->threads, thread);

> +                       thread = odp_thrmask_next(mask, thread);

> +               }

> +               done = 0;

> +       }

> +

> +       odp_rwlock_write_unlock(&G->lock);

> +       return done;

> +}

> +

> +static int schedule_group_leave(odp_schedule_group_t group,

> +                               const odp_thrmask_t *mask)

> +{

> +       int done = -1, thread;

> +       sched_group_t *G;

> +       sched_thread_local_t *local;

> +

> +       /* Named schedule group only */

> +       if (group < SCHED_GROUP_NAMED ||

> +               group >= NUM_SCHED_GRPS)

> +               return done;

> +

> +       G = &sched->groups[group];

> +       odp_rwlock_write_lock(&G->lock);

> +

> +       if (G->allocated == true) {

> +               /* Make leaving threads to stop check

> +                * queue indexes in this schedule group

> +                */

> +               thread = odp_thrmask_first(mask);

> +               while (thread >= 0) {

> +                       local = sched->threads[thread];

> +                       thread_clear_interests(local, &G->queues);

> +

> +                       odp_thrmask_clr(&G->threads, thread);

> +                       thread = odp_thrmask_next(mask, thread);

> +               }

> +               done = 0;

> +       }

> +

> +       odp_rwlock_write_unlock(&G->lock);

> +       return done;

> +}

> +

> +static int schedule_group_thrmask(odp_schedule_group_t group,

> +                                 odp_thrmask_t *thrmask)

> +{

> +       int done = -1;

> +       sched_group_t *G;

> +

> +       /* Named schedule group only */

> +       if (group < SCHED_GROUP_NAMED ||

> +               group >= NUM_SCHED_GRPS)

> +               return done;

> +

> +       G = &sched->groups[group];

> +       odp_rwlock_read_lock(&G->lock);

> +

> +       if (G->allocated == true && thrmask != NULL) {

> +               done = 0;

> +               odp_thrmask_copy(thrmask, &G->threads);

> +       }

> +

> +       odp_rwlock_read_unlock(&G->lock);

> +       return done;

> +}

> +

> +static int schedule_group_info(odp_schedule_group_t group,

> +                              odp_schedule_group_info_t *info)

> +{

> +       int done = -1;

> +       sched_group_t *G;

> +

> +       /* Named schedule group only */

> +       if (group < SCHED_GROUP_NAMED ||

> +               group >= NUM_SCHED_GRPS)

> +               return done;

> +

> +       G = &sched->groups[group];

> +       odp_rwlock_read_lock(&G->lock);

> +

> +       if (G->allocated == true && info != NULL) {

> +               done = 0;

> +               info->name = G->name;

> +               odp_thrmask_copy(&info->thrmask, &G->threads);

> +       }

> +

> +       odp_rwlock_read_unlock(&G->lock);

> +       return done;

> +}

> +

> +/* This function is a no-op */

> +static void schedule_prefetch(int num ODP_UNUSED)

> +{

> +}

> +

> +/*

> + * Limited to join and leave pre-defined schedule groups

> + * before and after thread local initialization or termination.

> + */

> +static int group_add_thread(odp_schedule_group_t group, int thread)

> +{

> +       sched_group_t *G;

> +

> +       if (group < 0 || group >= SCHED_GROUP_NAMED)

> +               return -1;

> +

> +       G = &sched->groups[group];

> +

> +       odp_rwlock_write_lock(&G->lock);

> +       odp_thrmask_set(&G->threads, thread);

> +       odp_rwlock_write_unlock(&G->lock);

> +       return 0;

> +}

> +

> +static int group_remove_thread(odp_schedule_group_t group, int thread)

> +{

> +       sched_group_t *G;

> +

> +       if (group < 0 || group >= SCHED_GROUP_NAMED)

> +               return -1;

> +

> +       G = &sched->groups[group];

> +

> +       odp_rwlock_write_lock(&G->lock);

> +       odp_thrmask_clr(&G->threads, thread);

> +       odp_rwlock_write_unlock(&G->lock);

> +       return 0;

> +}

> +

> +static int schedule_sched_queue(uint32_t queue_index)

> +{

> +       sched_local.ignore_ordered_context = 1;

> +

> +       /* Set available indications globally */

> +       sched->availables[queue_index] = true;

> +       return 0;

> +}

> +

> +static int schedule_unsched_queue(uint32_t queue_index)

> +{

> +       /* Clear available indications globally */

> +       sched->availables[queue_index] = false;

> +       return 0;

> +}

> +

> +static void schedule_release_atomic(void)

> +{

> +       unsigned int queue_index;

> +

> +       if ((thread_local.atomic != NULL) &&

> +               (thread_local.cache.count == 0)) {

> +               queue_index = thread_local.atomic - sched->availables;

> +               thread_local.atomic = NULL;

> +               sched->availables[queue_index] = true;

> +       }

> +}

> +

> +static void schedule_release_ordered(void)

> +{

> +       if (sched_local.origin_qe) {

> +               int rc = release_order(sched_local.origin_qe,

> +                                      sched_local.order,

> +                                      sched_local.pool,

> +                                      sched_local.enq_called);

> +               if (rc == 0)

> +                       sched_local.origin_qe = NULL;

> +       }

> +}

> +

> +static inline void schedule_release_context(void)

> +{

> +       if (sched_local.origin_qe != NULL) {

> +               release_order(sched_local.origin_qe, sched_local.order,

> +                             sched_local.pool, sched_local.enq_called);

> +               sched_local.origin_qe = NULL;

> +       } else {

> +               schedule_release_atomic();

> +       }

> +}

> +

> +static int number_of_groups(void)

> +{

> +       return NUM_SCHED_GRPS;

> +}

> +

> +/* Fill in scheduler interface */

> +const schedule_fn_t schedule_iquery_fn = {

> +       .pktio_start   = schedule_pktio_start,

> +       .thr_add       = group_add_thread,

> +       .thr_rem       = group_remove_thread,

> +       .num_grps      = number_of_groups,

> +       .init_queue    = init_sched_queue,

> +       .destroy_queue = destroy_sched_queue,

> +       .sched_queue   = schedule_sched_queue,

> +       .unsched_queue = schedule_unsched_queue,

> +       .ord_enq_multi = schedule_ordered_queue_enq_multi,

> +       .init_global   = schedule_init_global,

> +       .term_global   = schedule_term_global,

> +       .init_local    = schedule_init_local,

> +       .term_local    = schedule_term_local

> +};

> +

> +/* Fill in scheduler API calls */

> +const schedule_api_t schedule_iquery_api = {

> +       .schedule_wait_time       = schedule_wait_time,

> +       .schedule                 = schedule,

> +       .schedule_multi           = schedule_multi,

> +       .schedule_pause           = schedule_pause,

> +       .schedule_resume          = schedule_resume,

> +       .schedule_release_atomic  = schedule_release_atomic,

> +       .schedule_release_ordered = schedule_release_ordered,

> +       .schedule_prefetch        = schedule_prefetch,

> +       .schedule_num_prio        = number_of_priorites,

> +       .schedule_group_create    = schedule_group_create,

> +       .schedule_group_destroy   = schedule_group_destroy,

> +       .schedule_group_lookup    = schedule_group_lookup,

> +       .schedule_group_join      = schedule_group_join,

> +       .schedule_group_leave     = schedule_group_leave,

> +       .schedule_group_thrmask   = schedule_group_thrmask,

> +       .schedule_group_info      = schedule_group_info,

> +       .schedule_order_lock      = schedule_order_lock,

> +       .schedule_order_unlock    = schedule_order_unlock

> +};

> +

> +static void thread_set_interest(sched_thread_local_t *thread,

> +       unsigned int queue_index, int prio)

> +{

> +       queue_index_sparse_t *index;

> +

> +       if (thread == NULL)

> +               return;

> +

> +       if (prio >= NUM_SCHED_PRIO)

> +               return;

> +

> +       index = &thread->indexes[prio];

> +

> +       odp_rwlock_write_lock(&thread->lock);

> +       sparse_bitmap_set(index, queue_index);

> +       odp_rwlock_write_unlock(&thread->lock);

> +}

> +

> +static void thread_clear_interest(sched_thread_local_t *thread,

> +       unsigned int queue_index, int prio)

> +{

> +       queue_index_sparse_t *index;

> +

> +       if (thread == NULL)

> +               return;

> +

> +       if (prio >= NUM_SCHED_PRIO)

> +               return;

> +

> +       index = &thread->indexes[prio];

> +

> +       odp_rwlock_write_lock(&thread->lock);

> +       sparse_bitmap_clear(index, queue_index);

> +       odp_rwlock_write_unlock(&thread->lock);

> +}

> +

> +static void thread_set_interests(sched_thread_local_t *thread,

> +       queue_index_bitmap_t *set)

> +{

> +       int prio;

> +       sched_prio_t *P;

> +       unsigned int queue_index;

> +       queue_index_bitmap_t subset;

> +       wapl_bitmap_iterator_t it;

> +

> +       if (thread == NULL || set == NULL)

> +               return;

> +

> +       for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {

> +               P = &sched->prios[prio];

> +               odp_rwlock_read_lock(&P->lock);

> +

> +               /* The collection of queue indexes in 'set'

> +                * may belong to several priority levels.

> +                */

> +               wapl_bitmap_zero(&subset);

> +               wapl_bitmap_and(&subset, &P->queues, set);

> +

> +               odp_rwlock_read_unlock(&P->lock);

> +

> +               /* Add the subset to local indexes */

> +               wapl_bitmap_iterator(&it, &subset);

> +               for (it.start(&it); it.has_next(&it);) {

> +                       queue_index = it.next(&it);

> +                       thread_set_interest(thread, queue_index, prio);

> +               }

> +       }

> +}

> +

> +static void thread_clear_interests(sched_thread_local_t *thread,

> +       queue_index_bitmap_t *clear)

> +{

> +       int prio;

> +       sched_prio_t *P;

> +       unsigned int queue_index;

> +       queue_index_bitmap_t subset;

> +       wapl_bitmap_iterator_t it;

> +

> +       if (thread == NULL || clear == NULL)

> +               return;

> +

> +       for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {

> +               P = &sched->prios[prio];

> +               odp_rwlock_read_lock(&P->lock);

> +

> +               /* The collection of queue indexes in 'clear'

> +                * may belong to several priority levels.

> +                */

> +               wapl_bitmap_zero(&subset);

> +               wapl_bitmap_and(&subset, &P->queues, clear);

> +

> +               odp_rwlock_read_unlock(&P->lock);

> +

> +               /* Remove the subset from local indexes */

> +               wapl_bitmap_iterator(&it, &subset);

> +               for (it.start(&it); it.has_next(&it);) {

> +                       queue_index = it.next(&it);

> +                       thread_clear_interest(thread, queue_index, prio);

> +               }

> +       }

> +}

> +

> +static inline bool is_atomic_queue(unsigned int queue_index)

> +{

> +       return (sched->queues[queue_index].sync

> +                       == ODP_SCHED_SYNC_ATOMIC);

> +}

> +

> +static inline bool is_ordered_queue(unsigned int queue_index)

> +{

> +       return (sched->queues[queue_index].sync

> +                       == ODP_SCHED_SYNC_ORDERED);

> +}

> +

> +static inline bool compete_atomic_queue(unsigned int queue_index)

> +{

> +       bool expected = sched->availables[queue_index];

> +

> +       if (expected && is_atomic_queue(queue_index)) {

> +               expected = __atomic_compare_exchange_n(

> +                       &sched->availables[queue_index],

> +                       &expected, false, 0,

> +                       __ATOMIC_RELEASE, __ATOMIC_RELAXED);

> +       }

> +

> +       return expected;

> +}

> +

> +static inline void save_schedule_context(unsigned int queue_index)

> +{

> +       if (is_atomic_queue(queue_index))

> +               thread_local.atomic = &sched->availables[queue_index];

> +       else if (is_ordered_queue(queue_index))

> +               cache_order_info(queue_index, thread_local.cache.stash[0]);

> +}

> +

> +static inline int consume_queue(int prio, unsigned int queue_index)

> +{

> +       int count;

> +       unsigned int max = MAX_DEQ;

> +       event_cache_t *cache = &thread_local.cache;

> +

> +       /* Low priorities have smaller batch size to limit

> +        * head of line blocking latency.

> +        */

> +       if (odp_unlikely(prio > ODP_SCHED_PRIO_DEFAULT))

> +               max = MAX_DEQ / 2;

> +

> +       /* For ordered queues we want consecutive events to

> +        * be dispatched to separate threads, so do not cache

> +        * them locally.

> +        */

> +       if (is_ordered_queue(queue_index))

> +               max = 1;

> +

> +       count = sched_cb_queue_deq_multi(

> +               queue_index, cache->stash, max);

> +

> +       if (count < 0) {

> +               DO_SCHED_UNLOCK();

> +               sched_cb_queue_destroy_finalize(queue_index);

> +               DO_SCHED_LOCK();

> +               return 0;

> +       }

> +

> +       if (count == 0)

> +               return 0;

> +

> +       cache->top = &cache->stash[0];

> +       cache->count = count;

> +       cache->queue = sched_cb_queue_handle(queue_index);

> +       return count;

> +}

> +

> +static inline bool do_schedule_prio(int prio)

> +{

> +       int nbits, next, end;

> +       unsigned int queue_index;

> +       sparse_bitmap_iterator_t *it;

> +

> +       it = &thread_local.iterators[prio];

> +       nbits = (int) *(it->_base.last);

> +

> +       /* No interests at all! */

> +       if (nbits <= 0)

> +               return false;

> +

> +       /* In critical path, cannot afford iterator calls,

> +        * do it manually with internal knowledge

> +        */

> +       it->_start = (it->_start + 1) % nbits;

> +       end = it->_start + nbits;

> +

> +       for (next = it->_start; next < end; next++) {

> +               queue_index = it->_base.il[next % nbits];

> +

> +               if (!compete_atomic_queue(queue_index))

> +                       continue;

> +

> +               if (!consume_queue(prio, queue_index))

> +                       continue;

> +

> +               save_schedule_context(queue_index);

> +               return true;

> +       }

> +

> +       return false;

> +}

> diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c

> index 8b355da..b798e81 100644

> --- a/platform/linux-generic/odp_schedule_sp.c

> +++ b/platform/linux-generic/odp_schedule_sp.c

> @@ -298,6 +298,11 @@ static int sched_queue(uint32_t qi)

>         return 0;

>  }

>

> +static int unsched_queue(uint32_t qi ODP_UNUSED)

> +{

> +       return 0;

> +}

> +

>  static int ord_enq_multi(uint32_t queue_index, void *buf_hdr[], int num,

>                          int sustain, int *ret)

>  {

> @@ -669,6 +674,7 @@ const schedule_fn_t schedule_sp_fn = {

>         .init_queue    = init_queue,

>         .destroy_queue = destroy_queue,

>         .sched_queue   = sched_queue,

> +       .unsched_queue = unsched_queue,

>         .ord_enq_multi = ord_enq_multi,

>         .init_global   = init_global,

>         .term_global   = term_global,

> --

> 2.7.4

>
Yi He Nov. 29, 2016, 9:53 a.m. UTC | #2
Thanks Bill

I'll check this once back to office in Thursday.

Best Regards, Yi

On 29 November 2016 at 10:59, Bill Fischofer <bill.fischofer@linaro.org>
wrote:

> Attempting to compile this with clang I get:

>

> odp_schedule_iquery.c:170:3: error: redefinition of typedef

>       'sched_thread_local_t' is a C11 feature [-Werror,-Wtypedef-

> redefinition]

> } sched_thread_local_t;

>   ^

> odp_schedule_iquery.c:111:35: note: previous definition is here

> typedef struct sched_thread_local sched_thread_local_t;

>                                   ^

> 1 error generated.

>

> On Sun, Nov 27, 2016 at 11:46 PM, Yi He <yi.he@linaro.org> wrote:

> > Add this interests query (iquery) scheduler as an

> > alternate choice of ODP-linux scheduler component

> > for performance optimization especially in lower

> > queue counts use cases.

> >

> > It includes a new core algorithm, but adopted the

> > ring-based pktio poll algorithm from default scheduler,

> > and still uses the old ordered queue implementation.

> >

> > Signed-off-by: Yi He <yi.he@linaro.org>

> > ---

> >  platform/linux-generic/Makefile.am                 |    1 +

> >  .../linux-generic/include/odp_bitmap_internal.h    |    8 +-

> >  platform/linux-generic/include/odp_schedule_if.h   |    2 +

> >  platform/linux-generic/m4/odp_schedule.m4          |    7 +

> >  platform/linux-generic/odp_queue.c                 |   15 +-

> >  platform/linux-generic/odp_schedule.c              |    6 +

> >  platform/linux-generic/odp_schedule_if.c           |    6 +

> >  platform/linux-generic/odp_schedule_iquery.c       | 1306

> ++++++++++++++++++++

> >  platform/linux-generic/odp_schedule_sp.c           |    6 +

> >  9 files changed, 1354 insertions(+), 3 deletions(-)

> >  create mode 100644 platform/linux-generic/odp_schedule_iquery.c

> >

> > diff --git a/platform/linux-generic/Makefile.am

> b/platform/linux-generic/Makefile.am

> > index 0245e37..37f51fb 100644

> > --- a/platform/linux-generic/Makefile.am

> > +++ b/platform/linux-generic/Makefile.am

> > @@ -212,6 +212,7 @@ __LIB__libodp_linux_la_SOURCES = \

> >                            odp_schedule_if.c \

> >                            odp_schedule_ordered.c \

> >                            odp_schedule_sp.c \

> > +                          odp_schedule_iquery.c \

> >                            odp_shared_memory.c \

> >                            odp_sorted_list.c \

> >                            odp_spinlock.c \

> > diff --git a/platform/linux-generic/include/odp_bitmap_internal.h

> b/platform/linux-generic/include/odp_bitmap_internal.h

> > index 7e028fd..192c6f9 100644

> > --- a/platform/linux-generic/include/odp_bitmap_internal.h

> > +++ b/platform/linux-generic/include/odp_bitmap_internal.h

> > @@ -21,7 +21,13 @@ extern "C" {

> >  #include <stdbool.h>

> >  #include <string.h>

> >  #include <odp/api/hints.h>

> > -#include <odp_ring_internal.h> /* TOKENIZE and ARRAY_SIZE */

> > +

> > +/* Generate unique identifier for instantiated class */

> > +#define TOKENIZE(template, line) \

> > +       template ## _ ## line ## _ ## __COUNTER__

> > +

> > +/* Array size in general */

> > +#define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0]))

> >

> >  #define BITS_PER_BYTE  (8)

> >  #define BITS_PER_LONG  __WORDSIZE

> > diff --git a/platform/linux-generic/include/odp_schedule_if.h

> b/platform/linux-generic/include/odp_schedule_if.h

> > index df73e70..6fe2536 100644

> > --- a/platform/linux-generic/include/odp_schedule_if.h

> > +++ b/platform/linux-generic/include/odp_schedule_if.h

> > @@ -30,6 +30,7 @@ typedef int (*schedule_init_queue_fn_t)(uint32_t

> queue_index,

> >                                        );

> >  typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index);

> >  typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index);

> > +typedef int (*schedule_unsched_queue_fn_t)(uint32_t queue_index);

> >  typedef int (*schedule_ord_enq_multi_fn_t)(uint32_t queue_index,

> >                                            void *buf_hdr[], int num,

> >                                            int sustain, int *ret);

> > @@ -46,6 +47,7 @@ typedef struct schedule_fn_t {

> >         schedule_init_queue_fn_t    init_queue;

> >         schedule_destroy_queue_fn_t destroy_queue;

> >         schedule_sched_queue_fn_t   sched_queue;

> > +       schedule_unsched_queue_fn_t unsched_queue;

> >         schedule_ord_enq_multi_fn_t ord_enq_multi;

> >         schedule_init_global_fn_t   init_global;

> >         schedule_term_global_fn_t   term_global;

> > diff --git a/platform/linux-generic/m4/odp_schedule.m4

> b/platform/linux-generic/m4/odp_schedule.m4

> > index bc70c1f..2dcc9a7 100644

> > --- a/platform/linux-generic/m4/odp_schedule.m4

> > +++ b/platform/linux-generic/m4/odp_schedule.m4

> > @@ -4,3 +4,10 @@ AC_ARG_ENABLE([schedule-sp],

> >         schedule-sp=yes

> >         ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"

> >      fi])

> > +

> > +AC_ARG_ENABLE([schedule-iquery],

> > +    [  --enable-schedule-iquery    enable interests query (sparse

> bitmap) scheduler],

> > +    [if test x$enableval = xyes; then

> > +       schedule-iquery=yes

> > +       ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"

> > +    fi])

> > diff --git a/platform/linux-generic/odp_queue.c

> b/platform/linux-generic/odp_queue.c

> > index 43e212a..53a5def 100644

> > --- a/platform/linux-generic/odp_queue.c

> > +++ b/platform/linux-generic/odp_queue.c

> > @@ -381,7 +381,9 @@ odp_queue_t odp_queue_lookup(const char *name)

> >  static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t

> *buf_hdr[],

> >                             int num, int sustain)

> >  {

> > +#if !defined(ODP_SCHEDULE_IQUERY)

> >         int sched = 0;

> > +#endif

> >         int i, ret;

> >         odp_buffer_hdr_t *hdr, *tail, *next_hdr;

> >

> > @@ -442,14 +444,21 @@ static inline int enq_multi(queue_entry_t *queue,

> odp_buffer_hdr_t *buf_hdr[],

> >

> >         if (queue->s.status == QUEUE_STATUS_NOTSCHED) {

> >                 queue->s.status = QUEUE_STATUS_SCHED;

> > +#if !defined(ODP_SCHEDULE_IQUERY)

> >                 sched = 1; /* retval: schedule queue */

> > +#else

> > +               /* Add queue to scheduling */

> > +               if (sched_fn->sched_queue(queue->s.index))

> > +                       ODP_ABORT("schedule_queue failed\n");

> > +#endif

> >         }

> >         UNLOCK(&queue->s.lock);

> >

> > +#if !defined(ODP_SCHEDULE_IQUERY)

> >         /* Add queue to scheduling */

> >         if (sched && sched_fn->sched_queue(queue->s.index))

> >                 ODP_ABORT("schedule_queue failed\n");

> > -

> > +#endif

> >         return num; /* All events enqueued */

> >  }

> >

> > @@ -522,8 +531,10 @@ static inline int deq_multi(queue_entry_t *queue,

> odp_buffer_hdr_t *buf_hdr[],

> >

> >         if (hdr == NULL) {

> >                 /* Already empty queue */

> > -               if (queue->s.status == QUEUE_STATUS_SCHED)

> > +               if (queue->s.status == QUEUE_STATUS_SCHED) {

> >                         queue->s.status = QUEUE_STATUS_NOTSCHED;

> > +                       sched_fn->unsched_queue(queue->s.index);

> > +               }

> >

> >                 UNLOCK(&queue->s.lock);

> >                 return 0;

> > diff --git a/platform/linux-generic/odp_schedule.c

> b/platform/linux-generic/odp_schedule.c

> > index 1ef85ac..73c267e 100644

> > --- a/platform/linux-generic/odp_schedule.c

> > +++ b/platform/linux-generic/odp_schedule.c

> > @@ -973,6 +973,11 @@ static int schedule_sched_queue(uint32_t

> queue_index)

> >         return 0;

> >  }

> >

> > +static int schedule_unsched_queue(uint32_t queue_index ODP_UNUSED)

> > +{

> > +       return 0;

> > +}

> > +

> >  static int schedule_num_grps(void)

> >  {

> >         return NUM_SCHED_GRPS;

> > @@ -987,6 +992,7 @@ const schedule_fn_t schedule_default_fn = {

> >         .init_queue = schedule_init_queue,

> >         .destroy_queue = schedule_destroy_queue,

> >         .sched_queue = schedule_sched_queue,

> > +       .unsched_queue = schedule_unsched_queue,

> >         .ord_enq_multi = schedule_ordered_queue_enq_multi,

> >         .init_global = schedule_init_global,

> >         .term_global = schedule_term_global,

> > diff --git a/platform/linux-generic/odp_schedule_if.c

> b/platform/linux-generic/odp_schedule_if.c

> > index daf6c98..a9ede98 100644

> > --- a/platform/linux-generic/odp_schedule_if.c

> > +++ b/platform/linux-generic/odp_schedule_if.c

> > @@ -12,9 +12,15 @@ extern const schedule_api_t schedule_sp_api;

> >  extern const schedule_fn_t schedule_default_fn;

> >  extern const schedule_api_t schedule_default_api;

> >

> > +extern const schedule_fn_t schedule_iquery_fn;

> > +extern const schedule_api_t schedule_iquery_api;

> > +

> >  #ifdef ODP_SCHEDULE_SP

> >  const schedule_fn_t *sched_fn   = &schedule_sp_fn;

> >  const schedule_api_t *sched_api = &schedule_sp_api;

> > +#elif defined(ODP_SCHEDULE_IQUERY)

> > +const schedule_fn_t *sched_fn   = &schedule_iquery_fn;

> > +const schedule_api_t *sched_api = &schedule_iquery_api;

> >  #else

> >  const schedule_fn_t  *sched_fn  = &schedule_default_fn;

> >  const schedule_api_t *sched_api = &schedule_default_api;

> > diff --git a/platform/linux-generic/odp_schedule_iquery.c

> b/platform/linux-generic/odp_schedule_iquery.c

> > new file mode 100644

> > index 0000000..f232e23

> > --- /dev/null

> > +++ b/platform/linux-generic/odp_schedule_iquery.c

> > @@ -0,0 +1,1306 @@

> > +/* Copyright (c) 2016, Linaro Limited

> > + * All rights reserved.

> > + *

> > + * SPDX-License-Identifier:     BSD-3-Clause

> > + */

> > +

> > +#include <odp/api/schedule.h>

> > +#include <odp_schedule_if.h>

> > +#include <odp/api/align.h>

> > +#include <odp/api/queue.h>

> > +#include <odp/api/shared_memory.h>

> > +#include <odp_internal.h>

> > +#include <odp_debug_internal.h>

> > +#include <odp_ring_internal.h>

> > +#include <odp_bitmap_internal.h>

> > +#include <odp/api/thread.h>

> > +#include <odp/api/time.h>

> > +#include <odp/api/rwlock.h>

> > +#include <odp/api/hints.h>

> > +#include <odp/api/cpu.h>

> > +#include <odp/api/thrmask.h>

> > +#include <odp_config_internal.h>

> > +#include <odp_schedule_internal.h>

> > +#include <odp_schedule_ordered_internal.h>

> > +

> > +/* Number of priority levels */

> > +#define NUM_SCHED_PRIO 8

> > +

> > +ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (NUM_SCHED_PRIO - 1),

> > +                 "lowest_prio_does_not_match_with_num_prios");

> > +

> > +ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&

> > +                 (ODP_SCHED_PRIO_NORMAL < (NUM_SCHED_PRIO - 1)),

> > +                 "normal_prio_is_not_between_highest_and_lowest");

> > +

> > +/* Number of scheduling groups */

> > +#define NUM_SCHED_GRPS 256

> > +

> > +/* Start of named groups in group mask arrays */

> > +#define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1)

> > +

> > +/* Instantiate a WAPL bitmap to be used as queue index bitmap */

> > +typedef WAPL_BITMAP(ODP_CONFIG_QUEUES) queue_index_bitmap_t;

> > +

> > +typedef struct {

> > +       odp_rwlock_t lock;

> > +       queue_index_bitmap_t queues; /* queues in this priority level */

> > +} sched_prio_t;

> > +

> > +typedef struct {

> > +       odp_rwlock_t lock;

> > +       bool allocated;

> > +       odp_thrmask_t threads; /* threads subscribe to this group */

> > +       queue_index_bitmap_t queues; /* queues in this group */

> > +       char name[ODP_SCHED_GROUP_NAME_LEN];

> > +} sched_group_t;

> > +

> > +/* Packet input poll command queues */

> > +#define PKTIO_CMD_QUEUES 4

> > +

> > +/* Maximum number of packet input queues per command */

> > +#define MAX_PKTIN 16

> > +

> > +/* Maximum number of packet IO interfaces */

> > +#define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES

> > +

> > +/* Maximum number of pktio poll commands */

> > +#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO)

> > +

> > +/* Pktio command is free */

> > +#define PKTIO_CMD_FREE ((uint32_t)-1)

> > +

> > +/* Packet IO poll queue ring size. In worst case, all pktios

> > + * have all pktins enabled and one poll command is created per

> > + * pktin queue. The ring size must be larger than or equal to

> > + * NUM_PKTIO_CMD / PKTIO_CMD_QUEUES, so that it can hold all

> > + * poll commands in the worst case.

> > + */

> > +#define PKTIO_RING_SIZE (NUM_PKTIO_CMD / PKTIO_CMD_QUEUES)

> > +

> > +/* Mask for wrapping around pktio poll command index */

> > +#define PKTIO_RING_MASK (PKTIO_RING_SIZE - 1)

> > +

> > +/* Instantiate a RING data structure as pktio command queue */

> > +typedef struct {

> > +       /* Ring header */

> > +       ring_t ring;

> > +

> > +       /* Ring data: pktio poll command indexes */

> > +       uint32_t cmd_index[PKTIO_RING_SIZE];

> > +} pktio_cmd_queue_t ODP_ALIGNED_CACHE;

> > +

> > +/* Packet IO poll command */

> > +typedef struct {

> > +       int pktio;

> > +       int count;

> > +       int pktin[MAX_PKTIN];

> > +       uint32_t index;

> > +} pktio_cmd_t;

> > +

> > +/* Collect the pktio poll resources */

> > +typedef struct {

> > +       odp_rwlock_t lock;

> > +       /* count active commands per pktio interface */

> > +       int actives[NUM_PKTIO];

> > +       pktio_cmd_t commands[NUM_PKTIO_CMD];

> > +       pktio_cmd_queue_t queues[PKTIO_CMD_QUEUES];

> > +} pktio_poll_t;

> > +

> > +/* Forward declaration */

> > +typedef struct sched_thread_local sched_thread_local_t;

> > +

> > +typedef struct {

> > +       odp_shm_t selfie;

> > +

> > +       /* Schedule priorities */

> > +       sched_prio_t prios[NUM_SCHED_PRIO];

> > +

> > +       /* Schedule groups */

> > +       sched_group_t groups[NUM_SCHED_GRPS];

> > +

> > +       /* Cache queue parameters for easy reference */

> > +       odp_schedule_param_t queues[ODP_CONFIG_QUEUES];

> > +

> > +       /* Poll pktio inputs in spare time */

> > +       pktio_poll_t pktio_poll;

> > +

> > +       /* Queues send or unwind their availability indications

> > +        * for scheduling, the bool value also serves as a focal

> > +        * point for atomic competition. */

> > +       bool availables[ODP_CONFIG_QUEUES];

> > +

> > +       /* Quick reference to per thread context */

> > +       sched_thread_local_t *threads[ODP_THREAD_COUNT_MAX];

> > +} sched_global_t;

> > +

> > +/* Per thread events cache */

> > +typedef struct {

> > +       int count;

> > +       odp_queue_t queue;

> > +       odp_event_t stash[MAX_DEQ], *top;

> > +} event_cache_t;

> > +

> > +/* Instantiate a sparse bitmap to store thread's interested

> > + * queue indexes per priority.

> > + */

> > +typedef SPARSE_BITMAP(ODP_CONFIG_QUEUES) queue_index_sparse_t;

> > +

> > +typedef struct sched_thread_local {

> > +       int thread;

> > +       bool pause;

> > +

> > +       /* Cache events only for atomic queue */

> > +       event_cache_t cache;

> > +

> > +       /* Saved atomic context */

> > +       bool *atomic;

> > +

> > +       /* Record the pktio polls have done */

> > +       uint16_t pktin_polls;

> > +

> > +       /* Interested queue indexes to be checked by thread

> > +        * at each priority level for scheduling, and a round

> > +        * robin iterator to improve fairness between queues

> > +        * in the same priority level.

> > +        */

> > +       odp_rwlock_t lock;

> > +       queue_index_sparse_t indexes[NUM_SCHED_PRIO];

> > +       sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO];

> > +} sched_thread_local_t;

> > +

> > +/* Global scheduler context */

> > +static sched_global_t *sched;

> > +

> > +/* Thread local scheduler context */

> > +__thread sched_thread_local_t thread_local;

> > +

> > +static int schedule_init_global(void)

> > +{

> > +       odp_shm_t shm;

> > +       int i, k, prio, group;

> > +

> > +       ODP_DBG("Schedule[iquery] init ... ");

> > +

> > +       shm = odp_shm_reserve("odp_scheduler_iquery",

> > +                             sizeof(sched_global_t),

> > +                             ODP_CACHE_LINE_SIZE, 0);

> > +

> > +       sched = odp_shm_addr(shm);

> > +

> > +       if (sched == NULL) {

> > +               ODP_ERR("Schedule[iquery] "

> > +                       "init: shm reserve.\n");

> > +               return -1;

> > +       }

> > +

> > +       memset(sched, 0, sizeof(sched_global_t));

> > +

> > +       sched->selfie = shm;

> > +

> > +       for (prio = 0; prio < NUM_SCHED_PRIO; prio++)

> > +               odp_rwlock_init(&sched->prios[prio].lock);

> > +

> > +       for (group = 0; group < NUM_SCHED_GRPS; group++) {

> > +               sched->groups[group].allocated = false;

> > +               odp_rwlock_init(&sched->groups[group].lock);

> > +       }

> > +

> > +       odp_rwlock_init(&sched->pktio_poll.lock);

> > +

> > +       for (i = 0; i < PKTIO_CMD_QUEUES; i++) {

> > +               pktio_cmd_queue_t *queue =

> > +                       &sched->pktio_poll.queues[i];

> > +

> > +               ring_init(&queue->ring);

> > +

> > +               for (k = 0; k < PKTIO_RING_SIZE; k++)

> > +                       queue->cmd_index[k] = RING_EMPTY;

> > +       }

> > +

> > +       for (i = 0; i < NUM_PKTIO_CMD; i++)

> > +               sched->pktio_poll.commands[i].index = PKTIO_CMD_FREE;

> > +

> > +       ODP_DBG("done\n");

> > +       return 0;

> > +}

> > +

> > +static int schedule_term_global(void)

> > +{

> > +       uint32_t i;

> > +       odp_shm_t shm = sched->selfie;

> > +

> > +       for (i = 0; i < ODP_CONFIG_QUEUES; i++) {

> > +               int count = 0;

> > +               odp_event_t events[1];

> > +

> > +               if (sched->availables[i])

> > +                       count = sched_cb_queue_deq_multi(i, events, 1);

> > +

> > +               if (count < 0)

> > +                       sched_cb_queue_destroy_finalize(i);

> > +               else if (count > 0)

> > +                       ODP_ERR("Queue (%d) not empty\n", i);

> > +       }

> > +

> > +       memset(sched, 0, sizeof(sched_global_t));

> > +

> > +       if (odp_shm_free(shm) < 0) {

> > +               ODP_ERR("Schedule[iquery] "

> > +                       "term: shm release.\n");

> > +               return -1;

> > +       }

> > +       return 0;

> > +}

> > +

> > +/*

> > + * These APIs are used to manipulate thread's interests.

> > + */

> > +static void thread_set_interest(sched_thread_local_t *thread,

> > +       unsigned int queue_index, int prio);

> > +

> > +static void thread_clear_interest(sched_thread_local_t *thread,

> > +       unsigned int queue_index, int prio);

> > +

> > +static void thread_set_interests(sched_thread_local_t *thread,

> > +       queue_index_bitmap_t *set);

> > +

> > +static void thread_clear_interests(sched_thread_local_t *thread,

> > +       queue_index_bitmap_t *clear);

> > +

> > +static void sched_thread_local_reset(void)

> > +{

> > +       int prio;

> > +       queue_index_sparse_t *index;

> > +       sparse_bitmap_iterator_t *iterator;

> > +

> > +       memset(&sched_local, 0, sizeof(sched_local_t));

> > +       memset(&thread_local, 0, sizeof(sched_thread_local_t));

> > +

> > +       thread_local.thread = odp_thread_id();

> > +       thread_local.cache.queue = ODP_QUEUE_INVALID;

> > +

> > +       odp_rwlock_init(&thread_local.lock);

> > +

> > +       for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {

> > +               index = &thread_local.indexes[prio];

> > +               iterator = &thread_local.iterators[prio];

> > +

> > +               sparse_bitmap_zero(index);

> > +               sparse_bitmap_iterator(iterator, index);

> > +       }

> > +}

> > +

> > +static int schedule_init_local(void)

> > +{

> > +       int group;

> > +       sched_group_t *G;

> > +       queue_index_bitmap_t collect;

> > +

> > +       wapl_bitmap_zero(&collect);

> > +       sched_thread_local_reset();

> > +

> > +       /* Collect all queue indexes of the schedule groups

> > +        * which this thread has subscribed

> > +        */

> > +       for (group = 0; group < NUM_SCHED_GRPS; group++) {

> > +               G = &sched->groups[group];

> > +               odp_rwlock_read_lock(&G->lock);

> > +

> > +               if ((group < SCHED_GROUP_NAMED || G->allocated == true)

> &&

> > +                   odp_thrmask_isset(&G->threads, thread_local.thread))

> > +                       wapl_bitmap_or(&collect, &collect, &G->queues);

> > +

> > +               odp_rwlock_read_unlock(&G->lock);

> > +       }

> > +

> > +       /* Distribute the above collected queue indexes into

> > +        * thread local interests per priority level.

> > +        */

> > +       thread_set_interests(&thread_local, &collect);

> > +

> > +       /* "Night gathers, and now my watch begins..." */

> > +       sched->threads[thread_local.thread] = &thread_local;

> > +       return 0;

> > +}

> > +

> > +static inline void schedule_release_context(void);

> > +

> > +static int schedule_term_local(void)

> > +{

> > +       int group;

> > +       sched_group_t *G;

> > +

> > +       if (thread_local.cache.count) {

> > +               ODP_ERR("Locally pre-scheduled events exist.\n");

> > +               return -1;

> > +       }

> > +

> > +       schedule_release_context();

> > +

> > +       /* Unsubscribe all named schedule groups */

> > +       for (group = SCHED_GROUP_NAMED;

> > +               group < NUM_SCHED_GRPS; group++) {

> > +               G = &sched->groups[group];

> > +               odp_rwlock_write_lock(&G->lock);

> > +

> > +               if (G->allocated == true && odp_thrmask_isset(

> > +                       &G->threads, thread_local.thread))

> > +                       odp_thrmask_clr(&G->threads,

> thread_local.thread);

> > +

> > +               odp_rwlock_write_unlock(&G->lock);

> > +       }

> > +

> > +       /* "...for this night and all the nights to come." */

> > +       sched->threads[thread_local.thread] = NULL;

> > +       sched_thread_local_reset();

> > +       return 0;

> > +}

> > +

> > +static int init_sched_queue(uint32_t queue_index,

> > +                           const odp_schedule_param_t *sched_param)

> > +{

> > +       int prio, group, thread;

> > +       sched_prio_t *P;

> > +       sched_group_t *G;

> > +       sched_thread_local_t *local;

> > +

> > +       prio = sched_param->prio;

> > +       group = sched_param->group;

> > +

> > +       G = &sched->groups[group];

> > +       odp_rwlock_write_lock(&G->lock);

> > +

> > +       /* Named schedule group must be created prior

> > +        * to queue creation to this group.

> > +        */

> > +       if (group >= SCHED_GROUP_NAMED && G->allocated == false) {

> > +               odp_rwlock_write_unlock(&G->lock);

> > +               return -1;

> > +       }

> > +

> > +       /* Record the queue in its priority level globally */

> > +       P = &sched->prios[prio];

> > +

> > +       odp_rwlock_write_lock(&P->lock);

> > +       wapl_bitmap_set(&P->queues, queue_index);

> > +       odp_rwlock_write_unlock(&P->lock);

> > +

> > +       /* Record the queue in its schedule group */

> > +       wapl_bitmap_set(&G->queues, queue_index);

> > +

> > +       /* Cache queue parameters for easy reference */

> > +       memcpy(&sched->queues[queue_index],

> > +               sched_param, sizeof(odp_schedule_param_t));

> > +

> > +       /* Update all threads in this schedule group to

> > +        * start check this queue index upon scheduling.

> > +        */

> > +       thread = odp_thrmask_first(&G->threads);

> > +       while (thread >= 0) {

> > +               local = sched->threads[thread];

> > +               thread_set_interest(local, queue_index, prio);

> > +               thread = odp_thrmask_next(&G->threads, thread);

> > +       }

> > +

> > +       odp_rwlock_write_unlock(&G->lock);

> > +       return 0;

> > +}

> > +

> > +/*

> > + * Must be called with schedule group's rwlock held.

> > + * This is also being used in destroy_schedule_group()

> > + * to destroy all orphan queues while destroying a whole

> > + * schedule group.

> > + */

> > +static void __destroy_sched_queue(

> > +       sched_group_t *G, uint32_t queue_index)

> > +{

> > +       int prio, thread;

> > +       sched_prio_t *P;

> > +       sched_thread_local_t *local;

> > +

> > +       prio = sched->queues[queue_index].prio;

> > +

> > +       /* Forget the queue in its schedule group */

> > +       wapl_bitmap_clear(&G->queues, queue_index);

> > +

> > +       /* Forget queue schedule parameters */

> > +       memset(&sched->queues[queue_index],

> > +               0, sizeof(odp_schedule_param_t));

> > +

> > +       /* Update all threads in this schedule group to

> > +        * stop check this queue index upon scheduling.

> > +        */

> > +       thread = odp_thrmask_first(&G->threads);

> > +       while (thread >= 0) {

> > +               local = sched->threads[thread];

> > +               thread_clear_interest(local, queue_index, prio);

> > +               thread = odp_thrmask_next(&G->threads, thread);

> > +       }

> > +

> > +       /* Forget the queue in its priority level globally */

> > +       P = &sched->prios[prio];

> > +

> > +       odp_rwlock_write_lock(&P->lock);

> > +       wapl_bitmap_clear(&P->queues, queue_index);

> > +       odp_rwlock_write_unlock(&P->lock);

> > +}

> > +

> > +static void destroy_sched_queue(uint32_t queue_index)

> > +{

> > +       int group;

> > +       sched_group_t *G;

> > +

> > +       group = sched->queues[queue_index].group;

> > +

> > +       G = &sched->groups[group];

> > +       odp_rwlock_write_lock(&G->lock);

> > +

> > +       /* Named schedule group could have been destroyed

> > +        * earlier and left these orphan queues.

> > +        */

> > +       if (group >= SCHED_GROUP_NAMED && G->allocated == false) {

> > +               odp_rwlock_write_unlock(&G->lock);

> > +               return;

> > +       }

> > +

> > +       __destroy_sched_queue(G, queue_index);

> > +       odp_rwlock_write_unlock(&G->lock);

> > +}

> > +

> > +static int pktio_cmd_queue_hash(int pktio, int pktin)

> > +{

> > +       return (pktio ^ pktin) % PKTIO_CMD_QUEUES;

> > +}

> > +

> > +static inline pktio_cmd_t *alloc_pktio_cmd(void)

> > +{

> > +       int i;

> > +       pktio_cmd_t *cmd = NULL;

> > +

> > +       odp_rwlock_write_lock(&sched->pktio_poll.lock);

> > +

> > +       /* Find next free command */

> > +       for (i = 0; i < NUM_PKTIO_CMD; i++) {

> > +               if (sched->pktio_poll.commands[i].index

> > +                               == PKTIO_CMD_FREE) {

> > +                       cmd = &sched->pktio_poll.commands[i];

> > +                       cmd->index = i;

> > +                       break;

> > +               }

> > +       }

> > +

> > +       odp_rwlock_write_unlock(&sched->pktio_poll.lock);

> > +       return cmd;

> > +}

> > +

> > +static inline void free_pktio_cmd(pktio_cmd_t *cmd)

> > +{

> > +       odp_rwlock_write_lock(&sched->pktio_poll.lock);

> > +

> > +       cmd->index = PKTIO_CMD_FREE;

> > +

> > +       odp_rwlock_write_unlock(&sched->pktio_poll.lock);

> > +}

> > +

> > +static void schedule_pktio_start(int pktio, int count, int pktin[])

> > +{

> > +       int i, index;

> > +       pktio_cmd_t *cmd;

> > +

> > +       if (count > MAX_PKTIN)

> > +               ODP_ABORT("Too many input queues for scheduler\n");

> > +

> > +       /* Record the active commands count per pktio interface */

> > +       sched->pktio_poll.actives[pktio] = count;

> > +

> > +       /* Create a pktio poll command per pktin */

> > +       for (i = 0; i < count; i++) {

> > +

> > +               cmd = alloc_pktio_cmd();

> > +

> > +               if (cmd == NULL)

> > +                       ODP_ABORT("Scheduler out of pktio commands\n");

> > +

> > +               index = pktio_cmd_queue_hash(pktio, pktin[i]);

> > +

> > +               cmd->pktio = pktio;

> > +               cmd->count = 1;

> > +               cmd->pktin[0] = pktin[i];

> > +               ring_enq(&sched->pktio_poll.queues[index].ring,

> > +                       PKTIO_RING_MASK, cmd->index);

> > +       }

> > +}

> > +

> > +static int schedule_pktio_stop(int pktio, int pktin ODP_UNUSED)

> > +{

> > +       int remains;

> > +

> > +       odp_rwlock_write_lock(&sched->pktio_poll.lock);

> > +

> > +       sched->pktio_poll.actives[pktio]--;

> > +       remains = sched->pktio_poll.actives[pktio];

> > +

> > +       odp_rwlock_write_unlock(&sched->pktio_poll.lock);

> > +       return remains;

> > +}

> > +

> > +#define DO_SCHED_LOCK() odp_rwlock_read_lock(&thread_local.lock)

> > +#define DO_SCHED_UNLOCK() odp_rwlock_read_unlock(&thread_local.lock)

> > +

> > +static inline bool do_schedule_prio(int prio);

> > +

> > +static inline int pop_cache_events(odp_event_t ev[], unsigned int max)

> > +{

> > +       int k = 0;

> > +       event_cache_t *cache;

> > +

> > +       cache = &thread_local.cache;

> > +       while (cache->count && max) {

> > +               ev[k] = *cache->top++;

> > +               k++;

> > +               max--;

> > +               cache->count--;

> > +       }

> > +

> > +       return k;

> > +}

> > +

> > +static inline void assign_queue_handle(odp_queue_t *handle)

> > +{

> > +       if (handle)

> > +               *handle = thread_local.cache.queue;

> > +}

> > +

> > +static inline void pktio_poll_input(void)

> > +{

> > +       int i, hash;

> > +       uint32_t index;

> > +

> > +       ring_t *ring;

> > +       pktio_cmd_t *cmd;

> > +

> > +       /*

> > +        * Each thread starts the search for a poll command

> > +        * from the hash(threadID) queue to mitigate contentions.

> > +        * If the queue is empty, it moves to other queues.

> > +        *

> > +        * Most of the times, the search stops on the first

> > +        * command found to optimize multi-threaded performance.

> > +        * A small portion of polls have to do full iteration to

> > +        * avoid packet input starvation when there are less

> > +        * threads than command queues.

> > +        */

> > +       hash = thread_local.thread % PKTIO_CMD_QUEUES;

> > +

> > +       for (i = 0; i < PKTIO_CMD_QUEUES; i++,

> > +               hash = (hash + 1) % PKTIO_CMD_QUEUES) {

> > +

> > +               ring = &sched->pktio_poll.queues[hash].ring;

> > +               index = ring_deq(ring, PKTIO_RING_MASK);

> > +

> > +               if (odp_unlikely(index == RING_EMPTY))

> > +                       continue;

> > +

> > +               cmd = &sched->pktio_poll.commands[index];

> > +

> > +               /* Poll packet input */

> > +               if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio,

> > +                                                    cmd->count,

> > +                                                    cmd->pktin))) {

> > +                       /* Pktio stopped or closed. Remove poll

> > +                        * command and call stop_finalize when all

> > +                        * commands of the pktio has been removed.

> > +                        */

> > +                       if (schedule_pktio_stop(cmd->pktio,

> > +                                               cmd->pktin[0]) == 0)

> > +                               sched_cb_pktio_stop_finalize(

> cmd->pktio);

> > +

> > +                       free_pktio_cmd(cmd);

> > +               } else {

> > +                       /* Continue scheduling the pktio */

> > +                       ring_enq(ring, PKTIO_RING_MASK, index);

> > +

> > +                       /* Do not iterate through all pktin poll

> > +                        * command queues every time.

> > +                        */

> > +                       if (odp_likely(thread_local.pktin_polls & 0xF))

> > +                               break;

> > +               }

> > +       }

> > +

> > +       thread_local.pktin_polls++;

> > +}

> > +

> > +/*

> > + * Schedule queues

> > + */

> > +static int do_schedule(odp_queue_t *out_queue,

> > +       odp_event_t out_ev[], unsigned int max_num)

> > +{

> > +       int prio, count;

> > +

> > +       /* Consume locally cached events */

> > +       count = pop_cache_events(out_ev, max_num);

> > +       if (count > 0) {

> > +               assign_queue_handle(out_queue);

> > +               return count;

> > +       }

> > +

> > +       schedule_release_context();

> > +

> > +       if (odp_unlikely(thread_local.pause))

> > +               return count;

> > +

> > +       DO_SCHED_LOCK();

> > +       /* Schedule events */

> > +       for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {

> > +               /* Round robin iterate the interested queue

> > +                * indexes in this priority level to compete

> > +                * and consume available queues

> > +                */

> > +               if (!do_schedule_prio(prio))

> > +                       continue;

> > +

> > +               count = pop_cache_events(out_ev, max_num);

> > +               assign_queue_handle(out_queue);

> > +               DO_SCHED_UNLOCK();

> > +               return count;

> > +       }

> > +

> > +       DO_SCHED_UNLOCK();

> > +

> > +       /* Poll packet input when there are no events */

> > +       pktio_poll_input();

> > +       return 0;

> > +}

> > +

> > +static int schedule_loop(odp_queue_t *out_queue, uint64_t wait,

> > +                        odp_event_t out_ev[], unsigned int max_num)

> > +{

> > +       int count, first = 1;

> > +       odp_time_t next, wtime;

> > +

> > +       while (1) {

> > +               count = do_schedule(out_queue, out_ev, max_num);

> > +

> > +               if (count)

> > +                       break;

> > +

> > +               if (wait == ODP_SCHED_WAIT)

> > +                       continue;

> > +

> > +               if (wait == ODP_SCHED_NO_WAIT)

> > +                       break;

> > +

> > +               if (first) {

> > +                       wtime = odp_time_local_from_ns(wait);

> > +                       next = odp_time_sum(odp_time_local(), wtime);

> > +                       first = 0;

> > +                       continue;

> > +               }

> > +

> > +               if (odp_time_cmp(next, odp_time_local()) < 0)

> > +                       break;

> > +       }

> > +

> > +       return count;

> > +}

> > +

> > +static odp_event_t schedule(odp_queue_t *out_queue, uint64_t wait)

> > +{

> > +       odp_event_t ev;

> > +

> > +       ev = ODP_EVENT_INVALID;

> > +

> > +       schedule_loop(out_queue, wait, &ev, 1);

> > +

> > +       return ev;

> > +}

> > +

> > +static int schedule_multi(odp_queue_t *out_queue, uint64_t wait,

> > +                         odp_event_t events[], int num)

> > +{

> > +       return schedule_loop(out_queue, wait, events, num);

> > +}

> > +

> > +static void schedule_pause(void)

> > +{

> > +       thread_local.pause = 1;

> > +}

> > +

> > +static void schedule_resume(void)

> > +{

> > +       thread_local.pause = 0;

> > +}

> > +

> > +static uint64_t schedule_wait_time(uint64_t ns)

> > +{

> > +       return ns;

> > +}

> > +

> > +static int number_of_priorites(void)

> > +{

> > +       return NUM_SCHED_PRIO;

> > +}

> > +

> > +/*

> > + * Create a named schedule group with pre-defined

> > + * set of subscription threads.

> > + *

> > + * Sched queues belonging to this group must be

> > + * created after the group creation. Upon creation

> > + * the group holds 0 sched queues.

> > + */

> > +static odp_schedule_group_t schedule_group_create(

> > +       const char *name, const odp_thrmask_t *mask)

> > +{

> > +       int group;

> > +       sched_group_t *G;

> > +

> > +       for (group = SCHED_GROUP_NAMED;

> > +               group < NUM_SCHED_GRPS; group++) {

> > +               G = &sched->groups[group];

> > +

> > +               odp_rwlock_write_lock(&G->lock);

> > +               if (G->allocated == false) {

> > +                       strncpy(G->name, name ? name : "",

> > +                               ODP_SCHED_GROUP_NAME_LEN - 1);

> > +                       odp_thrmask_copy(&G->threads, mask);

> > +                       wapl_bitmap_zero(&G->queues);

> > +

> > +                       G->allocated = true;

> > +                       odp_rwlock_write_unlock(&G->lock);

> > +                       return (odp_schedule_group_t)group;

> > +               }

> > +               odp_rwlock_write_unlock(&G->lock);

> > +       }

> > +

> > +       return ODP_SCHED_GROUP_INVALID;

> > +}

> > +

> > +static inline void __destroy_group_queues(sched_group_t *group)

> > +{

> > +       unsigned int index;

> > +       wapl_bitmap_iterator_t it;

> > +

> > +       /* Constructor */

> > +       wapl_bitmap_iterator(&it, &group->queues);

> > +

> > +       /* Walk through the queue index bitmap */

> > +       for (it.start(&it); it.has_next(&it);) {

> > +               index = it.next(&it);

> > +               __destroy_sched_queue(group, index);

> > +       }

> > +}

> > +

> > +/*

> > + * Destroy a named schedule group.

> > + */

> > +static int schedule_group_destroy(odp_schedule_group_t group)

> > +{

> > +       int done = -1;

> > +       sched_group_t *G;

> > +

> > +       if (group < SCHED_GROUP_NAMED ||

> > +           group >= NUM_SCHED_GRPS)

> > +               return -1;

> > +

> > +       G = &sched->groups[group];

> > +       odp_rwlock_write_lock(&G->lock);

> > +

> > +       if (G->allocated == true) {

> > +               /* Destroy all queues in this schedule group

> > +                * and leave no orphan queues.

> > +                */

> > +               __destroy_group_queues(G);

> > +

> > +               done = 0;

> > +               G->allocated = false;

> > +               wapl_bitmap_zero(&G->queues);

> > +               odp_thrmask_zero(&G->threads);

> > +               memset(G->name, 0, ODP_SCHED_GROUP_NAME_LEN);

> > +       }

> > +

> > +       odp_rwlock_write_unlock(&G->lock);

> > +       return done;

> > +}

> > +

> > +static odp_schedule_group_t schedule_group_lookup(const char *name)

> > +{

> > +       int group;

> > +       sched_group_t *G;

> > +

> > +       for (group = SCHED_GROUP_NAMED;

> > +            group < NUM_SCHED_GRPS; group++) {

> > +               G = &sched->groups[group];

> > +

> > +               odp_rwlock_read_lock(&G->lock);

> > +               if (strcmp(name, G->name) == 0) {

> > +                       odp_rwlock_read_unlock(&G->lock);

> > +                       return (odp_schedule_group_t)group;

> > +               }

> > +               odp_rwlock_read_unlock(&G->lock);

> > +       }

> > +

> > +       return ODP_SCHED_GROUP_INVALID;

> > +}

> > +

> > +static int schedule_group_join(odp_schedule_group_t group,

> > +                              const odp_thrmask_t *mask)

> > +{

> > +       int done = -1, thread;

> > +       sched_group_t *G;

> > +       sched_thread_local_t *local;

> > +

> > +       /* Named schedule group only */

> > +       if (group < SCHED_GROUP_NAMED ||

> > +               group >= NUM_SCHED_GRPS)

> > +               return done;

> > +

> > +       G = &sched->groups[group];

> > +       odp_rwlock_write_lock(&G->lock);

> > +

> > +       if (G->allocated == true) {

> > +               /* Make new joined threads to start check

> > +                * queue indexes in this schedule group

> > +                */

> > +               thread = odp_thrmask_first(mask);

> > +               while (thread >= 0) {

> > +                       local = sched->threads[thread];

> > +                       thread_set_interests(local, &G->queues);

> > +

> > +                       odp_thrmask_set(&G->threads, thread);

> > +                       thread = odp_thrmask_next(mask, thread);

> > +               }

> > +               done = 0;

> > +       }

> > +

> > +       odp_rwlock_write_unlock(&G->lock);

> > +       return done;

> > +}

> > +

> > +static int schedule_group_leave(odp_schedule_group_t group,

> > +                               const odp_thrmask_t *mask)

> > +{

> > +       int done = -1, thread;

> > +       sched_group_t *G;

> > +       sched_thread_local_t *local;

> > +

> > +       /* Named schedule group only */

> > +       if (group < SCHED_GROUP_NAMED ||

> > +               group >= NUM_SCHED_GRPS)

> > +               return done;

> > +

> > +       G = &sched->groups[group];

> > +       odp_rwlock_write_lock(&G->lock);

> > +

> > +       if (G->allocated == true) {

> > +               /* Make leaving threads to stop check

> > +                * queue indexes in this schedule group

> > +                */

> > +               thread = odp_thrmask_first(mask);

> > +               while (thread >= 0) {

> > +                       local = sched->threads[thread];

> > +                       thread_clear_interests(local, &G->queues);

> > +

> > +                       odp_thrmask_clr(&G->threads, thread);

> > +                       thread = odp_thrmask_next(mask, thread);

> > +               }

> > +               done = 0;

> > +       }

> > +

> > +       odp_rwlock_write_unlock(&G->lock);

> > +       return done;

> > +}

> > +

> > +static int schedule_group_thrmask(odp_schedule_group_t group,

> > +                                 odp_thrmask_t *thrmask)

> > +{

> > +       int done = -1;

> > +       sched_group_t *G;

> > +

> > +       /* Named schedule group only */

> > +       if (group < SCHED_GROUP_NAMED ||

> > +               group >= NUM_SCHED_GRPS)

> > +               return done;

> > +

> > +       G = &sched->groups[group];

> > +       odp_rwlock_read_lock(&G->lock);

> > +

> > +       if (G->allocated == true && thrmask != NULL) {

> > +               done = 0;

> > +               odp_thrmask_copy(thrmask, &G->threads);

> > +       }

> > +

> > +       odp_rwlock_read_unlock(&G->lock);

> > +       return done;

> > +}

> > +

> > +static int schedule_group_info(odp_schedule_group_t group,

> > +                              odp_schedule_group_info_t *info)

> > +{

> > +       int done = -1;

> > +       sched_group_t *G;

> > +

> > +       /* Named schedule group only */

> > +       if (group < SCHED_GROUP_NAMED ||

> > +               group >= NUM_SCHED_GRPS)

> > +               return done;

> > +

> > +       G = &sched->groups[group];

> > +       odp_rwlock_read_lock(&G->lock);

> > +

> > +       if (G->allocated == true && info != NULL) {

> > +               done = 0;

> > +               info->name = G->name;

> > +               odp_thrmask_copy(&info->thrmask, &G->threads);

> > +       }

> > +

> > +       odp_rwlock_read_unlock(&G->lock);

> > +       return done;

> > +}

> > +

> > +/* This function is a no-op */

> > +static void schedule_prefetch(int num ODP_UNUSED)

> > +{

> > +}

> > +

> > +/*

> > + * Limited to join and leave pre-defined schedule groups

> > + * before and after thread local initialization or termination.

> > + */

> > +static int group_add_thread(odp_schedule_group_t group, int thread)

> > +{

> > +       sched_group_t *G;

> > +

> > +       if (group < 0 || group >= SCHED_GROUP_NAMED)

> > +               return -1;

> > +

> > +       G = &sched->groups[group];

> > +

> > +       odp_rwlock_write_lock(&G->lock);

> > +       odp_thrmask_set(&G->threads, thread);

> > +       odp_rwlock_write_unlock(&G->lock);

> > +       return 0;

> > +}

> > +

> > +static int group_remove_thread(odp_schedule_group_t group, int thread)

> > +{

> > +       sched_group_t *G;

> > +

> > +       if (group < 0 || group >= SCHED_GROUP_NAMED)

> > +               return -1;

> > +

> > +       G = &sched->groups[group];

> > +

> > +       odp_rwlock_write_lock(&G->lock);

> > +       odp_thrmask_clr(&G->threads, thread);

> > +       odp_rwlock_write_unlock(&G->lock);

> > +       return 0;

> > +}

> > +

> > +static int schedule_sched_queue(uint32_t queue_index)

> > +{

> > +       sched_local.ignore_ordered_context = 1;

> > +

> > +       /* Set available indications globally */

> > +       sched->availables[queue_index] = true;

> > +       return 0;

> > +}

> > +

> > +static int schedule_unsched_queue(uint32_t queue_index)

> > +{

> > +       /* Clear available indications globally */

> > +       sched->availables[queue_index] = false;

> > +       return 0;

> > +}

> > +

> > +static void schedule_release_atomic(void)

> > +{

> > +       unsigned int queue_index;

> > +

> > +       if ((thread_local.atomic != NULL) &&

> > +               (thread_local.cache.count == 0)) {

> > +               queue_index = thread_local.atomic - sched->availables;

> > +               thread_local.atomic = NULL;

> > +               sched->availables[queue_index] = true;

> > +       }

> > +}

> > +

> > +static void schedule_release_ordered(void)

> > +{

> > +       if (sched_local.origin_qe) {

> > +               int rc = release_order(sched_local.origin_qe,

> > +                                      sched_local.order,

> > +                                      sched_local.pool,

> > +                                      sched_local.enq_called);

> > +               if (rc == 0)

> > +                       sched_local.origin_qe = NULL;

> > +       }

> > +}

> > +

> > +static inline void schedule_release_context(void)

> > +{

> > +       if (sched_local.origin_qe != NULL) {

> > +               release_order(sched_local.origin_qe, sched_local.order,

> > +                             sched_local.pool, sched_local.enq_called);

> > +               sched_local.origin_qe = NULL;

> > +       } else {

> > +               schedule_release_atomic();

> > +       }

> > +}

> > +

> > +static int number_of_groups(void)

> > +{

> > +       return NUM_SCHED_GRPS;

> > +}

> > +

> > +/* Fill in scheduler interface */

> > +const schedule_fn_t schedule_iquery_fn = {

> > +       .pktio_start   = schedule_pktio_start,

> > +       .thr_add       = group_add_thread,

> > +       .thr_rem       = group_remove_thread,

> > +       .num_grps      = number_of_groups,

> > +       .init_queue    = init_sched_queue,

> > +       .destroy_queue = destroy_sched_queue,

> > +       .sched_queue   = schedule_sched_queue,

> > +       .unsched_queue = schedule_unsched_queue,

> > +       .ord_enq_multi = schedule_ordered_queue_enq_multi,

> > +       .init_global   = schedule_init_global,

> > +       .term_global   = schedule_term_global,

> > +       .init_local    = schedule_init_local,

> > +       .term_local    = schedule_term_local

> > +};

> > +

> > +/* Fill in scheduler API calls */

> > +const schedule_api_t schedule_iquery_api = {

> > +       .schedule_wait_time       = schedule_wait_time,

> > +       .schedule                 = schedule,

> > +       .schedule_multi           = schedule_multi,

> > +       .schedule_pause           = schedule_pause,

> > +       .schedule_resume          = schedule_resume,

> > +       .schedule_release_atomic  = schedule_release_atomic,

> > +       .schedule_release_ordered = schedule_release_ordered,

> > +       .schedule_prefetch        = schedule_prefetch,

> > +       .schedule_num_prio        = number_of_priorites,

> > +       .schedule_group_create    = schedule_group_create,

> > +       .schedule_group_destroy   = schedule_group_destroy,

> > +       .schedule_group_lookup    = schedule_group_lookup,

> > +       .schedule_group_join      = schedule_group_join,

> > +       .schedule_group_leave     = schedule_group_leave,

> > +       .schedule_group_thrmask   = schedule_group_thrmask,

> > +       .schedule_group_info      = schedule_group_info,

> > +       .schedule_order_lock      = schedule_order_lock,

> > +       .schedule_order_unlock    = schedule_order_unlock

> > +};

> > +

> > +static void thread_set_interest(sched_thread_local_t *thread,

> > +       unsigned int queue_index, int prio)

> > +{

> > +       queue_index_sparse_t *index;

> > +

> > +       if (thread == NULL)

> > +               return;

> > +

> > +       if (prio >= NUM_SCHED_PRIO)

> > +               return;

> > +

> > +       index = &thread->indexes[prio];

> > +

> > +       odp_rwlock_write_lock(&thread->lock);

> > +       sparse_bitmap_set(index, queue_index);

> > +       odp_rwlock_write_unlock(&thread->lock);

> > +}

> > +

> > +static void thread_clear_interest(sched_thread_local_t *thread,

> > +       unsigned int queue_index, int prio)

> > +{

> > +       queue_index_sparse_t *index;

> > +

> > +       if (thread == NULL)

> > +               return;

> > +

> > +       if (prio >= NUM_SCHED_PRIO)

> > +               return;

> > +

> > +       index = &thread->indexes[prio];

> > +

> > +       odp_rwlock_write_lock(&thread->lock);

> > +       sparse_bitmap_clear(index, queue_index);

> > +       odp_rwlock_write_unlock(&thread->lock);

> > +}

> > +

> > +static void thread_set_interests(sched_thread_local_t *thread,

> > +       queue_index_bitmap_t *set)

> > +{

> > +       int prio;

> > +       sched_prio_t *P;

> > +       unsigned int queue_index;

> > +       queue_index_bitmap_t subset;

> > +       wapl_bitmap_iterator_t it;

> > +

> > +       if (thread == NULL || set == NULL)

> > +               return;

> > +

> > +       for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {

> > +               P = &sched->prios[prio];

> > +               odp_rwlock_read_lock(&P->lock);

> > +

> > +               /* The collection of queue indexes in 'set'

> > +                * may belong to several priority levels.

> > +                */

> > +               wapl_bitmap_zero(&subset);

> > +               wapl_bitmap_and(&subset, &P->queues, set);

> > +

> > +               odp_rwlock_read_unlock(&P->lock);

> > +

> > +               /* Add the subset to local indexes */

> > +               wapl_bitmap_iterator(&it, &subset);

> > +               for (it.start(&it); it.has_next(&it);) {

> > +                       queue_index = it.next(&it);

> > +                       thread_set_interest(thread, queue_index, prio);

> > +               }

> > +       }

> > +}

> > +

> > +static void thread_clear_interests(sched_thread_local_t *thread,

> > +       queue_index_bitmap_t *clear)

> > +{

> > +       int prio;

> > +       sched_prio_t *P;

> > +       unsigned int queue_index;

> > +       queue_index_bitmap_t subset;

> > +       wapl_bitmap_iterator_t it;

> > +

> > +       if (thread == NULL || clear == NULL)

> > +               return;

> > +

> > +       for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {

> > +               P = &sched->prios[prio];

> > +               odp_rwlock_read_lock(&P->lock);

> > +

> > +               /* The collection of queue indexes in 'clear'

> > +                * may belong to several priority levels.

> > +                */

> > +               wapl_bitmap_zero(&subset);

> > +               wapl_bitmap_and(&subset, &P->queues, clear);

> > +

> > +               odp_rwlock_read_unlock(&P->lock);

> > +

> > +               /* Remove the subset from local indexes */

> > +               wapl_bitmap_iterator(&it, &subset);

> > +               for (it.start(&it); it.has_next(&it);) {

> > +                       queue_index = it.next(&it);

> > +                       thread_clear_interest(thread, queue_index, prio);

> > +               }

> > +       }

> > +}

> > +

> > +static inline bool is_atomic_queue(unsigned int queue_index)

> > +{

> > +       return (sched->queues[queue_index].sync

> > +                       == ODP_SCHED_SYNC_ATOMIC);

> > +}

> > +

> > +static inline bool is_ordered_queue(unsigned int queue_index)

> > +{

> > +       return (sched->queues[queue_index].sync

> > +                       == ODP_SCHED_SYNC_ORDERED);

> > +}

> > +

> > +static inline bool compete_atomic_queue(unsigned int queue_index)

> > +{

> > +       bool expected = sched->availables[queue_index];

> > +

> > +       if (expected && is_atomic_queue(queue_index)) {

> > +               expected = __atomic_compare_exchange_n(

> > +                       &sched->availables[queue_index],

> > +                       &expected, false, 0,

> > +                       __ATOMIC_RELEASE, __ATOMIC_RELAXED);

> > +       }

> > +

> > +       return expected;

> > +}

> > +

> > +static inline void save_schedule_context(unsigned int queue_index)

> > +{

> > +       if (is_atomic_queue(queue_index))

> > +               thread_local.atomic = &sched->availables[queue_index];

> > +       else if (is_ordered_queue(queue_index))

> > +               cache_order_info(queue_index,

> thread_local.cache.stash[0]);

> > +}

> > +

> > +static inline int consume_queue(int prio, unsigned int queue_index)

> > +{

> > +       int count;

> > +       unsigned int max = MAX_DEQ;

> > +       event_cache_t *cache = &thread_local.cache;

> > +

> > +       /* Low priorities have smaller batch size to limit

> > +        * head of line blocking latency.

> > +        */

> > +       if (odp_unlikely(prio > ODP_SCHED_PRIO_DEFAULT))

> > +               max = MAX_DEQ / 2;

> > +

> > +       /* For ordered queues we want consecutive events to

> > +        * be dispatched to separate threads, so do not cache

> > +        * them locally.

> > +        */

> > +       if (is_ordered_queue(queue_index))

> > +               max = 1;

> > +

> > +       count = sched_cb_queue_deq_multi(

> > +               queue_index, cache->stash, max);

> > +

> > +       if (count < 0) {

> > +               DO_SCHED_UNLOCK();

> > +               sched_cb_queue_destroy_finalize(queue_index);

> > +               DO_SCHED_LOCK();

> > +               return 0;

> > +       }

> > +

> > +       if (count == 0)

> > +               return 0;

> > +

> > +       cache->top = &cache->stash[0];

> > +       cache->count = count;

> > +       cache->queue = sched_cb_queue_handle(queue_index);

> > +       return count;

> > +}

> > +

> > +static inline bool do_schedule_prio(int prio)

> > +{

> > +       int nbits, next, end;

> > +       unsigned int queue_index;

> > +       sparse_bitmap_iterator_t *it;

> > +

> > +       it = &thread_local.iterators[prio];

> > +       nbits = (int) *(it->_base.last);

> > +

> > +       /* No interests at all! */

> > +       if (nbits <= 0)

> > +               return false;

> > +

> > +       /* In critical path, cannot afford iterator calls,

> > +        * do it manually with internal knowledge

> > +        */

> > +       it->_start = (it->_start + 1) % nbits;

> > +       end = it->_start + nbits;

> > +

> > +       for (next = it->_start; next < end; next++) {

> > +               queue_index = it->_base.il[next % nbits];

> > +

> > +               if (!compete_atomic_queue(queue_index))

> > +                       continue;

> > +

> > +               if (!consume_queue(prio, queue_index))

> > +                       continue;

> > +

> > +               save_schedule_context(queue_index);

> > +               return true;

> > +       }

> > +

> > +       return false;

> > +}

> > diff --git a/platform/linux-generic/odp_schedule_sp.c

> b/platform/linux-generic/odp_schedule_sp.c

> > index 8b355da..b798e81 100644

> > --- a/platform/linux-generic/odp_schedule_sp.c

> > +++ b/platform/linux-generic/odp_schedule_sp.c

> > @@ -298,6 +298,11 @@ static int sched_queue(uint32_t qi)

> >         return 0;

> >  }

> >

> > +static int unsched_queue(uint32_t qi ODP_UNUSED)

> > +{

> > +       return 0;

> > +}

> > +

> >  static int ord_enq_multi(uint32_t queue_index, void *buf_hdr[], int num,

> >                          int sustain, int *ret)

> >  {

> > @@ -669,6 +674,7 @@ const schedule_fn_t schedule_sp_fn = {

> >         .init_queue    = init_queue,

> >         .destroy_queue = destroy_queue,

> >         .sched_queue   = sched_queue,

> > +       .unsched_queue = unsched_queue,

> >         .ord_enq_multi = ord_enq_multi,

> >         .init_global   = init_global,

> >         .term_global   = term_global,

> > --

> > 2.7.4

> >

>
diff mbox

Patch

diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
index 0245e37..37f51fb 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -212,6 +212,7 @@  __LIB__libodp_linux_la_SOURCES = \
 			   odp_schedule_if.c \
 			   odp_schedule_ordered.c \
 			   odp_schedule_sp.c \
+			   odp_schedule_iquery.c \
 			   odp_shared_memory.c \
 			   odp_sorted_list.c \
 			   odp_spinlock.c \
diff --git a/platform/linux-generic/include/odp_bitmap_internal.h b/platform/linux-generic/include/odp_bitmap_internal.h
index 7e028fd..192c6f9 100644
--- a/platform/linux-generic/include/odp_bitmap_internal.h
+++ b/platform/linux-generic/include/odp_bitmap_internal.h
@@ -21,7 +21,13 @@  extern "C" {
 #include <stdbool.h>
 #include <string.h>
 #include <odp/api/hints.h>
-#include <odp_ring_internal.h> /* TOKENIZE and ARRAY_SIZE */
+
+/* Generate unique identifier for instantiated class */
+#define TOKENIZE(template, line) \
+	template ## _ ## line ## _ ## __COUNTER__
+
+/* Array size in general */
+#define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0]))
 
 #define BITS_PER_BYTE	(8)
 #define BITS_PER_LONG	__WORDSIZE
diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h
index df73e70..6fe2536 100644
--- a/platform/linux-generic/include/odp_schedule_if.h
+++ b/platform/linux-generic/include/odp_schedule_if.h
@@ -30,6 +30,7 @@  typedef int (*schedule_init_queue_fn_t)(uint32_t queue_index,
 				       );
 typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index);
 typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index);
+typedef int (*schedule_unsched_queue_fn_t)(uint32_t queue_index);
 typedef int (*schedule_ord_enq_multi_fn_t)(uint32_t queue_index,
 					   void *buf_hdr[], int num,
 					   int sustain, int *ret);
@@ -46,6 +47,7 @@  typedef struct schedule_fn_t {
 	schedule_init_queue_fn_t    init_queue;
 	schedule_destroy_queue_fn_t destroy_queue;
 	schedule_sched_queue_fn_t   sched_queue;
+	schedule_unsched_queue_fn_t unsched_queue;
 	schedule_ord_enq_multi_fn_t ord_enq_multi;
 	schedule_init_global_fn_t   init_global;
 	schedule_term_global_fn_t   term_global;
diff --git a/platform/linux-generic/m4/odp_schedule.m4 b/platform/linux-generic/m4/odp_schedule.m4
index bc70c1f..2dcc9a7 100644
--- a/platform/linux-generic/m4/odp_schedule.m4
+++ b/platform/linux-generic/m4/odp_schedule.m4
@@ -4,3 +4,10 @@  AC_ARG_ENABLE([schedule-sp],
 	schedule-sp=yes
 	ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"
     fi])
+
+AC_ARG_ENABLE([schedule-iquery],
+    [  --enable-schedule-iquery    enable interests query (sparse bitmap) scheduler],
+    [if test x$enableval = xyes; then
+	schedule-iquery=yes
+	ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"
+    fi])
diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c
index 43e212a..53a5def 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -381,7 +381,9 @@  odp_queue_t odp_queue_lookup(const char *name)
 static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 			    int num, int sustain)
 {
+#if !defined(ODP_SCHEDULE_IQUERY)
 	int sched = 0;
+#endif
 	int i, ret;
 	odp_buffer_hdr_t *hdr, *tail, *next_hdr;
 
@@ -442,14 +444,21 @@  static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 
 	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
 		queue->s.status = QUEUE_STATUS_SCHED;
+#if !defined(ODP_SCHEDULE_IQUERY)
 		sched = 1; /* retval: schedule queue */
+#else
+		/* Add queue to scheduling */
+		if (sched_fn->sched_queue(queue->s.index))
+			ODP_ABORT("schedule_queue failed\n");
+#endif
 	}
 	UNLOCK(&queue->s.lock);
 
+#if !defined(ODP_SCHEDULE_IQUERY)
 	/* Add queue to scheduling */
 	if (sched && sched_fn->sched_queue(queue->s.index))
 		ODP_ABORT("schedule_queue failed\n");
-
+#endif
 	return num; /* All events enqueued */
 }
 
@@ -522,8 +531,10 @@  static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 
 	if (hdr == NULL) {
 		/* Already empty queue */
-		if (queue->s.status == QUEUE_STATUS_SCHED)
+		if (queue->s.status == QUEUE_STATUS_SCHED) {
 			queue->s.status = QUEUE_STATUS_NOTSCHED;
+			sched_fn->unsched_queue(queue->s.index);
+		}
 
 		UNLOCK(&queue->s.lock);
 		return 0;
diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c
index 1ef85ac..73c267e 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -973,6 +973,11 @@  static int schedule_sched_queue(uint32_t queue_index)
 	return 0;
 }
 
+static int schedule_unsched_queue(uint32_t queue_index ODP_UNUSED)
+{
+	return 0;
+}
+
 static int schedule_num_grps(void)
 {
 	return NUM_SCHED_GRPS;
@@ -987,6 +992,7 @@  const schedule_fn_t schedule_default_fn = {
 	.init_queue = schedule_init_queue,
 	.destroy_queue = schedule_destroy_queue,
 	.sched_queue = schedule_sched_queue,
+	.unsched_queue = schedule_unsched_queue,
 	.ord_enq_multi = schedule_ordered_queue_enq_multi,
 	.init_global = schedule_init_global,
 	.term_global = schedule_term_global,
diff --git a/platform/linux-generic/odp_schedule_if.c b/platform/linux-generic/odp_schedule_if.c
index daf6c98..a9ede98 100644
--- a/platform/linux-generic/odp_schedule_if.c
+++ b/platform/linux-generic/odp_schedule_if.c
@@ -12,9 +12,15 @@  extern const schedule_api_t schedule_sp_api;
 extern const schedule_fn_t schedule_default_fn;
 extern const schedule_api_t schedule_default_api;
 
+extern const schedule_fn_t schedule_iquery_fn;
+extern const schedule_api_t schedule_iquery_api;
+
 #ifdef ODP_SCHEDULE_SP
 const schedule_fn_t *sched_fn   = &schedule_sp_fn;
 const schedule_api_t *sched_api = &schedule_sp_api;
+#elif defined(ODP_SCHEDULE_IQUERY)
+const schedule_fn_t *sched_fn   = &schedule_iquery_fn;
+const schedule_api_t *sched_api = &schedule_iquery_api;
 #else
 const schedule_fn_t  *sched_fn  = &schedule_default_fn;
 const schedule_api_t *sched_api = &schedule_default_api;
diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c
new file mode 100644
index 0000000..f232e23
--- /dev/null
+++ b/platform/linux-generic/odp_schedule_iquery.c
@@ -0,0 +1,1306 @@ 
+/* Copyright (c) 2016, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#include <odp/api/schedule.h>
+#include <odp_schedule_if.h>
+#include <odp/api/align.h>
+#include <odp/api/queue.h>
+#include <odp/api/shared_memory.h>
+#include <odp_internal.h>
+#include <odp_debug_internal.h>
+#include <odp_ring_internal.h>
+#include <odp_bitmap_internal.h>
+#include <odp/api/thread.h>
+#include <odp/api/time.h>
+#include <odp/api/rwlock.h>
+#include <odp/api/hints.h>
+#include <odp/api/cpu.h>
+#include <odp/api/thrmask.h>
+#include <odp_config_internal.h>
+#include <odp_schedule_internal.h>
+#include <odp_schedule_ordered_internal.h>
+
+/* Number of priority levels */
+#define NUM_SCHED_PRIO 8
+
+ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (NUM_SCHED_PRIO - 1),
+		  "lowest_prio_does_not_match_with_num_prios");
+
+ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
+		  (ODP_SCHED_PRIO_NORMAL < (NUM_SCHED_PRIO - 1)),
+		  "normal_prio_is_not_between_highest_and_lowest");
+
+/* Number of scheduling groups */
+#define NUM_SCHED_GRPS 256
+
+/* Start of named groups in group mask arrays */
+#define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1)
+
+/* Instantiate a WAPL bitmap to be used as queue index bitmap */
+typedef WAPL_BITMAP(ODP_CONFIG_QUEUES) queue_index_bitmap_t;
+
+typedef struct {
+	odp_rwlock_t lock;
+	queue_index_bitmap_t queues; /* queues in this priority level */
+} sched_prio_t;
+
+typedef struct {
+	odp_rwlock_t lock;
+	bool allocated;
+	odp_thrmask_t threads; /* threads subscribe to this group */
+	queue_index_bitmap_t queues; /* queues in this group */
+	char name[ODP_SCHED_GROUP_NAME_LEN];
+} sched_group_t;
+
+/* Packet input poll command queues */
+#define PKTIO_CMD_QUEUES 4
+
+/* Maximum number of packet input queues per command */
+#define MAX_PKTIN 16
+
+/* Maximum number of packet IO interfaces */
+#define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES
+
+/* Maximum number of pktio poll commands */
+#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO)
+
+/* Pktio command is free */
+#define PKTIO_CMD_FREE ((uint32_t)-1)
+
+/* Packet IO poll queue ring size. In worst case, all pktios
+ * have all pktins enabled and one poll command is created per
+ * pktin queue. The ring size must be larger than or equal to
+ * NUM_PKTIO_CMD / PKTIO_CMD_QUEUES, so that it can hold all
+ * poll commands in the worst case.
+ */
+#define PKTIO_RING_SIZE (NUM_PKTIO_CMD / PKTIO_CMD_QUEUES)
+
+/* Mask for wrapping around pktio poll command index */
+#define PKTIO_RING_MASK (PKTIO_RING_SIZE - 1)
+
+/* Instantiate a RING data structure as pktio command queue */
+typedef struct {
+	/* Ring header */
+	ring_t ring;
+
+	/* Ring data: pktio poll command indexes */
+	uint32_t cmd_index[PKTIO_RING_SIZE];
+} pktio_cmd_queue_t ODP_ALIGNED_CACHE;
+
+/* Packet IO poll command */
+typedef struct {
+	int pktio;
+	int count;
+	int pktin[MAX_PKTIN];
+	uint32_t index;
+} pktio_cmd_t;
+
+/* Collect the pktio poll resources */
+typedef struct {
+	odp_rwlock_t lock;
+	/* count active commands per pktio interface */
+	int actives[NUM_PKTIO];
+	pktio_cmd_t commands[NUM_PKTIO_CMD];
+	pktio_cmd_queue_t queues[PKTIO_CMD_QUEUES];
+} pktio_poll_t;
+
+/* Forward declaration */
+typedef struct sched_thread_local sched_thread_local_t;
+
+typedef struct {
+	odp_shm_t selfie;
+
+	/* Schedule priorities */
+	sched_prio_t prios[NUM_SCHED_PRIO];
+
+	/* Schedule groups */
+	sched_group_t groups[NUM_SCHED_GRPS];
+
+	/* Cache queue parameters for easy reference */
+	odp_schedule_param_t queues[ODP_CONFIG_QUEUES];
+
+	/* Poll pktio inputs in spare time */
+	pktio_poll_t pktio_poll;
+
+	/* Queues send or unwind their availability indications
+	 * for scheduling, the bool value also serves as a focal
+	 * point for atomic competition. */
+	bool availables[ODP_CONFIG_QUEUES];
+
+	/* Quick reference to per thread context */
+	sched_thread_local_t *threads[ODP_THREAD_COUNT_MAX];
+} sched_global_t;
+
+/* Per thread events cache */
+typedef struct {
+	int count;
+	odp_queue_t queue;
+	odp_event_t stash[MAX_DEQ], *top;
+} event_cache_t;
+
+/* Instantiate a sparse bitmap to store thread's interested
+ * queue indexes per priority.
+ */
+typedef SPARSE_BITMAP(ODP_CONFIG_QUEUES) queue_index_sparse_t;
+
+typedef struct sched_thread_local {
+	int thread;
+	bool pause;
+
+	/* Cache events only for atomic queue */
+	event_cache_t cache;
+
+	/* Saved atomic context */
+	bool *atomic;
+
+	/* Record the pktio polls have done */
+	uint16_t pktin_polls;
+
+	/* Interested queue indexes to be checked by thread
+	 * at each priority level for scheduling, and a round
+	 * robin iterator to improve fairness between queues
+	 * in the same priority level.
+	 */
+	odp_rwlock_t lock;
+	queue_index_sparse_t indexes[NUM_SCHED_PRIO];
+	sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO];
+} sched_thread_local_t;
+
+/* Global scheduler context */
+static sched_global_t *sched;
+
+/* Thread local scheduler context */
+__thread sched_thread_local_t thread_local;
+
+static int schedule_init_global(void)
+{
+	odp_shm_t shm;
+	int i, k, prio, group;
+
+	ODP_DBG("Schedule[iquery] init ... ");
+
+	shm = odp_shm_reserve("odp_scheduler_iquery",
+			      sizeof(sched_global_t),
+			      ODP_CACHE_LINE_SIZE, 0);
+
+	sched = odp_shm_addr(shm);
+
+	if (sched == NULL) {
+		ODP_ERR("Schedule[iquery] "
+			"init: shm reserve.\n");
+		return -1;
+	}
+
+	memset(sched, 0, sizeof(sched_global_t));
+
+	sched->selfie = shm;
+
+	for (prio = 0; prio < NUM_SCHED_PRIO; prio++)
+		odp_rwlock_init(&sched->prios[prio].lock);
+
+	for (group = 0; group < NUM_SCHED_GRPS; group++) {
+		sched->groups[group].allocated = false;
+		odp_rwlock_init(&sched->groups[group].lock);
+	}
+
+	odp_rwlock_init(&sched->pktio_poll.lock);
+
+	for (i = 0; i < PKTIO_CMD_QUEUES; i++) {
+		pktio_cmd_queue_t *queue =
+			&sched->pktio_poll.queues[i];
+
+		ring_init(&queue->ring);
+
+		for (k = 0; k < PKTIO_RING_SIZE; k++)
+			queue->cmd_index[k] = RING_EMPTY;
+	}
+
+	for (i = 0; i < NUM_PKTIO_CMD; i++)
+		sched->pktio_poll.commands[i].index = PKTIO_CMD_FREE;
+
+	ODP_DBG("done\n");
+	return 0;
+}
+
+static int schedule_term_global(void)
+{
+	uint32_t i;
+	odp_shm_t shm = sched->selfie;
+
+	for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
+		int count = 0;
+		odp_event_t events[1];
+
+		if (sched->availables[i])
+			count = sched_cb_queue_deq_multi(i, events, 1);
+
+		if (count < 0)
+			sched_cb_queue_destroy_finalize(i);
+		else if (count > 0)
+			ODP_ERR("Queue (%d) not empty\n", i);
+	}
+
+	memset(sched, 0, sizeof(sched_global_t));
+
+	if (odp_shm_free(shm) < 0) {
+		ODP_ERR("Schedule[iquery] "
+			"term: shm release.\n");
+		return -1;
+	}
+	return 0;
+}
+
+/*
+ * These APIs are used to manipulate thread's interests.
+ */
+static void thread_set_interest(sched_thread_local_t *thread,
+	unsigned int queue_index, int prio);
+
+static void thread_clear_interest(sched_thread_local_t *thread,
+	unsigned int queue_index, int prio);
+
+static void thread_set_interests(sched_thread_local_t *thread,
+	queue_index_bitmap_t *set);
+
+static void thread_clear_interests(sched_thread_local_t *thread,
+	queue_index_bitmap_t *clear);
+
+static void sched_thread_local_reset(void)
+{
+	int prio;
+	queue_index_sparse_t *index;
+	sparse_bitmap_iterator_t *iterator;
+
+	memset(&sched_local, 0, sizeof(sched_local_t));
+	memset(&thread_local, 0, sizeof(sched_thread_local_t));
+
+	thread_local.thread = odp_thread_id();
+	thread_local.cache.queue = ODP_QUEUE_INVALID;
+
+	odp_rwlock_init(&thread_local.lock);
+
+	for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
+		index = &thread_local.indexes[prio];
+		iterator = &thread_local.iterators[prio];
+
+		sparse_bitmap_zero(index);
+		sparse_bitmap_iterator(iterator, index);
+	}
+}
+
+static int schedule_init_local(void)
+{
+	int group;
+	sched_group_t *G;
+	queue_index_bitmap_t collect;
+
+	wapl_bitmap_zero(&collect);
+	sched_thread_local_reset();
+
+	/* Collect all queue indexes of the schedule groups
+	 * which this thread has subscribed
+	 */
+	for (group = 0; group < NUM_SCHED_GRPS; group++) {
+		G = &sched->groups[group];
+		odp_rwlock_read_lock(&G->lock);
+
+		if ((group < SCHED_GROUP_NAMED || G->allocated == true) &&
+		    odp_thrmask_isset(&G->threads, thread_local.thread))
+			wapl_bitmap_or(&collect, &collect, &G->queues);
+
+		odp_rwlock_read_unlock(&G->lock);
+	}
+
+	/* Distribute the above collected queue indexes into
+	 * thread local interests per priority level.
+	 */
+	thread_set_interests(&thread_local, &collect);
+
+	/* "Night gathers, and now my watch begins..." */
+	sched->threads[thread_local.thread] = &thread_local;
+	return 0;
+}
+
+static inline void schedule_release_context(void);
+
+static int schedule_term_local(void)
+{
+	int group;
+	sched_group_t *G;
+
+	if (thread_local.cache.count) {
+		ODP_ERR("Locally pre-scheduled events exist.\n");
+		return -1;
+	}
+
+	schedule_release_context();
+
+	/* Unsubscribe all named schedule groups */
+	for (group = SCHED_GROUP_NAMED;
+		group < NUM_SCHED_GRPS; group++) {
+		G = &sched->groups[group];
+		odp_rwlock_write_lock(&G->lock);
+
+		if (G->allocated == true && odp_thrmask_isset(
+			&G->threads, thread_local.thread))
+			odp_thrmask_clr(&G->threads, thread_local.thread);
+
+		odp_rwlock_write_unlock(&G->lock);
+	}
+
+	/* "...for this night and all the nights to come." */
+	sched->threads[thread_local.thread] = NULL;
+	sched_thread_local_reset();
+	return 0;
+}
+
+static int init_sched_queue(uint32_t queue_index,
+			    const odp_schedule_param_t *sched_param)
+{
+	int prio, group, thread;
+	sched_prio_t *P;
+	sched_group_t *G;
+	sched_thread_local_t *local;
+
+	prio = sched_param->prio;
+	group = sched_param->group;
+
+	G = &sched->groups[group];
+	odp_rwlock_write_lock(&G->lock);
+
+	/* Named schedule group must be created prior
+	 * to queue creation to this group.
+	 */
+	if (group >= SCHED_GROUP_NAMED && G->allocated == false) {
+		odp_rwlock_write_unlock(&G->lock);
+		return -1;
+	}
+
+	/* Record the queue in its priority level globally */
+	P = &sched->prios[prio];
+
+	odp_rwlock_write_lock(&P->lock);
+	wapl_bitmap_set(&P->queues, queue_index);
+	odp_rwlock_write_unlock(&P->lock);
+
+	/* Record the queue in its schedule group */
+	wapl_bitmap_set(&G->queues, queue_index);
+
+	/* Cache queue parameters for easy reference */
+	memcpy(&sched->queues[queue_index],
+		sched_param, sizeof(odp_schedule_param_t));
+
+	/* Update all threads in this schedule group to
+	 * start check this queue index upon scheduling.
+	 */
+	thread = odp_thrmask_first(&G->threads);
+	while (thread >= 0) {
+		local = sched->threads[thread];
+		thread_set_interest(local, queue_index, prio);
+		thread = odp_thrmask_next(&G->threads, thread);
+	}
+
+	odp_rwlock_write_unlock(&G->lock);
+	return 0;
+}
+
+/*
+ * Must be called with schedule group's rwlock held.
+ * This is also being used in destroy_schedule_group()
+ * to destroy all orphan queues while destroying a whole
+ * schedule group.
+ */
+static void __destroy_sched_queue(
+	sched_group_t *G, uint32_t queue_index)
+{
+	int prio, thread;
+	sched_prio_t *P;
+	sched_thread_local_t *local;
+
+	prio = sched->queues[queue_index].prio;
+
+	/* Forget the queue in its schedule group */
+	wapl_bitmap_clear(&G->queues, queue_index);
+
+	/* Forget queue schedule parameters */
+	memset(&sched->queues[queue_index],
+		0, sizeof(odp_schedule_param_t));
+
+	/* Update all threads in this schedule group to
+	 * stop check this queue index upon scheduling.
+	 */
+	thread = odp_thrmask_first(&G->threads);
+	while (thread >= 0) {
+		local = sched->threads[thread];
+		thread_clear_interest(local, queue_index, prio);
+		thread = odp_thrmask_next(&G->threads, thread);
+	}
+
+	/* Forget the queue in its priority level globally */
+	P = &sched->prios[prio];
+
+	odp_rwlock_write_lock(&P->lock);
+	wapl_bitmap_clear(&P->queues, queue_index);
+	odp_rwlock_write_unlock(&P->lock);
+}
+
+static void destroy_sched_queue(uint32_t queue_index)
+{
+	int group;
+	sched_group_t *G;
+
+	group = sched->queues[queue_index].group;
+
+	G = &sched->groups[group];
+	odp_rwlock_write_lock(&G->lock);
+
+	/* Named schedule group could have been destroyed
+	 * earlier and left these orphan queues.
+	 */
+	if (group >= SCHED_GROUP_NAMED && G->allocated == false) {
+		odp_rwlock_write_unlock(&G->lock);
+		return;
+	}
+
+	__destroy_sched_queue(G, queue_index);
+	odp_rwlock_write_unlock(&G->lock);
+}
+
+static int pktio_cmd_queue_hash(int pktio, int pktin)
+{
+	return (pktio ^ pktin) % PKTIO_CMD_QUEUES;
+}
+
+static inline pktio_cmd_t *alloc_pktio_cmd(void)
+{
+	int i;
+	pktio_cmd_t *cmd = NULL;
+
+	odp_rwlock_write_lock(&sched->pktio_poll.lock);
+
+	/* Find next free command */
+	for (i = 0; i < NUM_PKTIO_CMD; i++) {
+		if (sched->pktio_poll.commands[i].index
+				== PKTIO_CMD_FREE) {
+			cmd = &sched->pktio_poll.commands[i];
+			cmd->index = i;
+			break;
+		}
+	}
+
+	odp_rwlock_write_unlock(&sched->pktio_poll.lock);
+	return cmd;
+}
+
+static inline void free_pktio_cmd(pktio_cmd_t *cmd)
+{
+	odp_rwlock_write_lock(&sched->pktio_poll.lock);
+
+	cmd->index = PKTIO_CMD_FREE;
+
+	odp_rwlock_write_unlock(&sched->pktio_poll.lock);
+}
+
+static void schedule_pktio_start(int pktio, int count, int pktin[])
+{
+	int i, index;
+	pktio_cmd_t *cmd;
+
+	if (count > MAX_PKTIN)
+		ODP_ABORT("Too many input queues for scheduler\n");
+
+	/* Record the active commands count per pktio interface */
+	sched->pktio_poll.actives[pktio] = count;
+
+	/* Create a pktio poll command per pktin */
+	for (i = 0; i < count; i++) {
+
+		cmd = alloc_pktio_cmd();
+
+		if (cmd == NULL)
+			ODP_ABORT("Scheduler out of pktio commands\n");
+
+		index = pktio_cmd_queue_hash(pktio, pktin[i]);
+
+		cmd->pktio = pktio;
+		cmd->count = 1;
+		cmd->pktin[0] = pktin[i];
+		ring_enq(&sched->pktio_poll.queues[index].ring,
+			PKTIO_RING_MASK, cmd->index);
+	}
+}
+
+static int schedule_pktio_stop(int pktio, int pktin ODP_UNUSED)
+{
+	int remains;
+
+	odp_rwlock_write_lock(&sched->pktio_poll.lock);
+
+	sched->pktio_poll.actives[pktio]--;
+	remains = sched->pktio_poll.actives[pktio];
+
+	odp_rwlock_write_unlock(&sched->pktio_poll.lock);
+	return remains;
+}
+
+#define DO_SCHED_LOCK() odp_rwlock_read_lock(&thread_local.lock)
+#define DO_SCHED_UNLOCK() odp_rwlock_read_unlock(&thread_local.lock)
+
+static inline bool do_schedule_prio(int prio);
+
+static inline int pop_cache_events(odp_event_t ev[], unsigned int max)
+{
+	int k = 0;
+	event_cache_t *cache;
+
+	cache = &thread_local.cache;
+	while (cache->count && max) {
+		ev[k] = *cache->top++;
+		k++;
+		max--;
+		cache->count--;
+	}
+
+	return k;
+}
+
+static inline void assign_queue_handle(odp_queue_t *handle)
+{
+	if (handle)
+		*handle = thread_local.cache.queue;
+}
+
+static inline void pktio_poll_input(void)
+{
+	int i, hash;
+	uint32_t index;
+
+	ring_t *ring;
+	pktio_cmd_t *cmd;
+
+	/*
+	 * Each thread starts the search for a poll command
+	 * from the hash(threadID) queue to mitigate contentions.
+	 * If the queue is empty, it moves to other queues.
+	 *
+	 * Most of the times, the search stops on the first
+	 * command found to optimize multi-threaded performance.
+	 * A small portion of polls have to do full iteration to
+	 * avoid packet input starvation when there are less
+	 * threads than command queues.
+	 */
+	hash = thread_local.thread % PKTIO_CMD_QUEUES;
+
+	for (i = 0; i < PKTIO_CMD_QUEUES; i++,
+		hash = (hash + 1) % PKTIO_CMD_QUEUES) {
+
+		ring = &sched->pktio_poll.queues[hash].ring;
+		index = ring_deq(ring, PKTIO_RING_MASK);
+
+		if (odp_unlikely(index == RING_EMPTY))
+			continue;
+
+		cmd = &sched->pktio_poll.commands[index];
+
+		/* Poll packet input */
+		if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio,
+						     cmd->count,
+						     cmd->pktin))) {
+			/* Pktio stopped or closed. Remove poll
+			 * command and call stop_finalize when all
+			 * commands of the pktio has been removed.
+			 */
+			if (schedule_pktio_stop(cmd->pktio,
+						cmd->pktin[0]) == 0)
+				sched_cb_pktio_stop_finalize(cmd->pktio);
+
+			free_pktio_cmd(cmd);
+		} else {
+			/* Continue scheduling the pktio */
+			ring_enq(ring, PKTIO_RING_MASK, index);
+
+			/* Do not iterate through all pktin poll
+			 * command queues every time.
+			 */
+			if (odp_likely(thread_local.pktin_polls & 0xF))
+				break;
+		}
+	}
+
+	thread_local.pktin_polls++;
+}
+
+/*
+ * Schedule queues
+ */
+static int do_schedule(odp_queue_t *out_queue,
+	odp_event_t out_ev[], unsigned int max_num)
+{
+	int prio, count;
+
+	/* Consume locally cached events */
+	count = pop_cache_events(out_ev, max_num);
+	if (count > 0) {
+		assign_queue_handle(out_queue);
+		return count;
+	}
+
+	schedule_release_context();
+
+	if (odp_unlikely(thread_local.pause))
+		return count;
+
+	DO_SCHED_LOCK();
+	/* Schedule events */
+	for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
+		/* Round robin iterate the interested queue
+		 * indexes in this priority level to compete
+		 * and consume available queues
+		 */
+		if (!do_schedule_prio(prio))
+			continue;
+
+		count = pop_cache_events(out_ev, max_num);
+		assign_queue_handle(out_queue);
+		DO_SCHED_UNLOCK();
+		return count;
+	}
+
+	DO_SCHED_UNLOCK();
+
+	/* Poll packet input when there are no events */
+	pktio_poll_input();
+	return 0;
+}
+
+static int schedule_loop(odp_queue_t *out_queue, uint64_t wait,
+			 odp_event_t out_ev[], unsigned int max_num)
+{
+	int count, first = 1;
+	odp_time_t next, wtime;
+
+	while (1) {
+		count = do_schedule(out_queue, out_ev, max_num);
+
+		if (count)
+			break;
+
+		if (wait == ODP_SCHED_WAIT)
+			continue;
+
+		if (wait == ODP_SCHED_NO_WAIT)
+			break;
+
+		if (first) {
+			wtime = odp_time_local_from_ns(wait);
+			next = odp_time_sum(odp_time_local(), wtime);
+			first = 0;
+			continue;
+		}
+
+		if (odp_time_cmp(next, odp_time_local()) < 0)
+			break;
+	}
+
+	return count;
+}
+
+static odp_event_t schedule(odp_queue_t *out_queue, uint64_t wait)
+{
+	odp_event_t ev;
+
+	ev = ODP_EVENT_INVALID;
+
+	schedule_loop(out_queue, wait, &ev, 1);
+
+	return ev;
+}
+
+static int schedule_multi(odp_queue_t *out_queue, uint64_t wait,
+			  odp_event_t events[], int num)
+{
+	return schedule_loop(out_queue, wait, events, num);
+}
+
+static void schedule_pause(void)
+{
+	thread_local.pause = 1;
+}
+
+static void schedule_resume(void)
+{
+	thread_local.pause = 0;
+}
+
+static uint64_t schedule_wait_time(uint64_t ns)
+{
+	return ns;
+}
+
+static int number_of_priorites(void)
+{
+	return NUM_SCHED_PRIO;
+}
+
+/*
+ * Create a named schedule group with pre-defined
+ * set of subscription threads.
+ *
+ * Sched queues belonging to this group must be
+ * created after the group creation. Upon creation
+ * the group holds 0 sched queues.
+ */
+static odp_schedule_group_t schedule_group_create(
+	const char *name, const odp_thrmask_t *mask)
+{
+	int group;
+	sched_group_t *G;
+
+	for (group = SCHED_GROUP_NAMED;
+		group < NUM_SCHED_GRPS; group++) {
+		G = &sched->groups[group];
+
+		odp_rwlock_write_lock(&G->lock);
+		if (G->allocated == false) {
+			strncpy(G->name, name ? name : "",
+				ODP_SCHED_GROUP_NAME_LEN - 1);
+			odp_thrmask_copy(&G->threads, mask);
+			wapl_bitmap_zero(&G->queues);
+
+			G->allocated = true;
+			odp_rwlock_write_unlock(&G->lock);
+			return (odp_schedule_group_t)group;
+		}
+		odp_rwlock_write_unlock(&G->lock);
+	}
+
+	return ODP_SCHED_GROUP_INVALID;
+}
+
+static inline void __destroy_group_queues(sched_group_t *group)
+{
+	unsigned int index;
+	wapl_bitmap_iterator_t it;
+
+	/* Constructor */
+	wapl_bitmap_iterator(&it, &group->queues);
+
+	/* Walk through the queue index bitmap */
+	for (it.start(&it); it.has_next(&it);) {
+		index = it.next(&it);
+		__destroy_sched_queue(group, index);
+	}
+}
+
+/*
+ * Destroy a named schedule group.
+ */
+static int schedule_group_destroy(odp_schedule_group_t group)
+{
+	int done = -1;
+	sched_group_t *G;
+
+	if (group < SCHED_GROUP_NAMED ||
+	    group >= NUM_SCHED_GRPS)
+		return -1;
+
+	G = &sched->groups[group];
+	odp_rwlock_write_lock(&G->lock);
+
+	if (G->allocated == true) {
+		/* Destroy all queues in this schedule group
+		 * and leave no orphan queues.
+		 */
+		__destroy_group_queues(G);
+
+		done = 0;
+		G->allocated = false;
+		wapl_bitmap_zero(&G->queues);
+		odp_thrmask_zero(&G->threads);
+		memset(G->name, 0, ODP_SCHED_GROUP_NAME_LEN);
+	}
+
+	odp_rwlock_write_unlock(&G->lock);
+	return done;
+}
+
+static odp_schedule_group_t schedule_group_lookup(const char *name)
+{
+	int group;
+	sched_group_t *G;
+
+	for (group = SCHED_GROUP_NAMED;
+	     group < NUM_SCHED_GRPS; group++) {
+		G = &sched->groups[group];
+
+		odp_rwlock_read_lock(&G->lock);
+		if (strcmp(name, G->name) == 0) {
+			odp_rwlock_read_unlock(&G->lock);
+			return (odp_schedule_group_t)group;
+		}
+		odp_rwlock_read_unlock(&G->lock);
+	}
+
+	return ODP_SCHED_GROUP_INVALID;
+}
+
+static int schedule_group_join(odp_schedule_group_t group,
+			       const odp_thrmask_t *mask)
+{
+	int done = -1, thread;
+	sched_group_t *G;
+	sched_thread_local_t *local;
+
+	/* Named schedule group only */
+	if (group < SCHED_GROUP_NAMED ||
+		group >= NUM_SCHED_GRPS)
+		return done;
+
+	G = &sched->groups[group];
+	odp_rwlock_write_lock(&G->lock);
+
+	if (G->allocated == true) {
+		/* Make new joined threads to start check
+		 * queue indexes in this schedule group
+		 */
+		thread = odp_thrmask_first(mask);
+		while (thread >= 0) {
+			local = sched->threads[thread];
+			thread_set_interests(local, &G->queues);
+
+			odp_thrmask_set(&G->threads, thread);
+			thread = odp_thrmask_next(mask, thread);
+		}
+		done = 0;
+	}
+
+	odp_rwlock_write_unlock(&G->lock);
+	return done;
+}
+
+static int schedule_group_leave(odp_schedule_group_t group,
+				const odp_thrmask_t *mask)
+{
+	int done = -1, thread;
+	sched_group_t *G;
+	sched_thread_local_t *local;
+
+	/* Named schedule group only */
+	if (group < SCHED_GROUP_NAMED ||
+		group >= NUM_SCHED_GRPS)
+		return done;
+
+	G = &sched->groups[group];
+	odp_rwlock_write_lock(&G->lock);
+
+	if (G->allocated == true) {
+		/* Make leaving threads to stop check
+		 * queue indexes in this schedule group
+		 */
+		thread = odp_thrmask_first(mask);
+		while (thread >= 0) {
+			local = sched->threads[thread];
+			thread_clear_interests(local, &G->queues);
+
+			odp_thrmask_clr(&G->threads, thread);
+			thread = odp_thrmask_next(mask, thread);
+		}
+		done = 0;
+	}
+
+	odp_rwlock_write_unlock(&G->lock);
+	return done;
+}
+
+static int schedule_group_thrmask(odp_schedule_group_t group,
+				  odp_thrmask_t *thrmask)
+{
+	int done = -1;
+	sched_group_t *G;
+
+	/* Named schedule group only */
+	if (group < SCHED_GROUP_NAMED ||
+		group >= NUM_SCHED_GRPS)
+		return done;
+
+	G = &sched->groups[group];
+	odp_rwlock_read_lock(&G->lock);
+
+	if (G->allocated == true && thrmask != NULL) {
+		done = 0;
+		odp_thrmask_copy(thrmask, &G->threads);
+	}
+
+	odp_rwlock_read_unlock(&G->lock);
+	return done;
+}
+
+static int schedule_group_info(odp_schedule_group_t group,
+			       odp_schedule_group_info_t *info)
+{
+	int done = -1;
+	sched_group_t *G;
+
+	/* Named schedule group only */
+	if (group < SCHED_GROUP_NAMED ||
+		group >= NUM_SCHED_GRPS)
+		return done;
+
+	G = &sched->groups[group];
+	odp_rwlock_read_lock(&G->lock);
+
+	if (G->allocated == true && info != NULL) {
+		done = 0;
+		info->name = G->name;
+		odp_thrmask_copy(&info->thrmask, &G->threads);
+	}
+
+	odp_rwlock_read_unlock(&G->lock);
+	return done;
+}
+
+/* This function is a no-op */
+static void schedule_prefetch(int num ODP_UNUSED)
+{
+}
+
+/*
+ * Limited to join and leave pre-defined schedule groups
+ * before and after thread local initialization or termination.
+ */
+static int group_add_thread(odp_schedule_group_t group, int thread)
+{
+	sched_group_t *G;
+
+	if (group < 0 || group >= SCHED_GROUP_NAMED)
+		return -1;
+
+	G = &sched->groups[group];
+
+	odp_rwlock_write_lock(&G->lock);
+	odp_thrmask_set(&G->threads, thread);
+	odp_rwlock_write_unlock(&G->lock);
+	return 0;
+}
+
+static int group_remove_thread(odp_schedule_group_t group, int thread)
+{
+	sched_group_t *G;
+
+	if (group < 0 || group >= SCHED_GROUP_NAMED)
+		return -1;
+
+	G = &sched->groups[group];
+
+	odp_rwlock_write_lock(&G->lock);
+	odp_thrmask_clr(&G->threads, thread);
+	odp_rwlock_write_unlock(&G->lock);
+	return 0;
+}
+
+static int schedule_sched_queue(uint32_t queue_index)
+{
+	sched_local.ignore_ordered_context = 1;
+
+	/* Set available indications globally */
+	sched->availables[queue_index] = true;
+	return 0;
+}
+
+static int schedule_unsched_queue(uint32_t queue_index)
+{
+	/* Clear available indications globally */
+	sched->availables[queue_index] = false;
+	return 0;
+}
+
+static void schedule_release_atomic(void)
+{
+	unsigned int queue_index;
+
+	if ((thread_local.atomic != NULL) &&
+		(thread_local.cache.count == 0)) {
+		queue_index = thread_local.atomic - sched->availables;
+		thread_local.atomic = NULL;
+		sched->availables[queue_index] = true;
+	}
+}
+
+static void schedule_release_ordered(void)
+{
+	if (sched_local.origin_qe) {
+		int rc = release_order(sched_local.origin_qe,
+				       sched_local.order,
+				       sched_local.pool,
+				       sched_local.enq_called);
+		if (rc == 0)
+			sched_local.origin_qe = NULL;
+	}
+}
+
+static inline void schedule_release_context(void)
+{
+	if (sched_local.origin_qe != NULL) {
+		release_order(sched_local.origin_qe, sched_local.order,
+			      sched_local.pool, sched_local.enq_called);
+		sched_local.origin_qe = NULL;
+	} else {
+		schedule_release_atomic();
+	}
+}
+
+static int number_of_groups(void)
+{
+	return NUM_SCHED_GRPS;
+}
+
+/* Fill in scheduler interface */
+const schedule_fn_t schedule_iquery_fn = {
+	.pktio_start   = schedule_pktio_start,
+	.thr_add       = group_add_thread,
+	.thr_rem       = group_remove_thread,
+	.num_grps      = number_of_groups,
+	.init_queue    = init_sched_queue,
+	.destroy_queue = destroy_sched_queue,
+	.sched_queue   = schedule_sched_queue,
+	.unsched_queue = schedule_unsched_queue,
+	.ord_enq_multi = schedule_ordered_queue_enq_multi,
+	.init_global   = schedule_init_global,
+	.term_global   = schedule_term_global,
+	.init_local    = schedule_init_local,
+	.term_local    = schedule_term_local
+};
+
+/* Fill in scheduler API calls */
+const schedule_api_t schedule_iquery_api = {
+	.schedule_wait_time       = schedule_wait_time,
+	.schedule                 = schedule,
+	.schedule_multi           = schedule_multi,
+	.schedule_pause           = schedule_pause,
+	.schedule_resume          = schedule_resume,
+	.schedule_release_atomic  = schedule_release_atomic,
+	.schedule_release_ordered = schedule_release_ordered,
+	.schedule_prefetch        = schedule_prefetch,
+	.schedule_num_prio        = number_of_priorites,
+	.schedule_group_create    = schedule_group_create,
+	.schedule_group_destroy   = schedule_group_destroy,
+	.schedule_group_lookup    = schedule_group_lookup,
+	.schedule_group_join      = schedule_group_join,
+	.schedule_group_leave     = schedule_group_leave,
+	.schedule_group_thrmask   = schedule_group_thrmask,
+	.schedule_group_info      = schedule_group_info,
+	.schedule_order_lock      = schedule_order_lock,
+	.schedule_order_unlock    = schedule_order_unlock
+};
+
+static void thread_set_interest(sched_thread_local_t *thread,
+	unsigned int queue_index, int prio)
+{
+	queue_index_sparse_t *index;
+
+	if (thread == NULL)
+		return;
+
+	if (prio >= NUM_SCHED_PRIO)
+		return;
+
+	index = &thread->indexes[prio];
+
+	odp_rwlock_write_lock(&thread->lock);
+	sparse_bitmap_set(index, queue_index);
+	odp_rwlock_write_unlock(&thread->lock);
+}
+
+static void thread_clear_interest(sched_thread_local_t *thread,
+	unsigned int queue_index, int prio)
+{
+	queue_index_sparse_t *index;
+
+	if (thread == NULL)
+		return;
+
+	if (prio >= NUM_SCHED_PRIO)
+		return;
+
+	index = &thread->indexes[prio];
+
+	odp_rwlock_write_lock(&thread->lock);
+	sparse_bitmap_clear(index, queue_index);
+	odp_rwlock_write_unlock(&thread->lock);
+}
+
+static void thread_set_interests(sched_thread_local_t *thread,
+	queue_index_bitmap_t *set)
+{
+	int prio;
+	sched_prio_t *P;
+	unsigned int queue_index;
+	queue_index_bitmap_t subset;
+	wapl_bitmap_iterator_t it;
+
+	if (thread == NULL || set == NULL)
+		return;
+
+	for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
+		P = &sched->prios[prio];
+		odp_rwlock_read_lock(&P->lock);
+
+		/* The collection of queue indexes in 'set'
+		 * may belong to several priority levels.
+		 */
+		wapl_bitmap_zero(&subset);
+		wapl_bitmap_and(&subset, &P->queues, set);
+
+		odp_rwlock_read_unlock(&P->lock);
+
+		/* Add the subset to local indexes */
+		wapl_bitmap_iterator(&it, &subset);
+		for (it.start(&it); it.has_next(&it);) {
+			queue_index = it.next(&it);
+			thread_set_interest(thread, queue_index, prio);
+		}
+	}
+}
+
+static void thread_clear_interests(sched_thread_local_t *thread,
+	queue_index_bitmap_t *clear)
+{
+	int prio;
+	sched_prio_t *P;
+	unsigned int queue_index;
+	queue_index_bitmap_t subset;
+	wapl_bitmap_iterator_t it;
+
+	if (thread == NULL || clear == NULL)
+		return;
+
+	for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
+		P = &sched->prios[prio];
+		odp_rwlock_read_lock(&P->lock);
+
+		/* The collection of queue indexes in 'clear'
+		 * may belong to several priority levels.
+		 */
+		wapl_bitmap_zero(&subset);
+		wapl_bitmap_and(&subset, &P->queues, clear);
+
+		odp_rwlock_read_unlock(&P->lock);
+
+		/* Remove the subset from local indexes */
+		wapl_bitmap_iterator(&it, &subset);
+		for (it.start(&it); it.has_next(&it);) {
+			queue_index = it.next(&it);
+			thread_clear_interest(thread, queue_index, prio);
+		}
+	}
+}
+
+static inline bool is_atomic_queue(unsigned int queue_index)
+{
+	return (sched->queues[queue_index].sync
+			== ODP_SCHED_SYNC_ATOMIC);
+}
+
+static inline bool is_ordered_queue(unsigned int queue_index)
+{
+	return (sched->queues[queue_index].sync
+			== ODP_SCHED_SYNC_ORDERED);
+}
+
+static inline bool compete_atomic_queue(unsigned int queue_index)
+{
+	bool expected = sched->availables[queue_index];
+
+	if (expected && is_atomic_queue(queue_index)) {
+		expected = __atomic_compare_exchange_n(
+			&sched->availables[queue_index],
+			&expected, false, 0,
+			__ATOMIC_RELEASE, __ATOMIC_RELAXED);
+	}
+
+	return expected;
+}
+
+static inline void save_schedule_context(unsigned int queue_index)
+{
+	if (is_atomic_queue(queue_index))
+		thread_local.atomic = &sched->availables[queue_index];
+	else if (is_ordered_queue(queue_index))
+		cache_order_info(queue_index, thread_local.cache.stash[0]);
+}
+
+static inline int consume_queue(int prio, unsigned int queue_index)
+{
+	int count;
+	unsigned int max = MAX_DEQ;
+	event_cache_t *cache = &thread_local.cache;
+
+	/* Low priorities have smaller batch size to limit
+	 * head of line blocking latency.
+	 */
+	if (odp_unlikely(prio > ODP_SCHED_PRIO_DEFAULT))
+		max = MAX_DEQ / 2;
+
+	/* For ordered queues we want consecutive events to
+	 * be dispatched to separate threads, so do not cache
+	 * them locally.
+	 */
+	if (is_ordered_queue(queue_index))
+		max = 1;
+
+	count = sched_cb_queue_deq_multi(
+		queue_index, cache->stash, max);
+
+	if (count < 0) {
+		DO_SCHED_UNLOCK();
+		sched_cb_queue_destroy_finalize(queue_index);
+		DO_SCHED_LOCK();
+		return 0;
+	}
+
+	if (count == 0)
+		return 0;
+
+	cache->top = &cache->stash[0];
+	cache->count = count;
+	cache->queue = sched_cb_queue_handle(queue_index);
+	return count;
+}
+
+static inline bool do_schedule_prio(int prio)
+{
+	int nbits, next, end;
+	unsigned int queue_index;
+	sparse_bitmap_iterator_t *it;
+
+	it = &thread_local.iterators[prio];
+	nbits = (int) *(it->_base.last);
+
+	/* No interests at all! */
+	if (nbits <= 0)
+		return false;
+
+	/* In critical path, cannot afford iterator calls,
+	 * do it manually with internal knowledge
+	 */
+	it->_start = (it->_start + 1) % nbits;
+	end = it->_start + nbits;
+
+	for (next = it->_start; next < end; next++) {
+		queue_index = it->_base.il[next % nbits];
+
+		if (!compete_atomic_queue(queue_index))
+			continue;
+
+		if (!consume_queue(prio, queue_index))
+			continue;
+
+		save_schedule_context(queue_index);
+		return true;
+	}
+
+	return false;
+}
diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c
index 8b355da..b798e81 100644
--- a/platform/linux-generic/odp_schedule_sp.c
+++ b/platform/linux-generic/odp_schedule_sp.c
@@ -298,6 +298,11 @@  static int sched_queue(uint32_t qi)
 	return 0;
 }
 
+static int unsched_queue(uint32_t qi ODP_UNUSED)
+{
+	return 0;
+}
+
 static int ord_enq_multi(uint32_t queue_index, void *buf_hdr[], int num,
 			 int sustain, int *ret)
 {
@@ -669,6 +674,7 @@  const schedule_fn_t schedule_sp_fn = {
 	.init_queue    = init_queue,
 	.destroy_queue = destroy_queue,
 	.sched_queue   = sched_queue,
+	.unsched_queue = unsched_queue,
 	.ord_enq_multi = ord_enq_multi,
 	.init_global   = init_global,
 	.term_global   = term_global,