diff mbox

[RFC] example: traffic_mgmt: enhancement with multiple pktios

Message ID 1478844499-20434-1-git-send-email-forrest.shi@linaro.org
State New
Headers show

Commit Message

Forrest Shi Nov. 11, 2016, 6:08 a.m. UTC
From: Xuelin Shi <forrest.shi@linaro.org>


This patch made the following changes:
	- receiving packets from multiple pktios other than generating packets
	- identifying TM user by ip with individual class of service
	- print the user service while starting the program
	- pirnt the packets counts every 10 seconds for each user

Signed-off-by: Xuelin Shi <forrest.shi@linaro.org>

---
 example/traffic_mgmt/Makefile.am         |   7 +-
 example/traffic_mgmt/odp_traffic_mgmt.c  | 476 +++++++++---------
 example/traffic_mgmt/odp_traffic_mgmt.h  |  47 ++
 example/traffic_mgmt/odp_traffic_pktio.c | 794 +++++++++++++++++++++++++++++++
 4 files changed, 1091 insertions(+), 233 deletions(-)
 create mode 100644 example/traffic_mgmt/odp_traffic_mgmt.h
 create mode 100644 example/traffic_mgmt/odp_traffic_pktio.c

-- 
1.8.3.1
diff mbox

Patch

diff --git a/example/traffic_mgmt/Makefile.am b/example/traffic_mgmt/Makefile.am
index c8ff797..d2c7929 100644
--- a/example/traffic_mgmt/Makefile.am
+++ b/example/traffic_mgmt/Makefile.am
@@ -2,8 +2,9 @@  include $(top_srcdir)/example/Makefile.inc
 
 bin_PROGRAMS = odp_traffic_mgmt$(EXEEXT)
 odp_traffic_mgmt_LDFLAGS = $(AM_LDFLAGS) -static
-odp_traffic_mgmt_CFLAGS = $(AM_CFLAGS) -I${top_srcdir}/example
+odp_traffic_mgmt_CFLAGS = $(AM_CFLAGS) -I${top_srcdir}/example -I${top_srcdir}/test
 
-noinst_HEADERS = $(top_srcdir)/example/example_debug.h
+noinst_HEADERS = $(top_srcdir)/example/example_debug.h \
+		 $(top_srcdir)/example/traffic_mgmt/odp_traffic_mgmt.h
 
-dist_odp_traffic_mgmt_SOURCES = odp_traffic_mgmt.c
+dist_odp_traffic_mgmt_SOURCES = odp_traffic_mgmt.c odp_traffic_pktio.c
diff --git a/example/traffic_mgmt/odp_traffic_mgmt.c b/example/traffic_mgmt/odp_traffic_mgmt.c
index c4f5356..f0fe198 100644
--- a/example/traffic_mgmt/odp_traffic_mgmt.c
+++ b/example/traffic_mgmt/odp_traffic_mgmt.c
@@ -9,16 +9,22 @@ 
 #define _GNU_SOURCE
 
 #include <unistd.h>
+#include <stdio.h>
 #include <signal.h>
+#include <inttypes.h>
 #include <sys/resource.h>
 #include <execinfo.h>
 #include <odp_api.h>
 #include <example_debug.h>
+#include <odp/helper/ip.h>
 
+#include "odp_traffic_mgmt.h"
+
+#define TM_USER_IP_START 0x01010101
 #define NUM_SVC_CLASSES     4
 #define USERS_PER_SVC_CLASS 2
-#define APPS_PER_USER       2
-#define TM_QUEUES_PER_APP   2
+#define APPS_PER_USER       1
+#define TM_QUEUES_PER_APP   1
 #define NUM_USERS           (USERS_PER_SVC_CLASS * NUM_SVC_CLASSES)
 #define NUM_TM_QUEUES       (NUM_USERS * APPS_PER_USER * TM_QUEUES_PER_APP)
 #define TM_QUEUES_PER_USER  (TM_QUEUES_PER_APP * APPS_PER_USER)
@@ -49,11 +55,6 @@  typedef struct {
 	odp_tm_wred_t      wred_profiles[ODP_NUM_PACKET_COLORS];
 } profile_set_t;
 
