diff mbox

[v2,1/2] test: perf: add new scheduling latency test

Message ID 1473843187-25588-1-git-send-email-matias.elo@nokia.com
State Accepted
Commit bc65897481d8ce89a55257b22bf93d05abf74f70
Headers show

Commit Message

Elo, Matias (Nokia - FI/Espoo) Sept. 14, 2016, 8:53 a.m. UTC
Add new scheduling latency benchmark application. The application
measures delays (avg, min, max) for high and low priority events.

The test has a configurable number of TRAFFIC events and few SAMPLE events
(one common or one per priority). The scheduling latency is only measured
from the SAMPLE events to minimize measurement overhead.

The application's command line arguments enable configuring:
- Number of processing threads
- Number of high/low priority queues
- Number of high/low priority events
- Use separate SAMPLE events for each priority
- Scheduled queue type (PARALLEL, ATOMIC, ORDERED)

Signed-off-by: Matias Elo <matias.elo@nokia.com>

---

V2:
- Remove unnecessary 'num_workers' initialization (Maxim)

 test/common_plat/performance/.gitignore          |   1 +
 test/common_plat/performance/Makefile.am         |   4 +
 test/common_plat/performance/odp_sched_latency.c | 767 +++++++++++++++++++++++
 3 files changed, 772 insertions(+)
 create mode 100644 test/common_plat/performance/odp_sched_latency.c

-- 
2.7.4

Comments

Brian Brooks Sept. 16, 2016, 10:05 p.m. UTC | #1
On 09/14 11:53:06, Matias Elo wrote:
> +

> +	/* Clear possible locally stored buffers */

> +	odp_schedule_pause();

> +

> +	while (1) {

> +		ev = odp_schedule(&src_queue, ODP_SCHED_NO_WAIT);

> +

> +		if (ev == ODP_EVENT_INVALID)

> +			break;

> +

> +		if (odp_queue_enq(src_queue, ev)) {

> +			LOG_ERR("[%i] Queue enqueue failed.\n", thr);

> +			odp_event_free(ev);

> +			return -1;

> +		}

> +	}

> +

> +	odp_schedule_resume();


Is it possible to skip this and go straight to draining the queues?

Locally pre-scheduled work is an implementation detail that should be hidden
by the scheduling APIs.

A hardware scheduler may not pre-schedule work to cores the way the current
software implementation does. The ODP implementation for that environment
would have to turn the scheduling call into a nop for that core if it is
paused by use of these APIs. Another way to implement it would be to remove
this core from all queue scheduling groups and leave the schedule call as-is.
If implemented by the first method, the application writer could simply just
not call the API to schedule work. If implemented by the second method, there
are already scheduling group APIs to do this.

Are odp_schedule_pause() and odp_schedule_resume() deprecated?

> +	odp_barrier_wait(&globals->barrier);

> +

> +	clear_sched_queues();
Brian Brooks Sept. 19, 2016, 7:11 p.m. UTC | #2
On 09/19 07:55:22, Elo, Matias (Nokia - FI/Espoo) wrote:
> > 

> > On 09/14 11:53:06, Matias Elo wrote:

> > > +

> > > +	/* Clear possible locally stored buffers */

> > > +	odp_schedule_pause();

> > > +

> > > +	while (1) {

> > > +		ev = odp_schedule(&src_queue, ODP_SCHED_NO_WAIT);

> > > +

> > > +		if (ev == ODP_EVENT_INVALID)

> > > +			break;

> > > +

> > > +		if (odp_queue_enq(src_queue, ev)) {

> > > +			LOG_ERR("[%i] Queue enqueue failed.\n", thr);

> > > +			odp_event_free(ev);

> > > +			return -1;

> > > +		}

> > > +	}

> > > +

> > > +	odp_schedule_resume();

> > 

> > Is it possible to skip this and go straight to draining the queues?

> > 

> > Locally pre-scheduled work is an implementation detail that should be hidden

> > by the scheduling APIs.

> > 

> > A hardware scheduler may not pre-schedule work to cores the way the current

> > software implementation does.

> 

> Also some HW schedulers may operate in push mode and do local cashing. Calling

> odp_schedule_pause() is the only ODP method to signal the scheduler to stop this.

> So to keep the application platform agnostic (and follow the API documentation),

> this step cannot be skipped.

> 

> -Matias


Thinking in the general sense..

Should applications have to reason about _and_ code around pre-scheduled
and non-scheduled events? If the event hasn't crossed the API boundary to be
delivered to the application according to the scheduling group policies for
that core, what is the difference to the application?

If a scheduler implementation uses TLS to pre-schedule events it also seems
like it should be able to support work-stealing of those pre-scheduled events
by other threads in the runtime case where odp_schedule() is not called from
that thread or the thread id is removed from scheduling group masks. From
the application perspective these are all implementation details.

This pause state may also cause some confusion for application writers because
it is now possible to write two different event loops for the same core
depending on how a particular scheduler implementation behaves. The semantics
seem to blur a bit with scheduling groups. Level of abstraction can be raised
by deprecating the scheduler pause state and APIs.

> > The ODP implementation for that environment

> > would have to turn the scheduling call into a nop for that core if it is

> > paused by use of these APIs. Another way to implement it would be to remove

> > this core from all queue scheduling groups and leave the schedule call as-is.

> > If implemented by the first method, the application writer could simply just

> > not call the API to schedule work. If implemented by the second method, there

> > are already scheduling group APIs to do this.

> 

> The ODP implementation is free to choose how it implements these calls. For

> example adding a single 'if (odp_unlikely(x))' to odp_schedule() to make it a NOP

> after odp_schedule_pause() has been called shouldn't cause a significant overhead.

> 

> > 

> > Are odp_schedule_pause() and odp_schedule_resume() deprecated?

> 

> Nope.

> 

> > 

> > > +	odp_barrier_wait(&globals->barrier);

> > > +

> > > +	clear_sched_queues();
Bill Fischofer Sept. 19, 2016, 8:41 p.m. UTC | #3
On Mon, Sep 19, 2016 at 2:11 PM, Brian Brooks <brian.brooks@linaro.org>
wrote:

> On 09/19 07:55:22, Elo, Matias (Nokia - FI/Espoo) wrote:

> > >

> > > On 09/14 11:53:06, Matias Elo wrote:

> > > > +

> > > > + /* Clear possible locally stored buffers */

> > > > + odp_schedule_pause();

> > > > +

> > > > + while (1) {

> > > > +         ev = odp_schedule(&src_queue, ODP_SCHED_NO_WAIT);

> > > > +

> > > > +         if (ev == ODP_EVENT_INVALID)

> > > > +                 break;

> > > > +

> > > > +         if (odp_queue_enq(src_queue, ev)) {

> > > > +                 LOG_ERR("[%i] Queue enqueue failed.\n", thr);

> > > > +                 odp_event_free(ev);

> > > > +                 return -1;

> > > > +         }

> > > > + }

> > > > +

> > > > + odp_schedule_resume();

> > >

> > > Is it possible to skip this and go straight to draining the queues?

> > >

> > > Locally pre-scheduled work is an implementation detail that should be

> hidden

> > > by the scheduling APIs.

> > >

> > > A hardware scheduler may not pre-schedule work to cores the way the

> current

> > > software implementation does.

> >

> > Also some HW schedulers may operate in push mode and do local cashing.

> Calling

> > odp_schedule_pause() is the only ODP method to signal the scheduler to

> stop this.

> > So to keep the application platform agnostic (and follow the API

> documentation),

> > this step cannot be skipped.

> >

> > -Matias

>

> Thinking in the general sense..

>

> Should applications have to reason about _and_ code around pre-scheduled

> and non-scheduled events? If the event hasn't crossed the API boundary to

> be

> delivered to the application according to the scheduling group policies for

> that core, what is the difference to the application?

>

> If a scheduler implementation uses TLS to pre-schedule events it also seems

> like it should be able to support work-stealing of those pre-scheduled

> events

> by other threads in the runtime case where odp_schedule() is not called

> from

> that thread or the thread id is removed from scheduling group masks. From

> the application perspective these are all implementation details.

>


You're making an argument I made some time back. :)  As I recall, the
rationale for pause/resume was to make life easier for existing code that
is introducing ODP on a more gradual basis. Presumably Nokia has examples
of such code in house.

From a design standpoint worker threads shouldn't "change their minds" and
go off to do something else for a while. For whatever else they might want
to do it would seem that such requirements would be better served by simply
having another thread to do the other things that wakes up periodically to
do them.


>

> This pause state may also cause some confusion for application writers

> because

> it is now possible to write two different event loops for the same core

> depending on how a particular scheduler implementation behaves. The

> semantics

> seem to blur a bit with scheduling groups. Level of abstraction can be

> raised

> by deprecating the scheduler pause state and APIs.

>


This is a worthwhile discussion to have. I'll add it to the agenda for
tomorrow's ODP call and we can include it in the wider scheduler
discussions scheduled for next week. The other rationale for not wanting
this behavior (another argument I advanced earlier) is that it greatly
complicates recovery processing. A robustly designed application should be
able to recover from the failure of an individual thread (this is
especially true if the ODP thread is in fact a separate process). If the
implementation has prescheduled events to a failed thread then how are they
recovered gracefully? Conversely, if the implementation can recover from
such a scenario than it would seem it could equally "unschedule" prestaged
events as needed due to thread termination (normal or abnormal) or for load
balancing purposes.

We may not be able to fully deprecate these APIs, but perhaps we can make
it clearer how they are intended to be used and classify them as
"discouraged" for new code.


>

> > > The ODP implementation for that environment

> > > would have to turn the scheduling call into a nop for that core if it

> is

> > > paused by use of these APIs. Another way to implement it would be to

> remove

> > > this core from all queue scheduling groups and leave the schedule call

> as-is.

> > > If implemented by the first method, the application writer could

> simply just

> > > not call the API to schedule work. If implemented by the second

> method, there

> > > are already scheduling group APIs to do this.

> >

> > The ODP implementation is free to choose how it implements these calls.

> For

> > example adding a single 'if (odp_unlikely(x))' to odp_schedule() to make

> it a NOP

> > after odp_schedule_pause() has been called shouldn't cause a significant

> overhead.

> >

> > >

> > > Are odp_schedule_pause() and odp_schedule_resume() deprecated?

> >

> > Nope.

> >

> > >

> > > > + odp_barrier_wait(&globals->barrier);

> > > > +

> > > > + clear_sched_queues();

>
Brian Brooks Sept. 21, 2016, 10:11 p.m. UTC | #4
For series:

Reviewed-and-tested-by: Brian Brooks <brian.brooks@linaro.org>

On 09/14 11:53:06, Matias Elo wrote:
> Add new scheduling latency benchmark application. The application

> measures delays (avg, min, max) for high and low priority events.

> 

> The test has a configurable number of TRAFFIC events and few SAMPLE events

> (one common or one per priority). The scheduling latency is only measured

> from the SAMPLE events to minimize measurement overhead.

> 

> The application's command line arguments enable configuring:

> - Number of processing threads

> - Number of high/low priority queues

> - Number of high/low priority events

> - Use separate SAMPLE events for each priority

> - Scheduled queue type (PARALLEL, ATOMIC, ORDERED)

> 

> Signed-off-by: Matias Elo <matias.elo@nokia.com>

> ---

> 

> V2:

> - Remove unnecessary 'num_workers' initialization (Maxim)

> 

>  test/common_plat/performance/.gitignore          |   1 +

>  test/common_plat/performance/Makefile.am         |   4 +

>  test/common_plat/performance/odp_sched_latency.c | 767 +++++++++++++++++++++++

>  3 files changed, 772 insertions(+)

>  create mode 100644 test/common_plat/performance/odp_sched_latency.c

> 

> diff --git a/test/common_plat/performance/.gitignore b/test/common_plat/performance/.gitignore

> index edcc832..1527d25 100644

> --- a/test/common_plat/performance/.gitignore

> +++ b/test/common_plat/performance/.gitignore

> @@ -4,4 +4,5 @@ odp_atomic

>  odp_crypto

>  odp_l2fwd

>  odp_pktio_perf

> +odp_sched_latency

>  odp_scheduling

> diff --git a/test/common_plat/performance/Makefile.am b/test/common_plat/performance/Makefile.am

> index d23bb3e..f5dd8dd 100644

> --- a/test/common_plat/performance/Makefile.am

> +++ b/test/common_plat/performance/Makefile.am

> @@ -5,6 +5,7 @@ TESTS_ENVIRONMENT += TEST_DIR=${builddir}

>  EXECUTABLES = odp_crypto$(EXEEXT) odp_pktio_perf$(EXEEXT)

>  

>  COMPILE_ONLY = odp_l2fwd$(EXEEXT) \

> +	       odp_sched_latency$(EXEEXT) \

>  	       odp_scheduling$(EXEEXT)

>  

>  TESTSCRIPTS = odp_l2fwd_run.sh \

> @@ -20,6 +21,8 @@ bin_PROGRAMS = $(EXECUTABLES) $(COMPILE_ONLY)

>  

>  odp_crypto_LDFLAGS = $(AM_LDFLAGS) -static

>  odp_crypto_CFLAGS = $(AM_CFLAGS) -I${top_srcdir}/test

> +odp_sched_latency_LDFLAGS = $(AM_LDFLAGS) -static

> +odp_sched_latency_CFLAGS = $(AM_CFLAGS) -I${top_srcdir}/test

>  odp_scheduling_LDFLAGS = $(AM_LDFLAGS) -static

>  odp_scheduling_CFLAGS = $(AM_CFLAGS) -I${top_srcdir}/test

>  

> @@ -27,6 +30,7 @@ noinst_HEADERS = \

>  		  $(top_srcdir)/test/test_debug.h

>  

>  dist_odp_crypto_SOURCES = odp_crypto.c

> +dist_odp_sched_latency_SOURCES = odp_sched_latency.c

>  dist_odp_scheduling_SOURCES = odp_scheduling.c

>  dist_odp_pktio_perf_SOURCES = odp_pktio_perf.c

>  

> diff --git a/test/common_plat/performance/odp_sched_latency.c b/test/common_plat/performance/odp_sched_latency.c

> new file mode 100644

> index 0000000..063fb21

> --- /dev/null

> +++ b/test/common_plat/performance/odp_sched_latency.c

> @@ -0,0 +1,767 @@

> +/* Copyright (c) 2016, Linaro Limited

> + * All rights reserved.

> + *

> + * SPDX-License-Identifier:     BSD-3-Clause

> + */

> +

> +/**

> + * @file

> + *

> + * @example odp_sched_latency.c  ODP scheduling latency benchmark application

> + */

