diff mbox series

[API-NEXT,v1,14/14] example: generator: replace atomic counter with per worker counters

Message ID 1513008012-9231-15-git-send-email-odpbot@yandex.ru
State New
Headers show
Series None | expand

Commit Message

Github ODP bot Dec. 11, 2017, 4 p.m. UTC
From: Bogdan Pricope <bogdan.pricope@linaro.org>


Use of atomic counters reduces total packet throughput.

Signed-off-by: Bogdan Pricope <bogdan.pricope@linaro.org>

Reviewed-by: Ilias Apalodimas <ilias.apalodimas@linaro.org>

Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>

---
/** Email created from pull request 335 (muvarov:devel/api-next_master_merge)
 ** https://github.com/Linaro/odp/pull/335
 ** Patch: https://github.com/Linaro/odp/pull/335.patch
 ** Base sha: 630d8422564ebf4991b468cb38a29e9483fc2ec2
 ** Merge commit sha: 2de2af9eca01b713e51c66fee7a5c7e535502f85
 **/
 example/generator/odp_generator.c | 266 +++++++++++++++++++++++---------------
 1 file changed, 165 insertions(+), 101 deletions(-)
diff mbox series

Patch

diff --git a/example/generator/odp_generator.c b/example/generator/odp_generator.c
index 0a81cdeb3..861e98369 100644
--- a/example/generator/odp_generator.c
+++ b/example/generator/odp_generator.c
@@ -34,6 +34,9 @@ 
 #define APPL_MODE_PING   1			/**< ping mode */
 #define APPL_MODE_RCV    2			/**< receive mode */
 
+#define PING_THR_TX 0
+#define PING_THR_RX 1
+
 /** print appl mode */
 #define PRINT_APPL_MODE(x) printf("%s(%i)\n", #x, (x))
 