-static const odp_init_t ODP_INIT_PARAMS = {
-	.log_fn   = odp_override_log,
-	.abort_fn = odp_override_abort
-};
-
 static profile_params_set_t COMPANY_PROFILE_PARAMS = {
 	.shaper_params = {
 		.commit_bps = 50  * MBPS,  .commit_burst      = 1000000,
@@ -161,8 +162,8 @@  static profile_params_set_t COS2_PROFILE_PARAMS = {
 	},
 
 	.threshold_params = {
-		.max_pkts  = 1000,    .enable_max_pkts  = TRUE,
-		.max_bytes = 100000,  .enable_max_bytes = TRUE
+		.max_pkts  = 1000,    .enable_max_pkts  = FALSE,
+		.max_bytes = 100000,  .enable_max_bytes = FALSE
 	},
 
 	.wred_params = {
@@ -194,8 +195,8 @@  static profile_params_set_t COS3_PROFILE_PARAMS = {
 	},
 
 	.threshold_params = {
-		.max_pkts  = 400,    .enable_max_pkts  = TRUE,
-		.max_bytes = 60000,  .enable_max_bytes = TRUE
+		.max_pkts  = 400,    .enable_max_pkts  = FALSE,
+		.max_bytes = 60000,  .enable_max_bytes = FALSE
 	},
 
 	.wred_params = {
@@ -226,19 +227,46 @@  static profile_set_t APP_PROFILE_SETS[NUM_SVC_CLASSES][APPS_PER_USER];
 
 static odp_tm_t odp_tm_test;
 
-static odp_pool_t odp_pool;
-
 static odp_tm_queue_t queue_num_tbls[NUM_SVC_CLASSES][TM_QUEUES_PER_CLASS + 1];
 static uint32_t       next_queue_nums[NUM_SVC_CLASSES];
 
 static uint8_t  random_buf[RANDOM_BUF_LEN];
 static uint32_t next_rand_byte;
 
-static odp_atomic_u32_t atomic_pkts_into_tm;
-static odp_atomic_u32_t atomic_pkts_from_tm;
+typedef struct {
+	struct {
+		odp_atomic_u64_t tm_pkts_in;
+		odp_atomic_u64_t tm_bytes_in;
+		odp_atomic_u64_t tm_pkts_out;
+		odp_atomic_u64_t tm_bytes_out;
+	} s;			/* statistics info for each user of tm */
+	uint64_t max_pkts_limit; /* max packets allowed for this user */
+	uint64_t max_bytes_limit; /* max bytes allowed for this user */
+	odp_bool_t enable_pkts_limit;	/* max packets limit is valid */
+	odp_bool_t enable_bytes_limit;	/* max bytes limit is valid */
+	uint32_t ipaddr;	/* user ipaddr */
+	int svc;		/* service class */
+} tm_user_t;
+
+/* statistics value info, used for param pass */
+typedef struct {
+	uint64_t pkts_in;
+	uint64_t pkts_out;
+	uint64_t bytes_in;
+	uint64_t bytes_out;
+} stat_info_t;
+
+static tm_user_t tm_users[NUM_USERS];
+
+static profile_params_set_t *cos_profile_params[NUM_SVC_CLASSES] = {
+	&COS0_PROFILE_PARAMS,
+	&COS1_PROFILE_PARAMS,
+	&COS2_PROFILE_PARAMS,
+	&COS3_PROFILE_PARAMS
+};
 
-static uint32_t g_num_pkts_to_send = 1000;
-static uint8_t  g_print_tm_stats   = TRUE;
+static odp_atomic_u64_t atomic_pkts_into_tm;
+static odp_atomic_u64_t atomic_pkts_from_tm;
 
 static void tester_egress_fcn(odp_packet_t odp_pkt);
 
@@ -308,17 +336,15 @@  static uint32_t create_profile_set(profile_params_set_t *profile_params_set,
 	return err_cnt;
 }
 
-/* Returns the number of errors encountered. */
-
 static uint32_t init_profile_sets(void)
 {
 	uint32_t class_shaper_scale, class_threshold_scale, user_shaper_scale;
 	uint32_t user_threshold_scale, err_cnt, app_idx;
 
-	class_shaper_scale    = TM_QUEUES_PER_CLASS / 2;
-	class_threshold_scale = TM_QUEUES_PER_CLASS;
-	user_shaper_scale     = TM_QUEUES_PER_USER / 2;
-	user_threshold_scale  = TM_QUEUES_PER_USER;
+	class_shaper_scale    = 1;
+	class_threshold_scale = 1;
+	user_shaper_scale     = 1;
+	user_threshold_scale  = 1;
 	err_cnt               = 0;
 
 	err_cnt += create_profile_set(&COMPANY_PROFILE_PARAMS,
@@ -384,7 +410,7 @@  static int config_example_user(odp_tm_node_t cos_tm_node,
 	profile_set_t        *profile_set;
 	uint32_t              app_idx, queue_idx, svc_class_queue_num;
 	char                  user_name[64];
-	int                   rc;
+	int                   rc, numq = 0;
 
 	profile_set = &USER_PROFILE_SETS[svc_class];
 
@@ -429,6 +455,7 @@  static int config_example_user(odp_tm_node_t cos_tm_node,
 			if (rc < 0)
 				return rc;
 
+			numq++;
 			svc_class_queue_num = next_queue_nums[svc_class]++;
 			queue_num_tbls[svc_class][svc_class_queue_num + 1] =
 				tm_queue;
@@ -440,11 +467,13 @@  static int config_example_user(odp_tm_node_t cos_tm_node,
 
 static int config_company_node(const char *company_name)
 {
+	profile_params_set_t *param;
 	odp_tm_node_params_t tm_node_params;
 	profile_set_t       *profile_set;
 	odp_tm_node_t        company_tm_node, cos_tm_node;
 	uint32_t             cos_idx, user_idx;
 	char                 cos_node_name[64];
+	int idx = 0;
 
 	profile_set = &COMPANY_PROFILE_SET;
 	odp_tm_node_params_init(&tm_node_params);
@@ -484,12 +513,29 @@  static int config_company_node(const char *company_name)
 						 &tm_node_params);
 		odp_tm_node_connect(cos_tm_node, company_tm_node);
 
-		for (user_idx = 0; user_idx < USERS_PER_SVC_CLASS; user_idx++)
+		param = cos_profile_params[cos_idx];
+		for (user_idx = 0; user_idx < USERS_PER_SVC_CLASS; user_idx++) {
 			config_example_user(cos_tm_node, cos_idx,
 					    cos_idx * 256 + user_idx);
+			tm_users[idx].svc = cos_idx;
+			tm_users[idx].ipaddr = TM_USER_IP_START + idx;
+
+			tm_users[idx].max_bytes_limit =
+				param->threshold_params.max_bytes;
+			tm_users[idx].max_pkts_limit =
+				param->threshold_params.max_pkts;
+
+			if (param->threshold_params.enable_max_pkts)
+				tm_users[idx].enable_pkts_limit = TRUE;
+
+			if (param->threshold_params.enable_max_bytes)
+				tm_users[idx].enable_bytes_limit = TRUE;
+			idx++;
+		}
 	}
 
 	odp_tm_node_connect(company_tm_node, ODP_TM_ROOT);
+
 	return 0;
 }
 
@@ -500,6 +546,9 @@  static int create_and_config_tm(void)
 	odp_tm_egress_t              egress;
 	uint32_t                     level, err_cnt;
 
+	if (NUM_USERS < NUM_SVC_CLASSES * USERS_PER_SVC_CLASS)
+		return 1;
+
 	odp_tm_requirements_init(&requirements);
 	odp_tm_egress_init(&egress);
 
@@ -532,20 +581,8 @@  static int create_and_config_tm(void)
 		       __func__, err_cnt);
 
 	config_company_node("TestCompany");
-	return err_cnt;
-}
-
-static uint32_t random_8(void)
-{
-	uint32_t rand8;
-
-	if (RANDOM_BUF_LEN <= next_rand_byte) {
-		odp_random_data(random_buf, RANDOM_BUF_LEN, 1);
-		next_rand_byte = 0;
-	}
 
-	rand8 = random_buf[next_rand_byte++];
-	return rand8;
+	return err_cnt;
 }
 
 static uint32_t random_16(void)
@@ -562,232 +599,211 @@  static uint32_t random_16(void)
 	return (((uint16_t)byte1) << 8) | ((uint16_t)byte2);
 }
 
-static uint32_t pkt_service_class(void)
+static inline uint32_t pkt_to_user(odp_packet_t pkt)
 {
-	uint32_t rand8;
-
-       /* Make most of the traffic use service class 3 to increase the amount
-	* of delayed traffic so as to stimulate more interesting behaviors.
-	*/
-	rand8 = random_8();
-	switch (rand8) {
-	case 0   ... 24:  return 0;
-	case 25  ... 49:  return 1;
-	case 50  ... 150: return 2;
-	case 151 ... 255: return 3;
-	default:          return 3;
-	}
+	odph_ipv4hdr_t *ip;
+	uint32_t idx;
+
+	ip = odp_packet_l3_ptr(pkt, NULL);
+	idx = odp_be_to_cpu_32(ip->src_addr) - TM_USER_IP_START;
+
+	return idx & (NUM_USERS - 1);
 }
 
-static odp_packet_t make_odp_packet(uint16_t pkt_len)
+/**
+ * find the packet from which user and eligible for which service.
+ * this should be a lookup table implementation. Here for simplicity,
+ * only check last byte of ip src addr to classify the users.
+ */
+static inline uint32_t pkt_service_class(odp_packet_t pkt)
 {
-	odp_packet_t odp_pkt;
-	uint8_t      rand8a, rand8b, pkt_color, drop_eligible;
-
-	rand8a        = random_8();
-	rand8b        = random_8();
-	pkt_color     = (rand8a < 224) ? 0 : ((rand8a < 248) ? 1 : 2);
-	drop_eligible = (rand8b < 240) ? 1 : 0;
-	odp_pkt       = odp_packet_alloc(odp_pool, pkt_len);
-	if (odp_pkt == ODP_PACKET_INVALID) {
-		printf("%s odp_packet_alloc failure *******\n", __func__);
-		return 0;
-	}
+	uint32_t idx;
+	tm_user_t *u;
+
+	idx = pkt_to_user(pkt);
+	u = &tm_users[idx];
 
-	odp_packet_color_set(odp_pkt, pkt_color);
-	odp_packet_drop_eligible_set(odp_pkt, drop_eligible);
-	odp_packet_shaper_len_adjust_set(odp_pkt, 24);
-	return odp_pkt;
+	return u->svc;
 }
 
-void tester_egress_fcn(odp_packet_t odp_pkt ODP_UNUSED)
+void tester_egress_fcn(odp_packet_t odp_pkt)
 {
-	odp_atomic_inc_u32(&atomic_pkts_from_tm);
+	tm_user_t *u;
+	uint32_t idx = pkt_to_user(odp_pkt);
+
+	odp_atomic_inc_u64(&atomic_pkts_from_tm);
+	u = &tm_users[idx];
+	odp_atomic_inc_u64(&u->s.tm_pkts_out);
+	odp_atomic_add_u64(&u->s.tm_bytes_out, odp_packet_len(odp_pkt));
+
+	/* not forwarding, need to free, otherwise packet pool will be full */
+	odp_packet_free(odp_pkt);
 }
 
-static int traffic_generator(uint32_t pkts_to_send)
+int tm_send_packet(odp_packet_t pkt)
 {
-	odp_pool_param_t pool_params;
 	odp_tm_queue_t   tm_queue;
-	odp_packet_t     pkt;
-	odp_bool_t       tm_is_idle;
-	uint32_t         svc_class, queue_num, pkt_len, pkts_into_tm;
-	uint32_t         pkts_from_tm, pkt_cnt, millisecs, odp_tm_enq_errs;
+	uint32_t         svc_class, queue_num;
 	int              rc;
+	tm_user_t *u;
+	uint32_t idx;
+	uint64_t bytes_in;
 
-	memset(&pool_params, 0, sizeof(odp_pool_param_t));
-	pool_params.type           = ODP_POOL_PACKET;
-	pool_params.pkt.num        = pkts_to_send + 10;
-	pool_params.pkt.len        = 1600;
-	pool_params.pkt.seg_len    = 0;
-	pool_params.pkt.uarea_size = 0;
-
-	odp_pool        = odp_pool_create("MyPktPool", &pool_params);
-	odp_tm_enq_errs = 0;
-
-	pkt_cnt = 0;
-	while (pkt_cnt < pkts_to_send) {
-		svc_class = pkt_service_class();
-		queue_num = random_16() & (TM_QUEUES_PER_CLASS - 1);
-		tm_queue  = queue_num_tbls[svc_class][queue_num + 1];
-		pkt_len   = ((uint32_t)((random_8() & 0x7F) + 2)) * 32;
-		pkt_len   = MIN(pkt_len, 1500);
-		pkt       = make_odp_packet(pkt_len);
-
-		pkt_cnt++;
-		rc = odp_tm_enq(tm_queue, pkt);
-		if (rc < 0) {
-			odp_tm_enq_errs++;
-			continue;
-		}
+	idx = pkt_to_user(pkt);
+	u = &tm_users[idx];
 
-		odp_atomic_inc_u32(&atomic_pkts_into_tm);
-	}
+	if (u->enable_pkts_limit &&
+	    u->max_pkts_limit <= odp_atomic_load_u64(&u->s.tm_pkts_in))
+		return -1;
 
-	printf("%s odp_tm_enq_errs=%u\n", __func__, odp_tm_enq_errs);
-
-       /* Wait until the main traffic mgmt worker thread is idle and has no
-	* outstanding events (i.e. no timers, empty work queue, etc), but
-	* not longer than 60 seconds.
-	*/
-	for (millisecs = 0; millisecs < 600000; millisecs++) {
-		usleep(100);
-		tm_is_idle = odp_tm_is_idle(odp_tm_test);
-		if (tm_is_idle)
-			break;
-	}
+	bytes_in = odp_atomic_load_u64(&u->s.tm_bytes_in);
+	if (u->enable_bytes_limit &&
+	    u->max_bytes_limit <= bytes_in + odp_packet_len(pkt))
+		return -1;
+
+	svc_class = pkt_service_class(pkt);
+	queue_num = random_16() & (TM_QUEUES_PER_CLASS - 1);
+	tm_queue  = queue_num_tbls[svc_class][queue_num + 1];
 
-	if (!tm_is_idle)
-		printf("%s WARNING stopped waiting for the TM system "
-		       "to be IDLE!\n", __func__);
-
-	/* Wait for up to 2 seconds for pkts_from_tm to match pkts_into_tm. */
-	for (millisecs = 0; millisecs < 2000; millisecs++) {
-		usleep(1000);
-		pkts_into_tm = odp_atomic_load_u32(&atomic_pkts_into_tm);
-		pkts_from_tm = odp_atomic_load_u32(&atomic_pkts_from_tm);
-		if (pkts_into_tm <= pkts_from_tm)
-			break;
+	rc = odp_tm_enq(tm_queue, pkt);
+	if (rc > 0) {
+		odp_atomic_inc_u64(&atomic_pkts_into_tm);
+		odp_atomic_inc_u64(&u->s.tm_pkts_in);
+		odp_atomic_add_u64(&u->s.tm_bytes_in, odp_packet_len(pkt));
 	}
 
-	return 0;
+	return rc;
 }
 
-static int process_cmd_line_options(uint32_t argc, char *argv[])
+int tm_config_and_init(void)
 {
-	uint32_t arg_idx;
-	char    *arg;
-
-	arg_idx = 1;
-	while (arg_idx < argc) {
-		arg = argv[arg_idx++];
-		if (!arg) {
-			return -1;
-		} else if (arg[0] == '-') {
-			switch (arg[1]) {
-			case 'n':
-				if (argc <= arg_idx)
-					return -1;
-				g_num_pkts_to_send =
-					atoi(argv[arg_idx++]);
-				break;
-
-			case 'q':
-				g_print_tm_stats = FALSE;
-				break;
-
-			default:
-				printf("Unrecognized cmd line option '%s'\n",
-				       arg);
-				return -1;
-			}
-		} else {
-			/* Currently all cmd line options are '-' flag based. */
-			return -1;
+	int i, rc;
+	tm_user_t *u;
+
+	rc = create_and_config_tm();
+	if (!rc) {
+		odp_random_data(random_buf, RANDOM_BUF_LEN, 1);
+		next_rand_byte = 0;
+
+		odp_atomic_init_u64(&atomic_pkts_into_tm, 0);
+		odp_atomic_init_u64(&atomic_pkts_from_tm, 0);
+
+		for (i = 0; i < NUM_USERS; i++) {
+			u = &tm_users[i];
+			odp_atomic_init_u64(&u->s.tm_pkts_in, 0);
+			odp_atomic_init_u64(&u->s.tm_pkts_out, 0);
+			odp_atomic_init_u64(&u->s.tm_bytes_in, 0);
+			odp_atomic_init_u64(&u->s.tm_bytes_out, 0);
 		}
 	}
 
-	return 0;
+	return rc;
 }
 
-static void signal_handler(int signal)
+static void tm_print_stat_impl(int interval, stat_info_t *prev)
 {
-	size_t num_stack_frames;
-	const char  *signal_name;
-	void  *bt_array[128];
-
-	switch (signal) {
-	case SIGILL:
-		signal_name = "SIGILL";   break;
-	case SIGFPE:
-		signal_name = "SIGFPE";   break;
-	case SIGSEGV:
-		signal_name = "SIGSEGV";  break;
-	case SIGTERM:
-		signal_name = "SIGTERM";  break;
-	case SIGBUS:
-		signal_name = "SIGBUS";   break;
-	default:
-		signal_name = "UNKNOWN";  break;
+	int i;
+
+	printf("\nTM toal pkts_in=%" PRIu64 ", pkts_out=%" PRIu64 "\n",
+	       odp_atomic_load_u64(&atomic_pkts_into_tm),
+	       odp_atomic_load_u64(&atomic_pkts_from_tm));
+
+	printf("----------------------\n");
+	for (i = 0; i < NUM_USERS; i++) {
+		uint64_t bps;
+		tm_user_t *u;
+		stat_info_t si, *prev_si;
+
+		u = &tm_users[i];
+		prev_si = &prev[i];
+		si.pkts_in = odp_atomic_load_u64(&u->s.tm_pkts_in);
+		si.pkts_out = odp_atomic_load_u64(&u->s.tm_pkts_out);
+		si.bytes_in = odp_atomic_load_u64(&u->s.tm_bytes_in);
+		si.bytes_out = odp_atomic_load_u64(&u->s.tm_bytes_out);
+		bps = (si.bytes_out - prev_si->bytes_out) * 8 / interval;
+		*prev_si = si;
+
+		printf("user %d: pkts_in=%" PRIu64 ", pkts_out=%" PRIu64
+		       ", bytes_in=%" PRIu64 ", bytes_out=%" PRIu64
+		       ", bps=%" PRIu64 "\n", i, si.pkts_in, si.pkts_out,
+		       si.bytes_in, si.bytes_out, bps);
 	}
 
-	num_stack_frames = backtrace(bt_array, 100);
-	printf("Received signal=%u (%s) exiting.", signal, signal_name);
-	backtrace_symbols_fd(bt_array, num_stack_frames, fileno(stderr));
-	fflush(NULL);
-	sync();
-	abort();
+	printf("\n");
 }
 
-int main(int argc, char *argv[])
+void tm_print_stat(int duration, int interval)
 {
-	struct sigaction signal_action;
-	struct rlimit    rlimit;
-	uint32_t pkts_into_tm, pkts_from_tm;
-	odp_instance_t instance;
-	int rc;
-
-	memset(&signal_action, 0, sizeof(signal_action));
-	signal_action.sa_handler = signal_handler;
-	sigfillset(&signal_action.sa_mask);
-	sigaction(SIGILL,  &signal_action, NULL);
-	sigaction(SIGFPE,  &signal_action, NULL);
-	sigaction(SIGSEGV, &signal_action, NULL);
-	sigaction(SIGTERM, &signal_action, NULL);
-	sigaction(SIGBUS,  &signal_action, NULL);
-
-	getrlimit(RLIMIT_CORE, &rlimit);
-	rlimit.rlim_cur = rlimit.rlim_max;
-	setrlimit(RLIMIT_CORE, &rlimit);
-
-	rc = odp_init_global(&instance, &ODP_INIT_PARAMS, NULL);
-	if (rc != 0) {
-		printf("Error: odp_init_global() failed, rc = %d\n", rc);
-		abort();
+	int i;
+	int elapsed = 0;
+	int loop_forever = (duration == 0);
+	stat_info_t prev[NUM_USERS];
+
+	for (i = 0; i < NUM_USERS; i++) {
+		tm_user_t *u;
+		stat_info_t *si;
+
+		u = &tm_users[i];
+		si = &prev[i];
+		si->pkts_in = odp_atomic_load_u64(&u->s.tm_pkts_in);
+		si->pkts_out = odp_atomic_load_u64(&u->s.tm_pkts_out);
+		si->bytes_in = odp_atomic_load_u64(&u->s.tm_bytes_in);
+		si->bytes_out = odp_atomic_load_u64(&u->s.tm_bytes_out);
 	}
-	rc = odp_init_local(instance, ODP_THREAD_CONTROL);
-	if (rc != 0) {
-		printf("Error: odp_init_local() failed, rc = %d\n", rc);
-		abort();
-	}
-
-	if (process_cmd_line_options(argc, argv) < 0)
-		return -1;
-
-	create_and_config_tm();
-
-	odp_random_data(random_buf, RANDOM_BUF_LEN, 1);
-	next_rand_byte = 0;
-
-	odp_atomic_init_u32(&atomic_pkts_into_tm, 0);
-	odp_atomic_init_u32(&atomic_pkts_from_tm, 0);
-
-	traffic_generator(g_num_pkts_to_send);
 
-	pkts_into_tm = odp_atomic_load_u32(&atomic_pkts_into_tm);
-	pkts_from_tm = odp_atomic_load_u32(&atomic_pkts_from_tm);
-	printf("pkts_into_tm=%u pkts_from_tm=%u\n", pkts_into_tm, pkts_from_tm);
+	do {
+		sleep(interval);
+		tm_print_stat_impl(interval, &prev[0]);
+		elapsed += interval;
+	} while (loop_forever || (elapsed < duration));
+}
 
-	odp_tm_stats_print(odp_tm_test);
-	return 0;
+void tm_print_user_cos(void)
+{
+	int i;
+	char buf[80];
+
+	printf("\nClass Of Service\n"
+	       "----------------------\n");
+
+	for (i = 0; i < NUM_SVC_CLASSES; i++) {
+		profile_params_set_t *p;
+		char *b = buf;
+		int n;
+
+		p = cos_profile_params[i];
+		snprintf(buf, 32, "COS%d:", i);
+		printf("%-12s%-16scommit bps=%" PRIu64 ", burst=%d\n",
+		       buf, "shaper: ", p->shaper_params.commit_bps,
+		       p->shaper_params.commit_burst);
+		printf("%-28speak bps=%" PRIu64 ", burst=%d\n", "",
+		       p->shaper_params.peak_bps,
+		       p->shaper_params.peak_burst);
+
+		n = snprintf(buf, 80, "%-12s%-16s", "", "threshold: ");
+		b = buf + n;
+		if (p->threshold_params.enable_max_pkts) {
+			n = snprintf(b, 80, "max pkts=%" PRIu64,
+				     p->threshold_params.max_pkts);
+			b += n;
+		}
+		if (p->threshold_params.enable_max_bytes) {
+			n = snprintf(b, 80, ", bytes=%" PRIu64,
+				     p->threshold_params.max_bytes);
+			b += n;
+		}
+		printf("%s\n", buf);
+	}
+	printf("\nTM Users\n"
+	       "--%6s----%6s--------%3s--\n", "userid", "ipaddr", "cos");
+	for (i = 0; i < NUM_USERS; i++) {
+		uint8_t *p;
+		tm_user_t *u;
+
+		u = &tm_users[i];
+		p = (uint8_t *)&u->ipaddr;
+		snprintf(buf, 16, "%d.%d.%d.%d", p[3], p[2], p[1], p[0]);
+		printf("%6d  %10s  %8d\n", i, buf, u->svc);
+	}
+	printf("\n");
 }
diff --git a/example/traffic_mgmt/odp_traffic_mgmt.h b/example/traffic_mgmt/odp_traffic_mgmt.h
new file mode 100644
index 0000000..9081ca1
--- /dev/null
+++ b/example/traffic_mgmt/odp_traffic_mgmt.h
@@ -0,0 +1,47 @@ 
+/* Copyright (c) 2016, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#ifndef _ODP_TRAFFIC_MODULE_H_
+#define _ODP_TRAFFIC_MODULE_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * Config and initialize the tm system.
+ *
+ * @return 0 if success else -1
+ */
+int tm_config_and_init(void);
+
+/**
+ * Print tm user stastics for each interval seconds
+ *
+ * @param duration how many seconds this function will run
+ * @param interval how many seconds for each print
+ */
+void tm_print_stat(int duration, int interval);
+
+/**
+ * Print tm service information for a user
+ */
+void tm_print_user_cos(void);
+
+/**
+ * Send packets to traffic management system
+ *
+ * @param pkt the packet will be sent
+ *
+ * @return 1 if success else <= 0
+ */
+int tm_send_packet(odp_packet_t pkt);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/example/traffic_mgmt/odp_traffic_pktio.c b/example/traffic_mgmt/odp_traffic_pktio.c
new file mode 100644
index 0000000..a1ed45a
--- /dev/null
+++ b/example/traffic_mgmt/odp_traffic_pktio.c
@@ -0,0 +1,794 @@ 
+/* Copyright (c) 2016, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+/**
+ * @file
+ *
+ * @example odp_traffic_mgmt.c
+ */
+
+/** enable strtok */
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <signal.h>
+#include <getopt.h>
+#include <unistd.h>
+#include <errno.h>
+#include <inttypes.h>
+#include <sys/resource.h>
+#include <execinfo.h>
+#include <test_debug.h>
+
+#include <odp_api.h>
+#include <odp/helper/linux.h>
+#include <odp/helper/eth.h>
+#include <odp/helper/ip.h>
+
+#include "odp_traffic_mgmt.h"
+
+/** @def MAX_WORKERS
+ * @brief Maximum number of worker threads
+ */
+#define MAX_WORKERS            32
+
+/** @def SHM_PKT_POOL_SIZE
+ * @brief Size of the shared memory block
+ */
+#define SHM_PKT_POOL_SIZE      8192
+
+/** @def SHM_PKT_POOL_BUF_SIZE
+ * @brief Buffer size of the packet pool buffer
+ */
+#define SHM_PKT_POOL_BUF_SIZE  1856
+
+/** @def MAX_PKT_BURST
+ * @brief Maximum number of packet in a burst
+ */
+#define MAX_PKT_BURST          32
+
+/** Maximum number of pktio queues per interface */
+#define MAX_QUEUES             32
+
+/** Maximum number of pktio interfaces */
+#define MAX_PKTIOS             8
+
+/** Default seconds to run */
+#define DEFAULT_RUN_SECONDS    60
+
+/** Get rid of path in filename - only for unix-type paths using '/' */
+#define NO_PATH(file_name) (strrchr((file_name), '/') ? \
+			    strrchr((file_name), '/') + 1 : (file_name))
+/**
+ * Parsed command line application arguments
+ */
+typedef struct {
+	int cpu_count;
+	int duration;		/**< Number of seconds to run */
+	int if_count;		/**< Number of interfaces to be used */
+	int num_workers;	/**< Number of worker threads */
+	char **if_names;	/**< Array of pointers to interface names */
+	char *if_str;		/**< Storage for interface names */
+	int error_check;        /**< Check packet errors */
+} appl_args_t;
+
+static int exit_threads;	/**< Break workers loop if set to 1 */
+
+static const odp_init_t ODP_INIT_PARAMS = {
+	.log_fn   = odp_override_log,
+	.abort_fn = odp_override_abort
+};
+
+/**
+ * Statistics
+ */
+typedef union {
+	struct {
+		/** Number of packets received */
+		odp_atomic_u64_t packets;
+		odp_atomic_u64_t bytes;
+		/** Packets dropped due to receive error */
+		odp_atomic_u64_t rx_drops;
+		odp_atomic_u64_t rx_drop_bytes;
+		/** Packets dropped due to enqueue traffic management error */
+		odp_atomic_u64_t tm_drops;
+		odp_atomic_u64_t tm_drop_bytes;
+	} s;
+
+	uint8_t padding[ODP_CACHE_LINE_SIZE];
+} stats_t ODP_ALIGNED_CACHE;
+
+/**
+ * Thread specific arguments
+ */
+typedef struct thread_args_t {
+	uint64_t pkts;
+
+	int thr_idx;
+	int num_pktio;
+
+	struct {
+		odp_pktio_t rx_pktio;
+		odp_pktin_queue_t pktin;
+		odp_queue_t rx_queue;
+		int rx_idx;
+		int rx_queue_idx;
+	} pktio[MAX_PKTIOS];
+} thread_args_t;
+
+/**
+ * Grouping of all global data
+ */
+typedef struct {
+	/** Application (parsed) arguments */
+	appl_args_t appl;
+	/** Thread specific arguments */
+	thread_args_t thread[MAX_WORKERS];
+
+	/** Table of pktio handles */
+	struct {
+		odp_pktio_t pktio;
+		odp_pktin_queue_t pktin[MAX_QUEUES];
+		odp_queue_t rx_q[MAX_QUEUES];
+		int num_rx_thr;
+		int num_rx_queue;
+		int next_rx_queue;
+	} pktios[MAX_PKTIOS];
+} args_t;
+
+/** Global pointer to args */
+static args_t *gbl_args;
+/** Global barrier to synchronize main and workers */
+static odp_barrier_t barrier;
+
+/**
+ * Drop packets which input parsing marked as containing errors.
+ *
+ * Frees packets with error and modifies pkt_tbl[] to only contain packets with
+ * no detected errors.
+ *
+ * @param pkt_tbl  Array of packets
+ * @param num      Number of packets in pkt_tbl[]
+ *
+ * @return Number of packets dropped
+ */
+static inline int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned num)
+{
+	odp_packet_t pkt;
+	unsigned dropped = 0;
+	unsigned i, j;
+
+	for (i = 0, j = 0; i < num; ++i) {
+		pkt = pkt_tbl[i];
+
+		if (odp_unlikely(odp_packet_has_error(pkt))) {
+			odp_packet_free(pkt); /* Drop */
+			dropped++;
+		} else if (odp_unlikely(i != j++)) {
+			pkt_tbl[j - 1] = pkt;
+		}
+	}
+
+	return dropped;
+}
+
+/**
+ * Packet IO worker thread accessing IO resources directly
+ *
+ * @param arg  thread arguments of type 'thread_args_t *'
+ */
+static int run_worker_direct_mode(void *arg)
+{
+	int thr;
+	int pkts, i;
+	odp_packet_t pkt_tbl[MAX_PKT_BURST];
+	int num_pktio;
+	odp_pktin_queue_t pktin;
+	int pktio = 0;
+	thread_args_t *thr_args = arg;
+
+	thr = odp_thread_id();
+
+	num_pktio = thr_args->num_pktio;
+	pktin     = thr_args->pktio[pktio].pktin;
+
+	printf("[%02i] num pktios %i, PKTIN_DIRECT\n", thr, num_pktio);
+
+	/* Loop packets */
+	while (!exit_threads) {
+		if (num_pktio > 1) {
+			pktin     = thr_args->pktio[pktio].pktin;
+			pktio++;
+			if (pktio == num_pktio)
+				pktio = 0;
+		}
+
+		pkts = odp_pktin_recv(pktin, pkt_tbl, MAX_PKT_BURST);
+		if (odp_unlikely(pkts <= 0))
+			continue;
+
+		if (gbl_args->appl.error_check) {
+			int rx_drops;
+
+			/* Drop packets with errors */
+			rx_drops = drop_err_pkts(pkt_tbl, pkts);
+
+			if (odp_unlikely(rx_drops)) {
+				if (pkts == rx_drops)
+					continue;
+
+				pkts -= rx_drops;
+			}
+		}
+
+		for (i = 0; i < pkts; i++) {
+			/* try to send packets to Traffic Management System */
+			if (tm_send_packet(pkt_tbl[i]) <= 0)
+				odp_packet_free(pkt_tbl[i]);
+		}
+	}
+
+	/* Make sure that latest stat writes are visible to other threads */
+	odp_mb_full();
+
+	return 0;
+}
+
+/**
+ * Create a pktio handle, optionally associating a default input queue.
+ *
+ * @param dev   Name of device to open
+ * @param index Pktio index
+ * @param pool  Pool to associate with device for packet RX/TX
+ *
+ * @retval 0 on success
+ * @retval -1 on failure
+ */
+static int create_pktio(const char *dev, int idx, int num_rx, odp_pool_t pool)
+{
+	odp_pktio_t pktio;
+	odp_pktio_param_t pktio_param;
+	odp_pktio_capability_t capa;
+	odp_pktin_queue_param_t pktin_param;
+	odp_pktio_op_mode_t mode_rx;
+
+	odp_pktio_param_init(&pktio_param);
+
+	pktio = odp_pktio_open(dev, pool, &pktio_param);
+	if (pktio == ODP_PKTIO_INVALID) {
+		LOG_ERR("Error: failed to open %s\n", dev);
+		return -1;
+	}
+
+	printf("created pktio %" PRIu64 " (%s)\n",
+	       odp_pktio_to_u64(pktio), dev);
+
+	if (odp_pktio_capability(pktio, &capa)) {
+		LOG_ERR("Error: capability query failed %s\n", dev);
+		return -1;
+	}
+
+	odp_pktin_queue_param_init(&pktin_param);
+
+	mode_rx = ODP_PKTIO_OP_MT_UNSAFE;
+
+	if (num_rx > (int)capa.max_input_queues) {
+		printf("Sharing %i input queues between %i workers\n",
+		       capa.max_input_queues, num_rx);
+		num_rx  = capa.max_input_queues;
+		mode_rx = ODP_PKTIO_OP_MT;
+	}
+
+	pktin_param.hash_enable = 1;
+	pktin_param.hash_proto.proto.ipv4_udp = 1;
+	pktin_param.num_queues  = num_rx;
+	pktin_param.op_mode     = mode_rx;
+
+	if (odp_pktin_queue_config(pktio, &pktin_param)) {
+		LOG_ERR("Error: input queue config failed %s\n", dev);
+		return -1;
+	}
+
+	if (odp_pktin_queue(pktio, gbl_args->pktios[idx].pktin,
+			    num_rx) != num_rx) {
+		LOG_ERR("Error: pktin queue query failed %s\n", dev);
+		return -1;
+	}
+
+	printf("created %i input on (%s)\n", num_rx, dev);
+
+	gbl_args->pktios[idx].num_rx_queue = num_rx;
+	gbl_args->pktios[idx].pktio        = pktio;
+
+	return 0;
+}
+
+/*
+ * Bind worker threads to interfaces and calculate number of queues needed
+ *
+ * less workers (N) than interfaces (M)
+ *  - assign each worker to process every Nth interface
+ *  - workers process inequal number of interfaces, when M is not divisible by N
+ *  - needs only single queue per interface
+ * otherwise
+ *  - assign an interface to every Mth worker
+ *  - interfaces are processed by inequal number of workers, when N is not
+ *    divisible by M
+ *  - tries to configure a queue per worker per interface
+ *  - shares queues, if interface capability does not allows a queue per worker
+ */
+static void bind_workers(void)
+{
+	int if_count, num_workers;
+	int rx_idx, thr, pktio;
+	thread_args_t *thr_args;
+
+	if_count    = gbl_args->appl.if_count;
+	num_workers = gbl_args->appl.num_workers;
+
+	if (if_count > num_workers) {
+		thr = 0;
+
+		for (rx_idx = 0; rx_idx < if_count; rx_idx++) {
+			thr_args = &gbl_args->thread[thr];
+			pktio    = thr_args->num_pktio;
+			thr_args->pktio[pktio].rx_idx = rx_idx;
+			thr_args->num_pktio++;
+
+			gbl_args->pktios[rx_idx].num_rx_thr++;
+
+			thr++;
+			if (thr >= num_workers)
+				thr = 0;
+		}
+	} else {
+		rx_idx = 0;
+
+		for (thr = 0; thr < num_workers; thr++) {
+			thr_args = &gbl_args->thread[thr];
+			pktio    = thr_args->num_pktio;
+			thr_args->pktio[pktio].rx_idx = rx_idx;
+			thr_args->num_pktio++;
+
+			gbl_args->pktios[rx_idx].num_rx_thr++;
+
+			rx_idx++;
+			if (rx_idx >= if_count)
+				rx_idx = 0;
+		}
+	}
+}
+
+/*
+ * Bind queues to threads and fill in missing thread arguments (handles)
+ */
+static void bind_queues(void)
+{
+	int num_workers;
+	int thr, pktio;
+
+	num_workers = gbl_args->appl.num_workers;
+
+	for (thr = 0; thr < num_workers; thr++) {
+		int rx_idx;
+		thread_args_t *thr_args = &gbl_args->thread[thr];
+		int num = thr_args->num_pktio;
+
+		for (pktio = 0; pktio < num; pktio++) {
+			int rx_queue;
+
+			rx_idx   = thr_args->pktio[pktio].rx_idx;
+			rx_queue = gbl_args->pktios[rx_idx].next_rx_queue;
+
+			thr_args->pktio[pktio].rx_queue_idx = rx_queue;
+			thr_args->pktio[pktio].pktin =
+				gbl_args->pktios[rx_idx].pktin[rx_queue];
+			thr_args->pktio[pktio].rx_queue =
+				gbl_args->pktios[rx_idx].rx_q[rx_queue];
+			thr_args->pktio[pktio].rx_pktio =
+				gbl_args->pktios[rx_idx].pktio;
+
+			rx_queue++;
+
+			if (rx_queue >= gbl_args->pktios[rx_idx].num_rx_queue)
+				rx_queue = 0;
+
+			gbl_args->pktios[rx_idx].next_rx_queue = rx_queue;
+		}
+	}
+}
+
+/**
+ * Prinf usage information
+ */
+static void usage(char *progname)
+{
+	printf("\n"
+	       "OpenDataPlane traffic management application.\n"
+	       "\n"
+	       "Usage: %s OPTIONS\n"
+	       "  E.g. %s -i eth0,eth1 -c 2\n"
+	       " In the above example,\n"
+	       " two threads will be used for receiving pkts from eth0 and eth1\n"
+	       "\n"
+	       "Mandatory OPTIONS:\n"
+	       "  -i, --interface Eth interfaces (comma-separated, no spaces)\n"
+	       "                  Interface count min 1, max %i\n"
+	       "\n"
+	       "Optional OPTIONS:\n"
+	       "  -c, --count <number> CPU count.\n"
+	       "  -t, --time <number> seconds to run.\n"
+	       "  -e, --error_check 0: Don't check packet errors (default)\n"
+	       "                    1: Check packet errors\n"
+	       "  -h, --help           Display help and exit.\n\n"
+	       "\n", NO_PATH(progname), NO_PATH(progname), MAX_PKTIOS
+	    );
+}
+
+/**
+ * Parse and store the command line arguments
+ *
+ * @param argc       argument count
+ * @param argv[]     argument vector
+ * @param appl_args  Store application arguments here
+ */
+static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
+{
+	int opt;
+	int long_index;
+	char *token;
+	size_t len;
+	int i;
+	static const struct option longopts[] = {
+		{"count", required_argument, NULL, 'c'},
+		{"time", required_argument, NULL, 't'},
+		{"interface", required_argument, NULL, 'i'},
+		{"error_check", required_argument, NULL, 'e'},
+		{"help", no_argument, NULL, 'h'},
+		{NULL, 0, NULL, 0}
+	};
+
+	static const char *shortopts =  "+c:t:i:e:h";
+
+	/* let helper collect its own arguments (e.g. --odph_proc) */
+	odph_parse_options(argc, argv, shortopts, longopts);
+
+	appl_args->error_check = 0; /* don't check packet errors by default */
+
+	opterr = 0; /* do not issue errors on helper options */
+
+	while (1) {
+		opt = getopt_long(argc, argv, shortopts, longopts, &long_index);
+
+		if (opt == -1)
+			break;	/* No more options */
+
+		switch (opt) {
+		case 'c':
+			appl_args->cpu_count = atoi(optarg);
+			break;
+		case 't':
+			appl_args->duration = atoi(optarg);
+			break;
+		case 'i':
+			len = strlen(optarg);
+			if (len == 0) {
+				usage(argv[0]);
+				exit(EXIT_FAILURE);
+			}
+			len += 1;	/* add room for '\0' */
+
+			appl_args->if_str = malloc(len);
+			if (appl_args->if_str == NULL) {
+				usage(argv[0]);
+				exit(EXIT_FAILURE);
+			}
+
+			/* count the number of tokens separated by ',' */
+			strcpy(appl_args->if_str, optarg);
+			for (token = strtok(appl_args->if_str, ","), i = 0;
+			     token != NULL;
+			     token = strtok(NULL, ","), i++)
+				;
+
+			appl_args->if_count = i;
+
+			if (appl_args->if_count < 1 ||
+			    appl_args->if_count > MAX_PKTIOS) {
+				usage(argv[0]);
+				exit(EXIT_FAILURE);
+			}
+
+			/* allocate storage for the if names */
+			appl_args->if_names =
+			    calloc(appl_args->if_count, sizeof(char *));
+
+			/* store the if names (reset names string) */
+			strcpy(appl_args->if_str, optarg);
+			for (token = strtok(appl_args->if_str, ","), i = 0;
+			     token != NULL; token = strtok(NULL, ","), i++) {
+				appl_args->if_names[i] = token;
+			}
+			break;
+		case 'e':
+			appl_args->error_check = atoi(optarg);
+			break;
+		case 'h':
+			usage(argv[0]);
+			exit(EXIT_SUCCESS);
+			break;
+		default:
+			break;
+		}
+	}
+
+	if (appl_args->if_count == 0) {
+		usage(argv[0]);
+		exit(EXIT_FAILURE);
+	}
+
+	optind = 1;		/* reset 'extern optind' from the getopt lib */
+}
+
+/**
+ * Print system and application info
+ */
+static void print_info(char *progname, appl_args_t *appl_args)
+{
+	int i;
+
+	printf("\n"
+	       "ODP system info\n"
+	       "---------------\n"
+	       "ODP API version: %s\n"
+	       "ODP impl name:   %s\n"
+	       "CPU model:       %s\n"
+	       "CPU freq (hz):   %" PRIu64 "\n"
+	       "Cache line size: %i\n"
+	       "CPU count:       %i\n"
+	       "\n",
+	       odp_version_api_str(), odp_version_impl_name(),
+	       odp_cpu_model_str(), odp_cpu_hz_max(),
+	       odp_sys_cache_line_size(), odp_cpu_count());
+
+	printf("Running ODP appl: \"%s\"\n"
+	       "-----------------\n"
+	       "IF-count:        %i\n"
+	       "Using IFs:      ",
+	       progname, appl_args->if_count);
+	for (i = 0; i < appl_args->if_count; ++i)
+		printf(" %s", appl_args->if_names[i]);
+	printf("\n"
+	       "Mode: PKTIN_DIRECT, ");
+
+	printf("\n\n");
+	fflush(NULL);
+}
+
+static void gbl_args_init(args_t *args)
+{
+	int pktio, queue;
+
+	memset(args, 0, sizeof(args_t));
+
+	for (pktio = 0; pktio < MAX_PKTIOS; pktio++) {
+		args->pktios[pktio].pktio = ODP_PKTIO_INVALID;
+
+		for (queue = 0; queue < MAX_QUEUES; queue++)
+			args->pktios[pktio].rx_q[queue] = ODP_QUEUE_INVALID;
+	}
+}
+
+static void signal_handler(int signal)
+{
+	size_t num_stack_frames;
+	const char  *signal_name;
+	void  *bt_array[128];
+
+	switch (signal) {
+	case SIGILL:
+		signal_name = "SIGILL";   break;
+	case SIGFPE:
+		signal_name = "SIGFPE";   break;
+	case SIGSEGV:
+		signal_name = "SIGSEGV";  break;
+	case SIGTERM:
+		signal_name = "SIGTERM";  break;
+	case SIGBUS:
+		signal_name = "SIGBUS";   break;
+	default:
+		signal_name = "UNKNOWN";  break;
+	}
+
+	num_stack_frames = backtrace(bt_array, 100);
+	printf("Received signal=%u (%s) exiting.", signal, signal_name);
+	backtrace_symbols_fd(bt_array, num_stack_frames, fileno(stderr));
+	fflush(NULL);
+	sync();
+	abort();
+}
+
+int main(int argc, char *argv[])
+{
+	odph_odpthread_t thread_tbl[MAX_WORKERS];
+	odp_pool_t pool;
+	int i;
+	int cpu;
+	int num_workers;
+	odp_shm_t shm;
+	odp_cpumask_t cpumask;
+	char cpumaskstr[ODP_CPUMASK_STR_SIZE];
+	odp_pool_param_t params;
+	int ret;
+	int if_count;
+	int duration;
+	int (*thr_run_func)(void *);
+	odp_instance_t instance;
+	struct sigaction signal_action;
+
+	memset(&signal_action, 0, sizeof(signal_action));
+	signal_action.sa_handler = signal_handler;
+	sigfillset(&signal_action.sa_mask);
+	sigaction(SIGILL,  &signal_action, NULL);
+	sigaction(SIGFPE,  &signal_action, NULL);
+	sigaction(SIGSEGV, &signal_action, NULL);
+	sigaction(SIGTERM, &signal_action, NULL);
+	sigaction(SIGBUS,  &signal_action, NULL);
+
+	/* Init ODP before calling anything else */
+	if (odp_init_global(&instance, NULL, NULL)) {
+		LOG_ERR("Error: ODP global init failed.\n");
+		exit(EXIT_FAILURE);
+	}
+
+	/* Init this thread */
+	if (odp_init_local(instance, ODP_THREAD_CONTROL)) {
+		LOG_ERR("Error: ODP local init failed.\n");
+		exit(EXIT_FAILURE);
+	}
+
+	/* Reserve memory for args from shared mem */
+	shm = odp_shm_reserve("shm_args", sizeof(args_t),
+			      ODP_CACHE_LINE_SIZE, 0);
+	gbl_args = odp_shm_addr(shm);
+
+	if (gbl_args == NULL) {
+		LOG_ERR("Error: shared mem alloc failed.\n");
+		exit(EXIT_FAILURE);
+	}
+	gbl_args_init(gbl_args);
+
+	/* Parse and store the application arguments */
+	parse_args(argc, argv, &gbl_args->appl);
+
+	/* Print both system and application information */
+	print_info(NO_PATH(argv[0]), &gbl_args->appl);
+
+	/* Default to system CPU count unless user specified */
+	num_workers = MAX_WORKERS;
+	if (gbl_args->appl.cpu_count)
+		num_workers = gbl_args->appl.cpu_count;
+
+	/* Get default worker cpumask */
+	num_workers = odp_cpumask_default_worker(&cpumask, num_workers);
+	(void)odp_cpumask_to_str(&cpumask, cpumaskstr, sizeof(cpumaskstr));
+
+	gbl_args->appl.num_workers = num_workers;
+
+	for (i = 0; i < num_workers; i++)
+		gbl_args->thread[i].thr_idx    = i;
+
+	if_count = gbl_args->appl.if_count;
+
+	printf("num worker threads: %i\n", num_workers);
+	printf("first CPU:          %i\n", odp_cpumask_first(&cpumask));
+	printf("cpu mask:           %s\n", cpumaskstr);
+
+	/* Create packet pool */
+	odp_pool_param_init(&params);
+	params.pkt.seg_len = SHM_PKT_POOL_BUF_SIZE;
+	params.pkt.len     = SHM_PKT_POOL_BUF_SIZE;
+	params.pkt.num     = SHM_PKT_POOL_SIZE;
+	params.type        = ODP_POOL_PACKET;
+
+	pool = odp_pool_create("packet pool", &params);
+
+	if (pool == ODP_POOL_INVALID) {
+		LOG_ERR("Error: packet pool create failed.\n");
+		exit(EXIT_FAILURE);
+	}
+	odp_pool_print(pool);
+
+	bind_workers();
+
+	for (i = 0; i < if_count; ++i) {
+		const char *dev = gbl_args->appl.if_names[i];
+		int num_rx;
+
+		/* A queue per assigned worker */
+		num_rx = gbl_args->pktios[i].num_rx_thr;
+
+		if (create_pktio(dev, i, num_rx, pool))
+			exit(EXIT_FAILURE);
+	}
+
+	gbl_args->pktios[i].pktio = ODP_PKTIO_INVALID;
+
+	bind_queues();
+
+	if (tm_config_and_init()) {
+		LOG_ERR("Error: tm system initialization failed.\n");
+		exit(EXIT_FAILURE);
+	}
+	tm_print_user_cos();
+
+	memset(thread_tbl, 0, sizeof(thread_tbl));
+
+	odp_barrier_init(&barrier, num_workers + 1);
+
+	thr_run_func = run_worker_direct_mode;
+
+	/* Create worker threads */
+	cpu = odp_cpumask_first(&cpumask);
+	for (i = 0; i < num_workers; ++i) {
+		odp_cpumask_t thd_mask;
+		odph_odpthread_params_t thr_params;
+
+		memset(&thr_params, 0, sizeof(thr_params));
+		thr_params.start    = thr_run_func;
+		thr_params.arg      = &gbl_args->thread[i];
+		thr_params.thr_type = ODP_THREAD_WORKER;
+		thr_params.instance = instance;
+
+		odp_cpumask_zero(&thd_mask);
+		odp_cpumask_set(&thd_mask, cpu);
+		odph_odpthreads_create(&thread_tbl[i], &thd_mask,
+				       &thr_params);
+		cpu = odp_cpumask_next(&cpumask, cpu);
+	}
+
+	/* Start packet receive and transmit */
+	for (i = 0; i < if_count; ++i) {
+		odp_pktio_t pktio;
+		uint8_t mac[ODPH_ETHADDR_LEN];
+		char buf[32];
+		const char *dev;
+
+		pktio = gbl_args->pktios[i].pktio;
+		ret   = odp_pktio_start(pktio);
+		if (ret) {
+			LOG_ERR("Error: unable to start %s\n",
+				gbl_args->appl.if_names[i]);
+			exit(EXIT_FAILURE);
+		} else {
+			dev = gbl_args->appl.if_names[i];
+			odp_pktio_mac_addr(pktio, mac, ODPH_ETHADDR_LEN);
+			sprintf(buf, "%02x:%02x:%02x:%02x:%02x:%02x",
+				mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
+			printf("start pktio: %s, mac %s\n", dev, buf);
+		}
+	}
+
+	/* Print packets count every 10 seconds */
+	duration = gbl_args->appl.duration;
+	if (duration < 10)
+			duration = DEFAULT_RUN_SECONDS;
+	tm_print_stat(duration, 10);
+	exit_threads = 1;
+
+	/* Master thread waits for other threads to exit */
+	for (i = 0; i < num_workers; ++i)
+		odph_odpthreads_join(&thread_tbl[i]);
+
+	free(gbl_args->appl.if_names);
+	free(gbl_args->appl.if_str);
+
+	printf("Exit %d\n\n", ret);
+	return ret;
+}