> +

> +#include <string.h>

> +#include <stdlib.h>

> +#include <inttypes.h>

> +

> +#include <test_debug.h>

> +

> +/* ODP main header */

> +#include <odp_api.h>

> +

> +/* ODP helper for Linux apps */

> +#include <odp/helper/linux.h>

> +

> +/* GNU lib C */

> +#include <getopt.h>

> +

> +#define MAX_WORKERS	  64		/**< Maximum number of worker threads */

> +#define MAX_QUEUES	  4096		/**< Maximum number of queues */

> +#define EVENT_POOL_SIZE	  (1024 * 1024) /**< Event pool size */

> +#define TEST_ROUNDS (4 * 1024 * 1024)	/**< Test rounds for each thread */

> +#define MAIN_THREAD	   1 /**< Thread ID performing maintenance tasks */

> +

> +/* Default values for command line arguments */

> +#define SAMPLE_EVENT_PER_PRIO	  0 /**< Allocate a separate sample event for

> +					 each priority */

> +#define HI_PRIO_EVENTS		  0 /**< Number of high priority events */

> +#define LO_PRIO_EVENTS		 32 /**< Number of low priority events */

> +#define HI_PRIO_QUEUES		 16 /**< Number of high priority queues */

> +#define LO_PRIO_QUEUES		 64 /**< Number of low priority queues */

> +

> +#define EVENTS_PER_HI_PRIO_QUEUE 0  /**< Alloc HI_PRIO_QUEUES x HI_PRIO_EVENTS

> +					 events */

> +#define EVENTS_PER_LO_PRIO_QUEUE 1  /**< Alloc LO_PRIO_QUEUES x LO_PRIO_EVENTS

> +					 events */

> +ODP_STATIC_ASSERT(HI_PRIO_QUEUES <= MAX_QUEUES, "Too many HI priority queues");

> +ODP_STATIC_ASSERT(LO_PRIO_QUEUES <= MAX_QUEUES, "Too many LO priority queues");

> +

> +#define CACHE_ALIGN_ROUNDUP(x)\

> +	((ODP_CACHE_LINE_SIZE) * \

> +	 (((x) + ODP_CACHE_LINE_SIZE - 1) / (ODP_CACHE_LINE_SIZE)))

> +

> +/* Test priorities */

> +#define NUM_PRIOS 2 /**< Number of tested priorities */

> +#define HI_PRIO	  0

> +#define LO_PRIO	  1

> +

> +/** Test event types */

> +typedef enum {

> +	WARM_UP, /**< Warm up event */

> +	TRAFFIC, /**< Event used only as traffic load */

> +	SAMPLE	 /**< Event used to measure latency */

> +} event_type_t;

> +

> +/** Test event */

> +typedef struct {

> +	uint64_t ts;		/**< Send timestamp */

> +	event_type_t type;	/**< Message type */

> +	int src_idx[NUM_PRIOS]; /**< Source ODP queue */

> +	int prio;		/**< Source queue priority */

> +} test_event_t;

> +

> +/** Test arguments */

> +typedef struct {

> +	int cpu_count;			/**< CPU count */

> +	odp_schedule_sync_t sync_type;	/**< Scheduler sync type */

> +	struct {

> +		int queues;	/**< Number of scheduling queues */

> +		int events;	/**< Number of events */

> +		odp_bool_t events_per_queue; /**< Allocate 'queues' x 'events'

> +						  test events */

> +	} prio[NUM_PRIOS];

> +	odp_bool_t sample_per_prio; /**< Allocate a separate sample event for

> +					 each priority */

> +} test_args_t;

> +

> +/** Latency measurements statistics */

> +typedef struct {

> +	uint64_t events;   /**< Total number of received events */

> +	uint64_t sample_events;  /**< Number of received sample events */

> +	uint64_t tot;	   /**< Total event latency. Sum of all events. */

> +	uint64_t min;	   /**< Minimum event latency */

> +	uint64_t max;	   /**< Maximum event latency */

> +} test_stat_t;

> +

> +/** Performance test statistics (per core) */

> +typedef union {

> +	test_stat_t prio[NUM_PRIOS]; /**< Test statistics per priority */

> +

> +	uint8_t pad[CACHE_ALIGN_ROUNDUP(NUM_PRIOS * sizeof(test_stat_t))];

> +} core_stat_t ODP_ALIGNED_CACHE;

> +

> +/** Test global variables */

> +typedef struct {

> +	core_stat_t	 core_stat[MAX_WORKERS]; /**< Core specific stats */

> +	odp_barrier_t    barrier; /**< Barrier for thread synchronization */

> +	odp_pool_t       pool;	  /**< Pool for allocating test events */

> +	test_args_t      args;	  /**< Parsed command line arguments */

> +	odp_queue_t      queue[NUM_PRIOS][MAX_QUEUES]; /**< Scheduled queues */

> +} test_globals_t;

> +

> +/**

> + * Clear all scheduled queues.

> + *

> + * Retry to be sure that all buffers have been scheduled.

> + */

> +static void clear_sched_queues(void)

> +{

> +	odp_event_t ev;

> +

> +	while (1) {

> +		ev = odp_schedule(NULL, ODP_SCHED_NO_WAIT);

> +

> +		if (ev == ODP_EVENT_INVALID)

> +			break;

> +

> +		odp_event_free(ev);

> +	}

> +}

> +

> +/**

> + * Enqueue events into queues

> + *

> + * @param prio        Queue priority (HI_PRIO/LO_PRIO)

> + * @param num_queues  Number of queues

> + * @param num_events  Number of 'TRAFFIC' events

> + * @param num_samples Number of 'SAMPLE' events

> + * @param div_events  If true, divide 'num_events' between 'num_queues'. if

> + *		      false, enqueue 'num_events' to each queue.

> + * @param globals     Test shared data

> + *

> + * @retval 0 on success

> + * @retval -1 on failure

> + */

> +static int enqueue_events(int prio, int num_queues, int num_events,

> +			  int num_samples, odp_bool_t div_events,

> +			  test_globals_t *globals)

> +{

> +	odp_buffer_t buf[num_events + num_samples];

> +	odp_event_t ev[num_events + num_samples];

> +	odp_queue_t queue;

> +	test_event_t *event;

> +	int i, j, ret;

> +	int enq_events;

> +	int events_per_queue;

> +	int tot_events;

> +	int rdy_events = 0;

> +

> +	tot_events = num_events + num_samples;

> +

> +	if (!num_queues || !tot_events)

> +		return 0;

> +

> +	events_per_queue = tot_events;

> +	if (div_events)

> +		events_per_queue = (tot_events + num_queues - 1) / num_queues;

> +

> +	for (i = 0; i < num_queues; i++) {

> +		queue = globals->queue[prio][i];

> +

> +		ret = odp_buffer_alloc_multi(globals->pool, buf,

> +					     events_per_queue);

> +		if (ret != events_per_queue) {

> +			LOG_ERR("Buffer alloc failed. Try increasing EVENT_POOL_SIZE.\n");

> +			ret = ret < 0 ? 0 : ret;

> +			odp_buffer_free_multi(buf, ret);

> +			return -1;

> +		}

> +		for (j = 0; j < events_per_queue; j++) {

> +			if (!odp_buffer_is_valid(buf[j])) {

> +				LOG_ERR("Buffer alloc failed\n");

> +				odp_buffer_free_multi(buf, events_per_queue);

> +				return -1;

> +			}

> +

> +			event = odp_buffer_addr(buf[j]);

> +			memset(event, 0, sizeof(test_event_t));

> +

> +			/* Latency isn't measured from the first processing

> +			 * round. */

> +			if (num_samples > 0) {

> +				event->type = WARM_UP;

> +				num_samples--;

> +			} else {

> +				event->type = TRAFFIC;

> +			}

> +			event->src_idx[prio] = i;

> +			event->prio = prio;

> +			ev[j] = odp_buffer_to_event(buf[j]);

> +		}

> +

> +		enq_events = 0;

> +		do {

> +			ret = odp_queue_enq_multi(queue, &ev[enq_events],

> +						  events_per_queue -

> +						  enq_events);

> +			if (ret < 0) {

> +				LOG_ERR("Queue enqueue failed.\n");

> +				return -1;

> +			}

> +			enq_events += ret;

> +		} while (enq_events < events_per_queue);

> +

> +		rdy_events += events_per_queue;

> +		if (div_events && rdy_events >= tot_events)

> +			return 0;

> +	}

> +	return 0;

> +}

> +

> +/**

> + * Print latency measurement results

> + *

> + * @param globals  Test shared data

> + */

> +static void print_results(test_globals_t *globals)

> +{

> +	test_stat_t *lat;

> +	odp_schedule_sync_t stype;

> +	test_stat_t total;

> +	test_args_t *args;

> +	uint64_t avg;

> +	int i, j;

> +

> +	args = &globals->args;

> +	stype = globals->args.sync_type;

> +

> +	printf("\n%s queue scheduling latency\n",

> +	       (stype == ODP_SCHED_SYNC_ATOMIC) ? "ATOMIC" :

> +	       ((stype == ODP_SCHED_SYNC_ORDERED) ? "ORDERED" : "PARALLEL"));

> +

> +	printf("  LO_PRIO queues: %i\n", args->prio[LO_PRIO].queues);

> +	if (args->prio[LO_PRIO].events_per_queue)

> +		printf("  LO_PRIO event per queue: %i\n",

> +		       args->prio[LO_PRIO].events);

> +	else

> +		printf("  LO_PRIO events: %i\n", args->prio[LO_PRIO].events);

> +

> +	printf("  HI_PRIO queues: %i\n", args->prio[HI_PRIO].queues);

> +	if (args->prio[HI_PRIO].events_per_queue)

> +		printf("  HI_PRIO event per queue: %i\n\n",

> +		       args->prio[HI_PRIO].events);

> +	else

> +		printf("  HI_PRIO events: %i\n\n", args->prio[HI_PRIO].events);

> +

> +	for (i = 0; i < NUM_PRIOS; i++) {

> +		memset(&total, 0, sizeof(test_stat_t));

> +		total.min = UINT64_MAX;

> +

> +		printf("%s priority\n"

> +		       "Thread   Avg[ns]    Min[ns]    Max[ns]    Samples    Total\n"

> +		       "---------------------------------------------------------------\n",

> +		       i == HI_PRIO ? "HIGH" : "LOW");

> +		for (j = 1; j <= args->cpu_count; j++) {

> +			lat = &globals->core_stat[j].prio[i];

> +

> +			if (lat->sample_events == 0) {

> +				printf("%-8d N/A\n", j);

> +				continue;

> +			}

> +

> +			if (lat->max > total.max)

> +				total.max = lat->max;

> +			if (lat->min < total.min)

> +				total.min = lat->min;

> +			total.tot += lat->tot;

> +			total.sample_events += lat->sample_events;

> +			total.events += lat->events;

> +

> +			avg = lat->events ? lat->tot / lat->sample_events : 0;

> +			printf("%-8d %-10" PRIu64 " %-10" PRIu64 " "

> +			       "%-10" PRIu64 " %-10" PRIu64 " %-10" PRIu64 "\n",

> +			       j, avg, lat->min, lat->max, lat->sample_events,

> +			       lat->events);

> +		}

> +		printf("---------------------------------------------------------------\n");

> +		if (total.sample_events == 0) {

> +			printf("Total    N/A\n\n");

> +			continue;

> +		}

> +		avg = total.events ? total.tot / total.sample_events : 0;

> +		printf("Total    %-10" PRIu64 " %-10" PRIu64 " %-10" PRIu64 " "

> +		       "%-10" PRIu64 " %-10" PRIu64 "\n\n", avg, total.min,

> +		       total.max, total.sample_events, total.events);

> +	}

> +}

> +

> +/**

> + * Measure latency of scheduled ODP events

> + *

> + * Schedule and enqueue events until 'TEST_ROUNDS' events have been processed.

> + * Scheduling latency is measured only from type 'SAMPLE' events. Other events

> + * are simply enqueued back to the scheduling queues.

> + *

> + * For 'TRAFFIC' type events the destination queue is selected from the same

> + * priority class as source queue. 'SAMPLE' type event may change priority

> + * depending on the command line arguments.

> + *

> + * @param thr      Thread ID

> + * @param globals  Test shared data

> + *

> + * @retval 0 on success

> + * @retval -1 on failure

> + */

> +static int test_schedule(int thr, test_globals_t *globals)

> +{

> +	odp_event_t ev;

> +	odp_buffer_t buf;

> +	odp_queue_t src_queue;

> +	odp_queue_t dst_queue;

> +	uint64_t latency;

> +	uint32_t i;

> +	test_event_t *event;

> +	test_stat_t *stats;

> +	int dst_idx;

> +

> +	memset(&globals->core_stat[thr], 0, sizeof(core_stat_t));

> +	globals->core_stat[thr].prio[HI_PRIO].min = UINT64_MAX;

> +	globals->core_stat[thr].prio[LO_PRIO].min = UINT64_MAX;

> +

> +	for (i = 0; i < TEST_ROUNDS; i++) {

> +		ev = odp_schedule(&src_queue, ODP_SCHED_WAIT);

> +

> +		buf = odp_buffer_from_event(ev);

> +		event = odp_buffer_addr(buf);

> +

> +		stats = &globals->core_stat[thr].prio[event->prio];

> +

> +		if (event->type == SAMPLE) {

> +			latency = odp_time_to_ns(odp_time_global()) - event->ts;

> +

> +			if (latency > stats->max)

> +				stats->max = latency;

> +			if (latency < stats->min)

> +				stats->min = latency;

> +			stats->tot += latency;

> +			stats->sample_events++;

> +

> +			/* Move sample event to a different priority */

> +			if (!globals->args.sample_per_prio &&

> +			    globals->args.prio[!event->prio].queues)

> +				event->prio = !event->prio;

> +		}

> +

> +		if (odp_unlikely(event->type == WARM_UP))

> +			event->type = SAMPLE;

> +		else

> +			stats->events++;

> +

> +		/* Move event to next queue */

> +		dst_idx = event->src_idx[event->prio] + 1;

> +		if (dst_idx >= globals->args.prio[event->prio].queues)

> +			dst_idx = 0;

> +		event->src_idx[event->prio] = dst_idx;

> +		dst_queue = globals->queue[event->prio][dst_idx];

> +

> +		if (event->type == SAMPLE)

> +			event->ts = odp_time_to_ns(odp_time_global());

> +

> +		if (odp_queue_enq(dst_queue, ev)) {

> +			LOG_ERR("[%i] Queue enqueue failed.\n", thr);

> +			odp_event_free(ev);

> +			return -1;

> +		}

> +	}

> +

> +	/* Clear possible locally stored buffers */

> +	odp_schedule_pause();

> +

> +	while (1) {

> +		ev = odp_schedule(&src_queue, ODP_SCHED_NO_WAIT);

> +

> +		if (ev == ODP_EVENT_INVALID)

> +			break;

> +

> +		if (odp_queue_enq(src_queue, ev)) {

> +			LOG_ERR("[%i] Queue enqueue failed.\n", thr);

> +			odp_event_free(ev);

> +			return -1;

> +		}

> +	}

> +

> +	odp_schedule_resume();

> +

> +	odp_barrier_wait(&globals->barrier);

> +

> +	clear_sched_queues();

> +

> +	if (thr == MAIN_THREAD)

> +		print_results(globals);

> +

> +	return 0;

> +}

