diff mbox series

[3/3] linux-gen: sched: optimize group scheduling

Message ID 1491479944-31232-3-git-send-email-petri.savolainen@linaro.org
State Accepted
Commit 0a94dd3322dcbbd791e45647b238997553bfd1bc
Headers show
Series [1/3] test: l2fwd: add group option | expand

Commit Message

Petri Savolainen April 6, 2017, 11:59 a.m. UTC
Use separate priority queues for different groups. Sharing
the same priority queue over multiple groups caused multiple
issues:
* latency and ordering issues when threads push back
  events (from wrong groups) to the tail of the priority queue
* unnecessary contention (scaling issues) when threads belong
  to different groups

Lowered the maximum number of groups from 256 to 32 (in the default
configuration) to limit memory usage of priority queues. This should
be enough for the most users.

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
 platform/linux-generic/odp_schedule.c | 284 +++++++++++++++++++++++-----------
 1 file changed, 195 insertions(+), 89 deletions(-)

-- 
2.8.1

Comments

Savolainen, Petri (Nokia - FI/Espoo) April 12, 2017, 9:56 a.m. UTC | #1
Ping.

This patch set removes the non-deterministic latency, lower QoS and potential queue starvation that is caused by this code ...

-			if (grp > ODP_SCHED_GROUP_ALL &&
-			    !odp_thrmask_isset(&sched->sched_grp[grp].mask,
-					       sched_local.thr)) {
-				/* This thread is not eligible for work from
-				 * this queue, so continue scheduling it.
-				 */
-				ring_enq(ring, PRIO_QUEUE_MASK, qi);
-
-				i++;
-				id++;
-				continue;
-			}


... which sends queues of "wrong" group back to the end of the priority queue. If e.g. tens of threads are sending it back and only one thread would accept it, it's actually very likely that queue service level is much lower than it should be.

Improved latency can be seen already with the new l2fwd -g option. 

-Petri


> -----Original Message-----

> From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of Petri

> Savolainen

> Sent: Thursday, April 06, 2017 2:59 PM

> To: lng-odp@lists.linaro.org

> Subject: [lng-odp] [PATCH 3/3] linux-gen: sched: optimize group scheduling

> 

> Use separate priority queues for different groups. Sharing

> the same priority queue over multiple groups caused multiple

> issues:

> * latency and ordering issues when threads push back

>   events (from wrong groups) to the tail of the priority queue

> * unnecessary contention (scaling issues) when threads belong

>   to different groups

> 

> Lowered the maximum number of groups from 256 to 32 (in the default

> configuration) to limit memory usage of priority queues. This should

> be enough for the most users.

> 

> Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

> ---

>  platform/linux-generic/odp_schedule.c | 284 +++++++++++++++++++++++------

> -----

>  1 file changed, 195 insertions(+), 89 deletions(-)

> 

> diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-

> generic/odp_schedule.c

> index e7079b9..f366e7e 100644

> --- a/platform/linux-generic/odp_schedule.c

> +++ b/platform/linux-generic/odp_schedule.c

> @@ -34,7 +34,7 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&

>  		  "normal_prio_is_not_between_highest_and_lowest");

> 

>  /* Number of scheduling groups */

> -#define NUM_SCHED_GRPS 256

> +#define NUM_SCHED_GRPS 32

> 

>  /* Priority queues per priority */

>  #define QUEUES_PER_PRIO  4

> @@ -163,7 +163,11 @@ typedef struct {

>  		ordered_stash_t stash[MAX_ORDERED_STASH];

>  	} ordered;

> 

> +	uint32_t grp_epoch;

> +	int num_grp;

> +	uint8_t grp[NUM_SCHED_GRPS];

>  	uint8_t weight_tbl[WEIGHT_TBL_SIZE];

> +	uint8_t grp_weight[WEIGHT_TBL_SIZE];

> 

>  } sched_local_t;

> 

