diff mbox series

[v1,3/6] linux-gen: sched: single variable for sync context status

Message ID 1536674416-8465-4-git-send-email-odpbot@yandex.ru
State New
Headers show
Series [v1,1/6] linux-gen: queue: remove extra checks | expand

Commit Message

Github ODP bot Sept. 11, 2018, 2 p.m. UTC
From: Petri Savolainen <petri.savolainen@linaro.org>


Use single thread local variable to keep track if
a synchronization context is held and the type of the context
(atomic or ordered). Performance is improved as sync context
status is located on single (the first) cache line of
sched_local_t.

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
/** Email created from pull request 699 (psavol:master-sched-optim-clean-ups)
 ** https://github.com/Linaro/odp/pull/699
 ** Patch: https://github.com/Linaro/odp/pull/699.patch
 ** Base sha: 33fbc04b6373960ec3f84de4e7e7b34c49d71508
 ** Merge commit sha: 32d7a11f22e6f2e1e378b653993c5377d4116d8f
 **/
 platform/linux-generic/odp_schedule_basic.c | 130 +++++++++++---------
 1 file changed, 72 insertions(+), 58 deletions(-)
diff mbox series

Patch

diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c
index 89c0a5c42..46ae7f1c1 100644
--- a/platform/linux-generic/odp_schedule_basic.c
+++ b/platform/linux-generic/odp_schedule_basic.c
@@ -30,6 +30,9 @@ 
 #include <odp_libconfig_internal.h>
 #include <odp/api/plat/queue_inlines.h>
 
+/* No synchronization context */
+#define NO_SYNC_CONTEXT ODP_SCHED_SYNC_PARALLEL
+
 /* Number of priority levels  */
 #define NUM_PRIO 8
 
@@ -124,7 +127,8 @@  ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t),
 /* Scheduler local data */
 typedef struct ODP_ALIGNED_CACHE {
 	uint16_t thr;
-	uint16_t pause;
+	uint8_t  pause;
+	uint8_t  sync_ctx;
 	uint16_t grp_round;
 	uint16_t spread_round;
 
@@ -241,9 +245,6 @@  static sched_global_t *sched;
 /* Thread local scheduler context */
 static __thread sched_local_t sched_local;
 
-/* Function prototypes */
-static inline void schedule_release_context(void);
-
 static int read_config_file(sched_global_t *sched)
 {
 	const char *str;
@@ -311,6 +312,7 @@  static void sched_local_init(void)
 	memset(&sched_local, 0, sizeof(sched_local_t));
 
 	sched_local.thr         = odp_thread_id();
+	sched_local.sync_ctx    = NO_SYNC_CONTEXT;
 	sched_local.stash.queue = ODP_QUEUE_INVALID;
 	sched_local.stash.qi    = PRIO_QUEUE_EMPTY;
 	sched_local.ordered.src_queue = NULL_INDEX;
@@ -450,17 +452,6 @@  static int schedule_init_local(void)
 	return 0;
 }
 
-static int schedule_term_local(void)
-{
-	if (sched_local.stash.num_ev) {
-		ODP_ERR("Locally pre-scheduled events exist.\n");
-		return -1;
-	}
-
-	schedule_release_context();
-	return 0;
-}
-
 static inline void grp_update_mask(int grp, const odp_thrmask_t *new_mask)
 {
 	odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask);
@@ -565,14 +556,9 @@  static int schedule_init_queue(uint32_t queue_index,
 	return 0;
 }
 
-static inline int queue_is_atomic(uint32_t queue_index)
+static inline uint8_t sched_sync_type(uint32_t queue_index)
 {
-	return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ATOMIC;
-}
-
-static inline int queue_is_ordered(uint32_t queue_index)
-{
-	return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ORDERED;
+	return sched->queue[queue_index].sync;
 }
 
 static void schedule_destroy_queue(uint32_t queue_index)
@@ -584,7 +570,7 @@  static void schedule_destroy_queue(uint32_t queue_index)
 	sched->queue[queue_index].prio   = 0;
 	sched->queue[queue_index].spread = 0;
 
-	if (queue_is_ordered(queue_index) &&
+	if ((sched_sync_type(queue_index) == ODP_SCHED_SYNC_ORDERED) &&
 	    odp_atomic_load_u64(&sched->order[queue_index].ctx) !=
 	    odp_atomic_load_u64(&sched->order[queue_index].next_ctx))
 		ODP_ERR("queue reorder incomplete\n");
@@ -623,21 +609,26 @@  static void schedule_pktio_start(int pktio_index, int num_pktin,
 	}
 }
 
-static void schedule_release_atomic(void)
+static inline void release_atomic(void)
 {
-	uint32_t qi = sched_local.stash.qi;
+	uint32_t qi  = sched_local.stash.qi;
+	int grp      = sched->queue[qi].grp;
+	int prio     = sched->queue[qi].prio;
+	int spread   = sched->queue[qi].spread;
+	ring_t *ring = &sched->prio_q[grp][prio][spread].ring;
 
-	if (qi != PRIO_QUEUE_EMPTY && sched_local.stash.num_ev  == 0) {
-		int grp      = sched->queue[qi].grp;
-		int prio     = sched->queue[qi].prio;
-		int spread   = sched->queue[qi].spread;
-		ring_t *ring = &sched->prio_q[grp][prio][spread].ring;
+	/* Release current atomic queue */
+	ring_enq(ring, sched->ring_mask, qi);
 
-		/* Release current atomic queue */
-		ring_enq(ring, sched->ring_mask, qi);
+	/* We don't hold sync context anymore */
+	sched_local.sync_ctx = NO_SYNC_CONTEXT;
+}
 
-		sched_local.stash.qi = PRIO_QUEUE_EMPTY;
-	}
+static void schedule_release_atomic(void)
+{
+	if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC &&
+	    sched_local.stash.num_ev == 0)
+		release_atomic();
 }
 
 static inline int ordered_own_turn(uint32_t queue_index)