> +

> +/**

> + * Worker thread

> + *

> + * @param arg  Arguments

> + *

> + * @retval 0 on success

> + * @retval -1 on failure

> + */

> +static int run_thread(void *arg ODP_UNUSED)

> +{

> +	odp_shm_t shm;

> +	test_globals_t *globals;

> +	test_args_t *args;

> +	int thr;

> +	int sample_events = 0;

> +

> +	thr = odp_thread_id();

> +

> +	shm     = odp_shm_lookup("test_globals");

> +	globals = odp_shm_addr(shm);

> +

> +	if (globals == NULL) {

> +		LOG_ERR("Shared mem lookup failed\n");

> +		return -1;

> +	}

> +

> +	if (thr == MAIN_THREAD) {

> +		args = &globals->args;

> +

> +		if (enqueue_events(HI_PRIO, args->prio[HI_PRIO].queues,

> +				   args->prio[HI_PRIO].events, 1,

> +				   !args->prio[HI_PRIO].events_per_queue,

> +				   globals))

> +			return -1;

> +

> +		if (!args->prio[HI_PRIO].queues || args->sample_per_prio)

> +			sample_events = 1;

> +

> +		if (enqueue_events(LO_PRIO, args->prio[LO_PRIO].queues,

> +				   args->prio[LO_PRIO].events, sample_events,

> +				   !args->prio[LO_PRIO].events_per_queue,

> +				   globals))

> +			return -1;

> +	}

> +

> +	odp_barrier_wait(&globals->barrier);

> +

> +	if (test_schedule(thr, globals))

> +		return -1;

> +

> +	return 0;

> +}

> +

> +/**

> + * Print usage information

> + */

> +static void usage(void)

> +{

> +	printf("\n"

> +	       "OpenDataPlane scheduler latency benchmark application.\n"

> +	       "\n"

> +	       "Usage: ./odp_sched_latency [options]\n"

> +	       "Optional OPTIONS:\n"

> +	       "  -c, --count <number> CPU count\n"

> +	       "  -l, --lo-prio-queues <number> Number of low priority scheduled queues\n"

> +	       "  -t, --hi-prio-queues <number> Number of high priority scheduled queues\n"

> +	       "  -m, --lo-prio-events-per-queue <number> Number of events per low priority queue\n"

> +	       "  -n, --hi-prio-events-per-queue <number> Number of events per high priority queues\n"

> +	       "  -o, --lo-prio-events <number> Total number of low priority events (overrides the\n"

> +	       "				number of events per queue)\n"

> +	       "  -p, --hi-prio-events <number> Total number of high priority events (overrides the\n"

> +	       "				number of events per queue)\n"

> +	       "  -r  --sample-per-prio Allocate a separate sample event for each priority. By default\n"

> +	       "			a single sample event is used and its priority is changed after\n"

> +	       "			each processing round.\n"

> +	       "  -s, --sync  Scheduled queues' sync type\n"

> +	       "               0: ODP_SCHED_SYNC_PARALLEL (default)\n"

> +	       "               1: ODP_SCHED_SYNC_ATOMIC\n"

> +	       "               2: ODP_SCHED_SYNC_ORDERED\n"

> +	       "  -h, --help   Display help and exit.\n\n"

> +	       );

> +}

> +

> +/**

> + * Parse arguments

> + *

> + * @param argc  Argument count

> + * @param argv  Argument vector

> + * @param args  Test arguments

> + */

> +static void parse_args(int argc, char *argv[], test_args_t *args)

> +{

> +	int opt;

> +	int long_index;

> +	int i;

> +

> +	static const struct option longopts[] = {

> +		{"count", required_argument, NULL, 'c'},

> +		{"lo-prio-queues", required_argument, NULL, 'l'},

> +		{"hi-prio-queues", required_argument, NULL, 't'},

> +		{"lo-prio-events-per-queue", required_argument, NULL, 'm'},

> +		{"hi-prio-events-per-queue", required_argument, NULL, 'n'},

> +		{"lo-prio-events", required_argument, NULL, 'o'},

> +		{"hi-prio-events", required_argument, NULL, 'p'},

> +		{"sample-per-prio", no_argument, NULL, 'r'},

> +		{"sync", required_argument, NULL, 's'},

> +		{"help", no_argument, NULL, 'h'},

> +		{NULL, 0, NULL, 0}

> +	};

> +

> +	static const char *shortopts = "+c:s:l:t:m:n:o:p:rh";

> +

> +	/* Let helper collect its own arguments (e.g. --odph_proc) */

> +	odph_parse_options(argc, argv, shortopts, longopts);

> +

> +	args->sync_type = ODP_SCHED_SYNC_PARALLEL;

> +	args->sample_per_prio = SAMPLE_EVENT_PER_PRIO;

> +	args->prio[LO_PRIO].queues = LO_PRIO_QUEUES;

> +	args->prio[HI_PRIO].queues = HI_PRIO_QUEUES;

> +	args->prio[LO_PRIO].events = LO_PRIO_EVENTS;

> +	args->prio[HI_PRIO].events = HI_PRIO_EVENTS;

> +	args->prio[LO_PRIO].events_per_queue = EVENTS_PER_LO_PRIO_QUEUE;

> +	args->prio[HI_PRIO].events_per_queue = EVENTS_PER_HI_PRIO_QUEUE;

> +

> +	opterr = 0; /* Do not issue errors on helper options */

> +	while (1) {

> +		opt = getopt_long(argc, argv, shortopts, longopts, &long_index);

> +

> +		if (opt == -1)

> +			break;	/* No more options */

> +

> +		switch (opt) {

> +		case 'c':

> +			args->cpu_count = atoi(optarg);

> +			break;

> +		case 'l':

> +			args->prio[LO_PRIO].queues = atoi(optarg);

> +			break;

> +		case 't':

> +			args->prio[HI_PRIO].queues = atoi(optarg);

> +			break;

> +		case 'm':

> +			args->prio[LO_PRIO].events = atoi(optarg);

> +			args->prio[LO_PRIO].events_per_queue = 1;

> +			break;

> +		case 'n':

> +			args->prio[HI_PRIO].events = atoi(optarg);

> +			args->prio[HI_PRIO].events_per_queue = 1;

> +			break;

> +		case 'o':

> +			args->prio[LO_PRIO].events = atoi(optarg);

> +			args->prio[LO_PRIO].events_per_queue = 0;

> +			break;

> +		case 'p':

> +			args->prio[HI_PRIO].events = atoi(optarg);

> +			args->prio[HI_PRIO].events_per_queue = 0;

> +			break;

> +		case 's':

> +			i = atoi(optarg);

> +			if (i == 1)

> +				args->sync_type = ODP_SCHED_SYNC_ATOMIC;

> +			else if (i == 2)

> +				args->sync_type = ODP_SCHED_SYNC_ORDERED;

> +			else

> +				args->sync_type = ODP_SCHED_SYNC_PARALLEL;

> +			break;

> +		case 'r':

> +			args->sample_per_prio = 1;

> +			break;

> +		case 'h':

> +			usage();

> +			exit(EXIT_SUCCESS);

> +			break;

> +

> +		default:

> +			break;

> +		}

> +	}

> +

> +	/* Make sure arguments are valid */

> +	if (args->cpu_count > MAX_WORKERS)

> +		args->cpu_count = MAX_WORKERS;

> +	if (args->prio[LO_PRIO].queues > MAX_QUEUES)

> +		args->prio[LO_PRIO].queues = MAX_QUEUES;

> +	if (args->prio[HI_PRIO].queues > MAX_QUEUES)

> +		args->prio[HI_PRIO].queues = MAX_QUEUES;

> +	if (!args->prio[HI_PRIO].queues && !args->prio[LO_PRIO].queues) {

> +		printf("No queues configured\n");

> +		usage();

> +		exit(EXIT_FAILURE);

> +	}

> +}

> +

> +/**

> + * Test main function

> + */

> +int main(int argc, char *argv[])

> +{

> +	odp_instance_t instance;

> +	odph_odpthread_t *thread_tbl;

> +	odph_odpthread_params_t thr_params;

> +	odp_cpumask_t cpumask;

> +	odp_pool_t pool;

> +	odp_pool_param_t params;

> +	odp_shm_t shm;

> +	test_globals_t *globals;

> +	test_args_t args;

> +	char cpumaskstr[ODP_CPUMASK_STR_SIZE];

> +	int i, j;

> +	int ret = 0;

> +	int num_workers = 0;

> +

> +	printf("\nODP scheduling latency benchmark starts\n\n");

> +

> +	memset(&args, 0, sizeof(args));

> +	parse_args(argc, argv, &args);

> +

> +	/* ODP global init */

> +	if (odp_init_global(&instance, NULL, NULL)) {

> +		LOG_ERR("ODP global init failed.\n");

> +		return -1;

> +	}

> +

> +	/*

> +	 * Init this thread. It makes also ODP calls when

> +	 * setting up resources for worker threads.

> +	 */

> +	if (odp_init_local(instance, ODP_THREAD_CONTROL)) {

> +		LOG_ERR("ODP global init failed.\n");

> +		return -1;

> +	}

> +

> +	printf("\n");

> +	printf("ODP system info\n");

> +	printf("---------------\n");

> +	printf("ODP API version:  %s\n",        odp_version_api_str());

> +	printf("ODP impl name:    %s\n",        odp_version_impl_name());

> +	printf("ODP impl details: %s\n",        odp_version_impl_str());

> +	printf("CPU model:        %s\n",        odp_cpu_model_str());

> +	printf("CPU freq (hz):    %" PRIu64 "\n", odp_cpu_hz_max());

> +	printf("Cache line size:  %i\n",        odp_sys_cache_line_size());

> +	printf("Max CPU count:    %i\n",        odp_cpu_count());

> +

> +	/* Get default worker cpumask */

> +	if (args.cpu_count)

> +		num_workers = args.cpu_count;

> +

> +	num_workers = odp_cpumask_default_worker(&cpumask, num_workers);

> +	args.cpu_count = num_workers;

> +

> +	(void)odp_cpumask_to_str(&cpumask, cpumaskstr, sizeof(cpumaskstr));

> +

> +	printf("Worker threads:   %i\n", num_workers);

> +	printf("First CPU:        %i\n", odp_cpumask_first(&cpumask));

> +	printf("CPU mask:         %s\n\n", cpumaskstr);

> +

> +	thread_tbl = calloc(sizeof(odph_odpthread_t), num_workers);

> +	if (!thread_tbl) {

> +		LOG_ERR("no memory for thread_tbl\n");

> +		return -1;

> +	}

> +

> +	shm = odp_shm_reserve("test_globals",

> +			      sizeof(test_globals_t), ODP_CACHE_LINE_SIZE, 0);

> +	if (shm == ODP_SHM_INVALID) {

> +		LOG_ERR("Shared memory reserve failed.\n");

> +		return -1;

> +	}

> +

> +	globals = odp_shm_addr(shm);

> +	memset(globals, 0, sizeof(test_globals_t));

> +	memcpy(&globals->args, &args, sizeof(test_args_t));

> +

> +	/*

> +	 * Create event pool

> +	 */

> +	odp_pool_param_init(&params);

> +	params.buf.size  = sizeof(test_event_t);

> +	params.buf.align = 0;

> +	params.buf.num   = EVENT_POOL_SIZE;

> +	params.type      = ODP_POOL_BUFFER;

> +

> +	pool = odp_pool_create("event_pool", &params);

> +

> +	if (pool == ODP_POOL_INVALID) {

> +		LOG_ERR("Pool create failed.\n");

> +		return -1;

> +	}

> +	globals->pool = pool;

> +

> +	/*

> +	 * Create queues for schedule test

> +	 */

> +	for (i = 0; i < NUM_PRIOS; i++) {

> +		char name[] = "sched_XX_YY";

> +		odp_queue_t queue;

> +		odp_queue_param_t param;

> +		int prio;

> +

> +		if (i == HI_PRIO)

> +			prio = ODP_SCHED_PRIO_HIGHEST;

> +		else

> +			prio = ODP_SCHED_PRIO_LOWEST;

> +

> +		name[6] = '0' + (prio / 10);

> +		name[7] = '0' + prio - (10 * (prio / 10));

> +

> +		odp_queue_param_init(&param);

> +		param.type        = ODP_QUEUE_TYPE_SCHED;

> +		param.sched.prio  = prio;

> +		param.sched.sync  = args.sync_type;

> +		param.sched.group = ODP_SCHED_GROUP_ALL;

> +

> +		for (j = 0; j < args.prio[i].queues; j++) {

> +			name[9]  = '0' + j / 10;

> +			name[10] = '0' + j - 10 * (j / 10);

> +

> +			queue = odp_queue_create(name, &param);

> +

> +			if (queue == ODP_QUEUE_INVALID) {

> +				LOG_ERR("Scheduled queue create failed.\n");

> +				return -1;

> +			}

> +

> +			globals->queue[i][j] = queue;

> +		}

> +	}

> +

> +	odp_barrier_init(&globals->barrier, num_workers);

> +

> +	/* Create and launch worker threads */

> +	memset(&thr_params, 0, sizeof(thr_params));

> +	thr_params.thr_type = ODP_THREAD_WORKER;

> +	thr_params.instance = instance;

> +	thr_params.start = run_thread;

> +	thr_params.arg   = NULL;

> +	odph_odpthreads_create(thread_tbl, &cpumask, &thr_params);

> +

> +	/* Wait for worker threads to terminate */

> +	odph_odpthreads_join(thread_tbl);

> +	free(thread_tbl);

> +

> +	printf("ODP scheduling latency test complete\n\n");

> +

> +	for (i = 0; i < NUM_PRIOS; i++) {

> +		odp_queue_t queue;

> +		int num_queues;

> +

> +		num_queues = args.prio[i].queues;

> +

> +		for (j = 0; j < num_queues; j++) {

> +			queue = globals->queue[i][j];

> +			ret += odp_queue_destroy(queue);

> +		}

> +	}

> +

> +	ret += odp_shm_free(shm);

> +	ret += odp_pool_destroy(pool);

> +	ret += odp_term_local();

> +	ret += odp_term_global(instance);

> +

> +	return ret;

> +}