@@ -82,18 +85,21 @@  typedef struct {
 /**
  * counters
 */
-static struct {
-	odp_atomic_u64_t seq;	/**< ip seq to be send */
-	odp_atomic_u64_t ip;	/**< ip packets */
-	odp_atomic_u64_t udp;	/**< udp packets */
-	odp_atomic_u64_t icmp;	/**< icmp packets */
-	odp_atomic_u64_t cnt;	/**< sent packets*/
-	odp_atomic_u64_t tx_drops; /**< packets dropped in transmit */
-} counters;
+typedef struct {
+	uint64_t ctr_pkt_snd;	/**< sent packets*/
+	uint64_t ctr_pkt_snd_drop; /**< packets dropped in transmit */
+
+	uint64_t ctr_pkt_rcv;	/**< recv packets */
+	uint64_t ctr_seq;	/**< ip seq to be send */
+	uint64_t ctr_udp_rcv;	/**< udp packets */
+	uint64_t ctr_icmp_reply_rcv;	/**< icmp reply packets */
+} counters_t;
 
 /** * Thread specific arguments
  */
 typedef struct {
+	counters_t counters;	/**< Packet conters */
+	odp_bool_t stop; /**< Stop packet processing */
 	union {
 		struct {
 			odp_pktout_queue_t pktout; /**< Packet output queue */
@@ -120,6 +126,9 @@  typedef struct {
 	appl_args_t appl;
 	/** Thread specific arguments */
 	thread_args_t thread[MAX_WORKERS];
+	/** Global arguments */
+	int thread_cnt;
+	int tx_burst_size;
 } args_t;
 
 /** Global pointer to args */
@@ -131,7 +140,8 @@  static odp_barrier_t barrier;
 /** Packet processing function types */
 typedef odp_packet_t (*setup_pkt_ref_fn_t)(odp_pool_t,
 					   odp_pktout_config_opt_t *);
-typedef int (*setup_pkt_fn_t)(odp_packet_t, odp_pktout_config_opt_t *);
+typedef int (*setup_pkt_fn_t)(odp_packet_t, odp_pktout_config_opt_t *,
+			      counters_t *);
 
 /* helper funcs */
 static void parse_args(int argc, char *argv[], appl_args_t *appl_args);
@@ -245,6 +255,7 @@  static int setup_pkt_ref_array(odp_pool_t pool,
  * @return 0 success, -1 failed
 */
 static int setup_pkt_array(odp_pktout_config_opt_t *pktout_cfg,
+			   counters_t *counters,
 			   odp_packet_t *pkt_ref_array,
 			   odp_packet_t  *pkt_array,
 			   int pkt_array_size,
@@ -253,7 +264,7 @@  static int setup_pkt_array(odp_pktout_config_opt_t *pktout_cfg,
 	int i;
 
 	for (i = 0; i < pkt_array_size; i++) {
-		if ((*setup_pkt)(pkt_ref_array[i], pktout_cfg))
+		if ((*setup_pkt)(pkt_ref_array[i], pktout_cfg, counters))
 			break;
 
 		pkt_array[i] = odp_packet_ref_static(pkt_ref_array[i]);
@@ -342,7 +353,8 @@  static odp_packet_t setup_udp_pkt_ref(odp_pool_t pool,
  * @return Success/Failed
  * @retval 0 on success, -1 on fail
  */
-static int setup_udp_pkt(odp_packet_t pkt, odp_pktout_config_opt_t *pktout_cfg)
+static int setup_udp_pkt(odp_packet_t pkt, odp_pktout_config_opt_t *pktout_cfg,
+			 counters_t *counters)
 {
 	char *buf;
 	odph_ipv4hdr_t *ip;
@@ -352,7 +364,8 @@  static int setup_udp_pkt(odp_packet_t pkt, odp_pktout_config_opt_t *pktout_cfg)
 
 	/*Update IP ID and checksum*/
 	ip = (odph_ipv4hdr_t *)(buf + ODPH_ETHHDR_LEN);
-	seq = odp_atomic_fetch_add_u64(&counters.seq, 1) % 0xFFFF;
+	seq = counters->ctr_seq % 0xFFFF;
+	counters->ctr_seq++;
 	ip->id = odp_cpu_to_be_16(seq);
 	if (!pktout_cfg->bit.ipv4_chksum) {
 		ip->chksum = 0;
@@ -437,7 +450,8 @@  static odp_packet_t setup_icmp_pkt_ref(odp_pool_t pool,
  * @retval 0 on success, -1 on fail
  */
 static int setup_icmp_pkt(odp_packet_t pkt,
-			  odp_pktout_config_opt_t *pktout_cfg)
+			  odp_pktout_config_opt_t *pktout_cfg,
+			  counters_t *counters)
 {
 	char *buf;
 	odph_ipv4hdr_t *ip;
@@ -450,7 +464,8 @@  static int setup_icmp_pkt(odp_packet_t pkt,
 
 	/* ip */
 	ip = (odph_ipv4hdr_t *)(buf + ODPH_ETHHDR_LEN);
-	seq = odp_atomic_fetch_add_u64(&counters.seq, 1) % 0xffff;
+	seq = counters->ctr_seq % 0xffff;
+	counters->ctr_seq++;
 	ip->id = odp_cpu_to_be_16(seq);
 	if (!pktout_cfg->bit.ipv4_chksum) {
 		ip->chksum = 0;
@@ -607,30 +622,39 @@  static int gen_send_thread(void *arg)
 	odp_pktout_config_opt_t *pktout_cfg;
 	odp_packet_t pkt_ref_array[MAX_UDP_TX_BURST];
 	odp_packet_t pkt_array[MAX_UDP_TX_BURST];
-	int pkt_array_size;
+	int pkt_array_size, seq_step;
 	int burst_start, burst_size;
 	setup_pkt_ref_fn_t setup_pkt_ref = NULL;
 	setup_pkt_fn_t setup_pkt = NULL;
+	counters_t *counters;
+	uint64_t pkt_count_max = 0;
 
 	thr = odp_thread_id();
 	thr_args = arg;
 	pktout = thr_args->tx.pktout;
 	pktout_cfg = thr_args->tx.pktout_cfg;
+	counters = &thr_args->counters;
 
 	/* Create reference packets*/
 	if (args->appl.mode == APPL_MODE_UDP) {
-		pkt_array_size = args->appl.udp_tx_burst;
 		setup_pkt_ref = setup_udp_pkt_ref;
 		setup_pkt = setup_udp_pkt;
+		seq_step = args->tx_burst_size * (args->thread_cnt - 1);
+		if (args->appl.number != -1)
+			pkt_count_max = args->appl.number / args->thread_cnt +
+				(args->appl.number % args->thread_cnt ? 1 : 0);
 	} else if (args->appl.mode == APPL_MODE_PING) {
-		pkt_array_size = 1;
 		setup_pkt_ref = setup_icmp_pkt_ref;
 		setup_pkt = setup_icmp_pkt;
+		seq_step = 0;
+		if (args->appl.number != -1)
+			pkt_count_max = args->appl.number;
 	} else {
 		EXAMPLE_ERR("  [%02i] Error: invalid processing mode %d\n",
 			    thr, args->appl.mode);
 		return -1;
 	}
+	pkt_array_size = args->tx_burst_size;
 
 	if (setup_pkt_ref_array(thr_args->pool, pktout_cfg,
 				pkt_ref_array, pkt_array_size,
@@ -645,13 +669,17 @@  static int gen_send_thread(void *arg)
 	odp_barrier_wait(&barrier);
 
 	for (;;) {
-		if (args->appl.number != -1 &&
-		    odp_atomic_fetch_add_u64(&counters.cnt, pkt_array_size) >=
-				(unsigned int)args->appl.number)
+		if (thr_args->stop)
 			break;
 
+		if (pkt_count_max && counters->ctr_pkt_snd > pkt_count_max) {
+			sleep(1); /* wait for stop command */
+			continue;
+		}
+
 		/* Setup TX burst*/
-		if (setup_pkt_array(pktout_cfg, pkt_ref_array, pkt_array,
+		if (setup_pkt_array(pktout_cfg, counters,
+				    pkt_ref_array, pkt_array,
 				    pkt_array_size, setup_pkt)) {
 			EXAMPLE_ERR("[%02i] Error: failed to setup packets\n",
 				    thr);
@@ -663,10 +691,12 @@  static int gen_send_thread(void *arg)
 			ret = odp_pktout_send(pktout, &pkt_array[burst_start],
 					      burst_size);
 			if (ret == burst_size) {
+				burst_size = 0;
 				break;
 			} else if (ret >= 0 && ret < burst_size) {
-				odp_atomic_add_u64(&counters.tx_drops,
-						   burst_size - ret);
+				thr_args->counters.ctr_pkt_snd_drop +=
+					burst_size - ret;
+
 				burst_start += ret;
 				burst_size -= ret;
 				continue;
@@ -677,32 +707,20 @@  static int gen_send_thread(void *arg)
 			break;
 		}
 
+		counters->ctr_pkt_snd += pkt_array_size - burst_size;
+
 		if (args->appl.interval != 0) {
 			printf("  [%02i] send pkt no:%ju seq %ju\n",
 			       thr,
-			       odp_atomic_load_u64(&counters.seq),
-			       odp_atomic_load_u64(&counters.seq)%0xffff);
+			       counters->ctr_seq,
+			       counters->ctr_seq % 0xffff);
 			millisleep(args->appl.interval,
 				   thr_args->tp,
 				   thr_args->tim,
 				   thr_args->tq,
 				   thr_args->tmo_ev);
 		}
-	}
-
-	/* receive number of reply pks until timeout */
-	if (args->appl.mode == APPL_MODE_PING && args->appl.number > 0) {
-		while (args->appl.timeout >= 0) {
-			if (odp_atomic_load_u64(&counters.icmp) >=
-			    (unsigned int)args->appl.number)
-				break;
-			millisleep(DEFAULT_PKT_INTERVAL,
-				   thr_args->tp,
-				   thr_args->tim,
-				   thr_args->tq,
-				   thr_args->tmo_ev);
-			args->appl.timeout--;
-		}
+		counters->ctr_seq += seq_step;
 	}
 
 	odp_packet_free_multi(pkt_ref_array, pkt_array_size);
@@ -717,7 +735,8 @@  static int gen_send_thread(void *arg)
  * @param  msg output buffer
  */
 
-static void process_icmp_pkt(odph_icmphdr_t *icmp, char *msg)
+static void process_icmp_pkt(thread_args_t *thr_args,
+			     odph_icmphdr_t *icmp, char *msg)
 {
 	uint64_t trecv;
 	uint64_t tsend;
@@ -726,7 +745,7 @@  static void process_icmp_pkt(odph_icmphdr_t *icmp, char *msg)
 	msg[0] = 0;
 
 	if (icmp->type == ICMP_ECHOREPLY) {
-		odp_atomic_inc_u64(&counters.icmp);
+		thr_args->counters.ctr_icmp_reply_rcv++;
 
 		memcpy(&tsend, (uint8_t *)icmp + ODPH_ICMPHDR_LEN,
 		       sizeof(uint64_t));
@@ -751,7 +770,8 @@  static void process_icmp_pkt(odph_icmphdr_t *icmp, char *msg)
  * @param  pkt_tbl packets to be print
  * @param  len packet number
  */
-static void print_pkts(int thr, odp_packet_t pkt_tbl[], unsigned len)
+static void print_pkts(int thr, thread_args_t *thr_args,
+		       odp_packet_t pkt_tbl[], unsigned len)
 {
 	odp_packet_t pkt;
 	char *buf;
@@ -768,21 +788,20 @@  static void print_pkts(int thr, odp_packet_t pkt_tbl[], unsigned len)
 		if (!odp_packet_has_ipv4(pkt))
 			continue;
 
-		odp_atomic_inc_u64(&counters.ip);
+		thr_args->counters.ctr_pkt_rcv++;
 		buf = odp_packet_data(pkt);
 		ip = (odph_ipv4hdr_t *)(buf + odp_packet_l3_offset(pkt));
 		offset = odp_packet_l4_offset(pkt);
 
 		/* udp */
-		if (ip->proto == ODPH_IPPROTO_UDP) {
-			odp_atomic_inc_u64(&counters.udp);
-		}
+		if (ip->proto == ODPH_IPPROTO_UDP)
+			thr_args->counters.ctr_udp_rcv++;
 
 		/* icmp */
 		if (ip->proto == ODPH_IPPROTO_ICMPv4) {
 			icmp = (odph_icmphdr_t *)(buf + offset);
 
-			process_icmp_pkt(icmp, msg);
+			process_icmp_pkt(thr_args, icmp, msg);
 			printf("  [%02i] %s\n", thr, msg);
 		}
 	}
@@ -810,14 +829,11 @@  static int gen_recv_thread(void *arg)
 	odp_barrier_wait(&barrier);
 
 	for (;;) {
-		if (args->appl.number != -1 &&
-		    odp_atomic_load_u64(&counters.icmp) >=
-		    (unsigned int)args->appl.number) {
+		if (thr_args->stop)
 			break;
-		}
 
 		/* Use schedule to get buf from any input queue */
-		ev_cnt = odp_schedule_multi(NULL, ODP_SCHED_WAIT,
+		ev_cnt = odp_schedule_multi(NULL, ODP_SCHED_NO_WAIT,
 					    events, MAX_RX_BURST);
 		if (ev_cnt == 0)
 			continue;
@@ -848,7 +864,7 @@  static int gen_recv_thread(void *arg)
 		}
 
 		if (pkt_cnt) {
-			print_pkts(thr, pkts, pkt_cnt);
+			print_pkts(thr, thr_args, pkts, pkt_cnt);
 
 			odp_packet_free_multi(pkts, pkt_cnt);
 		}
@@ -857,6 +873,35 @@  static int gen_recv_thread(void *arg)
 	return 0;
 }
 
+#define COUNTER_SUM(_c, _nw)						\
+({									\
+	int _itr;							\
+	uint64_t _result = 0;						\
+									\
+	for (_itr = 0; _itr < _nw; _itr++)				\
+		_result += args->thread[_itr].counters.ctr_ ## _c;	\
+									\
+	_result;							\
+})
+
+static void garceful_stop_ping(void)
+{
+	uint64_t snd, rcv;
+
+	if (args->appl.mode != APPL_MODE_PING)
+		return;
+
+	while (args->appl.timeout >= 0) {
+		snd = COUNTER_SUM(pkt_snd, 2);
+		rcv = COUNTER_SUM(icmp_reply_rcv, 2);
+		if (rcv >= snd)
+			break;
+
+		sleep(1);
+		args->appl.timeout--;
+	}
+}
+
 /**
  * printing verbose statistics
  *
@@ -868,8 +913,8 @@  static void print_global_stats(int num_workers)
 	uint64_t pps_snd = 0, maximum_pps_snd = 0;
 	uint64_t pkts_rcv = 0, pkts_rcv_prev = 0;
 	uint64_t pps_rcv = 0, maximum_pps_rcv = 0;
-	uint64_t stall;
-	int verbose_interval = 20;
+	uint64_t stall, pkts_snd_drop;
+	int verbose_interval = 20, i;
 	odp_thrmask_t thrd_mask;
 
 	odp_barrier_wait(&barrier);
@@ -878,12 +923,15 @@  static void print_global_stats(int num_workers)
 	next = odp_time_sum(odp_time_local(), wait);
 
 	while (odp_thrmask_worker(&thrd_mask) == num_workers) {
-		if (args->appl.number != -1 &&
-		    odp_atomic_load_u64(&counters.cnt) >=
-		    (unsigned int)args->appl.number) {
-			break;
-		}
+		if (args->appl.mode != APPL_MODE_RCV &&
+		    args->appl.number != -1) {
+			uint64_t cnt = COUNTER_SUM(pkt_snd, num_workers);
 
+			if (cnt >= (unsigned int)args->appl.number) {
+				garceful_stop_ping();
+				break;
+			}
+		}
 		cur = odp_time_local();
 		if (odp_time_cmp(next, cur) > 0) {
 			left = odp_time_diff(next, cur);
@@ -894,18 +942,22 @@  static void print_global_stats(int num_workers)
 				usleep(stall / ODP_TIME_USEC_IN_NS);
 			continue;
 		}
-
 		next = odp_time_sum(cur, wait);
+
 		switch (args->appl.mode) {
 		case APPL_MODE_RCV:
-			pkts_rcv = odp_atomic_load_u64(&counters.ip);
+			pkts_rcv = COUNTER_SUM(pkt_rcv, num_workers);
+			pkts_snd = 0;
+			pkts_snd_drop = 0;
 			break;
 		case APPL_MODE_PING:
-			pkts_snd = odp_atomic_load_u64(&counters.seq);
-			pkts_rcv = odp_atomic_load_u64(&counters.icmp);
+			pkts_snd = COUNTER_SUM(pkt_snd, num_workers);
+			pkts_snd_drop = COUNTER_SUM(pkt_snd_drop, num_workers);
+			pkts_rcv = COUNTER_SUM(icmp_reply_rcv, num_workers);
 			break;
 		case APPL_MODE_UDP:
-			pkts_snd = odp_atomic_load_u64(&counters.seq);
+			pkts_snd = COUNTER_SUM(pkt_snd, num_workers);
+			pkts_snd_drop = COUNTER_SUM(pkt_snd_drop, num_workers);
 			break;
 		default:
 			continue;
@@ -927,11 +979,14 @@  static void print_global_stats(int num_workers)
 			"rcv: %" PRIu64 ", "
 			"recv rate: %" PRIu64 " pps, "
 			"max recv rate: %" PRIu64 " pps\n",
-			pkts_snd, odp_atomic_load_u64(&counters.tx_drops),
+			pkts_snd, pkts_snd_drop,
 			pps_snd, maximum_pps_snd,
 			pkts_rcv, pps_rcv, maximum_pps_rcv);
 		fflush(NULL);
 	}
+
+	for (i = 0; i < num_workers; i++)
+		args->thread[i].stop = 1;
 }
 
 /**
@@ -968,14 +1023,6 @@  int main(int argc, char *argv[])
 		exit(EXIT_FAILURE);
 	}
 
-	/* init counters */
-	odp_atomic_init_u64(&counters.seq, 0);
-	odp_atomic_init_u64(&counters.ip, 0);
-	odp_atomic_init_u64(&counters.udp, 0);
-	odp_atomic_init_u64(&counters.icmp, 0);
-	odp_atomic_init_u64(&counters.cnt, 0);
-	odp_atomic_init_u64(&counters.tx_drops, 0);
-
 	/* Reserve memory for args from shared mem */
 	shm = odp_shm_reserve("shm_args", sizeof(args_t),
 			      ODP_CACHE_LINE_SIZE, 0);
@@ -1024,6 +1071,15 @@  int main(int argc, char *argv[])
 			num_workers = 2;
 		}
 	}
+	args->thread_cnt = num_workers;
+
+	/* Burst size */
+	if (args->appl.mode == APPL_MODE_PING)
+		args->tx_burst_size = 1;
+	else if (args->appl.mode == APPL_MODE_UDP)
+		args->tx_burst_size = args->appl.udp_tx_burst;
+	else
+		args->tx_burst_size = 0;
 
 	/* Create packet pool */
 	odp_pool_param_init(&params);
@@ -1104,6 +1160,7 @@  int main(int argc, char *argv[])
 	if (args->appl.mode == APPL_MODE_PING) {
 		odp_cpumask_t cpu_mask;
 		int cpu_first, cpu_next;
+		thread_args_t *thr_args;
 
 		odp_cpumask_zero(&cpu_mask);
 		cpu_first = odp_cpumask_first(&cpumask);
@@ -1114,60 +1171,64 @@  int main(int argc, char *argv[])
 			EXAMPLE_ERR("queue_create failed\n");
 			abort();
 		}
-		args->thread[1].rx.ifs = ifs;
-		args->thread[1].rx.ifs_count = args->appl.if_count;
-		args->thread[1].pool = pool;
-		args->thread[1].tp = tp;
-		args->thread[1].tq = tq;
-		args->thread[1].tim = odp_timer_alloc(tp, tq, NULL);
-		if (args->thread[1].tim == ODP_TIMER_INVALID) {
+		thr_args = &args->thread[PING_THR_RX];
+		thr_args->rx.ifs = ifs;
+		thr_args->rx.ifs_count = args->appl.if_count;
+		thr_args->pool = pool;
+		thr_args->tp = tp;
+		thr_args->tq = tq;
+		thr_args->tim = odp_timer_alloc(tp, tq, NULL);
+		if (thr_args->tim == ODP_TIMER_INVALID) {
 			EXAMPLE_ERR("timer_alloc failed\n");
 			abort();
 		}
-		args->thread[1].tmo_ev = odp_timeout_alloc(tmop);
-		if (args->thread[1].tmo_ev == ODP_TIMEOUT_INVALID) {
+		thr_args->tmo_ev = odp_timeout_alloc(tmop);
+		if (thr_args->tmo_ev == ODP_TIMEOUT_INVALID) {
 			EXAMPLE_ERR("timeout_alloc failed\n");
 			abort();
 		}
-		args->thread[1].mode = args->appl.mode;
+		thr_args->mode = args->appl.mode;
 
 		memset(&thr_params, 0, sizeof(thr_params));
 		thr_params.start    = gen_recv_thread;
-		thr_params.arg      = &args->thread[1];
+		thr_params.arg      = thr_args;
 		thr_params.thr_type = ODP_THREAD_WORKER;
 		thr_params.instance = instance;
 
-		odph_odpthreads_create(&thread_tbl[1], &cpu_mask, &thr_params);
+		odph_odpthreads_create(&thread_tbl[PING_THR_RX],
+				       &cpu_mask, &thr_params);
 
 		tq = odp_queue_create("", NULL);
 		if (tq == ODP_QUEUE_INVALID) {
 			EXAMPLE_ERR("queue_create failed\n");
 			abort();
 		}
-		args->thread[0].tx.pktout = ifs[0].pktout[0];
-		args->thread[0].tx.pktout_cfg = &ifs[0].config.pktout;
-		args->thread[0].pool = pool;
-		args->thread[0].tp = tp;
-		args->thread[0].tq = tq;
-		args->thread[0].tim = odp_timer_alloc(tp, tq, NULL);
-		if (args->thread[0].tim == ODP_TIMER_INVALID) {
+		thr_args = &args->thread[PING_THR_TX];
+		thr_args->tx.pktout = ifs[0].pktout[0];
+		thr_args->tx.pktout_cfg = &ifs[0].config.pktout;
+		thr_args->pool = pool;
+		thr_args->tp = tp;
+		thr_args->tq = tq;
+		thr_args->tim = odp_timer_alloc(tp, tq, NULL);
+		if (thr_args->tim == ODP_TIMER_INVALID) {
 			EXAMPLE_ERR("timer_alloc failed\n");
 			abort();
 		}
-		args->thread[0].tmo_ev = odp_timeout_alloc(tmop);
-		if (args->thread[0].tmo_ev == ODP_TIMEOUT_INVALID) {
+		thr_args->tmo_ev = odp_timeout_alloc(tmop);
+		if (thr_args->tmo_ev == ODP_TIMEOUT_INVALID) {
 			EXAMPLE_ERR("timeout_alloc failed\n");
 			abort();
 		}
-		args->thread[0].mode = args->appl.mode;
+		thr_args->mode = args->appl.mode;
 		cpu_next = odp_cpumask_next(&cpumask, cpu_first);
 		odp_cpumask_zero(&cpu_mask);
 		odp_cpumask_set(&cpu_mask, cpu_next);
 
 		thr_params.start = gen_send_thread;
-		thr_params.arg   = &args->thread[0];
+		thr_params.arg   = thr_args;
 
-		odph_odpthreads_create(&thread_tbl[0], &cpu_mask, &thr_params);
+		odph_odpthreads_create(&thread_tbl[PING_THR_TX],
+				       &cpu_mask, &thr_params);
 
 	} else {
 		int cpu = odp_cpumask_first(&cpumask);
@@ -1176,6 +1237,7 @@  int main(int argc, char *argv[])
 			odp_cpumask_t thd_mask;
 			int (*thr_run_func)(void *);
 			int if_idx, pktout_idx;
+			uint64_t start_seq;
 
 			if (args->appl.mode == APPL_MODE_RCV) {
 				args->thread[i].rx.ifs = ifs;
@@ -1185,11 +1247,13 @@  int main(int argc, char *argv[])
 				if_idx = i % args->appl.if_count;
 				pktout_idx = (i / args->appl.if_count) %
 					ifs[if_idx].pktout_count;
+				start_seq = i * args->tx_burst_size;
 
 				args->thread[i].tx.pktout =
 					ifs[if_idx].pktout[pktout_idx];
 				args->thread[i].tx.pktout_cfg =
 					&ifs[if_idx].config.pktout;
+				args->thread[i].counters.ctr_seq = start_seq;
 			}
 			tq = odp_queue_create("", NULL);
 			if (tq == ODP_QUEUE_INVALID) {
@@ -1531,9 +1595,9 @@  static void usage(char *progname)
 	       "OpenDataPlane example application.\n"
 	       "\n"
 	       "  Work mode:\n"
-	       "    1.send udp packets\n"
+	       "    1.send ipv4 udp packets\n"
 	       "      odp_generator -I eth0 --srcmac fe:0f:97:c9:e0:44  --dstmac 32:cb:9b:27:2f:1a --srcip 192.168.0.1 --dstip 192.168.0.2 -m u\n"
-	       "    2.receive udp packets\n"
+	       "    2.receive ipv4 packets\n"
 	       "      odp_generator -I eth0 -m r\n"
 	       "    3.work likes ping\n"
 	       "      odp_generator -I eth0 --srcmac fe:0f:97:c9:e0:44  --dstmac 32:cb:9b:27:2f:1a --srcip 192.168.0.1 --dstip 192.168.0.2 --cpumask 0xc -m p\n"