@@ -709,9 +700,11 @@  static inline void release_ordered(void)
 	}
 
 	sched_local.ordered.lock_called.all = 0;
-	sched_local.ordered.src_queue = NULL_INDEX;
 	sched_local.ordered.in_order = 0;
 
+	/* We don't hold sync context anymore */
+	sched_local.sync_ctx = NO_SYNC_CONTEXT;
+
 	ordered_stash_release();
 
 	/* Next thread can continue processing */
@@ -720,23 +713,26 @@  static inline void release_ordered(void)
 
 static void schedule_release_ordered(void)
 {
-	uint32_t queue_index;
-
-	queue_index = sched_local.ordered.src_queue;
-
-	if (odp_unlikely((queue_index == NULL_INDEX) ||
+	if (odp_unlikely((sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED) ||
 			 sched_local.stash.num_ev))
 		return;
 
 	release_ordered();
 }
 
-static inline void schedule_release_context(void)
+static int schedule_term_local(void)
 {
-	if (sched_local.ordered.src_queue != NULL_INDEX)
-		release_ordered();
-	else
+	if (sched_local.stash.num_ev) {
+		ODP_ERR("Locally pre-scheduled events exist.\n");
+		return -1;
+	}
+
+	if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC)
 		schedule_release_atomic();
+	else if (sched_local.sync_ctx == ODP_SCHED_SYNC_ORDERED)
+		schedule_release_ordered();
+
+	return 0;
 }
 
 static inline int copy_from_stash(odp_event_t out_ev[], unsigned int max)
@@ -758,13 +754,22 @@  static int schedule_ord_enq_multi(odp_queue_t dst_queue, void *buf_hdr[],
 				  int num, int *ret)
 {
 	int i;
-	uint32_t stash_num = sched_local.ordered.stash_num;
-	queue_entry_t *dst_qentry = qentry_from_handle(dst_queue);
-	uint32_t src_queue = sched_local.ordered.src_queue;
+	uint32_t stash_num;
+	queue_entry_t *dst_qentry;
+	uint32_t src_queue;
 
-	if ((src_queue == NULL_INDEX) || sched_local.ordered.in_order)
+	/* This check is done for every queue enqueue operation, also for plain
+	 * queues. Return fast when not holding a scheduling context. */
+	if (odp_likely(sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED))
 		return 0;
 
+	if (sched_local.ordered.in_order)
+		return 0;
+
+	src_queue  = sched_local.ordered.src_queue;
+	stash_num  = sched_local.ordered.stash_num;
+	dst_qentry = qentry_from_handle(dst_queue);
+
 	if (ordered_own_turn(src_queue)) {
 		/* Own turn, so can do enqueue directly. */
 		sched_local.ordered.in_order = 1;
@@ -891,7 +896,7 @@  static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
 
 		for (i = 0; i < num_spread;) {
 			int num;
-			int ordered;
+			uint8_t sync_ctx, ordered;
 			odp_queue_t handle;
 			ring_t *ring;
 			int pktin;
@@ -921,7 +926,8 @@  static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
 				continue;
 			}
 
-			ordered = queue_is_ordered(qi);
+			sync_ctx = sched_sync_type(qi);
+			ordered  = (sync_ctx == ODP_SCHED_SYNC_ORDERED);
 
 			/* When application's array is larger than max burst
 			 * size, output all events directly there. Also, ordered
@@ -989,10 +995,12 @@  static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
 
 				/* Continue scheduling ordered queues */
 				ring_enq(ring, ring_mask, qi);
+				sched_local.sync_ctx = sync_ctx;
 
-			} else if (queue_is_atomic(qi)) {
+			} else if (sync_ctx == ODP_SCHED_SYNC_ATOMIC) {
 				/* Hold queue during atomic access */
 				sched_local.stash.qi = qi;
+				sched_local.sync_ctx = sync_ctx;
 			} else {
 				/* Continue scheduling the queue */
 				ring_enq(ring, ring_mask, qi);
@@ -1042,7 +1050,11 @@  static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 		return ret;
 	}
 
-	schedule_release_context();
+	/* Release schedule context */
+	if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC)
+		release_atomic();
+	else if (sched_local.sync_ctx == ODP_SCHED_SYNC_ORDERED)
+		release_ordered();
 
 	if (odp_unlikely(sched_local.pause))
 		return 0;
@@ -1141,14 +1153,10 @@  static int schedule_multi(odp_queue_t *out_queue, uint64_t wait,
 
 static inline void order_lock(void)
 {
-	uint32_t queue_index;
-
-	queue_index = sched_local.ordered.src_queue;
-
-	if (queue_index == NULL_INDEX)
+	if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED)
 		return;
 
-	wait_for_order(queue_index);
+	wait_for_order(sched_local.ordered.src_queue);
 }
 
 static void order_unlock(void)
@@ -1160,6 +1168,9 @@  static void schedule_order_lock(uint32_t lock_index)
 	odp_atomic_u64_t *ord_lock;
 	uint32_t queue_index;
 
+	if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED)
+		return;
+
 	queue_index = sched_local.ordered.src_queue;
 
 	ODP_ASSERT(queue_index != NULL_INDEX &&
@@ -1187,6 +1198,9 @@  static void schedule_order_unlock(uint32_t lock_index)
 	odp_atomic_u64_t *ord_lock;
 	uint32_t queue_index;
 
+	if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED)
+		return;
+
 	queue_index = sched_local.ordered.src_queue;
 
 	ODP_ASSERT(queue_index != NULL_INDEX &&