> -- 

> 2.7.4

>
Brian Brooks Sept. 22, 2016, 12:12 a.m. UTC | #5
On 09/20 08:01:49, Savolainen, Petri (Nokia - FI/Espoo) wrote:
> Hi,

> 

> First, this app is written according to the current API and we'd like to start latency testing schedulers ASAP. A review of the app code itself would be appreciated.


Reviewed and tested.

> Anayway, I'll answer those API related comments under.

> 

> 

> > -----Original Message-----

> > From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of Bill

> > Fischofer

> > Sent: Monday, September 19, 2016 11:41 PM

> > To: Brian Brooks <brian.brooks@linaro.org>

> > Cc: Elo, Matias (Nokia - FI/Espoo) <matias.elo@nokia-bell-labs.com>; lng-

> > odp@lists.linaro.org

> > Subject: Re: [lng-odp] [PATCH v2 1/2] test: perf: add new scheduling

> > latency test

> > 

> > On Mon, Sep 19, 2016 at 2:11 PM, Brian Brooks <brian.brooks@linaro.org>

> > wrote:

> > 

> > > On 09/19 07:55:22, Elo, Matias (Nokia - FI/Espoo) wrote:

> > > > >

> > > > > On 09/14 11:53:06, Matias Elo wrote:

> > > > > > +

> 

> 

> > >

> > > Thinking in the general sense..

> > >

> > > Should applications have to reason about _and_ code around pre-scheduled

> > > and non-scheduled events? If the event hasn't crossed the API boundary

> > to

> > > be

> > > delivered to the application according to the scheduling group policies

> > for

> > > that core, what is the difference to the application?

> > >

> > > If a scheduler implementation uses TLS to pre-schedule events it also

> > seems

> > > like it should be able to support work-stealing of those pre-scheduled

> > > events

> > > by other threads in the runtime case where odp_schedule() is not called

> > > from

> > > that thread or the thread id is removed from scheduling group masks.

> > From

> > > the application perspective these are all implementation details.

> > >

> 

> Pause signals a (HW) scheduler that application will leave the schedule loop soon (app stops calling schedule() for a long time or forever). Without the signal, scheduler would not see any difference between a "mid" schedule call vs. the last call. A schedule() call starts and ends a schedule context (e.g. atomic locking of a queue). If application just leaves the loop, the last context will not be freed and e.g. an atomic queue would deadlock.


It is the scheduler providing exclusive access to the atomic queue. At any
one point in time there may only be one core processing an event from an
atomic queue. Multiple cores can participate in processing from an atomic
queue, but the scheduler will ensure exclusive access.

If the core processing an event from an atomic queue finishes its work and
asks the scheduler for more work, the atomic context is implicitly released
by the application. The scheduler may give that core an event from a higher
priority queue and an event from the original atomic queue to another core.

Another scenario is when the core processing an event from an atomic queue
finishes the critical section work but still needs to continue processing the
event, it may release the atomic context explicitly. At this point, the
scheduler may dispatch the next event from the atomic queue to another core
and there could be parallel processing of events from an atomic queue. Maybe
switching the queue to be ordered instead of atomic could be considered here.

Do you have something in mind as to why the odp_schedule_release_xxx() APIs
are insufficient for the 'last' schedule call?

> Also generally pre-scheduled work cannot be "stolen" since:

> 1) it would be costly operation to unwind already made decisions

> 2) packet order must be maintained also in this case. It's costly to reorder / force order for stolen events (other events may have been already processed on other cores before you "steal" some events).


A scheduler implementation may pre-schedule work to cores, but you're right
it could end up being costly if data is being moved like that. Ensuring
correctness could become challenging too.

> > You're making an argument I made some time back. :)  As I recall, the

> > rationale for pause/resume was to make life easier for existing code that

> > is introducing ODP on a more gradual basis. Presumably Nokia has examples

> > of such code in house.

> 

> No. See, rationale above. It's based on functionality of existing SoC HW schedulers. HW is bad in unwinding already made decisions. Application is in the best position to decide what to do for the last events before a thread exists. Typically, those are processed as any other event.

> 

> > 

> > From a design standpoint worker threads shouldn't "change their minds" and

> > go off to do something else for a while. For whatever else they might want

> > to do it would seem that such requirements would be better served by

> > simply

> > having another thread to do the other things that wakes up periodically to

> > do them.

> > 

> 

> Pause/resume should not be something that a thread is doing very often. But without it, any worker thread could not ever exit the schedule loop - doing so could deadlock a queue (or a number of queues).

> 

> > 

> > >

> > > This pause state may also cause some confusion for application writers

> > > because

> > > it is now possible to write two different event loops for the same core

> > > depending on how a particular scheduler implementation behaves. The

> > > semantics

> > > seem to blur a bit with scheduling groups. Level of abstraction can be

> > > raised

> > > by deprecating the scheduler pause state and APIs.

> > >

> 

> Those cannot be just deprecated. The same signal is needed in some form to avoid deadlocks.

> 

> > 

> > This is a worthwhile discussion to have. I'll add it to the agenda for

> > tomorrow's ODP call and we can include it in the wider scheduler

> > discussions scheduled for next week. The other rationale for not wanting

> > this behavior (another argument I advanced earlier) is that it greatly

> > complicates recovery processing. A robustly designed application should be

> > able to recover from the failure of an individual thread (this is

> > especially true if the ODP thread is in fact a separate process). If the

> > implementation has prescheduled events to a failed thread then how are

> > they

> > recovered gracefully? Conversely, if the implementation can recover from

> > such a scenario than it would seem it could equally "unschedule" prestaged

> > events as needed due to thread termination (normal or abnormal) or for

> > load

> > balancing purposes.

> 

> Unwinding is hard in HW schedulers and something that is not generally supported.


A description of the unwinding process may help here.

> > 

> > We may not be able to fully deprecate these APIs, but perhaps we can make

> > it clearer how they are intended to be used and classify them as

> > "discouraged" for new code.

> 

