diff mbox series

[1/2,v3] linux-gen: traffic_mgmr: support termination in barrier.

Message ID 1488252137-14939-1-git-send-email-forrest.shi@linaro.org
State New
Headers show
Series [1/2,v3] linux-gen: traffic_mgmr: support termination in barrier. | expand

Commit Message

Forrest Shi Feb. 28, 2017, 3:22 a.m. UTC
From: Xuelin Shi <forrest.shi@linaro.org>


While the tm_group(thread) is waiting for enquing at barrier,
it may receive destroy call. In this case, need a mechanism to exit.
This patch adds a variant of odp_barrier_wait to enable it.

Signed-off-by: Xuelin Shi <forrest.shi@linaro.org>

---
 platform/linux-generic/odp_traffic_mngr.c | 93 ++++++++++++++++++++++++-------
 1 file changed, 73 insertions(+), 20 deletions(-)

-- 
1.8.3.1
diff mbox series

Patch

diff --git a/platform/linux-generic/odp_traffic_mngr.c b/platform/linux-generic/odp_traffic_mngr.c
index 309f237..9b61c50 100644
--- a/platform/linux-generic/odp_traffic_mngr.c
+++ b/platform/linux-generic/odp_traffic_mngr.c
@@ -91,6 +91,8 @@  static int g_main_thread_cpu = -1;
 static int g_tm_cpu_num;
 
 /* Forward function declarations. */
+static int _odp_tm_group_remove(_odp_tm_group_t odp_tm_group, odp_tm_t odp_tm);
+
 static void tm_queue_cnts_decrement(tm_system_t *tm_system,
 				    tm_wred_node_t *tm_wred_node,
 				    uint32_t priority,
@@ -2326,6 +2328,58 @@  static int thread_affinity_get(odp_cpumask_t *odp_cpu_mask)
 	return 0;
 }
 
+static tm_system_t *tm_system_find_run(tm_system_group_t *grp,
+				       tm_system_t *tm_system)
+{
+	while (tm_system && odp_atomic_load_u64(&tm_system->destroying)) {
+		tm_system_t *odp_tm;
+
+		odp_tm = tm_system->next;
+		odp_barrier_wait(&tm_system->tm_system_destroy_barrier);
+		_odp_tm_group_remove(MAKE_ODP_TM_SYSTEM_GROUP(grp),
+				     MAKE_ODP_TM_HANDLE(tm_system));
+		tm_system = odp_tm;
+		if (!grp->num_tm_systems)
+			tm_system = NULL;
+	}
+
+	return tm_system;
+}
+
+static bool odp_barrier_wait_stop(tm_system_group_t *grp,
+				  bool (*stop)(tm_system_group_t *grp))
+{
+	uint32_t count;
+	int wasless;
+	odp_barrier_t *barrier = &grp->tm_group_barrier;
+
+	odp_mb_full();
+
+	count   = odp_atomic_fetch_inc_u32(&barrier->bar);
+	wasless = count < barrier->count;
+
+	if (count == 2 * barrier->count - 1) {
+		/* Wrap around *atomically* */
+		odp_atomic_sub_u32(&barrier->bar, 2 * barrier->count);
+	} else {
+		while ((odp_atomic_load_u32(&barrier->bar) < barrier->count)
+				== wasless)
+			if (!stop(grp))
+				odp_cpu_pause();
+			else
+				return true;
+	}
+
+	odp_mb_full();
+
+	return false;
+}
+
+static bool try_stop_tm_group(tm_system_group_t *grp)
+{
+	return tm_system_find_run(grp, grp->first_tm_system) == NULL;
+}
+
 static void *tm_system_thread(void *arg)
 {
 	_odp_timer_wheel_t _odp_int_timer_wheel;
@@ -2333,7 +2387,7 @@  static void *tm_system_thread(void *arg)
 	tm_system_group_t  *tm_group;
 	tm_system_t *tm_system;
 	uint64_t current_ns;
-	uint32_t destroying, work_queue_cnt, timer_cnt;
+	uint32_t work_queue_cnt, timer_cnt;
 	int rc;
 
 	rc = odp_init_local((odp_instance_t)odp_global_data.main_pid,
@@ -2341,20 +2395,28 @@  static void *tm_system_thread(void *arg)
 	ODP_ASSERT(rc == 0);
 	tm_group = arg;
 
-	tm_system = tm_group->first_tm_system;
-	_odp_int_timer_wheel = tm_system->_odp_int_timer_wheel;
-	input_work_queue = tm_system->input_work_queue;
+	/* Wait here until we have seen the first enqueue operation
+	   or destroy. */
+	if (odp_barrier_wait_stop(tm_group, try_stop_tm_group)) {
+		odp_term_local();
+		return NULL;
+	}
 
-	/* Wait here until we have seen the first enqueue operation. */
-	odp_barrier_wait(&tm_group->tm_group_barrier);
 	main_loop_running = true;
+	tm_system = tm_system_find_run(tm_group, tm_group->first_tm_system);
+	if (!tm_system) {
+		odp_term_local();
+		return NULL;
+	}
 
-	destroying = odp_atomic_load_u64(&tm_system->destroying);
-
+	_odp_int_timer_wheel = tm_system->_odp_int_timer_wheel;
 	current_ns = odp_time_to_ns(odp_time_local());
 	_odp_timer_wheel_start(_odp_int_timer_wheel, current_ns);
 
-	while (destroying == 0) {
+	while (tm_system) {
+		_odp_int_timer_wheel = tm_system->_odp_int_timer_wheel;
+		input_work_queue = tm_system->input_work_queue;
+
 		/* See if another thread wants to make a configuration
 		 * change. */
 		check_for_request();
@@ -2392,16 +2454,12 @@  static void *tm_system_thread(void *arg)
 		tm_system->current_time = current_ns;
 		tm_system->is_idle = (timer_cnt == 0) &&
 			(work_queue_cnt == 0);
-		destroying = odp_atomic_load_u64(&tm_system->destroying);
 
 		/* Advance to the next tm_system in the tm_system_group. */
-		tm_system = tm_system->next;
-		_odp_int_timer_wheel = tm_system->_odp_int_timer_wheel;
-		input_work_queue = tm_system->input_work_queue;
+		tm_system = tm_system_find_run(tm_group, tm_system->next);
 	}
-
-	odp_barrier_wait(&tm_system->tm_system_destroy_barrier);
 	odp_term_local();
+
 	return NULL;
 }
 
@@ -2992,11 +3050,6 @@  int odp_tm_destroy(odp_tm_t odp_tm)
 	odp_atomic_inc_u64(&tm_system->destroying);
 	odp_barrier_wait(&tm_system->tm_system_destroy_barrier);
 
-	/* Remove ourselves from the group.  If we are the last tm_system in
-	 * this group, odp_tm_group_remove will destroy any service threads
-	 * allocated by this group. */
-	_odp_tm_group_remove(tm_system->odp_tm_group, odp_tm);
-
 	input_work_queue_destroy(tm_system->input_work_queue);
 	_odp_sorted_pool_destroy(tm_system->_odp_int_sorted_pool);
 	_odp_queue_pool_destroy(tm_system->_odp_int_queue_pool);