> @@ -199,7 +203,7 @@ typedef struct {

>  	pri_mask_t     pri_mask[NUM_PRIO];

>  	odp_spinlock_t mask_lock;

> 

> -	prio_queue_t   prio_q[NUM_PRIO][QUEUES_PER_PRIO];

> +	prio_queue_t

> prio_q[NUM_SCHED_GRPS][NUM_PRIO][QUEUES_PER_PRIO];

> 

>  	odp_spinlock_t poll_cmd_lock;

>  	/* Number of commands in a command queue */

> @@ -214,8 +218,10 @@ typedef struct {

>  	odp_shm_t      shm;

>  	uint32_t       pri_count[NUM_PRIO][QUEUES_PER_PRIO];

> 

> -	odp_spinlock_t grp_lock;

> -	odp_thrmask_t mask_all;

> +	odp_thrmask_t    mask_all;

> +	odp_spinlock_t   grp_lock;

> +	odp_atomic_u32_t grp_epoch;

> +

>  	struct {

>  		char           name[ODP_SCHED_GROUP_NAME_LEN];

>  		odp_thrmask_t  mask;

> @@ -223,6 +229,7 @@ typedef struct {

>  	} sched_grp[NUM_SCHED_GRPS];

> 

>  	struct {

> +		int         grp;

>  		int         prio;

>  		int         queue_per_prio;

>  	} queue[ODP_CONFIG_QUEUES];

> @@ -273,7 +280,7 @@ static void sched_local_init(void)

>  static int schedule_init_global(void)

>  {

>  	odp_shm_t shm;

> -	int i, j;

> +	int i, j, grp;

> 

>  	ODP_DBG("Schedule init ... ");

> 

> @@ -293,15 +300,20 @@ static int schedule_init_global(void)

>  	sched->shm  = shm;

>  	odp_spinlock_init(&sched->mask_lock);

> 

> -	for (i = 0; i < NUM_PRIO; i++) {

> -		for (j = 0; j < QUEUES_PER_PRIO; j++) {

> -			int k;

> +	for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {

> +		for (i = 0; i < NUM_PRIO; i++) {

> +			for (j = 0; j < QUEUES_PER_PRIO; j++) {

> +				prio_queue_t *prio_q;

> +				int k;

> 

> -			ring_init(&sched->prio_q[i][j].ring);

> +				prio_q = &sched-

> >prio_q[grp][i][j];

> +				ring_init(&prio_q->ring);

> 

> -			for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++)

> -				sched-

> >prio_q[i][j].queue_index[k] =

> -				PRIO_QUEUE_EMPTY;

> +				for (k = 0; k <

> PRIO_QUEUE_RING_SIZE; k++) {

> +					prio_q-

> >queue_index[k] =

> +					PRIO_QUEUE_EMPTY;

> +				}

> +			}

>  		}

>  	}

> 

> @@ -317,12 +329,17 @@ static int schedule_init_global(void)

>  		sched->pktio_cmd[i].cmd_index = PKTIO_CMD_FREE;

> 

>  	odp_spinlock_init(&sched->grp_lock);

> +	odp_atomic_init_u32(&sched->grp_epoch, 0);

> 

>  	for (i = 0; i < NUM_SCHED_GRPS; i++) {

>  		memset(sched->sched_grp[i].name, 0,

> ODP_SCHED_GROUP_NAME_LEN);

>  		odp_thrmask_zero(&sched->sched_grp[i].mask);

>  	}

> 

> +	sched->sched_grp[ODP_SCHED_GROUP_ALL].allocated = 1;

> +	sched->sched_grp[ODP_SCHED_GROUP_WORKER].allocated = 1;

> +	sched->sched_grp[ODP_SCHED_GROUP_CONTROL].allocated = 1;

> +

>  	odp_thrmask_setall(&sched->mask_all);

> 

>  	ODP_DBG("done\n");

> @@ -330,29 +347,38 @@ static int schedule_init_global(void)

>  	return 0;

>  }

> 

> +static inline void queue_destroy_finalize(uint32_t qi)

> +{

> +	sched_cb_queue_destroy_finalize(qi);

> +}

> +

>  static int schedule_term_global(void)

>  {

>  	int ret = 0;

>  	int rc = 0;

> -	int i, j;

> +	int i, j, grp;

> 

> -	for (i = 0; i < NUM_PRIO; i++) {

> -		for (j = 0; j < QUEUES_PER_PRIO; j++) {

> -			ring_t *ring = &sched->prio_q[i][j].ring;

> -			uint32_t qi;

> +	for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {

> +		for (i = 0; i < NUM_PRIO; i++) {

> +			for (j = 0; j < QUEUES_PER_PRIO; j++) {

> +				ring_t *ring = &sched-

> >prio_q[grp][i][j].ring;

> +				uint32_t qi;

> 

> -			while ((qi = ring_deq(ring,

> PRIO_QUEUE_MASK)) !=

> -			       RING_EMPTY) {

> -				odp_event_t events[1];

> -				int num;

> +				while ((qi = ring_deq(ring,

> PRIO_QUEUE_MASK)) !=

> +				       RING_EMPTY) {

> +					odp_event_t

> events[1];

> +					int num;

> 

> -				num =

> sched_cb_queue_deq_multi(qi, events, 1);

> +					num =

> sched_cb_queue_deq_multi(qi,

> +

> 		       events,

> +

> 		       1);

> 

> -				if (num < 0)

> -

> 	sched_cb_queue_destroy_finalize(qi);

> +					if (num < 0)

> +

> 	queue_destroy_finalize(qi);

> 

> -				if (num > 0)

> -					ODP_ERR("Queue not

> empty\n");

> +					if (num > 0)

> +

> 	ODP_ERR("Queue not empty\n");

> +				}

>  			}

>  		}

>  	}

> @@ -383,6 +409,40 @@ static int schedule_term_local(void)

>  	return 0;

>  }

> 

> +static inline void grp_update_mask(int grp, const odp_thrmask_t

> *new_mask)

> +{

> +	odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask);

> +	odp_atomic_add_rel_u32(&sched->grp_epoch, 1);

> +}

> +

> +static inline int grp_update_tbl(void)

> +{

> +	int i;

> +	int num = 0;

> +	int thr = sched_local.thr;

> +

> +	odp_spinlock_lock(&sched->grp_lock);

> +

> +	for (i = 0; i < NUM_SCHED_GRPS; i++) {

> +		if (sched->sched_grp[i].allocated == 0)

> +			continue;

> +

> +		if (odp_thrmask_isset(&sched->sched_grp[i].mask,

> thr)) {

> +			sched_local.grp[num] = i;

> +			num++;

> +		}

> +	}

> +

> +	odp_spinlock_unlock(&sched->grp_lock);

> +

> +	/* Update group weights. Round robin over all thread's groups.

> */

> +	for (i = 0; i < WEIGHT_TBL_SIZE; i++)

> +		sched_local.grp_weight[i] = i % num;

> +

> +	sched_local.num_grp = num;

> +	return num;

> +}

> +

>  static unsigned schedule_max_ordered_locks(void)

>  {

>  	return MAX_ORDERED_LOCKS_PER_QUEUE;

> @@ -433,6 +493,7 @@ static int schedule_init_queue(uint32_t queue_index,

>  	int prio = sched_param->prio;

> 

>  	pri_set_queue(queue_index, prio);

> +	sched->queue[queue_index].grp  = sched_param->group;

>  	sched->queue[queue_index].prio = prio;

>  	sched->queue[queue_index].queue_per_prio =

> queue_per_prio(queue_index);

> 

> @@ -444,6 +505,7 @@ static void schedule_destroy_queue(uint32_t

> queue_index)

>  	int prio = sched->queue[queue_index].prio;

> 

>  	pri_clr_queue(queue_index, prio);

> +	sched->queue[queue_index].grp = 0;

>  	sched->queue[queue_index].prio = 0;

>  	sched->queue[queue_index].queue_per_prio = 0;

>  }

> @@ -535,9 +597,10 @@ static void schedule_release_atomic(void)

>  	uint32_t qi = sched_local.queue_index;

> 

>  	if (qi != PRIO_QUEUE_EMPTY && sched_local.num  == 0) {

> -		int prio           = sched->queue[qi].prio;

> +		int grp = sched->queue[qi].grp;

> +		int prio = sched->queue[qi].prio;

>  		int queue_per_prio = sched-

> >queue[qi].queue_per_prio;

> -		ring_t *ring       = &sched-

> >prio_q[prio][queue_per_prio].ring;

> +		ring_t *ring = &sched-

> >prio_q[grp][prio][queue_per_prio].ring;

> 

>  		/* Release current atomic queue */

>  		ring_enq(ring, PRIO_QUEUE_MASK, qi);

> @@ -688,42 +751,14 @@ static int schedule_ord_enq_multi(uint32_t

> queue_index, void *buf_hdr[],

>  	return 1;

>  }

> 

> -/*

> - * Schedule queues

> - */

> -static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],

> -		       unsigned int max_num)

> +static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t

> out_ev[],

> +				  unsigned int max_num, int

> grp, int first)

>  {

>  	int prio, i;

>  	int ret;

> -	int id, first;

> +	int id;

>  	unsigned int max_deq = MAX_DEQ;

>  	uint32_t qi;

> -	uint16_t round;

> -

> -	if (sched_local.num) {

> -		ret = copy_events(out_ev, max_num);

> -

> -		if (out_queue)

> -			*out_queue = sched_local.queue;

> -

> -		return ret;

> -	}

> -

> -	schedule_release_context();

> -

> -	if (odp_unlikely(sched_local.pause))

> -		return 0;

> -

> -	/* Each thread prefers a priority queue. Poll weight table

> avoids

> -	 * starvation of other priority queues on low thread counts. */

> -	round = sched_local.round + 1;

> -

> -	if (odp_unlikely(round == WEIGHT_TBL_SIZE))

> -		round = 0;

> -

> -	sched_local.round = round;

> -	first = sched_local.weight_tbl[round];

> 

>  	/* Schedule events */

>  	for (prio = 0; prio < NUM_PRIO; prio++) {

> @@ -736,7 +771,6 @@ static int do_schedule(odp_queue_t *out_queue,

> odp_event_t out_ev[],

> 

>  		for (i = 0; i < QUEUES_PER_PRIO;) {

>  			int num;

> -			int grp;

>  			int ordered;

>  			odp_queue_t handle;

>  			ring_t *ring;

> @@ -753,7 +787,7 @@ static int do_schedule(odp_queue_t *out_queue,

> odp_event_t out_ev[],

>  			}

> 

>  			/* Get queue index from the priority queue

> */

> -			ring = &sched->prio_q[prio][id].ring;

> +			ring = &sched->prio_q[grp][prio][id].ring;

>  			qi   = ring_deq(ring, PRIO_QUEUE_MASK);

> 

>  			/* Priority queue empty */

> @@ -763,21 +797,6 @@ static int do_schedule(odp_queue_t *out_queue,

> odp_event_t out_ev[],

>  				continue;

>  			}

> 

> -			grp = sched_cb_queue_grp(qi);

> -

> -			if (grp > ODP_SCHED_GROUP_ALL &&

> -			    !odp_thrmask_isset(&sched-

> >sched_grp[grp].mask,

> -

> sched_local.thr)) {

> -				/* This thread is not eligible

> for work from

> -				 * this queue, so continue

> scheduling it.

> -				 */

> -				ring_enq(ring, PRIO_QUEUE_MASK,

> qi);

> -

> -				i++;

> -				id++;

> -				continue;

> -			}

> -

>  			/* Low priorities have smaller batch size

> to limit

>  			 * head of line blocking latency. */

>  			if (odp_unlikely(prio >

> ODP_SCHED_PRIO_DEFAULT))

> @@ -845,6 +864,70 @@ static int do_schedule(odp_queue_t *out_queue,

> odp_event_t out_ev[],

>  		}

>  	}

> 

> +	return 0;

> +}

> +

> +/*

> + * Schedule queues

> + */

> +static inline int do_schedule(odp_queue_t *out_queue, odp_event_t

> out_ev[],

> +			      unsigned int max_num)

> +{

> +	int i, num_grp;

> +	int ret;

> +	int id, first, grp_id;

> +	uint16_t round;

> +	uint32_t epoch;

> +

> +	if (sched_local.num) {

> +		ret = copy_events(out_ev, max_num);

> +

> +		if (out_queue)

> +			*out_queue = sched_local.queue;

> +

> +		return ret;

> +	}

> +

> +	schedule_release_context();

> +

> +	if (odp_unlikely(sched_local.pause))

> +		return 0;

> +

> +	/* Each thread prefers a priority queue. Poll weight table

> avoids

> +	 * starvation of other priority queues on low thread counts. */

> +	round = sched_local.round + 1;

> +

> +	if (odp_unlikely(round == WEIGHT_TBL_SIZE))

> +		round = 0;

> +

> +	sched_local.round = round;

> +	first = sched_local.weight_tbl[round];

> +

> +	epoch = odp_atomic_load_acq_u32(&sched->grp_epoch);

> +	num_grp = sched_local.num_grp;

> +

> +	if (odp_unlikely(sched_local.grp_epoch != epoch)) {

> +		num_grp = grp_update_tbl();

> +		sched_local.grp_epoch = epoch;

> +	}

> +

> +	grp_id = sched_local.grp_weight[round];

> +

> +	/* Schedule queues per group and priority */

> +	for (i = 0; i < num_grp; i++) {

> +		int grp;

> +

> +		grp = sched_local.grp[grp_id];

> +		ret = do_schedule_grp(out_queue, out_ev, max_num,

> grp, first);

> +

> +		if (odp_likely(ret))

> +			return ret;

> +

> +		grp_id++;

> +		if (odp_unlikely(grp_id >= num_grp))

> +			grp_id = 0;

> +	}

> +

>  	/*

>  	 * Poll packet input when there are no events

>  	 *   * Each thread starts the search for a poll command from

> its

> @@ -1050,7 +1133,8 @@ static odp_schedule_group_t

> schedule_group_create(const char *name,

> 

> 	ODP_SCHED_GROUP_NAME_LEN - 1);

> 

> 	grp_name[ODP_SCHED_GROUP_NAME_LEN - 1] = 0;

>  			}

> -			odp_thrmask_copy(&sched-

> >sched_grp[i].mask, mask);

> +

> +			grp_update_mask(i, mask);

>  			group = (odp_schedule_group_t)i;

>  			sched->sched_grp[i].allocated = 1;

>  			break;

> @@ -1063,13 +1147,16 @@ static odp_schedule_group_t

> schedule_group_create(const char *name,

> 

>  static int schedule_group_destroy(odp_schedule_group_t group)

>  {

> +	odp_thrmask_t zero;

>  	int ret;

> 

> +	odp_thrmask_zero(&zero);

> +

>  	odp_spinlock_lock(&sched->grp_lock);

> 

>  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

>  	    sched->sched_grp[group].allocated) {

> -		odp_thrmask_zero(&sched->sched_grp[group].mask);

> +		grp_update_mask(group, &zero);

>  		memset(sched->sched_grp[group].name, 0,

>  		       ODP_SCHED_GROUP_NAME_LEN);

>  		sched->sched_grp[group].allocated = 0;

> @@ -1109,9 +1196,11 @@ static int schedule_group_join(odp_schedule_group_t

> group,

> 

>  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

>  	    sched->sched_grp[group].allocated) {

> -		odp_thrmask_or(&sched->sched_grp[group].mask,

> -			       &sched->sched_grp[group].mask,

> -			       mask);

> +		odp_thrmask_t new_mask;

> +

> +		odp_thrmask_or(&new_mask, &sched-

> >sched_grp[group].mask, mask);

> +		grp_update_mask(group, &new_mask);

> +

>  		ret = 0;

>  	} else {

>  		ret = -1;

> @@ -1124,18 +1213,19 @@ static int

> schedule_group_join(odp_schedule_group_t group,

>  static int schedule_group_leave(odp_schedule_group_t group,

>  				const odp_thrmask_t *mask)

>  {

> +	odp_thrmask_t new_mask;

>  	int ret;

> 

> +	odp_thrmask_xor(&new_mask, mask, &sched->mask_all);

> +

>  	odp_spinlock_lock(&sched->grp_lock);

> 

>  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

>  	    sched->sched_grp[group].allocated) {

> -		odp_thrmask_t leavemask;

> +		odp_thrmask_and(&new_mask, &sched-

> >sched_grp[group].mask,

> +				&new_mask);

> +		grp_update_mask(group, &new_mask);

> 

> -		odp_thrmask_xor(&leavemask, mask, &sched->mask_all);

> -		odp_thrmask_and(&sched->sched_grp[group].mask,

> -				&sched->sched_grp[group].mask,

> -				&leavemask);

>  		ret = 0;

>  	} else {

>  		ret = -1;

> @@ -1186,12 +1276,19 @@ static int

> schedule_group_info(odp_schedule_group_t group,

> 

>  static int schedule_thr_add(odp_schedule_group_t group, int thr)

>  {

> +	odp_thrmask_t mask;

> +	odp_thrmask_t new_mask;

> +

>  	if (group < 0 || group >= SCHED_GROUP_NAMED)

>  		return -1;

> 

> +	odp_thrmask_zero(&mask);

> +	odp_thrmask_set(&mask, thr);

> +

>  	odp_spinlock_lock(&sched->grp_lock);

> 

> -	odp_thrmask_set(&sched->sched_grp[group].mask, thr);

> +	odp_thrmask_or(&new_mask, &sched->sched_grp[group].mask,

> &mask);

> +	grp_update_mask(group, &new_mask);

> 

>  	odp_spinlock_unlock(&sched->grp_lock);

> 

> @@ -1200,12 +1297,20 @@ static int schedule_thr_add(odp_schedule_group_t

> group, int thr)

> 

>  static int schedule_thr_rem(odp_schedule_group_t group, int thr)

>  {

> +	odp_thrmask_t mask;

> +	odp_thrmask_t new_mask;

> +

>  	if (group < 0 || group >= SCHED_GROUP_NAMED)

>  		return -1;

> 

> +	odp_thrmask_zero(&mask);

> +	odp_thrmask_set(&mask, thr);

> +	odp_thrmask_xor(&new_mask, &mask, &sched->mask_all);

> +

>  	odp_spinlock_lock(&sched->grp_lock);

> 

> -	odp_thrmask_clr(&sched->sched_grp[group].mask, thr);

> +	odp_thrmask_and(&new_mask, &sched->sched_grp[group].mask,

> &new_mask);

> +	grp_update_mask(group, &new_mask);

> 

>  	odp_spinlock_unlock(&sched->grp_lock);

> 

> @@ -1219,9 +1324,10 @@ static void schedule_prefetch(int num ODP_UNUSED)

> 

>  static int schedule_sched_queue(uint32_t queue_index)

>  {

> +	int grp            = sched->queue[queue_index].grp;

>  	int prio           = sched->queue[queue_index].prio;

>  	int queue_per_prio = sched->queue[queue_index].queue_per_prio;

> -	ring_t *ring       = &sched->prio_q[prio][queue_per_prio].ring;

> +	ring_t *ring       = &sched-

> >prio_q[grp][prio][queue_per_prio].ring;

> 

>  	ring_enq(ring, PRIO_QUEUE_MASK, queue_index);

>  	return 0;

> --

> 2.8.1
Savolainen, Petri (Nokia - FI/Espoo) April 20, 2017, 6:37 a.m. UTC | #2
Ping. Fixes bug https://bugs.linaro.org/show_bug.cgi?id=2945



> -----Original Message-----

> From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of

> Savolainen, Petri (Nokia - FI/Espoo)

> Sent: Wednesday, April 12, 2017 12:57 PM

> To: lng-odp@lists.linaro.org

> Subject: Re: [lng-odp] [PATCH 3/3] linux-gen: sched: optimize group

> scheduling

> 

> Ping.

> 

> This patch set removes the non-deterministic latency, lower QoS and

> potential queue starvation that is caused by this code ...

> 

> -			if (grp > ODP_SCHED_GROUP_ALL &&

> -			    !odp_thrmask_isset(&sched-

> >sched_grp[grp].mask,

> -

> sched_local.thr)) {

> -				/* This thread is not eligible

> for work from

> -				 * this queue, so continue

> scheduling it.

> -				 */

> -				ring_enq(ring, PRIO_QUEUE_MASK,

> qi);

> -

> -				i++;

> -				id++;

> -				continue;

> -			}

> 

> 

> ... which sends queues of "wrong" group back to the end of the priority

> queue. If e.g. tens of threads are sending it back and only one thread

> would accept it, it's actually very likely that queue service level is

> much lower than it should be.

> 

> Improved latency can be seen already with the new l2fwd -g option.

> 

> -Petri

> 

> 

> > -----Original Message-----

> > From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of

> Petri

> > Savolainen

> > Sent: Thursday, April 06, 2017 2:59 PM

> > To: lng-odp@lists.linaro.org

> > Subject: [lng-odp] [PATCH 3/3] linux-gen: sched: optimize group

> scheduling

> >

> > Use separate priority queues for different groups. Sharing

> > the same priority queue over multiple groups caused multiple

> > issues:

> > * latency and ordering issues when threads push back

> >   events (from wrong groups) to the tail of the priority queue

> > * unnecessary contention (scaling issues) when threads belong

> >   to different groups

> >

> > Lowered the maximum number of groups from 256 to 32 (in the default

> > configuration) to limit memory usage of priority queues. This should

> > be enough for the most users.

> >

> > Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

> > ---

> >  platform/linux-generic/odp_schedule.c | 284 +++++++++++++++++++++++----

> --

> > -----

> >  1 file changed, 195 insertions(+), 89 deletions(-)

> >

> > diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-

> > generic/odp_schedule.c

> > index e7079b9..f366e7e 100644

> > --- a/platform/linux-generic/odp_schedule.c

> > +++ b/platform/linux-generic/odp_schedule.c

> > @@ -34,7 +34,7 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&

> >  		  "normal_prio_is_not_between_highest_and_lowest");

> >

> >  /* Number of scheduling groups */

> > -#define NUM_SCHED_GRPS 256

> > +#define NUM_SCHED_GRPS 32

> >

> >  /* Priority queues per priority */

> >  #define QUEUES_PER_PRIO  4

> > @@ -163,7 +163,11 @@ typedef struct {

> >  		ordered_stash_t stash[MAX_ORDERED_STASH];

> >  	} ordered;

> >

> > +	uint32_t grp_epoch;

> > +	int num_grp;

> > +	uint8_t grp[NUM_SCHED_GRPS];

> >  	uint8_t weight_tbl[WEIGHT_TBL_SIZE];

> > +	uint8_t grp_weight[WEIGHT_TBL_SIZE];

> >

> >  } sched_local_t;

> >

> > @@ -199,7 +203,7 @@ typedef struct {

> >  	pri_mask_t     pri_mask[NUM_PRIO];

> >  	odp_spinlock_t mask_lock;

> >

> > -	prio_queue_t   prio_q[NUM_PRIO][QUEUES_PER_PRIO];

> > +	prio_queue_t

> > prio_q[NUM_SCHED_GRPS][NUM_PRIO][QUEUES_PER_PRIO];

> >

> >  	odp_spinlock_t poll_cmd_lock;

> >  	/* Number of commands in a command queue */

> > @@ -214,8 +218,10 @@ typedef struct {

> >  	odp_shm_t      shm;

> >  	uint32_t       pri_count[NUM_PRIO][QUEUES_PER_PRIO];

> >

> > -	odp_spinlock_t grp_lock;

> > -	odp_thrmask_t mask_all;

> > +	odp_thrmask_t    mask_all;

> > +	odp_spinlock_t   grp_lock;

> > +	odp_atomic_u32_t grp_epoch;

> > +

> >  	struct {

> >  		char           name[ODP_SCHED_GROUP_NAME_LEN];

> >  		odp_thrmask_t  mask;

> > @@ -223,6 +229,7 @@ typedef struct {

> >  	} sched_grp[NUM_SCHED_GRPS];

> >

> >  	struct {

> > +		int         grp;

> >  		int         prio;

> >  		int         queue_per_prio;

> >  	} queue[ODP_CONFIG_QUEUES];

> > @@ -273,7 +280,7 @@ static void sched_local_init(void)

> >  static int schedule_init_global(void)

> >  {

> >  	odp_shm_t shm;

> > -	int i, j;

> > +	int i, j, grp;

> >

> >  	ODP_DBG("Schedule init ... ");

> >

> > @@ -293,15 +300,20 @@ static int schedule_init_global(void)

> >  	sched->shm  = shm;

> >  	odp_spinlock_init(&sched->mask_lock);

> >

> > -	for (i = 0; i < NUM_PRIO; i++) {

> > -		for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > -			int k;

> > +	for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {

> > +		for (i = 0; i < NUM_PRIO; i++) {

> > +			for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > +				prio_queue_t *prio_q;

> > +				int k;

> >

> > -			ring_init(&sched->prio_q[i][j].ring);

> > +				prio_q = &sched-

> > >prio_q[grp][i][j];

> > +				ring_init(&prio_q->ring);

> >

> > -			for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++)

> > -				sched-

> > >prio_q[i][j].queue_index[k] =

> > -				PRIO_QUEUE_EMPTY;

> > +				for (k = 0; k <

> > PRIO_QUEUE_RING_SIZE; k++) {

> > +					prio_q-

> > >queue_index[k] =

> > +					PRIO_QUEUE_EMPTY;

> > +				}

> > +			}

> >  		}

> >  	}

> >

> > @@ -317,12 +329,17 @@ static int schedule_init_global(void)

> >  		sched->pktio_cmd[i].cmd_index = PKTIO_CMD_FREE;

> >

> >  	odp_spinlock_init(&sched->grp_lock);

> > +	odp_atomic_init_u32(&sched->grp_epoch, 0);

> >

> >  	for (i = 0; i < NUM_SCHED_GRPS; i++) {

> >  		memset(sched->sched_grp[i].name, 0,

> > ODP_SCHED_GROUP_NAME_LEN);

> >  		odp_thrmask_zero(&sched->sched_grp[i].mask);

> >  	}

> >

> > +	sched->sched_grp[ODP_SCHED_GROUP_ALL].allocated = 1;

> > +	sched->sched_grp[ODP_SCHED_GROUP_WORKER].allocated = 1;

> > +	sched->sched_grp[ODP_SCHED_GROUP_CONTROL].allocated = 1;

> > +

> >  	odp_thrmask_setall(&sched->mask_all);

> >

> >  	ODP_DBG("done\n");

> > @@ -330,29 +347,38 @@ static int schedule_init_global(void)

> >  	return 0;

> >  }

> >

> > +static inline void queue_destroy_finalize(uint32_t qi)

> > +{

> > +	sched_cb_queue_destroy_finalize(qi);

> > +}

> > +

> >  static int schedule_term_global(void)

> >  {

> >  	int ret = 0;

> >  	int rc = 0;

> > -	int i, j;

> > +	int i, j, grp;

> >

> > -	for (i = 0; i < NUM_PRIO; i++) {

> > -		for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > -			ring_t *ring = &sched->prio_q[i][j].ring;

> > -			uint32_t qi;

> > +	for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {

> > +		for (i = 0; i < NUM_PRIO; i++) {

> > +			for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > +				ring_t *ring = &sched-

> > >prio_q[grp][i][j].ring;

> > +				uint32_t qi;

> >

> > -			while ((qi = ring_deq(ring,

> > PRIO_QUEUE_MASK)) !=

> > -			       RING_EMPTY) {

> > -				odp_event_t events[1];

> > -				int num;

> > +				while ((qi = ring_deq(ring,

> > PRIO_QUEUE_MASK)) !=

> > +				       RING_EMPTY) {

> > +					odp_event_t

> > events[1];

> > +					int num;

> >

> > -				num =

> > sched_cb_queue_deq_multi(qi, events, 1);

> > +					num =

> > sched_cb_queue_deq_multi(qi,

> > +

> > 		       events,

> > +

> > 		       1);

> >

> > -				if (num < 0)

> > -

> > 	sched_cb_queue_destroy_finalize(qi);

> > +					if (num < 0)

> > +

> > 	queue_destroy_finalize(qi);

> >

> > -				if (num > 0)

> > -					ODP_ERR("Queue not

> > empty\n");

> > +					if (num > 0)

> > +

> > 	ODP_ERR("Queue not empty\n");

> > +				}

> >  			}

> >  		}

> >  	}

> > @@ -383,6 +409,40 @@ static int schedule_term_local(void)

> >  	return 0;

> >  }

> >

> > +static inline void grp_update_mask(int grp, const odp_thrmask_t

> > *new_mask)

> > +{

> > +	odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask);

> > +	odp_atomic_add_rel_u32(&sched->grp_epoch, 1);

> > +}

> > +

> > +static inline int grp_update_tbl(void)

> > +{

> > +	int i;

> > +	int num = 0;

> > +	int thr = sched_local.thr;

> > +

> > +	odp_spinlock_lock(&sched->grp_lock);

> > +

> > +	for (i = 0; i < NUM_SCHED_GRPS; i++) {

> > +		if (sched->sched_grp[i].allocated == 0)

> > +			continue;

> > +

> > +		if (odp_thrmask_isset(&sched->sched_grp[i].mask,

> > thr)) {

> > +			sched_local.grp[num] = i;

> > +			num++;

> > +		}

> > +	}

> > +

> > +	odp_spinlock_unlock(&sched->grp_lock);

> > +

> > +	/* Update group weights. Round robin over all thread's groups.

> > */

> > +	for (i = 0; i < WEIGHT_TBL_SIZE; i++)

> > +		sched_local.grp_weight[i] = i % num;

> > +

> > +	sched_local.num_grp = num;

> > +	return num;

> > +}

> > +

> >  static unsigned schedule_max_ordered_locks(void)

> >  {

> >  	return MAX_ORDERED_LOCKS_PER_QUEUE;

> > @@ -433,6 +493,7 @@ static int schedule_init_queue(uint32_t queue_index,

> >  	int prio = sched_param->prio;

> >

> >  	pri_set_queue(queue_index, prio);

> > +	sched->queue[queue_index].grp  = sched_param->group;

> >  	sched->queue[queue_index].prio = prio;

> >  	sched->queue[queue_index].queue_per_prio =

> > queue_per_prio(queue_index);

> >

> > @@ -444,6 +505,7 @@ static void schedule_destroy_queue(uint32_t

> > queue_index)

> >  	int prio = sched->queue[queue_index].prio;

> >

> >  	pri_clr_queue(queue_index, prio);

> > +	sched->queue[queue_index].grp = 0;

> >  	sched->queue[queue_index].prio = 0;

> >  	sched->queue[queue_index].queue_per_prio = 0;

> >  }

> > @@ -535,9 +597,10 @@ static void schedule_release_atomic(void)

> >  	uint32_t qi = sched_local.queue_index;

> >

> >  	if (qi != PRIO_QUEUE_EMPTY && sched_local.num  == 0) {

> > -		int prio           = sched->queue[qi].prio;

> > +		int grp = sched->queue[qi].grp;

> > +		int prio = sched->queue[qi].prio;

> >  		int queue_per_prio = sched-

> > >queue[qi].queue_per_prio;

> > -		ring_t *ring       = &sched-

> > >prio_q[prio][queue_per_prio].ring;

> > +		ring_t *ring = &sched-

> > >prio_q[grp][prio][queue_per_prio].ring;

> >

> >  		/* Release current atomic queue */

> >  		ring_enq(ring, PRIO_QUEUE_MASK, qi);

> > @@ -688,42 +751,14 @@ static int schedule_ord_enq_multi(uint32_t

> > queue_index, void *buf_hdr[],

> >  	return 1;

> >  }

> >

> > -/*

> > - * Schedule queues

> > - */

> > -static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],

> > -		       unsigned int max_num)

> > +static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t

> > out_ev[],

> > +				  unsigned int max_num, int

> > grp, int first)

> >  {

> >  	int prio, i;

> >  	int ret;

> > -	int id, first;

> > +	int id;

> >  	unsigned int max_deq = MAX_DEQ;

> >  	uint32_t qi;

> > -	uint16_t round;

> > -

> > -	if (sched_local.num) {

> > -		ret = copy_events(out_ev, max_num);

> > -

> > -		if (out_queue)

> > -			*out_queue = sched_local.queue;

> > -

> > -		return ret;

> > -	}

> > -

> > -	schedule_release_context();

> > -

> > -	if (odp_unlikely(sched_local.pause))

> > -		return 0;

> > -

> > -	/* Each thread prefers a priority queue. Poll weight table

> > avoids

> > -	 * starvation of other priority queues on low thread counts. */

> > -	round = sched_local.round + 1;

> > -

> > -	if (odp_unlikely(round == WEIGHT_TBL_SIZE))

> > -		round = 0;

> > -

> > -	sched_local.round = round;

> > -	first = sched_local.weight_tbl[round];

> >

> >  	/* Schedule events */

> >  	for (prio = 0; prio < NUM_PRIO; prio++) {

> > @@ -736,7 +771,6 @@ static int do_schedule(odp_queue_t *out_queue,

> > odp_event_t out_ev[],

> >

> >  		for (i = 0; i < QUEUES_PER_PRIO;) {

> >  			int num;

> > -			int grp;

> >  			int ordered;

> >  			odp_queue_t handle;

> >  			ring_t *ring;

> > @@ -753,7 +787,7 @@ static int do_schedule(odp_queue_t *out_queue,

> > odp_event_t out_ev[],

> >  			}

> >

> >  			/* Get queue index from the priority queue

> > */

> > -			ring = &sched->prio_q[prio][id].ring;

> > +			ring = &sched->prio_q[grp][prio][id].ring;

> >  			qi   = ring_deq(ring, PRIO_QUEUE_MASK);

> >

> >  			/* Priority queue empty */

> > @@ -763,21 +797,6 @@ static int do_schedule(odp_queue_t *out_queue,

> > odp_event_t out_ev[],

> >  				continue;

> >  			}

> >

> > -			grp = sched_cb_queue_grp(qi);

> > -

> > -			if (grp > ODP_SCHED_GROUP_ALL &&

> > -			    !odp_thrmask_isset(&sched-

> > >sched_grp[grp].mask,

> > -

> > sched_local.thr)) {

> > -				/* This thread is not eligible

> > for work from

> > -				 * this queue, so continue

> > scheduling it.

> > -				 */

> > -				ring_enq(ring, PRIO_QUEUE_MASK,

> > qi);

> > -

> > -				i++;

> > -				id++;

> > -				continue;

> > -			}

> > -

> >  			/* Low priorities have smaller batch size

> > to limit

> >  			 * head of line blocking latency. */

> >  			if (odp_unlikely(prio >

> > ODP_SCHED_PRIO_DEFAULT))

> > @@ -845,6 +864,70 @@ static int do_schedule(odp_queue_t *out_queue,

> > odp_event_t out_ev[],

> >  		}

> >  	}

> >

> > +	return 0;

> > +}

> > +

> > +/*

> > + * Schedule queues

> > + */

> > +static inline int do_schedule(odp_queue_t *out_queue, odp_event_t

> > out_ev[],

> > +			      unsigned int max_num)

> > +{

> > +	int i, num_grp;

> > +	int ret;

> > +	int id, first, grp_id;

> > +	uint16_t round;

> > +	uint32_t epoch;

> > +

> > +	if (sched_local.num) {

> > +		ret = copy_events(out_ev, max_num);

> > +

> > +		if (out_queue)

> > +			*out_queue = sched_local.queue;

> > +

> > +		return ret;

> > +	}

> > +

> > +	schedule_release_context();

> > +

> > +	if (odp_unlikely(sched_local.pause))

> > +		return 0;

> > +

> > +	/* Each thread prefers a priority queue. Poll weight table

> > avoids

> > +	 * starvation of other priority queues on low thread counts. */

> > +	round = sched_local.round + 1;

> > +

> > +	if (odp_unlikely(round == WEIGHT_TBL_SIZE))

> > +		round = 0;

> > +

> > +	sched_local.round = round;

> > +	first = sched_local.weight_tbl[round];

> > +

> > +	epoch = odp_atomic_load_acq_u32(&sched->grp_epoch);

> > +	num_grp = sched_local.num_grp;

> > +

> > +	if (odp_unlikely(sched_local.grp_epoch != epoch)) {

> > +		num_grp = grp_update_tbl();

> > +		sched_local.grp_epoch = epoch;

> > +	}

> > +

> > +	grp_id = sched_local.grp_weight[round];

> > +

> > +	/* Schedule queues per group and priority */

> > +	for (i = 0; i < num_grp; i++) {

> > +		int grp;

> > +

> > +		grp = sched_local.grp[grp_id];

> > +		ret = do_schedule_grp(out_queue, out_ev, max_num,

> > grp, first);

> > +

> > +		if (odp_likely(ret))

> > +			return ret;

> > +

> > +		grp_id++;

> > +		if (odp_unlikely(grp_id >= num_grp))

> > +			grp_id = 0;

> > +	}

> > +

> >  	/*

> >  	 * Poll packet input when there are no events

> >  	 *   * Each thread starts the search for a poll command from

> > its

> > @@ -1050,7 +1133,8 @@ static odp_schedule_group_t

> > schedule_group_create(const char *name,

> >

> > 	ODP_SCHED_GROUP_NAME_LEN - 1);

> >

> > 	grp_name[ODP_SCHED_GROUP_NAME_LEN - 1] = 0;

> >  			}

> > -			odp_thrmask_copy(&sched-

> > >sched_grp[i].mask, mask);

> > +

> > +			grp_update_mask(i, mask);

> >  			group = (odp_schedule_group_t)i;

> >  			sched->sched_grp[i].allocated = 1;

> >  			break;

> > @@ -1063,13 +1147,16 @@ static odp_schedule_group_t

> > schedule_group_create(const char *name,

> >

> >  static int schedule_group_destroy(odp_schedule_group_t group)

> >  {

> > +	odp_thrmask_t zero;

> >  	int ret;

> >

> > +	odp_thrmask_zero(&zero);

> > +

> >  	odp_spinlock_lock(&sched->grp_lock);

> >

> >  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

> >  	    sched->sched_grp[group].allocated) {

> > -		odp_thrmask_zero(&sched->sched_grp[group].mask);

> > +		grp_update_mask(group, &zero);

> >  		memset(sched->sched_grp[group].name, 0,

> >  		       ODP_SCHED_GROUP_NAME_LEN);

> >  		sched->sched_grp[group].allocated = 0;

> > @@ -1109,9 +1196,11 @@ static int

> schedule_group_join(odp_schedule_group_t

> > group,

> >

> >  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

> >  	    sched->sched_grp[group].allocated) {

> > -		odp_thrmask_or(&sched->sched_grp[group].mask,

> > -			       &sched->sched_grp[group].mask,

> > -			       mask);

> > +		odp_thrmask_t new_mask;

> > +

> > +		odp_thrmask_or(&new_mask, &sched-

> > >sched_grp[group].mask, mask);

> > +		grp_update_mask(group, &new_mask);

> > +

> >  		ret = 0;

> >  	} else {

> >  		ret = -1;

> > @@ -1124,18 +1213,19 @@ static int

> > schedule_group_join(odp_schedule_group_t group,

> >  static int schedule_group_leave(odp_schedule_group_t group,

> >  				const odp_thrmask_t *mask)

> >  {

> > +	odp_thrmask_t new_mask;

> >  	int ret;

> >

> > +	odp_thrmask_xor(&new_mask, mask, &sched->mask_all);

> > +

> >  	odp_spinlock_lock(&sched->grp_lock);

> >

> >  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

> >  	    sched->sched_grp[group].allocated) {

> > -		odp_thrmask_t leavemask;

> > +		odp_thrmask_and(&new_mask, &sched-

> > >sched_grp[group].mask,

> > +				&new_mask);

> > +		grp_update_mask(group, &new_mask);

> >

> > -		odp_thrmask_xor(&leavemask, mask, &sched->mask_all);

> > -		odp_thrmask_and(&sched->sched_grp[group].mask,

> > -				&sched->sched_grp[group].mask,

> > -				&leavemask);

> >  		ret = 0;

> >  	} else {

> >  		ret = -1;

> > @@ -1186,12 +1276,19 @@ static int

> > schedule_group_info(odp_schedule_group_t group,

> >

> >  static int schedule_thr_add(odp_schedule_group_t group, int thr)

> >  {

> > +	odp_thrmask_t mask;

> > +	odp_thrmask_t new_mask;

> > +

> >  	if (group < 0 || group >= SCHED_GROUP_NAMED)

> >  		return -1;

> >

> > +	odp_thrmask_zero(&mask);

> > +	odp_thrmask_set(&mask, thr);

> > +

> >  	odp_spinlock_lock(&sched->grp_lock);

> >

> > -	odp_thrmask_set(&sched->sched_grp[group].mask, thr);

> > +	odp_thrmask_or(&new_mask, &sched->sched_grp[group].mask,

> > &mask);

> > +	grp_update_mask(group, &new_mask);

> >

> >  	odp_spinlock_unlock(&sched->grp_lock);

> >

> > @@ -1200,12 +1297,20 @@ static int schedule_thr_add(odp_schedule_group_t

> > group, int thr)

> >

> >  static int schedule_thr_rem(odp_schedule_group_t group, int thr)

> >  {

> > +	odp_thrmask_t mask;

> > +	odp_thrmask_t new_mask;

> > +

> >  	if (group < 0 || group >= SCHED_GROUP_NAMED)

> >  		return -1;

> >

> > +	odp_thrmask_zero(&mask);

> > +	odp_thrmask_set(&mask, thr);

> > +	odp_thrmask_xor(&new_mask, &mask, &sched->mask_all);

> > +

> >  	odp_spinlock_lock(&sched->grp_lock);

> >

> > -	odp_thrmask_clr(&sched->sched_grp[group].mask, thr);

> > +	odp_thrmask_and(&new_mask, &sched->sched_grp[group].mask,

> > &new_mask);

> > +	grp_update_mask(group, &new_mask);

> >

> >  	odp_spinlock_unlock(&sched->grp_lock);

> >

> > @@ -1219,9 +1324,10 @@ static void schedule_prefetch(int num ODP_UNUSED)

> >

> >  static int schedule_sched_queue(uint32_t queue_index)

> >  {

> > +	int grp            = sched->queue[queue_index].grp;

> >  	int prio           = sched->queue[queue_index].prio;

> >  	int queue_per_prio = sched->queue[queue_index].queue_per_prio;

> > -	ring_t *ring       = &sched->prio_q[prio][queue_per_prio].ring;

> > +	ring_t *ring       = &sched-

> > >prio_q[grp][prio][queue_per_prio].ring;

> >

> >  	ring_enq(ring, PRIO_QUEUE_MASK, queue_index);

> >  	return 0;

> > --

> > 2.8.1
Wallen, Carl (Nokia - FI/Espoo) April 20, 2017, 12:11 p.m. UTC | #3
For the entire patch set:
Reviewed-and-tested-by: Carl Wallén <carl.wallen@nokia.com>

-----Original Message-----
From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of Savolainen, Petri (Nokia - FI/Espoo)

Sent: Thursday, April 20, 2017 9:38 AM
To: lng-odp@lists.linaro.org
Subject: Re: [lng-odp] [PATCH 3/3] linux-gen: sched: optimize group scheduling

Ping. Fixes bug https://bugs.linaro.org/show_bug.cgi?id=2945



> -----Original Message-----

> From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of

> Savolainen, Petri (Nokia - FI/Espoo)

> Sent: Wednesday, April 12, 2017 12:57 PM

> To: lng-odp@lists.linaro.org

> Subject: Re: [lng-odp] [PATCH 3/3] linux-gen: sched: optimize group

> scheduling

> 

> Ping.

> 

> This patch set removes the non-deterministic latency, lower QoS and

> potential queue starvation that is caused by this code ...

> 

> -			if (grp > ODP_SCHED_GROUP_ALL &&

> -			    !odp_thrmask_isset(&sched-

> >sched_grp[grp].mask,

> -

> sched_local.thr)) {

> -				/* This thread is not eligible

> for work from

> -				 * this queue, so continue

> scheduling it.

> -				 */

> -				ring_enq(ring, PRIO_QUEUE_MASK,

> qi);

> -

> -				i++;

> -				id++;

> -				continue;

> -			}

> 

> 

> ... which sends queues of "wrong" group back to the end of the priority

> queue. If e.g. tens of threads are sending it back and only one thread

> would accept it, it's actually very likely that queue service level is

> much lower than it should be.

> 

> Improved latency can be seen already with the new l2fwd -g option.

> 

> -Petri

> 

> 

> > -----Original Message-----

> > From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of

> Petri

> > Savolainen

> > Sent: Thursday, April 06, 2017 2:59 PM

> > To: lng-odp@lists.linaro.org

> > Subject: [lng-odp] [PATCH 3/3] linux-gen: sched: optimize group

> scheduling

> >

> > Use separate priority queues for different groups. Sharing

> > the same priority queue over multiple groups caused multiple

> > issues:

> > * latency and ordering issues when threads push back

> >   events (from wrong groups) to the tail of the priority queue

> > * unnecessary contention (scaling issues) when threads belong

> >   to different groups

> >

> > Lowered the maximum number of groups from 256 to 32 (in the default

> > configuration) to limit memory usage of priority queues. This should

> > be enough for the most users.

> >

> > Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

> > ---

> >  platform/linux-generic/odp_schedule.c | 284 +++++++++++++++++++++++----

> --

> > -----

> >  1 file changed, 195 insertions(+), 89 deletions(-)

> >

> > diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-

> > generic/odp_schedule.c

> > index e7079b9..f366e7e 100644

> > --- a/platform/linux-generic/odp_schedule.c

> > +++ b/platform/linux-generic/odp_schedule.c

> > @@ -34,7 +34,7 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&

> >  		  "normal_prio_is_not_between_highest_and_lowest");

> >

> >  /* Number of scheduling groups */

> > -#define NUM_SCHED_GRPS 256

> > +#define NUM_SCHED_GRPS 32

> >

> >  /* Priority queues per priority */

> >  #define QUEUES_PER_PRIO  4

> > @@ -163,7 +163,11 @@ typedef struct {

> >  		ordered_stash_t stash[MAX_ORDERED_STASH];

> >  	} ordered;

> >

> > +	uint32_t grp_epoch;

> > +	int num_grp;

> > +	uint8_t grp[NUM_SCHED_GRPS];

> >  	uint8_t weight_tbl[WEIGHT_TBL_SIZE];

> > +	uint8_t grp_weight[WEIGHT_TBL_SIZE];

> >

> >  } sched_local_t;

> >

> > @@ -199,7 +203,7 @@ typedef struct {

> >  	pri_mask_t     pri_mask[NUM_PRIO];

> >  	odp_spinlock_t mask_lock;

> >

> > -	prio_queue_t   prio_q[NUM_PRIO][QUEUES_PER_PRIO];

> > +	prio_queue_t

> > prio_q[NUM_SCHED_GRPS][NUM_PRIO][QUEUES_PER_PRIO];

> >

> >  	odp_spinlock_t poll_cmd_lock;

> >  	/* Number of commands in a command queue */

> > @@ -214,8 +218,10 @@ typedef struct {

> >  	odp_shm_t      shm;

> >  	uint32_t       pri_count[NUM_PRIO][QUEUES_PER_PRIO];

> >

> > -	odp_spinlock_t grp_lock;

> > -	odp_thrmask_t mask_all;

> > +	odp_thrmask_t    mask_all;

> > +	odp_spinlock_t   grp_lock;

> > +	odp_atomic_u32_t grp_epoch;

> > +

> >  	struct {

> >  		char           name[ODP_SCHED_GROUP_NAME_LEN];

> >  		odp_thrmask_t  mask;

> > @@ -223,6 +229,7 @@ typedef struct {

> >  	} sched_grp[NUM_SCHED_GRPS];

> >

> >  	struct {

> > +		int         grp;

> >  		int         prio;

> >  		int         queue_per_prio;

> >  	} queue[ODP_CONFIG_QUEUES];

> > @@ -273,7 +280,7 @@ static void sched_local_init(void)

> >  static int schedule_init_global(void)

> >  {

> >  	odp_shm_t shm;

> > -	int i, j;

> > +	int i, j, grp;

> >

> >  	ODP_DBG("Schedule init ... ");

> >

> > @@ -293,15 +300,20 @@ static int schedule_init_global(void)

> >  	sched->shm  = shm;

> >  	odp_spinlock_init(&sched->mask_lock);

> >

> > -	for (i = 0; i < NUM_PRIO; i++) {

> > -		for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > -			int k;

> > +	for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {

> > +		for (i = 0; i < NUM_PRIO; i++) {

> > +			for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > +				prio_queue_t *prio_q;

> > +				int k;

> >

> > -			ring_init(&sched->prio_q[i][j].ring);

> > +				prio_q = &sched-

> > >prio_q[grp][i][j];

> > +				ring_init(&prio_q->ring);

> >

> > -			for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++)

> > -				sched-

> > >prio_q[i][j].queue_index[k] =

> > -				PRIO_QUEUE_EMPTY;

> > +				for (k = 0; k <

> > PRIO_QUEUE_RING_SIZE; k++) {

> > +					prio_q-

> > >queue_index[k] =

> > +					PRIO_QUEUE_EMPTY;

> > +				}

> > +			}

> >  		}

> >  	}

> >

> > @@ -317,12 +329,17 @@ static int schedule_init_global(void)

> >  		sched->pktio_cmd[i].cmd_index = PKTIO_CMD_FREE;

> >

> >  	odp_spinlock_init(&sched->grp_lock);

> > +	odp_atomic_init_u32(&sched->grp_epoch, 0);

> >

> >  	for (i = 0; i < NUM_SCHED_GRPS; i++) {

> >  		memset(sched->sched_grp[i].name, 0,

> > ODP_SCHED_GROUP_NAME_LEN);

> >  		odp_thrmask_zero(&sched->sched_grp[i].mask);

> >  	}

> >

> > +	sched->sched_grp[ODP_SCHED_GROUP_ALL].allocated = 1;

> > +	sched->sched_grp[ODP_SCHED_GROUP_WORKER].allocated = 1;

> > +	sched->sched_grp[ODP_SCHED_GROUP_CONTROL].allocated = 1;

> > +

> >  	odp_thrmask_setall(&sched->mask_all);

> >

> >  	ODP_DBG("done\n");

> > @@ -330,29 +347,38 @@ static int schedule_init_global(void)

> >  	return 0;

> >  }

> >

> > +static inline void queue_destroy_finalize(uint32_t qi)

> > +{

> > +	sched_cb_queue_destroy_finalize(qi);

> > +}

> > +

> >  static int schedule_term_global(void)

> >  {

> >  	int ret = 0;

> >  	int rc = 0;

> > -	int i, j;

> > +	int i, j, grp;

> >

> > -	for (i = 0; i < NUM_PRIO; i++) {

> > -		for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > -			ring_t *ring = &sched->prio_q[i][j].ring;

> > -			uint32_t qi;

> > +	for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {

> > +		for (i = 0; i < NUM_PRIO; i++) {

> > +			for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > +				ring_t *ring = &sched-

> > >prio_q[grp][i][j].ring;

> > +				uint32_t qi;

> >

> > -			while ((qi = ring_deq(ring,

> > PRIO_QUEUE_MASK)) !=

> > -			       RING_EMPTY) {

> > -				odp_event_t events[1];

> > -				int num;

> > +				while ((qi = ring_deq(ring,

> > PRIO_QUEUE_MASK)) !=

> > +				       RING_EMPTY) {

> > +					odp_event_t

> > events[1];

> > +					int num;

> >

> > -				num =

> > sched_cb_queue_deq_multi(qi, events, 1);

> > +					num =

> > sched_cb_queue_deq_multi(qi,

> > +

> > 		       events,

> > +

> > 		       1);

> >

> > -				if (num < 0)

> > -

> > 	sched_cb_queue_destroy_finalize(qi);

> > +					if (num < 0)

> > +

> > 	queue_destroy_finalize(qi);

> >

> > -				if (num > 0)

> > -					ODP_ERR("Queue not

> > empty\n");

> > +					if (num > 0)

> > +

> > 	ODP_ERR("Queue not empty\n");

> > +				}

> >  			}

> >  		}

> >  	}

> > @@ -383,6 +409,40 @@ static int schedule_term_local(void)

> >  	return 0;

> >  }

> >

> > +static inline void grp_update_mask(int grp, const odp_thrmask_t

> > *new_mask)

> > +{

> > +	odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask);

> > +	odp_atomic_add_rel_u32(&sched->grp_epoch, 1);

> > +}

> > +

> > +static inline int grp_update_tbl(void)

> > +{

> > +	int i;

> > +	int num = 0;

> > +	int thr = sched_local.thr;

> > +

> > +	odp_spinlock_lock(&sched->grp_lock);

> > +

> > +	for (i = 0; i < NUM_SCHED_GRPS; i++) {

> > +		if (sched->sched_grp[i].allocated == 0)

> > +			continue;

> > +

> > +		if (odp_thrmask_isset(&sched->sched_grp[i].mask,

> > thr)) {

> > +			sched_local.grp[num] = i;

> > +			num++;

> > +		}

> > +	}

> > +

> > +	odp_spinlock_unlock(&sched->grp_lock);

> > +

> > +	/* Update group weights. Round robin over all thread's groups.

> > */

> > +	for (i = 0; i < WEIGHT_TBL_SIZE; i++)

> > +		sched_local.grp_weight[i] = i % num;

> > +

> > +	sched_local.num_grp = num;

> > +	return num;

> > +}

> > +

> >  static unsigned schedule_max_ordered_locks(void)

> >  {

> >  	return MAX_ORDERED_LOCKS_PER_QUEUE;

> > @@ -433,6 +493,7 @@ static int schedule_init_queue(uint32_t queue_index,

> >  	int prio = sched_param->prio;

> >

> >  	pri_set_queue(queue_index, prio);

> > +	sched->queue[queue_index].grp  = sched_param->group;

> >  	sched->queue[queue_index].prio = prio;

> >  	sched->queue[queue_index].queue_per_prio =

> > queue_per_prio(queue_index);

> >

> > @@ -444,6 +505,7 @@ static void schedule_destroy_queue(uint32_t

> > queue_index)

> >  	int prio = sched->queue[queue_index].prio;

> >

> >  	pri_clr_queue(queue_index, prio);

> > +	sched->queue[queue_index].grp = 0;

> >  	sched->queue[queue_index].prio = 0;

> >  	sched->queue[queue_index].queue_per_prio = 0;

> >  }

> > @@ -535,9 +597,10 @@ static void schedule_release_atomic(void)

> >  	uint32_t qi = sched_local.queue_index;

> >

> >  	if (qi != PRIO_QUEUE_EMPTY && sched_local.num  == 0) {

> > -		int prio           = sched->queue[qi].prio;

> > +		int grp = sched->queue[qi].grp;

> > +		int prio = sched->queue[qi].prio;

> >  		int queue_per_prio = sched-

> > >queue[qi].queue_per_prio;

> > -		ring_t *ring       = &sched-

> > >prio_q[prio][queue_per_prio].ring;

> > +		ring_t *ring = &sched-

> > >prio_q[grp][prio][queue_per_prio].ring;

> >

> >  		/* Release current atomic queue */

> >  		ring_enq(ring, PRIO_QUEUE_MASK, qi);

> > @@ -688,42 +751,14 @@ static int schedule_ord_enq_multi(uint32_t

> > queue_index, void *buf_hdr[],

> >  	return 1;

> >  }

> >

> > -/*

> > - * Schedule queues

> > - */

> > -static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],

> > -		       unsigned int max_num)

> > +static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t

> > out_ev[],

> > +				  unsigned int max_num, int

> > grp, int first)

> >  {

> >  	int prio, i;

> >  	int ret;

> > -	int id, first;

> > +	int id;

> >  	unsigned int max_deq = MAX_DEQ;

> >  	uint32_t qi;

> > -	uint16_t round;

> > -

> > -	if (sched_local.num) {

> > -		ret = copy_events(out_ev, max_num);

> > -

> > -		if (out_queue)

> > -			*out_queue = sched_local.queue;

> > -

> > -		return ret;

> > -	}

> > -

> > -	schedule_release_context();

> > -

> > -	if (odp_unlikely(sched_local.pause))

> > -		return 0;

> > -

> > -	/* Each thread prefers a priority queue. Poll weight table

> > avoids

> > -	 * starvation of other priority queues on low thread counts. */

> > -	round = sched_local.round + 1;

> > -

> > -	if (odp_unlikely(round == WEIGHT_TBL_SIZE))

> > -		round = 0;

> > -

> > -	sched_local.round = round;

> > -	first = sched_local.weight_tbl[round];

> >

> >  	/* Schedule events */

> >  	for (prio = 0; prio < NUM_PRIO; prio++) {

> > @@ -736,7 +771,6 @@ static int do_schedule(odp_queue_t *out_queue,

> > odp_event_t out_ev[],

> >

> >  		for (i = 0; i < QUEUES_PER_PRIO;) {

> >  			int num;

> > -			int grp;

> >  			int ordered;

> >  			odp_queue_t handle;

> >  			ring_t *ring;

> > @@ -753,7 +787,7 @@ static int do_schedule(odp_queue_t *out_queue,

> > odp_event_t out_ev[],

> >  			}

> >

> >  			/* Get queue index from the priority queue

> > */

> > -			ring = &sched->prio_q[prio][id].ring;

> > +			ring = &sched->prio_q[grp][prio][id].ring;

> >  			qi   = ring_deq(ring, PRIO_QUEUE_MASK);

> >

> >  			/* Priority queue empty */

> > @@ -763,21 +797,6 @@ static int do_schedule(odp_queue_t *out_queue,

> > odp_event_t out_ev[],

> >  				continue;

> >  			}

> >

> > -			grp = sched_cb_queue_grp(qi);

> > -

> > -			if (grp > ODP_SCHED_GROUP_ALL &&

> > -			    !odp_thrmask_isset(&sched-

> > >sched_grp[grp].mask,

> > -

> > sched_local.thr)) {

> > -				/* This thread is not eligible

> > for work from

> > -				 * this queue, so continue

> > scheduling it.

> > -				 */

> > -				ring_enq(ring, PRIO_QUEUE_MASK,

> > qi);

> > -

> > -				i++;

> > -				id++;

> > -				continue;

> > -			}

> > -

> >  			/* Low priorities have smaller batch size

> > to limit

> >  			 * head of line blocking latency. */

> >  			if (odp_unlikely(prio >

> > ODP_SCHED_PRIO_DEFAULT))

> > @@ -845,6 +864,70 @@ static int do_schedule(odp_queue_t *out_queue,

> > odp_event_t out_ev[],

> >  		}

> >  	}

> >

> > +	return 0;

> > +}

> > +

> > +/*

> > + * Schedule queues

> > + */

> > +static inline int do_schedule(odp_queue_t *out_queue, odp_event_t

> > out_ev[],

> > +			      unsigned int max_num)

> > +{

> > +	int i, num_grp;

> > +	int ret;

> > +	int id, first, grp_id;

> > +	uint16_t round;

> > +	uint32_t epoch;

> > +

> > +	if (sched_local.num) {

> > +		ret = copy_events(out_ev, max_num);

> > +

> > +		if (out_queue)

> > +			*out_queue = sched_local.queue;

> > +

> > +		return ret;

> > +	}

> > +

> > +	schedule_release_context();

> > +

> > +	if (odp_unlikely(sched_local.pause))

> > +		return 0;

> > +

> > +	/* Each thread prefers a priority queue. Poll weight table

> > avoids

> > +	 * starvation of other priority queues on low thread counts. */

> > +	round = sched_local.round + 1;

> > +

> > +	if (odp_unlikely(round == WEIGHT_TBL_SIZE))

> > +		round = 0;

> > +

> > +	sched_local.round = round;

> > +	first = sched_local.weight_tbl[round];

> > +

> > +	epoch = odp_atomic_load_acq_u32(&sched->grp_epoch);

> > +	num_grp = sched_local.num_grp;

> > +

> > +	if (odp_unlikely(sched_local.grp_epoch != epoch)) {

> > +		num_grp = grp_update_tbl();

> > +		sched_local.grp_epoch = epoch;

> > +	}

> > +

> > +	grp_id = sched_local.grp_weight[round];

> > +

> > +	/* Schedule queues per group and priority */

> > +	for (i = 0; i < num_grp; i++) {

> > +		int grp;

> > +

> > +		grp = sched_local.grp[grp_id];

> > +		ret = do_schedule_grp(out_queue, out_ev, max_num,

> > grp, first);

> > +

> > +		if (odp_likely(ret))

> > +			return ret;

> > +

> > +		grp_id++;

> > +		if (odp_unlikely(grp_id >= num_grp))

> > +			grp_id = 0;

> > +	}

> > +

> >  	/*

> >  	 * Poll packet input when there are no events

> >  	 *   * Each thread starts the search for a poll command from

> > its

> > @@ -1050,7 +1133,8 @@ static odp_schedule_group_t

> > schedule_group_create(const char *name,

> >

> > 	ODP_SCHED_GROUP_NAME_LEN - 1);

> >

> > 	grp_name[ODP_SCHED_GROUP_NAME_LEN - 1] = 0;

> >  			}

> > -			odp_thrmask_copy(&sched-

> > >sched_grp[i].mask, mask);

> > +

> > +			grp_update_mask(i, mask);

> >  			group = (odp_schedule_group_t)i;

> >  			sched->sched_grp[i].allocated = 1;

> >  			break;

> > @@ -1063,13 +1147,16 @@ static odp_schedule_group_t

> > schedule_group_create(const char *name,

> >

> >  static int schedule_group_destroy(odp_schedule_group_t group)

> >  {

> > +	odp_thrmask_t zero;

> >  	int ret;

> >

> > +	odp_thrmask_zero(&zero);

> > +

> >  	odp_spinlock_lock(&sched->grp_lock);

> >

> >  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

> >  	    sched->sched_grp[group].allocated) {

> > -		odp_thrmask_zero(&sched->sched_grp[group].mask);

> > +		grp_update_mask(group, &zero);

> >  		memset(sched->sched_grp[group].name, 0,

> >  		       ODP_SCHED_GROUP_NAME_LEN);

> >  		sched->sched_grp[group].allocated = 0;

> > @@ -1109,9 +1196,11 @@ static int

> schedule_group_join(odp_schedule_group_t

> > group,

> >

> >  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

> >  	    sched->sched_grp[group].allocated) {

> > -		odp_thrmask_or(&sched->sched_grp[group].mask,

> > -			       &sched->sched_grp[group].mask,

> > -			       mask);

> > +		odp_thrmask_t new_mask;

> > +

> > +		odp_thrmask_or(&new_mask, &sched-

> > >sched_grp[group].mask, mask);

> > +		grp_update_mask(group, &new_mask);

> > +

> >  		ret = 0;

> >  	} else {

> >  		ret = -1;

> > @@ -1124,18 +1213,19 @@ static int

> > schedule_group_join(odp_schedule_group_t group,

> >  static int schedule_group_leave(odp_schedule_group_t group,

> >  				const odp_thrmask_t *mask)

> >  {

> > +	odp_thrmask_t new_mask;

> >  	int ret;

> >

> > +	odp_thrmask_xor(&new_mask, mask, &sched->mask_all);

> > +

> >  	odp_spinlock_lock(&sched->grp_lock);

> >

> >  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

> >  	    sched->sched_grp[group].allocated) {

> > -		odp_thrmask_t leavemask;

> > +		odp_thrmask_and(&new_mask, &sched-

> > >sched_grp[group].mask,

> > +				&new_mask);

> > +		grp_update_mask(group, &new_mask);

> >

> > -		odp_thrmask_xor(&leavemask, mask, &sched->mask_all);

> > -		odp_thrmask_and(&sched->sched_grp[group].mask,

> > -				&sched->sched_grp[group].mask,

> > -				&leavemask);

> >  		ret = 0;

> >  	} else {

> >  		ret = -1;

> > @@ -1186,12 +1276,19 @@ static int

> > schedule_group_info(odp_schedule_group_t group,

> >

> >  static int schedule_thr_add(odp_schedule_group_t group, int thr)

> >  {

> > +	odp_thrmask_t mask;

> > +	odp_thrmask_t new_mask;

> > +

> >  	if (group < 0 || group >= SCHED_GROUP_NAMED)

> >  		return -1;

> >

> > +	odp_thrmask_zero(&mask);

> > +	odp_thrmask_set(&mask, thr);

> > +

> >  	odp_spinlock_lock(&sched->grp_lock);

> >

> > -	odp_thrmask_set(&sched->sched_grp[group].mask, thr);

> > +	odp_thrmask_or(&new_mask, &sched->sched_grp[group].mask,

> > &mask);

> > +	grp_update_mask(group, &new_mask);

> >

> >  	odp_spinlock_unlock(&sched->grp_lock);

> >

> > @@ -1200,12 +1297,20 @@ static int schedule_thr_add(odp_schedule_group_t

> > group, int thr)

> >

> >  static int schedule_thr_rem(odp_schedule_group_t group, int thr)

> >  {

> > +	odp_thrmask_t mask;

> > +	odp_thrmask_t new_mask;

> > +

> >  	if (group < 0 || group >= SCHED_GROUP_NAMED)

> >  		return -1;

> >

> > +	odp_thrmask_zero(&mask);

> > +	odp_thrmask_set(&mask, thr);

> > +	odp_thrmask_xor(&new_mask, &mask, &sched->mask_all);

> > +

> >  	odp_spinlock_lock(&sched->grp_lock);

> >

> > -	odp_thrmask_clr(&sched->sched_grp[group].mask, thr);

> > +	odp_thrmask_and(&new_mask, &sched->sched_grp[group].mask,

> > &new_mask);

> > +	grp_update_mask(group, &new_mask);

> >

> >  	odp_spinlock_unlock(&sched->grp_lock);

> >

> > @@ -1219,9 +1324,10 @@ static void schedule_prefetch(int num ODP_UNUSED)

> >

> >  static int schedule_sched_queue(uint32_t queue_index)

> >  {

> > +	int grp            = sched->queue[queue_index].grp;

> >  	int prio           = sched->queue[queue_index].prio;

> >  	int queue_per_prio = sched->queue[queue_index].queue_per_prio;

> > -	ring_t *ring       = &sched->prio_q[prio][queue_per_prio].ring;

> > +	ring_t *ring       = &sched-

> > >prio_q[grp][prio][queue_per_prio].ring;

> >

> >  	ring_enq(ring, PRIO_QUEUE_MASK, queue_index);

> >  	return 0;

> > --

> > 2.8.1
Savolainen, Petri (Nokia - FI/Espoo) April 24, 2017, 11:06 a.m. UTC | #4
Ping.

> -----Original Message-----

> From: Wallen, Carl (Nokia - FI/Espoo)

> Sent: Thursday, April 20, 2017 3:11 PM

> To: lng-odp@lists.linaro.org

> Cc: Savolainen, Petri (Nokia - FI/Espoo) <petri.savolainen@nokia-bell-

> labs.com>

> Subject: RE: [lng-odp] [PATCH 3/3] linux-gen: sched: optimize group

> scheduling

> 

> For the entire patch set:

> Reviewed-and-tested-by: Carl Wallén <carl.wallen@nokia.com>

> 

> -----Original Message-----

> From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of

> Savolainen, Petri (Nokia - FI/Espoo)

> Sent: Thursday, April 20, 2017 9:38 AM

> To: lng-odp@lists.linaro.org

> Subject: Re: [lng-odp] [PATCH 3/3] linux-gen: sched: optimize group

> scheduling

> 

> Ping. Fixes bug https://bugs.linaro.org/show_bug.cgi?id=2945

> 

> 

> 

> > -----Original Message-----

> > From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of

> > Savolainen, Petri (Nokia - FI/Espoo)

> > Sent: Wednesday, April 12, 2017 12:57 PM

> > To: lng-odp@lists.linaro.org

> > Subject: Re: [lng-odp] [PATCH 3/3] linux-gen: sched: optimize group

> > scheduling

> >

> > Ping.

> >

> > This patch set removes the non-deterministic latency, lower QoS and

> > potential queue starvation that is caused by this code ...

> >

> > -			if (grp > ODP_SCHED_GROUP_ALL &&

> > -			    !odp_thrmask_isset(&sched-

> > >sched_grp[grp].mask,

> > -

> > sched_local.thr)) {

> > -				/* This thread is not eligible

> > for work from

> > -				 * this queue, so continue

> > scheduling it.

> > -				 */

> > -				ring_enq(ring, PRIO_QUEUE_MASK,

> > qi);

> > -

> > -				i++;

> > -				id++;

> > -				continue;

> > -			}

> >

> >

> > ... which sends queues of "wrong" group back to the end of the priority

> > queue. If e.g. tens of threads are sending it back and only one thread

> > would accept it, it's actually very likely that queue service level is

> > much lower than it should be.

> >

> > Improved latency can be seen already with the new l2fwd -g option.

> >

> > -Petri

> >

> >

> > > -----Original Message-----

> > > From: lng-odp [mailto:lng-odp-bounces@lists.linaro.org] On Behalf Of

> > Petri

> > > Savolainen

> > > Sent: Thursday, April 06, 2017 2:59 PM

> > > To: lng-odp@lists.linaro.org

> > > Subject: [lng-odp] [PATCH 3/3] linux-gen: sched: optimize group

> > scheduling

> > >

> > > Use separate priority queues for different groups. Sharing

> > > the same priority queue over multiple groups caused multiple

> > > issues:

> > > * latency and ordering issues when threads push back

> > >   events (from wrong groups) to the tail of the priority queue

> > > * unnecessary contention (scaling issues) when threads belong

> > >   to different groups

> > >

> > > Lowered the maximum number of groups from 256 to 32 (in the default

> > > configuration) to limit memory usage of priority queues. This should

> > > be enough for the most users.

> > >

> > > Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

> > > ---

> > >  platform/linux-generic/odp_schedule.c | 284 +++++++++++++++++++++++--

> --

> > --

> > > -----

> > >  1 file changed, 195 insertions(+), 89 deletions(-)

> > >

> > > diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-

> > > generic/odp_schedule.c

> > > index e7079b9..f366e7e 100644

> > > --- a/platform/linux-generic/odp_schedule.c

> > > +++ b/platform/linux-generic/odp_schedule.c

> > > @@ -34,7 +34,7 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&

> > >  		  "normal_prio_is_not_between_highest_and_lowest");

> > >

> > >  /* Number of scheduling groups */

> > > -#define NUM_SCHED_GRPS 256

> > > +#define NUM_SCHED_GRPS 32

> > >

> > >  /* Priority queues per priority */

> > >  #define QUEUES_PER_PRIO  4

> > > @@ -163,7 +163,11 @@ typedef struct {

> > >  		ordered_stash_t stash[MAX_ORDERED_STASH];

> > >  	} ordered;

> > >

> > > +	uint32_t grp_epoch;

> > > +	int num_grp;

> > > +	uint8_t grp[NUM_SCHED_GRPS];

> > >  	uint8_t weight_tbl[WEIGHT_TBL_SIZE];

> > > +	uint8_t grp_weight[WEIGHT_TBL_SIZE];

> > >

> > >  } sched_local_t;

> > >

> > > @@ -199,7 +203,7 @@ typedef struct {

> > >  	pri_mask_t     pri_mask[NUM_PRIO];

> > >  	odp_spinlock_t mask_lock;

> > >

> > > -	prio_queue_t   prio_q[NUM_PRIO][QUEUES_PER_PRIO];

> > > +	prio_queue_t

> > > prio_q[NUM_SCHED_GRPS][NUM_PRIO][QUEUES_PER_PRIO];

> > >

> > >  	odp_spinlock_t poll_cmd_lock;

> > >  	/* Number of commands in a command queue */

> > > @@ -214,8 +218,10 @@ typedef struct {

> > >  	odp_shm_t      shm;

> > >  	uint32_t       pri_count[NUM_PRIO][QUEUES_PER_PRIO];

> > >

> > > -	odp_spinlock_t grp_lock;

> > > -	odp_thrmask_t mask_all;

> > > +	odp_thrmask_t    mask_all;

> > > +	odp_spinlock_t   grp_lock;

> > > +	odp_atomic_u32_t grp_epoch;

> > > +

> > >  	struct {

> > >  		char           name[ODP_SCHED_GROUP_NAME_LEN];

> > >  		odp_thrmask_t  mask;

> > > @@ -223,6 +229,7 @@ typedef struct {

> > >  	} sched_grp[NUM_SCHED_GRPS];

> > >

> > >  	struct {

> > > +		int         grp;

> > >  		int         prio;

> > >  		int         queue_per_prio;

> > >  	} queue[ODP_CONFIG_QUEUES];

> > > @@ -273,7 +280,7 @@ static void sched_local_init(void)

> > >  static int schedule_init_global(void)

> > >  {

> > >  	odp_shm_t shm;

> > > -	int i, j;

> > > +	int i, j, grp;

> > >

> > >  	ODP_DBG("Schedule init ... ");

> > >

> > > @@ -293,15 +300,20 @@ static int schedule_init_global(void)

> > >  	sched->shm  = shm;

> > >  	odp_spinlock_init(&sched->mask_lock);

> > >

> > > -	for (i = 0; i < NUM_PRIO; i++) {

> > > -		for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > > -			int k;

> > > +	for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {

> > > +		for (i = 0; i < NUM_PRIO; i++) {

> > > +			for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > > +				prio_queue_t *prio_q;

> > > +				int k;

> > >

> > > -			ring_init(&sched->prio_q[i][j].ring);

> > > +				prio_q = &sched-

> > > >prio_q[grp][i][j];

> > > +				ring_init(&prio_q->ring);

> > >

> > > -			for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++)

> > > -				sched-

> > > >prio_q[i][j].queue_index[k] =

> > > -				PRIO_QUEUE_EMPTY;

> > > +				for (k = 0; k <

> > > PRIO_QUEUE_RING_SIZE; k++) {

> > > +					prio_q-

> > > >queue_index[k] =

> > > +					PRIO_QUEUE_EMPTY;

> > > +				}

> > > +			}

> > >  		}

> > >  	}

> > >

> > > @@ -317,12 +329,17 @@ static int schedule_init_global(void)

> > >  		sched->pktio_cmd[i].cmd_index = PKTIO_CMD_FREE;

> > >

> > >  	odp_spinlock_init(&sched->grp_lock);

> > > +	odp_atomic_init_u32(&sched->grp_epoch, 0);

> > >

> > >  	for (i = 0; i < NUM_SCHED_GRPS; i++) {

> > >  		memset(sched->sched_grp[i].name, 0,

> > > ODP_SCHED_GROUP_NAME_LEN);

> > >  		odp_thrmask_zero(&sched->sched_grp[i].mask);

> > >  	}

> > >

> > > +	sched->sched_grp[ODP_SCHED_GROUP_ALL].allocated = 1;

> > > +	sched->sched_grp[ODP_SCHED_GROUP_WORKER].allocated = 1;

> > > +	sched->sched_grp[ODP_SCHED_GROUP_CONTROL].allocated = 1;

> > > +

> > >  	odp_thrmask_setall(&sched->mask_all);

> > >

> > >  	ODP_DBG("done\n");

> > > @@ -330,29 +347,38 @@ static int schedule_init_global(void)

> > >  	return 0;

> > >  }

> > >

> > > +static inline void queue_destroy_finalize(uint32_t qi)

> > > +{

> > > +	sched_cb_queue_destroy_finalize(qi);

> > > +}

> > > +

> > >  static int schedule_term_global(void)

> > >  {

> > >  	int ret = 0;

> > >  	int rc = 0;

> > > -	int i, j;

> > > +	int i, j, grp;

> > >

> > > -	for (i = 0; i < NUM_PRIO; i++) {

> > > -		for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > > -			ring_t *ring = &sched->prio_q[i][j].ring;

> > > -			uint32_t qi;

> > > +	for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {

> > > +		for (i = 0; i < NUM_PRIO; i++) {

> > > +			for (j = 0; j < QUEUES_PER_PRIO; j++) {

> > > +				ring_t *ring = &sched-

> > > >prio_q[grp][i][j].ring;

> > > +				uint32_t qi;

> > >

> > > -			while ((qi = ring_deq(ring,

> > > PRIO_QUEUE_MASK)) !=

> > > -			       RING_EMPTY) {

> > > -				odp_event_t events[1];

> > > -				int num;

> > > +				while ((qi = ring_deq(ring,

> > > PRIO_QUEUE_MASK)) !=

> > > +				       RING_EMPTY) {

> > > +					odp_event_t

> > > events[1];

> > > +					int num;

> > >

> > > -				num =

> > > sched_cb_queue_deq_multi(qi, events, 1);

> > > +					num =

> > > sched_cb_queue_deq_multi(qi,

> > > +

> > > 		       events,

> > > +

> > > 		       1);

> > >

> > > -				if (num < 0)

> > > -

> > > 	sched_cb_queue_destroy_finalize(qi);

> > > +					if (num < 0)

> > > +

> > > 	queue_destroy_finalize(qi);

> > >

> > > -				if (num > 0)

> > > -					ODP_ERR("Queue not

> > > empty\n");

> > > +					if (num > 0)

> > > +

> > > 	ODP_ERR("Queue not empty\n");

> > > +				}

> > >  			}

> > >  		}

> > >  	}

> > > @@ -383,6 +409,40 @@ static int schedule_term_local(void)

> > >  	return 0;

> > >  }

> > >

> > > +static inline void grp_update_mask(int grp, const odp_thrmask_t

> > > *new_mask)

> > > +{

> > > +	odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask);

> > > +	odp_atomic_add_rel_u32(&sched->grp_epoch, 1);

> > > +}

> > > +

> > > +static inline int grp_update_tbl(void)

> > > +{

> > > +	int i;

> > > +	int num = 0;

> > > +	int thr = sched_local.thr;

> > > +

> > > +	odp_spinlock_lock(&sched->grp_lock);

> > > +

> > > +	for (i = 0; i < NUM_SCHED_GRPS; i++) {

> > > +		if (sched->sched_grp[i].allocated == 0)

> > > +			continue;

> > > +

> > > +		if (odp_thrmask_isset(&sched->sched_grp[i].mask,

> > > thr)) {

> > > +			sched_local.grp[num] = i;

> > > +			num++;

> > > +		}

> > > +	}

> > > +

> > > +	odp_spinlock_unlock(&sched->grp_lock);

> > > +

> > > +	/* Update group weights. Round robin over all thread's groups.

> > > */

> > > +	for (i = 0; i < WEIGHT_TBL_SIZE; i++)

> > > +		sched_local.grp_weight[i] = i % num;

> > > +

> > > +	sched_local.num_grp = num;

> > > +	return num;

> > > +}

> > > +

> > >  static unsigned schedule_max_ordered_locks(void)

> > >  {

> > >  	return MAX_ORDERED_LOCKS_PER_QUEUE;

> > > @@ -433,6 +493,7 @@ static int schedule_init_queue(uint32_t

> queue_index,

> > >  	int prio = sched_param->prio;

> > >

> > >  	pri_set_queue(queue_index, prio);

> > > +	sched->queue[queue_index].grp  = sched_param->group;

> > >  	sched->queue[queue_index].prio = prio;

> > >  	sched->queue[queue_index].queue_per_prio =

> > > queue_per_prio(queue_index);

> > >

> > > @@ -444,6 +505,7 @@ static void schedule_destroy_queue(uint32_t

> > > queue_index)

> > >  	int prio = sched->queue[queue_index].prio;

> > >

> > >  	pri_clr_queue(queue_index, prio);

> > > +	sched->queue[queue_index].grp = 0;

> > >  	sched->queue[queue_index].prio = 0;

> > >  	sched->queue[queue_index].queue_per_prio = 0;

> > >  }

> > > @@ -535,9 +597,10 @@ static void schedule_release_atomic(void)

> > >  	uint32_t qi = sched_local.queue_index;

> > >

> > >  	if (qi != PRIO_QUEUE_EMPTY && sched_local.num  == 0) {

> > > -		int prio           = sched->queue[qi].prio;

> > > +		int grp = sched->queue[qi].grp;

> > > +		int prio = sched->queue[qi].prio;

> > >  		int queue_per_prio = sched-

> > > >queue[qi].queue_per_prio;

> > > -		ring_t *ring       = &sched-

> > > >prio_q[prio][queue_per_prio].ring;

> > > +		ring_t *ring = &sched-

> > > >prio_q[grp][prio][queue_per_prio].ring;

> > >

> > >  		/* Release current atomic queue */

> > >  		ring_enq(ring, PRIO_QUEUE_MASK, qi);

> > > @@ -688,42 +751,14 @@ static int schedule_ord_enq_multi(uint32_t

> > > queue_index, void *buf_hdr[],

> > >  	return 1;

> > >  }

> > >

> > > -/*

> > > - * Schedule queues

> > > - */

> > > -static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],

> > > -		       unsigned int max_num)

> > > +static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t

> > > out_ev[],

> > > +				  unsigned int max_num, int

> > > grp, int first)

> > >  {

> > >  	int prio, i;

> > >  	int ret;

> > > -	int id, first;

> > > +	int id;

> > >  	unsigned int max_deq = MAX_DEQ;

> > >  	uint32_t qi;

> > > -	uint16_t round;

> > > -

> > > -	if (sched_local.num) {

> > > -		ret = copy_events(out_ev, max_num);

> > > -

> > > -		if (out_queue)

> > > -			*out_queue = sched_local.queue;

> > > -

> > > -		return ret;

> > > -	}

> > > -

> > > -	schedule_release_context();

> > > -

> > > -	if (odp_unlikely(sched_local.pause))

> > > -		return 0;

> > > -

> > > -	/* Each thread prefers a priority queue. Poll weight table

> > > avoids

> > > -	 * starvation of other priority queues on low thread counts. */

> > > -	round = sched_local.round + 1;

> > > -

> > > -	if (odp_unlikely(round == WEIGHT_TBL_SIZE))

> > > -		round = 0;

> > > -

> > > -	sched_local.round = round;

> > > -	first = sched_local.weight_tbl[round];

> > >

> > >  	/* Schedule events */

> > >  	for (prio = 0; prio < NUM_PRIO; prio++) {

> > > @@ -736,7 +771,6 @@ static int do_schedule(odp_queue_t *out_queue,

> > > odp_event_t out_ev[],

> > >

> > >  		for (i = 0; i < QUEUES_PER_PRIO;) {

> > >  			int num;

> > > -			int grp;

> > >  			int ordered;

> > >  			odp_queue_t handle;

> > >  			ring_t *ring;

> > > @@ -753,7 +787,7 @@ static int do_schedule(odp_queue_t *out_queue,

> > > odp_event_t out_ev[],

> > >  			}

> > >

> > >  			/* Get queue index from the priority queue

> > > */

> > > -			ring = &sched->prio_q[prio][id].ring;

> > > +			ring = &sched->prio_q[grp][prio][id].ring;

> > >  			qi   = ring_deq(ring, PRIO_QUEUE_MASK);

> > >

> > >  			/* Priority queue empty */

> > > @@ -763,21 +797,6 @@ static int do_schedule(odp_queue_t *out_queue,

> > > odp_event_t out_ev[],

> > >  				continue;

> > >  			}

> > >

> > > -			grp = sched_cb_queue_grp(qi);

> > > -

> > > -			if (grp > ODP_SCHED_GROUP_ALL &&

> > > -			    !odp_thrmask_isset(&sched-

> > > >sched_grp[grp].mask,

> > > -

> > > sched_local.thr)) {

> > > -				/* This thread is not eligible

> > > for work from

> > > -				 * this queue, so continue

> > > scheduling it.

> > > -				 */

> > > -				ring_enq(ring, PRIO_QUEUE_MASK,

> > > qi);

> > > -

> > > -				i++;

> > > -				id++;

> > > -				continue;

> > > -			}

> > > -

> > >  			/* Low priorities have smaller batch size

> > > to limit

> > >  			 * head of line blocking latency. */

> > >  			if (odp_unlikely(prio >

> > > ODP_SCHED_PRIO_DEFAULT))

> > > @@ -845,6 +864,70 @@ static int do_schedule(odp_queue_t *out_queue,

> > > odp_event_t out_ev[],

> > >  		}

> > >  	}

> > >

> > > +	return 0;

> > > +}

> > > +

> > > +/*

> > > + * Schedule queues

> > > + */

> > > +static inline int do_schedule(odp_queue_t *out_queue, odp_event_t

> > > out_ev[],

> > > +			      unsigned int max_num)

> > > +{

> > > +	int i, num_grp;

> > > +	int ret;

> > > +	int id, first, grp_id;

> > > +	uint16_t round;

> > > +	uint32_t epoch;

> > > +

> > > +	if (sched_local.num) {

> > > +		ret = copy_events(out_ev, max_num);

> > > +

> > > +		if (out_queue)

> > > +			*out_queue = sched_local.queue;

> > > +

> > > +		return ret;

> > > +	}

> > > +

> > > +	schedule_release_context();

> > > +

> > > +	if (odp_unlikely(sched_local.pause))

> > > +		return 0;

> > > +

> > > +	/* Each thread prefers a priority queue. Poll weight table

> > > avoids

> > > +	 * starvation of other priority queues on low thread counts. */

> > > +	round = sched_local.round + 1;

> > > +

> > > +	if (odp_unlikely(round == WEIGHT_TBL_SIZE))

> > > +		round = 0;

> > > +

> > > +	sched_local.round = round;

> > > +	first = sched_local.weight_tbl[round];

> > > +

> > > +	epoch = odp_atomic_load_acq_u32(&sched->grp_epoch);

> > > +	num_grp = sched_local.num_grp;

> > > +

> > > +	if (odp_unlikely(sched_local.grp_epoch != epoch)) {

> > > +		num_grp = grp_update_tbl();

> > > +		sched_local.grp_epoch = epoch;

> > > +	}

> > > +

> > > +	grp_id = sched_local.grp_weight[round];

> > > +

> > > +	/* Schedule queues per group and priority */

> > > +	for (i = 0; i < num_grp; i++) {

> > > +		int grp;

> > > +

> > > +		grp = sched_local.grp[grp_id];

> > > +		ret = do_schedule_grp(out_queue, out_ev, max_num,

> > > grp, first);

> > > +

> > > +		if (odp_likely(ret))

> > > +			return ret;

> > > +

> > > +		grp_id++;

> > > +		if (odp_unlikely(grp_id >= num_grp))

> > > +			grp_id = 0;

> > > +	}

> > > +

> > >  	/*

> > >  	 * Poll packet input when there are no events

> > >  	 *   * Each thread starts the search for a poll command from

> > > its

> > > @@ -1050,7 +1133,8 @@ static odp_schedule_group_t

> > > schedule_group_create(const char *name,

> > >

> > > 	ODP_SCHED_GROUP_NAME_LEN - 1);

> > >

> > > 	grp_name[ODP_SCHED_GROUP_NAME_LEN - 1] = 0;

> > >  			}

> > > -			odp_thrmask_copy(&sched-

> > > >sched_grp[i].mask, mask);

> > > +

> > > +			grp_update_mask(i, mask);

> > >  			group = (odp_schedule_group_t)i;

> > >  			sched->sched_grp[i].allocated = 1;

> > >  			break;

> > > @@ -1063,13 +1147,16 @@ static odp_schedule_group_t

> > > schedule_group_create(const char *name,

> > >

> > >  static int schedule_group_destroy(odp_schedule_group_t group)

> > >  {

> > > +	odp_thrmask_t zero;

> > >  	int ret;

> > >

> > > +	odp_thrmask_zero(&zero);

> > > +

> > >  	odp_spinlock_lock(&sched->grp_lock);

> > >

> > >  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

> > >  	    sched->sched_grp[group].allocated) {

> > > -		odp_thrmask_zero(&sched->sched_grp[group].mask);

> > > +		grp_update_mask(group, &zero);

> > >  		memset(sched->sched_grp[group].name, 0,

> > >  		       ODP_SCHED_GROUP_NAME_LEN);

> > >  		sched->sched_grp[group].allocated = 0;

> > > @@ -1109,9 +1196,11 @@ static int

> > schedule_group_join(odp_schedule_group_t

> > > group,

> > >

> > >  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

> > >  	    sched->sched_grp[group].allocated) {

> > > -		odp_thrmask_or(&sched->sched_grp[group].mask,

> > > -			       &sched->sched_grp[group].mask,

> > > -			       mask);

> > > +		odp_thrmask_t new_mask;

> > > +

> > > +		odp_thrmask_or(&new_mask, &sched-

> > > >sched_grp[group].mask, mask);

> > > +		grp_update_mask(group, &new_mask);

> > > +

> > >  		ret = 0;

> > >  	} else {

> > >  		ret = -1;

> > > @@ -1124,18 +1213,19 @@ static int

> > > schedule_group_join(odp_schedule_group_t group,

> > >  static int schedule_group_leave(odp_schedule_group_t group,

> > >  				const odp_thrmask_t *mask)

> > >  {

> > > +	odp_thrmask_t new_mask;

> > >  	int ret;

> > >

> > > +	odp_thrmask_xor(&new_mask, mask, &sched->mask_all);

> > > +

> > >  	odp_spinlock_lock(&sched->grp_lock);

> > >

> > >  	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&

> > >  	    sched->sched_grp[group].allocated) {

> > > -		odp_thrmask_t leavemask;

> > > +		odp_thrmask_and(&new_mask, &sched-

> > > >sched_grp[group].mask,

> > > +				&new_mask);

> > > +		grp_update_mask(group, &new_mask);

> > >

> > > -		odp_thrmask_xor(&leavemask, mask, &sched->mask_all);

> > > -		odp_thrmask_and(&sched->sched_grp[group].mask,

> > > -				&sched->sched_grp[group].mask,

> > > -				&leavemask);

> > >  		ret = 0;

> > >  	} else {

> > >  		ret = -1;

> > > @@ -1186,12 +1276,19 @@ static int

> > > schedule_group_info(odp_schedule_group_t group,

> > >

> > >  static int schedule_thr_add(odp_schedule_group_t group, int thr)

> > >  {

> > > +	odp_thrmask_t mask;

> > > +	odp_thrmask_t new_mask;

> > > +

> > >  	if (group < 0 || group >= SCHED_GROUP_NAMED)

> > >  		return -1;

> > >

> > > +	odp_thrmask_zero(&mask);

> > > +	odp_thrmask_set(&mask, thr);

> > > +

> > >  	odp_spinlock_lock(&sched->grp_lock);

> > >

> > > -	odp_thrmask_set(&sched->sched_grp[group].mask, thr);

> > > +	odp_thrmask_or(&new_mask, &sched->sched_grp[group].mask,

> > > &mask);

> > > +	grp_update_mask(group, &new_mask);

> > >

> > >  	odp_spinlock_unlock(&sched->grp_lock);

> > >

> > > @@ -1200,12 +1297,20 @@ static int

> schedule_thr_add(odp_schedule_group_t

> > > group, int thr)

> > >

> > >  static int schedule_thr_rem(odp_schedule_group_t group, int thr)

> > >  {

> > > +	odp_thrmask_t mask;

> > > +	odp_thrmask_t new_mask;

> > > +

> > >  	if (group < 0 || group >= SCHED_GROUP_NAMED)

> > >  		return -1;

> > >

> > > +	odp_thrmask_zero(&mask);

> > > +	odp_thrmask_set(&mask, thr);

> > > +	odp_thrmask_xor(&new_mask, &mask, &sched->mask_all);

> > > +

> > >  	odp_spinlock_lock(&sched->grp_lock);

> > >

> > > -	odp_thrmask_clr(&sched->sched_grp[group].mask, thr);

> > > +	odp_thrmask_and(&new_mask, &sched->sched_grp[group].mask,

> > > &new_mask);

> > > +	grp_update_mask(group, &new_mask);

> > >

> > >  	odp_spinlock_unlock(&sched->grp_lock);

> > >

> > > @@ -1219,9 +1324,10 @@ static void schedule_prefetch(int num

> ODP_UNUSED)

> > >

> > >  static int schedule_sched_queue(uint32_t queue_index)

> > >  {

> > > +	int grp            = sched->queue[queue_index].grp;

> > >  	int prio           = sched->queue[queue_index].prio;

> > >  	int queue_per_prio = sched->queue[queue_index].queue_per_prio;

> > > -	ring_t *ring       = &sched->prio_q[prio][queue_per_prio].ring;

> > > +	ring_t *ring       = &sched-

> > > >prio_q[grp][prio][queue_per_prio].ring;

> > >

> > >  	ring_enq(ring, PRIO_QUEUE_MASK, queue_index);

> > >  	return 0;

> > > --

> > > 2.8.1
diff mbox series

Patch

diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c
index e7079b9..f366e7e 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -34,7 +34,7 @@  ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
 		  "normal_prio_is_not_between_highest_and_lowest");
 
 /* Number of scheduling groups */
-#define NUM_SCHED_GRPS 256
+#define NUM_SCHED_GRPS 32
 
 /* Priority queues per priority */
 #define QUEUES_PER_PRIO  4
@@ -163,7 +163,11 @@  typedef struct {
 		ordered_stash_t stash[MAX_ORDERED_STASH];
 	} ordered;
 
+	uint32_t grp_epoch;
+	int num_grp;
+	uint8_t grp[NUM_SCHED_GRPS];
 	uint8_t weight_tbl[WEIGHT_TBL_SIZE];
+	uint8_t grp_weight[WEIGHT_TBL_SIZE];
 
 } sched_local_t;
 
@@ -199,7 +203,7 @@  typedef struct {
 	pri_mask_t     pri_mask[NUM_PRIO];
 	odp_spinlock_t mask_lock;
 
-	prio_queue_t   prio_q[NUM_PRIO][QUEUES_PER_PRIO];
+	prio_queue_t   prio_q[NUM_SCHED_GRPS][NUM_PRIO][QUEUES_PER_PRIO];
 
 	odp_spinlock_t poll_cmd_lock;
 	/* Number of commands in a command queue */
@@ -214,8 +218,10 @@  typedef struct {
 	odp_shm_t      shm;
 	uint32_t       pri_count[NUM_PRIO][QUEUES_PER_PRIO];
 
-	odp_spinlock_t grp_lock;
-	odp_thrmask_t mask_all;
+	odp_thrmask_t    mask_all;
+	odp_spinlock_t   grp_lock;
+	odp_atomic_u32_t grp_epoch;
+
 	struct {
 		char           name[ODP_SCHED_GROUP_NAME_LEN];
 		odp_thrmask_t  mask;
@@ -223,6 +229,7 @@  typedef struct {
 	} sched_grp[NUM_SCHED_GRPS];
 
 	struct {
+		int         grp;
 		int         prio;
 		int         queue_per_prio;
 	} queue[ODP_CONFIG_QUEUES];
@@ -273,7 +280,7 @@  static void sched_local_init(void)
 static int schedule_init_global(void)
 {
 	odp_shm_t shm;
-	int i, j;
+	int i, j, grp;
 
 	ODP_DBG("Schedule init ... ");
 
@@ -293,15 +300,20 @@  static int schedule_init_global(void)
 	sched->shm  = shm;
 	odp_spinlock_init(&sched->mask_lock);
 
-	for (i = 0; i < NUM_PRIO; i++) {
-		for (j = 0; j < QUEUES_PER_PRIO; j++) {
-			int k;
+	for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {
+		for (i = 0; i < NUM_PRIO; i++) {
+			for (j = 0; j < QUEUES_PER_PRIO; j++) {
+				prio_queue_t *prio_q;
+				int k;
 
-			ring_init(&sched->prio_q[i][j].ring);
+				prio_q = &sched->prio_q[grp][i][j];
+				ring_init(&prio_q->ring);
 
-			for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++)
-				sched->prio_q[i][j].queue_index[k] =
-				PRIO_QUEUE_EMPTY;
+				for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++) {
+					prio_q->queue_index[k] =
+					PRIO_QUEUE_EMPTY;
+				}
+			}
 		}
 	}
 
@@ -317,12 +329,17 @@  static int schedule_init_global(void)
 		sched->pktio_cmd[i].cmd_index = PKTIO_CMD_FREE;
 
 	odp_spinlock_init(&sched->grp_lock);
+	odp_atomic_init_u32(&sched->grp_epoch, 0);
 
 	for (i = 0; i < NUM_SCHED_GRPS; i++) {
 		memset(sched->sched_grp[i].name, 0, ODP_SCHED_GROUP_NAME_LEN);
 		odp_thrmask_zero(&sched->sched_grp[i].mask);
 	}
 
+	sched->sched_grp[ODP_SCHED_GROUP_ALL].allocated = 1;
+	sched->sched_grp[ODP_SCHED_GROUP_WORKER].allocated = 1;
+	sched->sched_grp[ODP_SCHED_GROUP_CONTROL].allocated = 1;
+
 	odp_thrmask_setall(&sched->mask_all);
 
 	ODP_DBG("done\n");
@@ -330,29 +347,38 @@  static int schedule_init_global(void)
 	return 0;
 }
 
+static inline void queue_destroy_finalize(uint32_t qi)
+{
+	sched_cb_queue_destroy_finalize(qi);
+}
+
 static int schedule_term_global(void)
 {
 	int ret = 0;
 	int rc = 0;
-	int i, j;
+	int i, j, grp;
 
-	for (i = 0; i < NUM_PRIO; i++) {
-		for (j = 0; j < QUEUES_PER_PRIO; j++) {
-			ring_t *ring = &sched->prio_q[i][j].ring;
-			uint32_t qi;
+	for (grp = 0; grp < NUM_SCHED_GRPS; grp++) {
+		for (i = 0; i < NUM_PRIO; i++) {
+			for (j = 0; j < QUEUES_PER_PRIO; j++) {
+				ring_t *ring = &sched->prio_q[grp][i][j].ring;
+				uint32_t qi;
 
-			while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) !=
-			       RING_EMPTY) {
-				odp_event_t events[1];
-				int num;
+				while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) !=
+				       RING_EMPTY) {
+					odp_event_t events[1];
+					int num;
 
-				num = sched_cb_queue_deq_multi(qi, events, 1);
+					num = sched_cb_queue_deq_multi(qi,
+								       events,
+								       1);
 
-				if (num < 0)
-					sched_cb_queue_destroy_finalize(qi);
+					if (num < 0)
+						queue_destroy_finalize(qi);
 
-				if (num > 0)
-					ODP_ERR("Queue not empty\n");
+					if (num > 0)
+						ODP_ERR("Queue not empty\n");
+				}
 			}
 		}
 	}
@@ -383,6 +409,40 @@  static int schedule_term_local(void)
 	return 0;
 }
 
+static inline void grp_update_mask(int grp, const odp_thrmask_t *new_mask)
+{
+	odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask);
+	odp_atomic_add_rel_u32(&sched->grp_epoch, 1);
+}
+
+static inline int grp_update_tbl(void)
+{
+	int i;
+	int num = 0;
+	int thr = sched_local.thr;
+
+	odp_spinlock_lock(&sched->grp_lock);
+
+	for (i = 0; i < NUM_SCHED_GRPS; i++) {
+		if (sched->sched_grp[i].allocated == 0)
+			continue;
+
+		if (odp_thrmask_isset(&sched->sched_grp[i].mask, thr)) {
+			sched_local.grp[num] = i;
+			num++;
+		}
+	}
+
+	odp_spinlock_unlock(&sched->grp_lock);
+
+	/* Update group weights. Round robin over all thread's groups. */
+	for (i = 0; i < WEIGHT_TBL_SIZE; i++)
+		sched_local.grp_weight[i] = i % num;
+
+	sched_local.num_grp = num;
+	return num;
+}
+
 static unsigned schedule_max_ordered_locks(void)
 {
 	return MAX_ORDERED_LOCKS_PER_QUEUE;
@@ -433,6 +493,7 @@  static int schedule_init_queue(uint32_t queue_index,
 	int prio = sched_param->prio;
 
 	pri_set_queue(queue_index, prio);
+	sched->queue[queue_index].grp  = sched_param->group;
 	sched->queue[queue_index].prio = prio;
 	sched->queue[queue_index].queue_per_prio = queue_per_prio(queue_index);
 
@@ -444,6 +505,7 @@  static void schedule_destroy_queue(uint32_t queue_index)
 	int prio = sched->queue[queue_index].prio;
 
 	pri_clr_queue(queue_index, prio);
+	sched->queue[queue_index].grp = 0;
 	sched->queue[queue_index].prio = 0;
 	sched->queue[queue_index].queue_per_prio = 0;
 }
@@ -535,9 +597,10 @@  static void schedule_release_atomic(void)
 	uint32_t qi = sched_local.queue_index;
 
 	if (qi != PRIO_QUEUE_EMPTY && sched_local.num  == 0) {
-		int prio           = sched->queue[qi].prio;
+		int grp = sched->queue[qi].grp;
+		int prio = sched->queue[qi].prio;
 		int queue_per_prio = sched->queue[qi].queue_per_prio;
-		ring_t *ring       = &sched->prio_q[prio][queue_per_prio].ring;
+		ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring;
 
 		/* Release current atomic queue */
 		ring_enq(ring, PRIO_QUEUE_MASK, qi);
@@ -688,42 +751,14 @@  static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[],
 	return 1;
 }
 
-/*
- * Schedule queues
- */
-static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
-		       unsigned int max_num)
+static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
+				  unsigned int max_num, int grp, int first)
 {
 	int prio, i;
 	int ret;
-	int id, first;
+	int id;
 	unsigned int max_deq = MAX_DEQ;
 	uint32_t qi;
-	uint16_t round;
-
-	if (sched_local.num) {
-		ret = copy_events(out_ev, max_num);
-
-		if (out_queue)
-			*out_queue = sched_local.queue;
-
-		return ret;
-	}
-
-	schedule_release_context();
-
-	if (odp_unlikely(sched_local.pause))
-		return 0;
-
-	/* Each thread prefers a priority queue. Poll weight table avoids
-	 * starvation of other priority queues on low thread counts. */
-	round = sched_local.round + 1;
-
-	if (odp_unlikely(round == WEIGHT_TBL_SIZE))
-		round = 0;
-
-	sched_local.round = round;
-	first = sched_local.weight_tbl[round];
 
 	/* Schedule events */
 	for (prio = 0; prio < NUM_PRIO; prio++) {
@@ -736,7 +771,6 @@  static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 
 		for (i = 0; i < QUEUES_PER_PRIO;) {
 			int num;
-			int grp;
 			int ordered;
 			odp_queue_t handle;
 			ring_t *ring;
@@ -753,7 +787,7 @@  static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 			}
 
 			/* Get queue index from the priority queue */
-			ring = &sched->prio_q[prio][id].ring;
+			ring = &sched->prio_q[grp][prio][id].ring;
 			qi   = ring_deq(ring, PRIO_QUEUE_MASK);
 
 			/* Priority queue empty */
@@ -763,21 +797,6 @@  static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 				continue;
 			}
 
-			grp = sched_cb_queue_grp(qi);
-
-			if (grp > ODP_SCHED_GROUP_ALL &&
-			    !odp_thrmask_isset(&sched->sched_grp[grp].mask,
-					       sched_local.thr)) {
-				/* This thread is not eligible for work from
-				 * this queue, so continue scheduling it.
-				 */
-				ring_enq(ring, PRIO_QUEUE_MASK, qi);
-
-				i++;
-				id++;
-				continue;
-			}
-
 			/* Low priorities have smaller batch size to limit
 			 * head of line blocking latency. */
 			if (odp_unlikely(prio > ODP_SCHED_PRIO_DEFAULT))
@@ -845,6 +864,70 @@  static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 		}
 	}
 
+	return 0;
+}
+
+/*
+ * Schedule queues
+ */
+static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
+			      unsigned int max_num)
+{
+	int i, num_grp;
+	int ret;
+	int id, first, grp_id;
+	uint16_t round;
+	uint32_t epoch;
+
+	if (sched_local.num) {
+		ret = copy_events(out_ev, max_num);
+
+		if (out_queue)
+			*out_queue = sched_local.queue;
+
+		return ret;
+	}
+
+	schedule_release_context();
+
+	if (odp_unlikely(sched_local.pause))
+		return 0;
+
+	/* Each thread prefers a priority queue. Poll weight table avoids
+	 * starvation of other priority queues on low thread counts. */
+	round = sched_local.round + 1;
+
+	if (odp_unlikely(round == WEIGHT_TBL_SIZE))
+		round = 0;
+
+	sched_local.round = round;
+	first = sched_local.weight_tbl[round];
+
+	epoch = odp_atomic_load_acq_u32(&sched->grp_epoch);
+	num_grp = sched_local.num_grp;
+
+	if (odp_unlikely(sched_local.grp_epoch != epoch)) {
+		num_grp = grp_update_tbl();
+		sched_local.grp_epoch = epoch;
+	}
+
+	grp_id = sched_local.grp_weight[round];
+
+	/* Schedule queues per group and priority */
+	for (i = 0; i < num_grp; i++) {
+		int grp;
+
+		grp = sched_local.grp[grp_id];
+		ret = do_schedule_grp(out_queue, out_ev, max_num, grp, first);
+
+		if (odp_likely(ret))
+			return ret;
+
+		grp_id++;
+		if (odp_unlikely(grp_id >= num_grp))
+			grp_id = 0;
+	}
+
 	/*
 	 * Poll packet input when there are no events
 	 *   * Each thread starts the search for a poll command from its
@@ -1050,7 +1133,8 @@  static odp_schedule_group_t schedule_group_create(const char *name,
 					ODP_SCHED_GROUP_NAME_LEN - 1);
 				grp_name[ODP_SCHED_GROUP_NAME_LEN - 1] = 0;
 			}
-			odp_thrmask_copy(&sched->sched_grp[i].mask, mask);
+
+			grp_update_mask(i, mask);
 			group = (odp_schedule_group_t)i;
 			sched->sched_grp[i].allocated = 1;
 			break;
@@ -1063,13 +1147,16 @@  static odp_schedule_group_t schedule_group_create(const char *name,
 
 static int schedule_group_destroy(odp_schedule_group_t group)
 {
+	odp_thrmask_t zero;
 	int ret;
 
+	odp_thrmask_zero(&zero);
+
 	odp_spinlock_lock(&sched->grp_lock);
 
 	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&
 	    sched->sched_grp[group].allocated) {
-		odp_thrmask_zero(&sched->sched_grp[group].mask);
+		grp_update_mask(group, &zero);
 		memset(sched->sched_grp[group].name, 0,
 		       ODP_SCHED_GROUP_NAME_LEN);
 		sched->sched_grp[group].allocated = 0;
@@ -1109,9 +1196,11 @@  static int schedule_group_join(odp_schedule_group_t group,
 
 	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&
 	    sched->sched_grp[group].allocated) {
-		odp_thrmask_or(&sched->sched_grp[group].mask,
-			       &sched->sched_grp[group].mask,
-			       mask);
+		odp_thrmask_t new_mask;
+
+		odp_thrmask_or(&new_mask, &sched->sched_grp[group].mask, mask);
+		grp_update_mask(group, &new_mask);
+
 		ret = 0;
 	} else {
 		ret = -1;
@@ -1124,18 +1213,19 @@  static int schedule_group_join(odp_schedule_group_t group,
 static int schedule_group_leave(odp_schedule_group_t group,
 				const odp_thrmask_t *mask)
 {
+	odp_thrmask_t new_mask;
 	int ret;
 
+	odp_thrmask_xor(&new_mask, mask, &sched->mask_all);
+
 	odp_spinlock_lock(&sched->grp_lock);
 
 	if (group < NUM_SCHED_GRPS && group >= SCHED_GROUP_NAMED &&
 	    sched->sched_grp[group].allocated) {
-		odp_thrmask_t leavemask;
+		odp_thrmask_and(&new_mask, &sched->sched_grp[group].mask,
+				&new_mask);
+		grp_update_mask(group, &new_mask);
 
-		odp_thrmask_xor(&leavemask, mask, &sched->mask_all);
-		odp_thrmask_and(&sched->sched_grp[group].mask,
-				&sched->sched_grp[group].mask,
-				&leavemask);
 		ret = 0;
 	} else {
 		ret = -1;
@@ -1186,12 +1276,19 @@  static int schedule_group_info(odp_schedule_group_t group,
 
 static int schedule_thr_add(odp_schedule_group_t group, int thr)
 {
+	odp_thrmask_t mask;
+	odp_thrmask_t new_mask;
+
 	if (group < 0 || group >= SCHED_GROUP_NAMED)
 		return -1;
 
+	odp_thrmask_zero(&mask);
+	odp_thrmask_set(&mask, thr);
+
 	odp_spinlock_lock(&sched->grp_lock);
 
-	odp_thrmask_set(&sched->sched_grp[group].mask, thr);
+	odp_thrmask_or(&new_mask, &sched->sched_grp[group].mask, &mask);
+	grp_update_mask(group, &new_mask);
 
 	odp_spinlock_unlock(&sched->grp_lock);
 
@@ -1200,12 +1297,20 @@  static int schedule_thr_add(odp_schedule_group_t group, int thr)
 
 static int schedule_thr_rem(odp_schedule_group_t group, int thr)
 {
+	odp_thrmask_t mask;
+	odp_thrmask_t new_mask;
+
 	if (group < 0 || group >= SCHED_GROUP_NAMED)
 		return -1;
 
+	odp_thrmask_zero(&mask);
+	odp_thrmask_set(&mask, thr);
+	odp_thrmask_xor(&new_mask, &mask, &sched->mask_all);
+
 	odp_spinlock_lock(&sched->grp_lock);
 
-	odp_thrmask_clr(&sched->sched_grp[group].mask, thr);
+	odp_thrmask_and(&new_mask, &sched->sched_grp[group].mask, &new_mask);
+	grp_update_mask(group, &new_mask);
 
 	odp_spinlock_unlock(&sched->grp_lock);
 
@@ -1219,9 +1324,10 @@  static void schedule_prefetch(int num ODP_UNUSED)
 
 static int schedule_sched_queue(uint32_t queue_index)
 {
+	int grp            = sched->queue[queue_index].grp;
 	int prio           = sched->queue[queue_index].prio;
 	int queue_per_prio = sched->queue[queue_index].queue_per_prio;
-	ring_t *ring       = &sched->prio_q[prio][queue_per_prio].ring;
+	ring_t *ring       = &sched->prio_q[grp][prio][queue_per_prio].ring;
 
 	ring_enq(ring, PRIO_QUEUE_MASK, queue_index);
 	return 0;