> Explained above why those are needed. I think there's no real reason to change the current API. It's optimized for the normal operation (== threads don't exit), but offers a way to exit the loop gracefully (== without ruing the performance or ease of use of the normal operation).

> 

> -Petri

> 

> 

>
Maxim Uvarov Sept. 22, 2016, 5:05 p.m. UTC | #6
Merged,
Maxim.

On 09/22/16 01:11, Brian Brooks wrote:
> For series:

>

> Reviewed-and-tested-by: Brian Brooks <brian.brooks@linaro.org>

>

> On 09/14 11:53:06, Matias Elo wrote:

>> Add new scheduling latency benchmark application. The application

>> measures delays (avg, min, max) for high and low priority events.

>>

>> The test has a configurable number of TRAFFIC events and few SAMPLE events

>> (one common or one per priority). The scheduling latency is only measured

>> from the SAMPLE events to minimize measurement overhead.

>>

>> The application's command line arguments enable configuring:

>> - Number of processing threads

>> - Number of high/low priority queues

>> - Number of high/low priority events

>> - Use separate SAMPLE events for each priority

>> - Scheduled queue type (PARALLEL, ATOMIC, ORDERED)

>>

>> Signed-off-by: Matias Elo <matias.elo@nokia.com>

>> ---

>>

>> V2:

>> - Remove unnecessary 'num_workers' initialization (Maxim)

>>

>>   test/common_plat/performance/.gitignore          |   1 +

>>   test/common_plat/performance/Makefile.am         |   4 +

>>   test/common_plat/performance/odp_sched_latency.c | 767 +++++++++++++++++++++++

>>   3 files changed, 772 insertions(+)

>>   create mode 100644 test/common_plat/performance/odp_sched_latency.c

>>

>> diff --git a/test/common_plat/performance/.gitignore b/test/common_plat/performance/.gitignore

>> index edcc832..1527d25 100644

>> --- a/test/common_plat/performance/.gitignore

>> +++ b/test/common_plat/performance/.gitignore

>> @@ -4,4 +4,5 @@ odp_atomic

>>   odp_crypto

>>   odp_l2fwd

>>   odp_pktio_perf

>> +odp_sched_latency

>>   odp_scheduling

>> diff --git a/test/common_plat/performance/Makefile.am b/test/common_plat/performance/Makefile.am

>> index d23bb3e..f5dd8dd 100644

>> --- a/test/common_plat/performance/Makefile.am

>> +++ b/test/common_plat/performance/Makefile.am

>> @@ -5,6 +5,7 @@ TESTS_ENVIRONMENT += TEST_DIR=${builddir}

>>   EXECUTABLES = odp_crypto$(EXEEXT) odp_pktio_perf$(EXEEXT)

>>   

>>   COMPILE_ONLY = odp_l2fwd$(EXEEXT) \

>> +	       odp_sched_latency$(EXEEXT) \

>>   	       odp_scheduling$(EXEEXT)

>>   

>>   TESTSCRIPTS = odp_l2fwd_run.sh \

>> @@ -20,6 +21,8 @@ bin_PROGRAMS = $(EXECUTABLES) $(COMPILE_ONLY)

>>   

>>   odp_crypto_LDFLAGS = $(AM_LDFLAGS) -static

>>   odp_crypto_CFLAGS = $(AM_CFLAGS) -I${top_srcdir}/test

>> +odp_sched_latency_LDFLAGS = $(AM_LDFLAGS) -static

>> +odp_sched_latency_CFLAGS = $(AM_CFLAGS) -I${top_srcdir}/test

>>   odp_scheduling_LDFLAGS = $(AM_LDFLAGS) -static

>>   odp_scheduling_CFLAGS = $(AM_CFLAGS) -I${top_srcdir}/test

>>   

>> @@ -27,6 +30,7 @@ noinst_HEADERS = \

>>   		  $(top_srcdir)/test/test_debug.h

>>   

>>   dist_odp_crypto_SOURCES = odp_crypto.c

>> +dist_odp_sched_latency_SOURCES = odp_sched_latency.c

>>   dist_odp_scheduling_SOURCES = odp_scheduling.c

>>   dist_odp_pktio_perf_SOURCES = odp_pktio_perf.c

>>   

>> diff --git a/test/common_plat/performance/odp_sched_latency.c b/test/common_plat/performance/odp_sched_latency.c

>> new file mode 100644

>> index 0000000..063fb21

>> --- /dev/null

>> +++ b/test/common_plat/performance/odp_sched_latency.c

>> @@ -0,0 +1,767 @@

>> +/* Copyright (c) 2016, Linaro Limited

>> + * All rights reserved.

>> + *

>> + * SPDX-License-Identifier:     BSD-3-Clause

>> + */

>> +

>> +/**

>> + * @file

>> + *

>> + * @example odp_sched_latency.c  ODP scheduling latency benchmark application

>> + */

>> +

>> +#include <string.h>

>> +#include <stdlib.h>

>> +#include <inttypes.h>

>> +

>> +#include <test_debug.h>

>> +

>> +/* ODP main header */

>> +#include <odp_api.h>

>> +

>> +/* ODP helper for Linux apps */

>> +#include <odp/helper/linux.h>

>> +

>> +/* GNU lib C */

>> +#include <getopt.h>

>> +

>> +#define MAX_WORKERS	  64		/**< Maximum number of worker threads */

>> +#define MAX_QUEUES	  4096		/**< Maximum number of queues */

>> +#define EVENT_POOL_SIZE	  (1024 * 1024) /**< Event pool size */

>> +#define TEST_ROUNDS (4 * 1024 * 1024)	/**< Test rounds for each thread */

>> +#define MAIN_THREAD	   1 /**< Thread ID performing maintenance tasks */

>> +

>> +/* Default values for command line arguments */

>> +#define SAMPLE_EVENT_PER_PRIO	  0 /**< Allocate a separate sample event for

>> +					 each priority */

>> +#define HI_PRIO_EVENTS		  0 /**< Number of high priority events */

>> +#define LO_PRIO_EVENTS		 32 /**< Number of low priority events */

>> +#define HI_PRIO_QUEUES		 16 /**< Number of high priority queues */

>> +#define LO_PRIO_QUEUES		 64 /**< Number of low priority queues */

>> +

>> +#define EVENTS_PER_HI_PRIO_QUEUE 0  /**< Alloc HI_PRIO_QUEUES x HI_PRIO_EVENTS

>> +					 events */

>> +#define EVENTS_PER_LO_PRIO_QUEUE 1  /**< Alloc LO_PRIO_QUEUES x LO_PRIO_EVENTS

>> +					 events */

>> +ODP_STATIC_ASSERT(HI_PRIO_QUEUES <= MAX_QUEUES, "Too many HI priority queues");

>> +ODP_STATIC_ASSERT(LO_PRIO_QUEUES <= MAX_QUEUES, "Too many LO priority queues");

>> +

>> +#define CACHE_ALIGN_ROUNDUP(x)\

>> +	((ODP_CACHE_LINE_SIZE) * \

>> +	 (((x) + ODP_CACHE_LINE_SIZE - 1) / (ODP_CACHE_LINE_SIZE)))

>> +

>> +/* Test priorities */

>> +#define NUM_PRIOS 2 /**< Number of tested priorities */

>> +#define HI_PRIO	  0

>> +#define LO_PRIO	  1

>> +

>> +/** Test event types */

>> +typedef enum {

>> +	WARM_UP, /**< Warm up event */

>> +	TRAFFIC, /**< Event used only as traffic load */

>> +	SAMPLE	 /**< Event used to measure latency */

>> +} event_type_t;

>> +

>> +/** Test event */

>> +typedef struct {

>> +	uint64_t ts;		/**< Send timestamp */

>> +	event_type_t type;	/**< Message type */

>> +	int src_idx[NUM_PRIOS]; /**< Source ODP queue */

>> +	int prio;		/**< Source queue priority */

>> +} test_event_t;

>> +

>> +/** Test arguments */

>> +typedef struct {

>> +	int cpu_count;			/**< CPU count */

>> +	odp_schedule_sync_t sync_type;	/**< Scheduler sync type */

>> +	struct {

>> +		int queues;	/**< Number of scheduling queues */

>> +		int events;	/**< Number of events */

>> +		odp_bool_t events_per_queue; /**< Allocate 'queues' x 'events'

>> +						  test events */

>> +	} prio[NUM_PRIOS];

>> +	odp_bool_t sample_per_prio; /**< Allocate a separate sample event for

>> +					 each priority */

>> +} test_args_t;

>> +

>> +/** Latency measurements statistics */

>> +typedef struct {

>> +	uint64_t events;   /**< Total number of received events */

>> +	uint64_t sample_events;  /**< Number of received sample events */

>> +	uint64_t tot;	   /**< Total event latency. Sum of all events. */

>> +	uint64_t min;	   /**< Minimum event latency */

>> +	uint64_t max;	   /**< Maximum event latency */

>> +} test_stat_t;

>> +

>> +/** Performance test statistics (per core) */

>> +typedef union {

>> +	test_stat_t prio[NUM_PRIOS]; /**< Test statistics per priority */

>> +

>> +	uint8_t pad[CACHE_ALIGN_ROUNDUP(NUM_PRIOS * sizeof(test_stat_t))];

>> +} core_stat_t ODP_ALIGNED_CACHE;

>> +

>> +/** Test global variables */

>> +typedef struct {

>> +	core_stat_t	 core_stat[MAX_WORKERS]; /**< Core specific stats */

>> +	odp_barrier_t    barrier; /**< Barrier for thread synchronization */

>> +	odp_pool_t       pool;	  /**< Pool for allocating test events */

>> +	test_args_t      args;	  /**< Parsed command line arguments */

>> +	odp_queue_t      queue[NUM_PRIOS][MAX_QUEUES]; /**< Scheduled queues */

>> +} test_globals_t;

>> +

>> +/**

>> + * Clear all scheduled queues.

>> + *

>> + * Retry to be sure that all buffers have been scheduled.

>> + */

>> +static void clear_sched_queues(void)

>> +{

>> +	odp_event_t ev;

>> +

>> +	while (1) {

>> +		ev = odp_schedule(NULL, ODP_SCHED_NO_WAIT);

>> +

>> +		if (ev == ODP_EVENT_INVALID)

>> +			break;

>> +

>> +		odp_event_free(ev);

>> +	}

>> +}

>> +

>> +/**

>> + * Enqueue events into queues

>> + *

>> + * @param prio        Queue priority (HI_PRIO/LO_PRIO)

>> + * @param num_queues  Number of queues

>> + * @param num_events  Number of 'TRAFFIC' events

>> + * @param num_samples Number of 'SAMPLE' events

>> + * @param div_events  If true, divide 'num_events' between 'num_queues'. if

>> + *		      false, enqueue 'num_events' to each queue.

>> + * @param globals     Test shared data

>> + *

>> + * @retval 0 on success

>> + * @retval -1 on failure

>> + */

>> +static int enqueue_events(int prio, int num_queues, int num_events,

>> +			  int num_samples, odp_bool_t div_events,

>> +			  test_globals_t *globals)

>> +{

>> +	odp_buffer_t buf[num_events + num_samples];

>> +	odp_event_t ev[num_events + num_samples];

>> +	odp_queue_t queue;

>> +	test_event_t *event;

>> +	int i, j, ret;

>> +	int enq_events;

>> +	int events_per_queue;

>> +	int tot_events;

>> +	int rdy_events = 0;

>> +

>> +	tot_events = num_events + num_samples;

>> +

>> +	if (!num_queues || !tot_events)

>> +		return 0;

>> +

>> +	events_per_queue = tot_events;

>> +	if (div_events)

>> +		events_per_queue = (tot_events + num_queues - 1) / num_queues;

>> +

>> +	for (i = 0; i < num_queues; i++) {

>> +		queue = globals->queue[prio][i];

>> +

>> +		ret = odp_buffer_alloc_multi(globals->pool, buf,

>> +					     events_per_queue);

>> +		if (ret != events_per_queue) {

>> +			LOG_ERR("Buffer alloc failed. Try increasing EVENT_POOL_SIZE.\n");

>> +			ret = ret < 0 ? 0 : ret;

>> +			odp_buffer_free_multi(buf, ret);

>> +			return -1;

>> +		}

>> +		for (j = 0; j < events_per_queue; j++) {

>> +			if (!odp_buffer_is_valid(buf[j])) {

>> +				LOG_ERR("Buffer alloc failed\n");

>> +				odp_buffer_free_multi(buf, events_per_queue);

>> +				return -1;

>> +			}

>> +

>> +			event = odp_buffer_addr(buf[j]);

>> +			memset(event, 0, sizeof(test_event_t));

>> +

>> +			/* Latency isn't measured from the first processing

>> +			 * round. */

>> +			if (num_samples > 0) {

>> +				event->type = WARM_UP;

>> +				num_samples--;

>> +			} else {

>> +				event->type = TRAFFIC;

>> +			}

>> +			event->src_idx[prio] = i;

>> +			event->prio = prio;

>> +			ev[j] = odp_buffer_to_event(buf[j]);

>> +		}

>> +

>> +		enq_events = 0;

>> +		do {

>> +			ret = odp_queue_enq_multi(queue, &ev[enq_events],

>> +						  events_per_queue -

>> +						  enq_events);

>> +			if (ret < 0) {

>> +				LOG_ERR("Queue enqueue failed.\n");

>> +				return -1;

>> +			}

>> +			enq_events += ret;

>> +		} while (enq_events < events_per_queue);

>> +

>> +		rdy_events += events_per_queue;

>> +		if (div_events && rdy_events >= tot_events)

>> +			return 0;

>> +	}

>> +	return 0;

>> +}

>> +

>> +/**

>> + * Print latency measurement results

>> + *

>> + * @param globals  Test shared data

>> + */

>> +static void print_results(test_globals_t *globals)

>> +{

>> +	test_stat_t *lat;

>> +	odp_schedule_sync_t stype;

>> +	test_stat_t total;

>> +	test_args_t *args;

>> +	uint64_t avg;

>> +	int i, j;

>> +

>> +	args = &globals->args;

>> +	stype = globals->args.sync_type;

>> +

>> +	printf("\n%s queue scheduling latency\n",

>> +	       (stype == ODP_SCHED_SYNC_ATOMIC) ? "ATOMIC" :

>> +	       ((stype == ODP_SCHED_SYNC_ORDERED) ? "ORDERED" : "PARALLEL"));

>> +

>> +	printf("  LO_PRIO queues: %i\n", args->prio[LO_PRIO].queues);

>> +	if (args->prio[LO_PRIO].events_per_queue)

>> +		printf("  LO_PRIO event per queue: %i\n",

>> +		       args->prio[LO_PRIO].events);

>> +	else

>> +		printf("  LO_PRIO events: %i\n", args->prio[LO_PRIO].events);

>> +

>> +	printf("  HI_PRIO queues: %i\n", args->prio[HI_PRIO].queues);

>> +	if (args->prio[HI_PRIO].events_per_queue)

>> +		printf("  HI_PRIO event per queue: %i\n\n",

>> +		       args->prio[HI_PRIO].events);

>> +	else

>> +		printf("  HI_PRIO events: %i\n\n", args->prio[HI_PRIO].events);

>> +

>> +	for (i = 0; i < NUM_PRIOS; i++) {

>> +		memset(&total, 0, sizeof(test_stat_t));

>> +		total.min = UINT64_MAX;

>> +

>> +		printf("%s priority\n"

>> +		       "Thread   Avg[ns]    Min[ns]    Max[ns]    Samples    Total\n"

>> +		       "---------------------------------------------------------------\n",

>> +		       i == HI_PRIO ? "HIGH" : "LOW");

>> +		for (j = 1; j <= args->cpu_count; j++) {

>> +			lat = &globals->core_stat[j].prio[i];

>> +

>> +			if (lat->sample_events == 0) {

>> +				printf("%-8d N/A\n", j);

>> +				continue;

>> +			}

>> +

>> +			if (lat->max > total.max)

>> +				total.max = lat->max;

>> +			if (lat->min < total.min)

>> +				total.min = lat->min;

>> +			total.tot += lat->tot;

>> +			total.sample_events += lat->sample_events;

>> +			total.events += lat->events;

>> +

>> +			avg = lat->events ? lat->tot / lat->sample_events : 0;

>> +			printf("%-8d %-10" PRIu64 " %-10" PRIu64 " "

>> +			       "%-10" PRIu64 " %-10" PRIu64 " %-10" PRIu64 "\n",

>> +			       j, avg, lat->min, lat->max, lat->sample_events,

>> +			       lat->events);

>> +		}

>> +		printf("---------------------------------------------------------------\n");

>> +		if (total.sample_events == 0) {

>> +			printf("Total    N/A\n\n");

>> +			continue;

>> +		}

>> +		avg = total.events ? total.tot / total.sample_events : 0;

>> +		printf("Total    %-10" PRIu64 " %-10" PRIu64 " %-10" PRIu64 " "

>> +		       "%-10" PRIu64 " %-10" PRIu64 "\n\n", avg, total.min,

>> +		       total.max, total.sample_events, total.events);

>> +	}

>> +}

>> +

>> +/**

>> + * Measure latency of scheduled ODP events

>> + *

>> + * Schedule and enqueue events until 'TEST_ROUNDS' events have been processed.

>> + * Scheduling latency is measured only from type 'SAMPLE' events. Other events

>> + * are simply enqueued back to the scheduling queues.

>> + *

>> + * For 'TRAFFIC' type events the destination queue is selected from the same

>> + * priority class as source queue. 'SAMPLE' type event may change priority

>> + * depending on the command line arguments.

>> + *

>> + * @param thr      Thread ID

>> + * @param globals  Test shared data

>> + *

>> + * @retval 0 on success

>> + * @retval -1 on failure

>> + */

>> +static int test_schedule(int thr, test_globals_t *globals)

>> +{

>> +	odp_event_t ev;

>> +	odp_buffer_t buf;

>> +	odp_queue_t src_queue;

>> +	odp_queue_t dst_queue;

>> +	uint64_t latency;

>> +	uint32_t i;

>> +	test_event_t *event;

>> +	test_stat_t *stats;

>> +	int dst_idx;

>> +

>> +	memset(&globals->core_stat[thr], 0, sizeof(core_stat_t));

>> +	globals->core_stat[thr].prio[HI_PRIO].min = UINT64_MAX;

>> +	globals->core_stat[thr].prio[LO_PRIO].min = UINT64_MAX;

>> +

>> +	for (i = 0; i < TEST_ROUNDS; i++) {

>> +		ev = odp_schedule(&src_queue, ODP_SCHED_WAIT);

>> +

>> +		buf = odp_buffer_from_event(ev);

>> +		event = odp_buffer_addr(buf);

>> +

>> +		stats = &globals->core_stat[thr].prio[event->prio];

>> +

>> +		if (event->type == SAMPLE) {

>> +			latency = odp_time_to_ns(odp_time_global()) - event->ts;

>> +

>> +			if (latency > stats->max)

>> +				stats->max = latency;

>> +			if (latency < stats->min)

>> +				stats->min = latency;

>> +			stats->tot += latency;

>> +			stats->sample_events++;

>> +

>> +			/* Move sample event to a different priority */

>> +			if (!globals->args.sample_per_prio &&

>> +			    globals->args.prio[!event->prio].queues)

>> +				event->prio = !event->prio;

>> +		}

>> +

>> +		if (odp_unlikely(event->type == WARM_UP))

>> +			event->type = SAMPLE;

>> +		else

>> +			stats->events++;

>> +

>> +		/* Move event to next queue */

>> +		dst_idx = event->src_idx[event->prio] + 1;

>> +		if (dst_idx >= globals->args.prio[event->prio].queues)

>> +			dst_idx = 0;

>> +		event->src_idx[event->prio] = dst_idx;

>> +		dst_queue = globals->queue[event->prio][dst_idx];

>> +

>> +		if (event->type == SAMPLE)

>> +			event->ts = odp_time_to_ns(odp_time_global());

>> +

>> +		if (odp_queue_enq(dst_queue, ev)) {

>> +			LOG_ERR("[%i] Queue enqueue failed.\n", thr);

>> +			odp_event_free(ev);

>> +			return -1;

>> +		}

>> +	}

>> +

>> +	/* Clear possible locally stored buffers */

>> +	odp_schedule_pause();

>> +

>> +	while (1) {

>> +		ev = odp_schedule(&src_queue, ODP_SCHED_NO_WAIT);

>> +

>> +		if (ev == ODP_EVENT_INVALID)

>> +			break;

>> +

>> +		if (odp_queue_enq(src_queue, ev)) {

>> +			LOG_ERR("[%i] Queue enqueue failed.\n", thr);

>> +			odp_event_free(ev);

>> +			return -1;

>> +		}

>> +	}

>> +

>> +	odp_schedule_resume();

>> +

>> +	odp_barrier_wait(&globals->barrier);

>> +

>> +	clear_sched_queues();

>> +

>> +	if (thr == MAIN_THREAD)

>> +		print_results(globals);

>> +

>> +	return 0;

>> +}

>> +

>> +/**

>> + * Worker thread

>> + *

>> + * @param arg  Arguments

>> + *

>> + * @retval 0 on success

>> + * @retval -1 on failure

>> + */

>> +static int run_thread(void *arg ODP_UNUSED)

>> +{

>> +	odp_shm_t shm;

>> +	test_globals_t *globals;

>> +	test_args_t *args;

>> +	int thr;

>> +	int sample_events = 0;

>> +

>> +	thr = odp_thread_id();

>> +

>> +	shm     = odp_shm_lookup("test_globals");

>> +	globals = odp_shm_addr(shm);

>> +

>> +	if (globals == NULL) {

>> +		LOG_ERR("Shared mem lookup failed\n");

>> +		return -1;

>> +	}

>> +

>> +	if (thr == MAIN_THREAD) {

>> +		args = &globals->args;

>> +

>> +		if (enqueue_events(HI_PRIO, args->prio[HI_PRIO].queues,

>> +				   args->prio[HI_PRIO].events, 1,

>> +				   !args->prio[HI_PRIO].events_per_queue,

>> +				   globals))

>> +			return -1;

>> +

>> +		if (!args->prio[HI_PRIO].queues || args->sample_per_prio)

>> +			sample_events = 1;

>> +

>> +		if (enqueue_events(LO_PRIO, args->prio[LO_PRIO].queues,

>> +				   args->prio[LO_PRIO].events, sample_events,

>> +				   !args->prio[LO_PRIO].events_per_queue,

>> +				   globals))

>> +			return -1;

>> +	}

>> +

>> +	odp_barrier_wait(&globals->barrier);

>> +

>> +	if (test_schedule(thr, globals))

>> +		return -1;

>> +

>> +	return 0;

>> +}

>> +

>> +/**

>> + * Print usage information

>> + */

>> +static void usage(void)

>> +{

>> +	printf("\n"

>> +	       "OpenDataPlane scheduler latency benchmark application.\n"

>> +	       "\n"

>> +	       "Usage: ./odp_sched_latency [options]\n"

>> +	       "Optional OPTIONS:\n"

>> +	       "  -c, --count <number> CPU count\n"

>> +	       "  -l, --lo-prio-queues <number> Number of low priority scheduled queues\n"

>> +	       "  -t, --hi-prio-queues <number> Number of high priority scheduled queues\n"

>> +	       "  -m, --lo-prio-events-per-queue <number> Number of events per low priority queue\n"

>> +	       "  -n, --hi-prio-events-per-queue <number> Number of events per high priority queues\n"

>> +	       "  -o, --lo-prio-events <number> Total number of low priority events (overrides the\n"

>> +	       "				number of events per queue)\n"

>> +	       "  -p, --hi-prio-events <number> Total number of high priority events (overrides the\n"

>> +	       "				number of events per queue)\n"

>> +	       "  -r  --sample-per-prio Allocate a separate sample event for each priority. By default\n"

>> +	       "			a single sample event is used and its priority is changed after\n"

>> +	       "			each processing round.\n"

>> +	       "  -s, --sync  Scheduled queues' sync type\n"

>> +	       "               0: ODP_SCHED_SYNC_PARALLEL (default)\n"

>> +	       "               1: ODP_SCHED_SYNC_ATOMIC\n"

>> +	       "               2: ODP_SCHED_SYNC_ORDERED\n"

>> +	       "  -h, --help   Display help and exit.\n\n"

>> +	       );

>> +}

>> +

>> +/**

>> + * Parse arguments

>> + *

>> + * @param argc  Argument count

>> + * @param argv  Argument vector

>> + * @param args  Test arguments

>> + */

>> +static void parse_args(int argc, char *argv[], test_args_t *args)

>> +{

>> +	int opt;

>> +	int long_index;

>> +	int i;

>> +

>> +	static const struct option longopts[] = {

>> +		{"count", required_argument, NULL, 'c'},

>> +		{"lo-prio-queues", required_argument, NULL, 'l'},

>> +		{"hi-prio-queues", required_argument, NULL, 't'},

>> +		{"lo-prio-events-per-queue", required_argument, NULL, 'm'},

>> +		{"hi-prio-events-per-queue", required_argument, NULL, 'n'},

>> +		{"lo-prio-events", required_argument, NULL, 'o'},

>> +		{"hi-prio-events", required_argument, NULL, 'p'},

>> +		{"sample-per-prio", no_argument, NULL, 'r'},

>> +		{"sync", required_argument, NULL, 's'},

>> +		{"help", no_argument, NULL, 'h'},

>> +		{NULL, 0, NULL, 0}

>> +	};

>> +

>> +	static const char *shortopts = "+c:s:l:t:m:n:o:p:rh";

>> +

>> +	/* Let helper collect its own arguments (e.g. --odph_proc) */

>> +	odph_parse_options(argc, argv, shortopts, longopts);

>> +

>> +	args->sync_type = ODP_SCHED_SYNC_PARALLEL;

>> +	args->sample_per_prio = SAMPLE_EVENT_PER_PRIO;

>> +	args->prio[LO_PRIO].queues = LO_PRIO_QUEUES;

>> +	args->prio[HI_PRIO].queues = HI_PRIO_QUEUES;

>> +	args->prio[LO_PRIO].events = LO_PRIO_EVENTS;

>> +	args->prio[HI_PRIO].events = HI_PRIO_EVENTS;

>> +	args->prio[LO_PRIO].events_per_queue = EVENTS_PER_LO_PRIO_QUEUE;

>> +	args->prio[HI_PRIO].events_per_queue = EVENTS_PER_HI_PRIO_QUEUE;

>> +

>> +	opterr = 0; /* Do not issue errors on helper options */

>> +	while (1) {

>> +		opt = getopt_long(argc, argv, shortopts, longopts, &long_index);

>> +

>> +		if (opt == -1)

>> +			break;	/* No more options */

>> +

>> +		switch (opt) {

>> +		case 'c':

>> +			args->cpu_count = atoi(optarg);

>> +			break;

>> +		case 'l':

>> +			args->prio[LO_PRIO].queues = atoi(optarg);

>> +			break;

>> +		case 't':

>> +			args->prio[HI_PRIO].queues = atoi(optarg);

>> +			break;

>> +		case 'm':

>> +			args->prio[LO_PRIO].events = atoi(optarg);

>> +			args->prio[LO_PRIO].events_per_queue = 1;

>> +			break;

>> +		case 'n':

>> +			args->prio[HI_PRIO].events = atoi(optarg);

>> +			args->prio[HI_PRIO].events_per_queue = 1;

>> +			break;

>> +		case 'o':

>> +			args->prio[LO_PRIO].events = atoi(optarg);

>> +			args->prio[LO_PRIO].events_per_queue = 0;

>> +			break;

>> +		case 'p':

>> +			args->prio[HI_PRIO].events = atoi(optarg);

>> +			args->prio[HI_PRIO].events_per_queue = 0;

>> +			break;

>> +		case 's':

>> +			i = atoi(optarg);

>> +			if (i == 1)

>> +				args->sync_type = ODP_SCHED_SYNC_ATOMIC;

>> +			else if (i == 2)

>> +				args->sync_type = ODP_SCHED_SYNC_ORDERED;

>> +			else

>> +				args->sync_type = ODP_SCHED_SYNC_PARALLEL;

>> +			break;

>> +		case 'r':

>> +			args->sample_per_prio = 1;

>> +			break;

>> +		case 'h':

>> +			usage();

>> +			exit(EXIT_SUCCESS);

>> +			break;

>> +

>> +		default:

>> +			break;

>> +		}

>> +	}

>> +

>> +	/* Make sure arguments are valid */

>> +	if (args->cpu_count > MAX_WORKERS)

>> +		args->cpu_count = MAX_WORKERS;

>> +	if (args->prio[LO_PRIO].queues > MAX_QUEUES)

>> +		args->prio[LO_PRIO].queues = MAX_QUEUES;

>> +	if (args->prio[HI_PRIO].queues > MAX_QUEUES)

>> +		args->prio[HI_PRIO].queues = MAX_QUEUES;

>> +	if (!args->prio[HI_PRIO].queues && !args->prio[LO_PRIO].queues) {

>> +		printf("No queues configured\n");

>> +		usage();

>> +		exit(EXIT_FAILURE);

>> +	}

>> +}

>> +

>> +/**

>> + * Test main function

>> + */

>> +int main(int argc, char *argv[])

>> +{

>> +	odp_instance_t instance;

>> +	odph_odpthread_t *thread_tbl;

>> +	odph_odpthread_params_t thr_params;

>> +	odp_cpumask_t cpumask;

>> +	odp_pool_t pool;

>> +	odp_pool_param_t params;

>> +	odp_shm_t shm;

>> +	test_globals_t *globals;

>> +	test_args_t args;

>> +	char cpumaskstr[ODP_CPUMASK_STR_SIZE];

>> +	int i, j;

>> +	int ret = 0;

>> +	int num_workers = 0;

>> +

>> +	printf("\nODP scheduling latency benchmark starts\n\n");

>> +

>> +	memset(&args, 0, sizeof(args));

>> +	parse_args(argc, argv, &args);

>> +

>> +	/* ODP global init */

>> +	if (odp_init_global(&instance, NULL, NULL)) {

>> +		LOG_ERR("ODP global init failed.\n");

>> +		return -1;

>> +	}

>> +

>> +	/*

>> +	 * Init this thread. It makes also ODP calls when

>> +	 * setting up resources for worker threads.

>> +	 */

>> +	if (odp_init_local(instance, ODP_THREAD_CONTROL)) {

>> +		LOG_ERR("ODP global init failed.\n");

>> +		return -1;

>> +	}

>> +

>> +	printf("\n");

>> +	printf("ODP system info\n");

>> +	printf("---------------\n");

>> +	printf("ODP API version:  %s\n",        odp_version_api_str());

>> +	printf("ODP impl name:    %s\n",        odp_version_impl_name());

>> +	printf("ODP impl details: %s\n",        odp_version_impl_str());

>> +	printf("CPU model:        %s\n",        odp_cpu_model_str());

>> +	printf("CPU freq (hz):    %" PRIu64 "\n", odp_cpu_hz_max());

>> +	printf("Cache line size:  %i\n",        odp_sys_cache_line_size());

>> +	printf("Max CPU count:    %i\n",        odp_cpu_count());

>> +

>> +	/* Get default worker cpumask */

>> +	if (args.cpu_count)

>> +		num_workers = args.cpu_count;

>> +

>> +	num_workers = odp_cpumask_default_worker(&cpumask, num_workers);

>> +	args.cpu_count = num_workers;

>> +

>> +	(void)odp_cpumask_to_str(&cpumask, cpumaskstr, sizeof(cpumaskstr));

>> +

>> +	printf("Worker threads:   %i\n", num_workers);

>> +	printf("First CPU:        %i\n", odp_cpumask_first(&cpumask));

>> +	printf("CPU mask:         %s\n\n", cpumaskstr);

>> +

>> +	thread_tbl = calloc(sizeof(odph_odpthread_t), num_workers);

>> +	if (!thread_tbl) {

>> +		LOG_ERR("no memory for thread_tbl\n");

>> +		return -1;

>> +	}

>> +

>> +	shm = odp_shm_reserve("test_globals",

>> +			      sizeof(test_globals_t), ODP_CACHE_LINE_SIZE, 0);

>> +	if (shm == ODP_SHM_INVALID) {

>> +		LOG_ERR("Shared memory reserve failed.\n");

>> +		return -1;

>> +	}

>> +

>> +	globals = odp_shm_addr(shm);

>> +	memset(globals, 0, sizeof(test_globals_t));

>> +	memcpy(&globals->args, &args, sizeof(test_args_t));

>> +

>> +	/*

>> +	 * Create event pool

>> +	 */

>> +	odp_pool_param_init(&params);

>> +	params.buf.size  = sizeof(test_event_t);

>> +	params.buf.align = 0;

>> +	params.buf.num   = EVENT_POOL_SIZE;

>> +	params.type      = ODP_POOL_BUFFER;

>> +

>> +	pool = odp_pool_create("event_pool", &params);

>> +

>> +	if (pool == ODP_POOL_INVALID) {

>> +		LOG_ERR("Pool create failed.\n");

>> +		return -1;

>> +	}

>> +	globals->pool = pool;

>> +

>> +	/*

>> +	 * Create queues for schedule test

>> +	 */

>> +	for (i = 0; i < NUM_PRIOS; i++) {

>> +		char name[] = "sched_XX_YY";

>> +		odp_queue_t queue;

>> +		odp_queue_param_t param;

>> +		int prio;

>> +

>> +		if (i == HI_PRIO)

>> +			prio = ODP_SCHED_PRIO_HIGHEST;

>> +		else

>> +			prio = ODP_SCHED_PRIO_LOWEST;

>> +

>> +		name[6] = '0' + (prio / 10);

>> +		name[7] = '0' + prio - (10 * (prio / 10));

>> +

>> +		odp_queue_param_init(&param);

>> +		param.type        = ODP_QUEUE_TYPE_SCHED;

>> +		param.sched.prio  = prio;

>> +		param.sched.sync  = args.sync_type;

>> +		param.sched.group = ODP_SCHED_GROUP_ALL;

>> +

>> +		for (j = 0; j < args.prio[i].queues; j++) {

>> +			name[9]  = '0' + j / 10;

>> +			name[10] = '0' + j - 10 * (j / 10);

>> +

>> +			queue = odp_queue_create(name, &param);

>> +

>> +			if (queue == ODP_QUEUE_INVALID) {

>> +				LOG_ERR("Scheduled queue create failed.\n");

>> +				return -1;

>> +			}

>> +

>> +			globals->queue[i][j] = queue;

>> +		}

>> +	}

>> +

>> +	odp_barrier_init(&globals->barrier, num_workers);

>> +

>> +	/* Create and launch worker threads */

>> +	memset(&thr_params, 0, sizeof(thr_params));

>> +	thr_params.thr_type = ODP_THREAD_WORKER;

>> +	thr_params.instance = instance;

>> +	thr_params.start = run_thread;

>> +	thr_params.arg   = NULL;

>> +	odph_odpthreads_create(thread_tbl, &cpumask, &thr_params);

>> +

>> +	/* Wait for worker threads to terminate */

>> +	odph_odpthreads_join(thread_tbl);

>> +	free(thread_tbl);

>> +

>> +	printf("ODP scheduling latency test complete\n\n");

>> +

>> +	for (i = 0; i < NUM_PRIOS; i++) {

>> +		odp_queue_t queue;

>> +		int num_queues;

>> +

>> +		num_queues = args.prio[i].queues;

>> +

>> +		for (j = 0; j < num_queues; j++) {

>> +			queue = globals->queue[i][j];

>> +			ret += odp_queue_destroy(queue);

>> +		}

>> +	}

>> +

>> +	ret += odp_shm_free(shm);

>> +	ret += odp_pool_destroy(pool);

>> +	ret += odp_term_local();

>> +	ret += odp_term_global(instance);

>> +

>> +	return ret;

>> +}

>> -- 

>> 2.7.4

>>
diff mbox

Patch

diff --git a/test/common_plat/performance/.gitignore b/test/common_plat/performance/.gitignore
index edcc832..1527d25 100644
--- a/test/common_plat/performance/.gitignore
+++ b/test/common_plat/performance/.gitignore
@@ -4,4 +4,5 @@  odp_atomic
 odp_crypto
 odp_l2fwd
 odp_pktio_perf
+odp_sched_latency
 odp_scheduling
diff --git a/test/common_plat/performance/Makefile.am b/test/common_plat/performance/Makefile.am
index d23bb3e..f5dd8dd 100644
--- a/test/common_plat/performance/Makefile.am
+++ b/test/common_plat/performance/Makefile.am
@@ -5,6 +5,7 @@  TESTS_ENVIRONMENT += TEST_DIR=${builddir}
 EXECUTABLES = odp_crypto$(EXEEXT) odp_pktio_perf$(EXEEXT)
 
 COMPILE_ONLY = odp_l2fwd$(EXEEXT) \
+	       odp_sched_latency$(EXEEXT) \
 	       odp_scheduling$(EXEEXT)
 
 TESTSCRIPTS = odp_l2fwd_run.sh \
@@ -20,6 +21,8 @@  bin_PROGRAMS = $(EXECUTABLES) $(COMPILE_ONLY)
 
 odp_crypto_LDFLAGS = $(AM_LDFLAGS) -static
 odp_crypto_CFLAGS = $(AM_CFLAGS) -I${top_srcdir}/test
+odp_sched_latency_LDFLAGS = $(AM_LDFLAGS) -static
+odp_sched_latency_CFLAGS = $(AM_CFLAGS) -I${top_srcdir}/test
 odp_scheduling_LDFLAGS = $(AM_LDFLAGS) -static
 odp_scheduling_CFLAGS = $(AM_CFLAGS) -I${top_srcdir}/test
 
@@ -27,6 +30,7 @@  noinst_HEADERS = \
 		  $(top_srcdir)/test/test_debug.h
 
 dist_odp_crypto_SOURCES = odp_crypto.c
+dist_odp_sched_latency_SOURCES = odp_sched_latency.c
 dist_odp_scheduling_SOURCES = odp_scheduling.c
 dist_odp_pktio_perf_SOURCES = odp_pktio_perf.c
 
diff --git a/test/common_plat/performance/odp_sched_latency.c b/test/common_plat/performance/odp_sched_latency.c
new file mode 100644
index 0000000..063fb21
--- /dev/null
+++ b/test/common_plat/performance/odp_sched_latency.c
@@ -0,0 +1,767 @@ 
+/* Copyright (c) 2016, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+/**
+ * @file
+ *
+ * @example odp_sched_latency.c  ODP scheduling latency benchmark application
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <inttypes.h>
+
+#include <test_debug.h>
+
+/* ODP main header */
+#include <odp_api.h>
+
+/* ODP helper for Linux apps */
+#include <odp/helper/linux.h>
+
+/* GNU lib C */
+#include <getopt.h>
+
+#define MAX_WORKERS	  64		/**< Maximum number of worker threads */
+#define MAX_QUEUES	  4096		/**< Maximum number of queues */
+#define EVENT_POOL_SIZE	  (1024 * 1024) /**< Event pool size */
+#define TEST_ROUNDS (4 * 1024 * 1024)	/**< Test rounds for each thread */
+#define MAIN_THREAD	   1 /**< Thread ID performing maintenance tasks */
+
+/* Default values for command line arguments */
+#define SAMPLE_EVENT_PER_PRIO	  0 /**< Allocate a separate sample event for
+					 each priority */
+#define HI_PRIO_EVENTS		  0 /**< Number of high priority events */
+#define LO_PRIO_EVENTS		 32 /**< Number of low priority events */
+#define HI_PRIO_QUEUES		 16 /**< Number of high priority queues */
+#define LO_PRIO_QUEUES		 64 /**< Number of low priority queues */
+
+#define EVENTS_PER_HI_PRIO_QUEUE 0  /**< Alloc HI_PRIO_QUEUES x HI_PRIO_EVENTS
+					 events */
+#define EVENTS_PER_LO_PRIO_QUEUE 1  /**< Alloc LO_PRIO_QUEUES x LO_PRIO_EVENTS
+					 events */
+ODP_STATIC_ASSERT(HI_PRIO_QUEUES <= MAX_QUEUES, "Too many HI priority queues");
+ODP_STATIC_ASSERT(LO_PRIO_QUEUES <= MAX_QUEUES, "Too many LO priority queues");
+
+#define CACHE_ALIGN_ROUNDUP(x)\
+	((ODP_CACHE_LINE_SIZE) * \
+	 (((x) + ODP_CACHE_LINE_SIZE - 1) / (ODP_CACHE_LINE_SIZE)))
+
+/* Test priorities */
+#define NUM_PRIOS 2 /**< Number of tested priorities */
+#define HI_PRIO	  0
+#define LO_PRIO	  1
+
+/** Test event types */
+typedef enum {
+	WARM_UP, /**< Warm up event */
+	TRAFFIC, /**< Event used only as traffic load */
+	SAMPLE	 /**< Event used to measure latency */
+} event_type_t;
+
+/** Test event */
+typedef struct {
+	uint64_t ts;		/**< Send timestamp */
+	event_type_t type;	/**< Message type */
+	int src_idx[NUM_PRIOS]; /**< Source ODP queue */
+	int prio;		/**< Source queue priority */
+} test_event_t;
+
+/** Test arguments */
+typedef struct {
+	int cpu_count;			/**< CPU count */
+	odp_schedule_sync_t sync_type;	/**< Scheduler sync type */
+	struct {
+		int queues;	/**< Number of scheduling queues */
+		int events;	/**< Number of events */
+		odp_bool_t events_per_queue; /**< Allocate 'queues' x 'events'
+						  test events */
+	} prio[NUM_PRIOS];
+	odp_bool_t sample_per_prio; /**< Allocate a separate sample event for
+					 each priority */
+} test_args_t;
+
+/** Latency measurements statistics */
+typedef struct {
+	uint64_t events;   /**< Total number of received events */
+	uint64_t sample_events;  /**< Number of received sample events */
+	uint64_t tot;	   /**< Total event latency. Sum of all events. */
+	uint64_t min;	   /**< Minimum event latency */
+	uint64_t max;	   /**< Maximum event latency */
+} test_stat_t;
+
+/** Performance test statistics (per core) */
+typedef union {
+	test_stat_t prio[NUM_PRIOS]; /**< Test statistics per priority */
+
+	uint8_t pad[CACHE_ALIGN_ROUNDUP(NUM_PRIOS * sizeof(test_stat_t))];
+} core_stat_t ODP_ALIGNED_CACHE;
+
+/** Test global variables */
+typedef struct {
+	core_stat_t	 core_stat[MAX_WORKERS]; /**< Core specific stats */
+	odp_barrier_t    barrier; /**< Barrier for thread synchronization */
+	odp_pool_t       pool;	  /**< Pool for allocating test events */
+	test_args_t      args;	  /**< Parsed command line arguments */
+	odp_queue_t      queue[NUM_PRIOS][MAX_QUEUES]; /**< Scheduled queues */
+} test_globals_t;
+
+/**
+ * Clear all scheduled queues.
+ *
+ * Retry to be sure that all buffers have been scheduled.
+ */
+static void clear_sched_queues(void)
+{
+	odp_event_t ev;
+
+	while (1) {
+		ev = odp_schedule(NULL, ODP_SCHED_NO_WAIT);
+
+		if (ev == ODP_EVENT_INVALID)
+			break;
+
+		odp_event_free(ev);
+	}
+}
+
+/**
+ * Enqueue events into queues
+ *
+ * @param prio        Queue priority (HI_PRIO/LO_PRIO)
+ * @param num_queues  Number of queues
+ * @param num_events  Number of 'TRAFFIC' events
+ * @param num_samples Number of 'SAMPLE' events
+ * @param div_events  If true, divide 'num_events' between 'num_queues'. if
+ *		      false, enqueue 'num_events' to each queue.
+ * @param globals     Test shared data
+ *
+ * @retval 0 on success
+ * @retval -1 on failure
+ */
+static int enqueue_events(int prio, int num_queues, int num_events,
+			  int num_samples, odp_bool_t div_events,
+			  test_globals_t *globals)
+{
+	odp_buffer_t buf[num_events + num_samples];
+	odp_event_t ev[num_events + num_samples];
+	odp_queue_t queue;
+	test_event_t *event;
+	int i, j, ret;
+	int enq_events;
+	int events_per_queue;
+	int tot_events;
+	int rdy_events = 0;
+
+	tot_events = num_events + num_samples;
+
+	if (!num_queues || !tot_events)
+		return 0;
+
+	events_per_queue = tot_events;
+	if (div_events)
+		events_per_queue = (tot_events + num_queues - 1) / num_queues;
+
+	for (i = 0; i < num_queues; i++) {
+		queue = globals->queue[prio][i];
+
+		ret = odp_buffer_alloc_multi(globals->pool, buf,
+					     events_per_queue);
+		if (ret != events_per_queue) {
+			LOG_ERR("Buffer alloc failed. Try increasing EVENT_POOL_SIZE.\n");
+			ret = ret < 0 ? 0 : ret;
+			odp_buffer_free_multi(buf, ret);
+			return -1;
+		}
+		for (j = 0; j < events_per_queue; j++) {
+			if (!odp_buffer_is_valid(buf[j])) {
+				LOG_ERR("Buffer alloc failed\n");
+				odp_buffer_free_multi(buf, events_per_queue);
+				return -1;
+			}
+
+			event = odp_buffer_addr(buf[j]);
+			memset(event, 0, sizeof(test_event_t));
+
+			/* Latency isn't measured from the first processing
+			 * round. */
+			if (num_samples > 0) {
+				event->type = WARM_UP;
+				num_samples--;
+			} else {
+				event->type = TRAFFIC;
+			}
+			event->src_idx[prio] = i;
+			event->prio = prio;
+			ev[j] = odp_buffer_to_event(buf[j]);
+		}
+
+		enq_events = 0;
+		do {
+			ret = odp_queue_enq_multi(queue, &ev[enq_events],
+						  events_per_queue -
+						  enq_events);
+			if (ret < 0) {
+				LOG_ERR("Queue enqueue failed.\n");
+				return -1;
+			}
+			enq_events += ret;
+		} while (enq_events < events_per_queue);
+
+		rdy_events += events_per_queue;
+		if (div_events && rdy_events >= tot_events)
+			return 0;
+	}
+	return 0;
+}
+
+/**
+ * Print latency measurement results
+ *
+ * @param globals  Test shared data
+ */
+static void print_results(test_globals_t *globals)
+{
+	test_stat_t *lat;
+	odp_schedule_sync_t stype;
+	test_stat_t total;
+	test_args_t *args;
+	uint64_t avg;
+	int i, j;
+
+	args = &globals->args;
+	stype = globals->args.sync_type;
+
+	printf("\n%s queue scheduling latency\n",
+	       (stype == ODP_SCHED_SYNC_ATOMIC) ? "ATOMIC" :
+	       ((stype == ODP_SCHED_SYNC_ORDERED) ? "ORDERED" : "PARALLEL"));
+
+	printf("  LO_PRIO queues: %i\n", args->prio[LO_PRIO].queues);
+	if (args->prio[LO_PRIO].events_per_queue)
+		printf("  LO_PRIO event per queue: %i\n",
+		       args->prio[LO_PRIO].events);
+	else
+		printf("  LO_PRIO events: %i\n", args->prio[LO_PRIO].events);
+
+	printf("  HI_PRIO queues: %i\n", args->prio[HI_PRIO].queues);
+	if (args->prio[HI_PRIO].events_per_queue)
+		printf("  HI_PRIO event per queue: %i\n\n",
+		       args->prio[HI_PRIO].events);
+	else
+		printf("  HI_PRIO events: %i\n\n", args->prio[HI_PRIO].events);
+
+	for (i = 0; i < NUM_PRIOS; i++) {
+		memset(&total, 0, sizeof(test_stat_t));
+		total.min = UINT64_MAX;
+
+		printf("%s priority\n"
+		       "Thread   Avg[ns]    Min[ns]    Max[ns]    Samples    Total\n"
+		       "---------------------------------------------------------------\n",
+		       i == HI_PRIO ? "HIGH" : "LOW");
+		for (j = 1; j <= args->cpu_count; j++) {
+			lat = &globals->core_stat[j].prio[i];
+
+			if (lat->sample_events == 0) {
+				printf("%-8d N/A\n", j);
+				continue;
+			}
+
+			if (lat->max > total.max)
+				total.max = lat->max;
+			if (lat->min < total.min)
+				total.min = lat->min;
+			total.tot += lat->tot;
+			total.sample_events += lat->sample_events;
+			total.events += lat->events;
+
+			avg = lat->events ? lat->tot / lat->sample_events : 0;
+			printf("%-8d %-10" PRIu64 " %-10" PRIu64 " "
+			       "%-10" PRIu64 " %-10" PRIu64 " %-10" PRIu64 "\n",
+			       j, avg, lat->min, lat->max, lat->sample_events,
+			       lat->events);
+		}
+		printf("---------------------------------------------------------------\n");
+		if (total.sample_events == 0) {
+			printf("Total    N/A\n\n");
+			continue;
+		}
+		avg = total.events ? total.tot / total.sample_events : 0;
+		printf("Total    %-10" PRIu64 " %-10" PRIu64 " %-10" PRIu64 " "
+		       "%-10" PRIu64 " %-10" PRIu64 "\n\n", avg, total.min,
+		       total.max, total.sample_events, total.events);
+	}
+}
+
+/**
+ * Measure latency of scheduled ODP events
+ *
+ * Schedule and enqueue events until 'TEST_ROUNDS' events have been processed.
+ * Scheduling latency is measured only from type 'SAMPLE' events. Other events
+ * are simply enqueued back to the scheduling queues.
+ *
+ * For 'TRAFFIC' type events the destination queue is selected from the same
+ * priority class as source queue. 'SAMPLE' type event may change priority
+ * depending on the command line arguments.
+ *
+ * @param thr      Thread ID
+ * @param globals  Test shared data
+ *
+ * @retval 0 on success
+ * @retval -1 on failure
+ */
+static int test_schedule(int thr, test_globals_t *globals)
+{
+	odp_event_t ev;
+	odp_buffer_t buf;
+	odp_queue_t src_queue;
+	odp_queue_t dst_queue;
+	uint64_t latency;
+	uint32_t i;
+	test_event_t *event;
+	test_stat_t *stats;
+	int dst_idx;
+
+	memset(&globals->core_stat[thr], 0, sizeof(core_stat_t));
+	globals->core_stat[thr].prio[HI_PRIO].min = UINT64_MAX;
+	globals->core_stat[thr].prio[LO_PRIO].min = UINT64_MAX;
+
+	for (i = 0; i < TEST_ROUNDS; i++) {
+		ev = odp_schedule(&src_queue, ODP_SCHED_WAIT);
+
+		buf = odp_buffer_from_event(ev);
+		event = odp_buffer_addr(buf);
+
+		stats = &globals->core_stat[thr].prio[event->prio];
+
+		if (event->type == SAMPLE) {
+			latency = odp_time_to_ns(odp_time_global()) - event->ts;
+
+			if (latency > stats->max)
+				stats->max = latency;
+			if (latency < stats->min)
+				stats->min = latency;
+			stats->tot += latency;
+			stats->sample_events++;
+
+			/* Move sample event to a different priority */
+			if (!globals->args.sample_per_prio &&
+			    globals->args.prio[!event->prio].queues)
+				event->prio = !event->prio;
+		}
+
+		if (odp_unlikely(event->type == WARM_UP))
+			event->type = SAMPLE;
+		else
+			stats->events++;
+
+		/* Move event to next queue */
+		dst_idx = event->src_idx[event->prio] + 1;
+		if (dst_idx >= globals->args.prio[event->prio].queues)
+			dst_idx = 0;
+		event->src_idx[event->prio] = dst_idx;
+		dst_queue = globals->queue[event->prio][dst_idx];
+
+		if (event->type == SAMPLE)
+			event->ts = odp_time_to_ns(odp_time_global());
+
+		if (odp_queue_enq(dst_queue, ev)) {
+			LOG_ERR("[%i] Queue enqueue failed.\n", thr);
+			odp_event_free(ev);
+			return -1;
+		}
+	}
+
+	/* Clear possible locally stored buffers */
+	odp_schedule_pause();
+
+	while (1) {
+		ev = odp_schedule(&src_queue, ODP_SCHED_NO_WAIT);
+
+		if (ev == ODP_EVENT_INVALID)
+			break;
+
+		if (odp_queue_enq(src_queue, ev)) {
+			LOG_ERR("[%i] Queue enqueue failed.\n", thr);
+			odp_event_free(ev);
+			return -1;
+		}
+	}
+
+	odp_schedule_resume();
+
+	odp_barrier_wait(&globals->barrier);
+
+	clear_sched_queues();
+
+	if (thr == MAIN_THREAD)
+		print_results(globals);
+
+	return 0;
+}
+
+/**
+ * Worker thread
+ *
+ * @param arg  Arguments
+ *
+ * @retval 0 on success
+ * @retval -1 on failure
+ */
+static int run_thread(void *arg ODP_UNUSED)
+{
+	odp_shm_t shm;
+	test_globals_t *globals;
+	test_args_t *args;
+	int thr;
+	int sample_events = 0;
+
+	thr = odp_thread_id();
+
+	shm     = odp_shm_lookup("test_globals");
+	globals = odp_shm_addr(shm);
+
+	if (globals == NULL) {
+		LOG_ERR("Shared mem lookup failed\n");
+		return -1;
+	}
+
+	if (thr == MAIN_THREAD) {
+		args = &globals->args;
+
+		if (enqueue_events(HI_PRIO, args->prio[HI_PRIO].queues,
+				   args->prio[HI_PRIO].events, 1,
+				   !args->prio[HI_PRIO].events_per_queue,
+				   globals))
+			return -1;
+
+		if (!args->prio[HI_PRIO].queues || args->sample_per_prio)
+			sample_events = 1;
+
+		if (enqueue_events(LO_PRIO, args->prio[LO_PRIO].queues,
+				   args->prio[LO_PRIO].events, sample_events,
+				   !args->prio[LO_PRIO].events_per_queue,
+				   globals))
+			return -1;
+	}
+
+	odp_barrier_wait(&globals->barrier);
+
+	if (test_schedule(thr, globals))
+		return -1;
+
+	return 0;
+}
+
+/**
+ * Print usage information
+ */
+static void usage(void)
+{
+	printf("\n"
+	       "OpenDataPlane scheduler latency benchmark application.\n"
+	       "\n"
+	       "Usage: ./odp_sched_latency [options]\n"
+	       "Optional OPTIONS:\n"
+	       "  -c, --count <number> CPU count\n"
+	       "  -l, --lo-prio-queues <number> Number of low priority scheduled queues\n"
+	       "  -t, --hi-prio-queues <number> Number of high priority scheduled queues\n"
+	       "  -m, --lo-prio-events-per-queue <number> Number of events per low priority queue\n"
+	       "  -n, --hi-prio-events-per-queue <number> Number of events per high priority queues\n"
+	       "  -o, --lo-prio-events <number> Total number of low priority events (overrides the\n"
+	       "				number of events per queue)\n"
+	       "  -p, --hi-prio-events <number> Total number of high priority events (overrides the\n"
+	       "				number of events per queue)\n"
+	       "  -r  --sample-per-prio Allocate a separate sample event for each priority. By default\n"
+	       "			a single sample event is used and its priority is changed after\n"
+	       "			each processing round.\n"
+	       "  -s, --sync  Scheduled queues' sync type\n"
+	       "               0: ODP_SCHED_SYNC_PARALLEL (default)\n"
+	       "               1: ODP_SCHED_SYNC_ATOMIC\n"
+	       "               2: ODP_SCHED_SYNC_ORDERED\n"
+	       "  -h, --help   Display help and exit.\n\n"
+	       );
+}
+
+/**
+ * Parse arguments
+ *
+ * @param argc  Argument count
+ * @param argv  Argument vector
+ * @param args  Test arguments
+ */
+static void parse_args(int argc, char *argv[], test_args_t *args)
+{
+	int opt;
+	int long_index;
+	int i;
+
+	static const struct option longopts[] = {
+		{"count", required_argument, NULL, 'c'},
+		{"lo-prio-queues", required_argument, NULL, 'l'},
+		{"hi-prio-queues", required_argument, NULL, 't'},
+		{"lo-prio-events-per-queue", required_argument, NULL, 'm'},
+		{"hi-prio-events-per-queue", required_argument, NULL, 'n'},
+		{"lo-prio-events", required_argument, NULL, 'o'},
+		{"hi-prio-events", required_argument, NULL, 'p'},
+		{"sample-per-prio", no_argument, NULL, 'r'},
+		{"sync", required_argument, NULL, 's'},
+		{"help", no_argument, NULL, 'h'},
+		{NULL, 0, NULL, 0}
+	};
+
+	static const char *shortopts = "+c:s:l:t:m:n:o:p:rh";
+
+	/* Let helper collect its own arguments (e.g. --odph_proc) */
+	odph_parse_options(argc, argv, shortopts, longopts);
+
+	args->sync_type = ODP_SCHED_SYNC_PARALLEL;
+	args->sample_per_prio = SAMPLE_EVENT_PER_PRIO;
+	args->prio[LO_PRIO].queues = LO_PRIO_QUEUES;
+	args->prio[HI_PRIO].queues = HI_PRIO_QUEUES;
+	args->prio[LO_PRIO].events = LO_PRIO_EVENTS;
+	args->prio[HI_PRIO].events = HI_PRIO_EVENTS;
+	args->prio[LO_PRIO].events_per_queue = EVENTS_PER_LO_PRIO_QUEUE;
+	args->prio[HI_PRIO].events_per_queue = EVENTS_PER_HI_PRIO_QUEUE;
+
+	opterr = 0; /* Do not issue errors on helper options */
+	while (1) {
+		opt = getopt_long(argc, argv, shortopts, longopts, &long_index);
+
+		if (opt == -1)
+			break;	/* No more options */
+
+		switch (opt) {
+		case 'c':
+			args->cpu_count = atoi(optarg);
+			break;
+		case 'l':
+			args->prio[LO_PRIO].queues = atoi(optarg);
+			break;
+		case 't':
+			args->prio[HI_PRIO].queues = atoi(optarg);
+			break;
+		case 'm':
+			args->prio[LO_PRIO].events = atoi(optarg);
+			args->prio[LO_PRIO].events_per_queue = 1;
+			break;
+		case 'n':
+			args->prio[HI_PRIO].events = atoi(optarg);
+			args->prio[HI_PRIO].events_per_queue = 1;
+			break;
+		case 'o':
+			args->prio[LO_PRIO].events = atoi(optarg);
+			args->prio[LO_PRIO].events_per_queue = 0;
+			break;
+		case 'p':
+			args->prio[HI_PRIO].events = atoi(optarg);
+			args->prio[HI_PRIO].events_per_queue = 0;
+			break;
+		case 's':
+			i = atoi(optarg);
+			if (i == 1)
+				args->sync_type = ODP_SCHED_SYNC_ATOMIC;
+			else if (i == 2)
+				args->sync_type = ODP_SCHED_SYNC_ORDERED;
+			else
+				args->sync_type = ODP_SCHED_SYNC_PARALLEL;
+			break;
+		case 'r':
+			args->sample_per_prio = 1;
+			break;
+		case 'h':
+			usage();
+			exit(EXIT_SUCCESS);
+			break;
+
+		default:
+			break;
+		}
+	}
+
+	/* Make sure arguments are valid */
+	if (args->cpu_count > MAX_WORKERS)
+		args->cpu_count = MAX_WORKERS;
+	if (args->prio[LO_PRIO].queues > MAX_QUEUES)
+		args->prio[LO_PRIO].queues = MAX_QUEUES;
+	if (args->prio[HI_PRIO].queues > MAX_QUEUES)
+		args->prio[HI_PRIO].queues = MAX_QUEUES;
+	if (!args->prio[HI_PRIO].queues && !args->prio[LO_PRIO].queues) {
+		printf("No queues configured\n");
+		usage();
+		exit(EXIT_FAILURE);
+	}
+}
+
+/**
+ * Test main function
+ */
+int main(int argc, char *argv[])
+{
+	odp_instance_t instance;
+	odph_odpthread_t *thread_tbl;
+	odph_odpthread_params_t thr_params;
+	odp_cpumask_t cpumask;
+	odp_pool_t pool;
+	odp_pool_param_t params;
+	odp_shm_t shm;
+	test_globals_t *globals;
+	test_args_t args;
+	char cpumaskstr[ODP_CPUMASK_STR_SIZE];
+	int i, j;
+	int ret = 0;
+	int num_workers = 0;
+
+	printf("\nODP scheduling latency benchmark starts\n\n");
+
+	memset(&args, 0, sizeof(args));
+	parse_args(argc, argv, &args);
+
+	/* ODP global init */
+	if (odp_init_global(&instance, NULL, NULL)) {
+		LOG_ERR("ODP global init failed.\n");
+		return -1;
+	}
+
+	/*
+	 * Init this thread. It makes also ODP calls when
+	 * setting up resources for worker threads.
+	 */
+	if (odp_init_local(instance, ODP_THREAD_CONTROL)) {
+		LOG_ERR("ODP global init failed.\n");
+		return -1;
+	}
+
+	printf("\n");
+	printf("ODP system info\n");
+	printf("---------------\n");
+	printf("ODP API version:  %s\n",        odp_version_api_str());
+	printf("ODP impl name:    %s\n",        odp_version_impl_name());
+	printf("ODP impl details: %s\n",        odp_version_impl_str());
+	printf("CPU model:        %s\n",        odp_cpu_model_str());
+	printf("CPU freq (hz):    %" PRIu64 "\n", odp_cpu_hz_max());
+	printf("Cache line size:  %i\n",        odp_sys_cache_line_size());
+	printf("Max CPU count:    %i\n",        odp_cpu_count());
+
+	/* Get default worker cpumask */
+	if (args.cpu_count)
+		num_workers = args.cpu_count;
+
+	num_workers = odp_cpumask_default_worker(&cpumask, num_workers);
+	args.cpu_count = num_workers;
+
+	(void)odp_cpumask_to_str(&cpumask, cpumaskstr, sizeof(cpumaskstr));
+
+	printf("Worker threads:   %i\n", num_workers);
+	printf("First CPU:        %i\n", odp_cpumask_first(&cpumask));
+	printf("CPU mask:         %s\n\n", cpumaskstr);
+
+	thread_tbl = calloc(sizeof(odph_odpthread_t), num_workers);
+	if (!thread_tbl) {
+		LOG_ERR("no memory for thread_tbl\n");
+		return -1;
+	}
+
+	shm = odp_shm_reserve("test_globals",
+			      sizeof(test_globals_t), ODP_CACHE_LINE_SIZE, 0);
+	if (shm == ODP_SHM_INVALID) {
+		LOG_ERR("Shared memory reserve failed.\n");
+		return -1;
+	}
+
+	globals = odp_shm_addr(shm);
+	memset(globals, 0, sizeof(test_globals_t));
+	memcpy(&globals->args, &args, sizeof(test_args_t));
+
+	/*
+	 * Create event pool
+	 */
+	odp_pool_param_init(&params);
+	params.buf.size  = sizeof(test_event_t);
+	params.buf.align = 0;
+	params.buf.num   = EVENT_POOL_SIZE;
+	params.type      = ODP_POOL_BUFFER;
+
+	pool = odp_pool_create("event_pool", &params);
+
+	if (pool == ODP_POOL_INVALID) {
+		LOG_ERR("Pool create failed.\n");
+		return -1;
+	}
+	globals->pool = pool;
+
+	/*
+	 * Create queues for schedule test
+	 */
+	for (i = 0; i < NUM_PRIOS; i++) {
+		char name[] = "sched_XX_YY";
+		odp_queue_t queue;
+		odp_queue_param_t param;
+		int prio;
+
+		if (i == HI_PRIO)
+			prio = ODP_SCHED_PRIO_HIGHEST;
+		else
+			prio = ODP_SCHED_PRIO_LOWEST;
+
+		name[6] = '0' + (prio / 10);
+		name[7] = '0' + prio - (10 * (prio / 10));
+
+		odp_queue_param_init(&param);
+		param.type        = ODP_QUEUE_TYPE_SCHED;
+		param.sched.prio  = prio;
+		param.sched.sync  = args.sync_type;
+		param.sched.group = ODP_SCHED_GROUP_ALL;
+
+		for (j = 0; j < args.prio[i].queues; j++) {
+			name[9]  = '0' + j / 10;
+			name[10] = '0' + j - 10 * (j / 10);
+
+			queue = odp_queue_create(name, &param);
+
+			if (queue == ODP_QUEUE_INVALID) {
+				LOG_ERR("Scheduled queue create failed.\n");
+				return -1;
+			}
+
+			globals->queue[i][j] = queue;
+		}
+	}
+
+	odp_barrier_init(&globals->barrier, num_workers);
+
+	/* Create and launch worker threads */
+	memset(&thr_params, 0, sizeof(thr_params));
+	thr_params.thr_type = ODP_THREAD_WORKER;
+	thr_params.instance = instance;
+	thr_params.start = run_thread;
+	thr_params.arg   = NULL;
+	odph_odpthreads_create(thread_tbl, &cpumask, &thr_params);
+
+	/* Wait for worker threads to terminate */
+	odph_odpthreads_join(thread_tbl);
+	free(thread_tbl);
+
+	printf("ODP scheduling latency test complete\n\n");
+
+	for (i = 0; i < NUM_PRIOS; i++) {
+		odp_queue_t queue;
+		int num_queues;
+
+		num_queues = args.prio[i].queues;
+
+		for (j = 0; j < num_queues; j++) {
+			queue = globals->queue[i][j];
+			ret += odp_queue_destroy(queue);
+		}
+	}
+
+	ret += odp_shm_free(shm);
+	ret += odp_pool_destroy(pool);
+	ret += odp_term_local();
+	ret += odp_term_global(instance);
+
+	return ret;
+}