diff mbox series

[v2,2/2] example: generator: add direct pktin mode

Message ID 1513342811-4728-3-git-send-email-odpbot@yandex.ru
State New
Headers show
Series [v2,1/2] example: generator: add configuration option for RX burst size | expand

Commit Message

Github ODP bot Dec. 15, 2017, 1 p.m. UTC
From: Bogdan Pricope <bogdan.pricope@linaro.org>


Update packet receive mode by adding direct pktin mode.
Direct mode should increase throughput on RX side.

Signed-off-by: Bogdan Pricope <bogdan.pricope@linaro.org>

---
/** Email created from pull request 343 (bogdanPricope:generator_rx_direct_pr)
 ** https://github.com/Linaro/odp/pull/343
 ** Patch: https://github.com/Linaro/odp/pull/343.patch
 ** Base sha: 6b5cdc77eb9759a2349b10372a964648559bc92c
 ** Merge commit sha: d3f54aff752fe7ae7f51c4caa2f714dc659dd7a5
 **/
 example/generator/odp_generator.c | 257 ++++++++++++++++++++++++++------------
 1 file changed, 174 insertions(+), 83 deletions(-)
diff mbox series

Patch

diff --git a/example/generator/odp_generator.c b/example/generator/odp_generator.c
index 956161a61..3bd844b2a 100644
--- a/example/generator/odp_generator.c
+++ b/example/generator/odp_generator.c
@@ -54,6 +54,8 @@  typedef struct {
 	odp_pktio_config_t config;
 	odp_pktout_queue_t pktout[MAX_WORKERS];
 	unsigned pktout_count;
+	odp_pktin_queue_t pktin[MAX_WORKERS];
+	unsigned pktin_count;
 } interface_t;
 
 /**
@@ -83,6 +85,7 @@  typedef struct {
 	int rx_burst;	/**< number of packets to receive with one
 				      API call */
 	odp_bool_t csum;	/**< use platform csum support if available */
+	odp_bool_t sched;	/**< use scheduler API to receive packets */
 } appl_args_t;
 
 /**
@@ -109,6 +112,7 @@  typedef struct {
 			odp_pktout_config_opt_t *pktout_cfg; /**< Packet output config*/
 		} tx;
 		struct {
+			odp_pktin_queue_t pktin; /**< Packet input queue */
 			interface_t *ifs; /**< Interfaces array */
 			int ifs_count; /**< Interfaces array size */
 		} rx;
@@ -517,10 +521,15 @@  static int create_pktio(const char *dev, odp_pool_t pool,
 	odp_pktio_param_t pktio_param;
 	odp_pktin_queue_param_t pktin_param;
 	odp_pktout_queue_param_t pktout_param;
-	odp_pktio_op_mode_t pktout_mode;
+	odp_pktio_op_mode_t pktout_mode, pktin_mode;
+	odp_bool_t sched = args->appl.sched;
 
 	odp_pktio_param_init(&pktio_param);
-	pktio_param.in_mode = ODP_PKTIN_MODE_SCHED;
+	pktio_param.in_mode = num_rx_queues ?
+		(sched ? ODP_PKTIN_MODE_SCHED : ODP_PKTIN_MODE_DIRECT) :
+		ODP_PKTIN_MODE_DISABLED;
+	pktio_param.out_mode = num_tx_queues ? ODP_PKTOUT_MODE_DIRECT :
+		ODP_PKTOUT_MODE_DISABLED;
 
 	/* Open a packet IO instance */
 	itf->pktio = odp_pktio_open(dev, pool, &pktio_param);
@@ -563,31 +572,46 @@  static int create_pktio(const char *dev, odp_pool_t pool,
 		return -1;
 	}
 
-	if (num_rx_queues > capa.max_input_queues)
-		num_rx_queues = capa.max_input_queues;
+	if (num_rx_queues) {
+		pktin_mode = ODP_PKTIO_OP_MT_UNSAFE;
+		if (num_rx_queues > capa.max_input_queues) {
+			num_rx_queues = capa.max_input_queues;
+			pktin_mode = ODP_PKTIO_OP_MT;
+			EXAMPLE_DBG("Warning: Force RX multithread safe mode "
+				    "(slower)on %s\n",	dev);
+		}
 
-	odp_pktin_queue_param_init(&pktin_param);
-	pktin_param.num_queues = num_rx_queues;
-	pktin_param.queue_param.sched.sync = ODP_SCHED_SYNC_ATOMIC;
+		odp_pktin_queue_param_init(&pktin_param);
+		pktin_param.num_queues = num_rx_queues;
+		pktin_param.op_mode = pktin_mode;
+		if (sched)
+			pktin_param.queue_param.sched.sync =
+				ODP_SCHED_SYNC_ATOMIC;
 
-	if (odp_pktin_queue_config(itf->pktio, &pktin_param)) {
-		EXAMPLE_ERR("Error: pktin queue config failed for %s\n", dev);
-		return -1;
+		if (odp_pktin_queue_config(itf->pktio, &pktin_param)) {
+			EXAMPLE_ERR("Error: Pktin config failed for %s\n", dev);
+			return -1;
+		}
 	}
 
-	pktout_mode = ODP_PKTIO_OP_MT_UNSAFE;
-	if (num_tx_queues > capa.max_output_queues) {
-		num_tx_queues = capa.max_output_queues;
-		pktout_mode = ODP_PKTIO_OP_MT;
-	}
+	if (num_tx_queues) {
+		pktout_mode = ODP_PKTIO_OP_MT_UNSAFE;
+		if (num_tx_queues > capa.max_output_queues) {
+			num_tx_queues = capa.max_output_queues;
+			pktout_mode = ODP_PKTIO_OP_MT;
+			EXAMPLE_DBG("Warning: Force TX multithread safe mode "
+				    "(slower) on %s\n", dev);
+		}
 
-	odp_pktout_queue_param_init(&pktout_param);
-	pktout_param.num_queues = num_tx_queues;
-	pktout_param.op_mode = pktout_mode;
+		odp_pktout_queue_param_init(&pktout_param);
+		pktout_param.num_queues = num_tx_queues;
+		pktout_param.op_mode = pktout_mode;
 
-	if (odp_pktout_queue_config(itf->pktio, &pktout_param)) {
-		EXAMPLE_ERR("Error: pktout queue config failed for %s\n", dev);
-		return -1;
+		if (odp_pktout_queue_config(itf->pktio, &pktout_param)) {
+			EXAMPLE_ERR("Error: Pktout config failed for %s\n",
+				    dev);
+			return -1;
+		}
 	}
 
 	ret = odp_pktio_start(itf->pktio);
@@ -595,12 +619,21 @@  static int create_pktio(const char *dev, odp_pool_t pool,
 		EXAMPLE_ABORT("Error: unable to start %s\n", dev);
 
 	itf->pktout_count = num_tx_queues;
-	if (odp_pktout_queue(itf->pktio, itf->pktout, itf->pktout_count) !=
-			     (int)itf->pktout_count) {
+	if (itf->pktout_count &&
+	    odp_pktout_queue(itf->pktio, itf->pktout, itf->pktout_count) !=
+	    (int)itf->pktout_count) {
 		EXAMPLE_ERR("Error: failed to get output queues for %s\n", dev);
 		return -1;
 	}
 
+	itf->pktin_count = num_rx_queues;
+	if (!sched && itf->pktin_count &&
+	    odp_pktin_queue(itf->pktio, itf->pktin, itf->pktin_count) !=
+	    (int)itf->pktin_count) {
+		EXAMPLE_ERR("Error: failed to get input queues for %s\n", dev);
+		return -1;
+	}
+
 	printf("  created pktio:%02" PRIu64
 	       ", dev:%s, queue mode (ATOMIC queues)\n"
 	       "          default pktio%02" PRIu64 "\n",
@@ -768,14 +801,14 @@  static void process_icmp_pkt(thread_args_t *thr_args,
 }
 
 /**
- * Print odp packets
+ * Process odp packets
  *
  * @param  thr worker id
  * @param  pkt_tbl packets to be print
  * @param  len packet number
  */
-static void print_pkts(int thr, thread_args_t *thr_args,
-		       odp_packet_t pkt_tbl[], unsigned len)
+static void process_pkts(int thr, thread_args_t *thr_args,
+			 odp_packet_t pkt_tbl[], unsigned len)
 {
 	odp_packet_t pkt;
 	char *buf;
@@ -784,10 +817,33 @@  static void print_pkts(int thr, thread_args_t *thr_args,
 	unsigned i;
 	size_t offset;
 	char msg[1024];
+	interface_t *itfs, *itf;
+
+	itfs = thr_args->rx.ifs;
 
 	for (i = 0; i < len; ++i) {
 		pkt = pkt_tbl[i];
 
+		itf = &itfs[odp_pktio_index(odp_packet_input(pkt))];
+
+		if (odp_packet_has_ipv4(pkt)) {
+			if (itf->config.pktin.bit.ipv4_chksum) {
+				if (odp_packet_has_l3_error(pkt))
+					printf("HW detected L3 error\n");
+			}
+		}
+
+		if (odp_packet_has_udp(pkt)) {
+			if (itf->config.pktin.bit.udp_chksum) {
+				if (odp_packet_has_l4_error(pkt))
+					printf("HW detected L4 error\n");
+			}
+		}
+
+		/* Drop packets with errors */
+		if (odp_unlikely(odp_packet_has_error(pkt)))
+			continue;
+
 		/* only ip pkts */
 		if (!odp_packet_has_ipv4(pkt))
 			continue;
@@ -820,15 +876,13 @@  static int gen_recv_thread(void *arg)
 {
 	int thr;
 	thread_args_t *thr_args;
-	odp_packet_t pkts[MAX_RX_BURST], pkt;
-	odp_event_t events[MAX_RX_BURST];
-	int pkt_cnt, ev_cnt, i;
-	int burst_size;
-	interface_t *itfs, *itf;
+	odp_packet_t pkts[MAX_RX_BURST];
+	int pkt_cnt, burst_size;
+	odp_pktin_queue_t pktin;
 
 	thr = odp_thread_id();
 	thr_args = (thread_args_t *)arg;
-	itfs = thr_args->rx.ifs;
+	pktin = thr_args->rx.pktin;
 	burst_size = args->rx_burst_size;
 
 	printf("  [%02i] created mode: RECEIVE\n", thr);
@@ -838,39 +892,55 @@  static int gen_recv_thread(void *arg)
 		if (thr_args->stop)
 			break;
 
-		/* Use schedule to get buf from any input queue */
-		ev_cnt = odp_schedule_multi(NULL, ODP_SCHED_NO_WAIT,
-					    events, burst_size);
-		if (ev_cnt == 0)
-			continue;
-		for (i = 0, pkt_cnt = 0; i < ev_cnt; i++) {
-			pkt = odp_packet_from_event(events[i]);
-			itf = &itfs[odp_pktio_index(odp_packet_input(pkt))];
-
-			if (odp_packet_has_ipv4(pkt)) {
-				if (itf->config.pktin.bit.ipv4_chksum) {
-					if (odp_packet_has_l3_error(pkt))
-						printf("HW detected L3 error\n");
-				}
-			}
+		pkt_cnt = odp_pktin_recv_tmo(pktin, pkts, burst_size,
+					     ODP_PKTIN_NO_WAIT);
 
-			if (odp_packet_has_udp(pkt)) {
-				if (itf->config.pktin.bit.udp_chksum) {
-					if (odp_packet_has_l4_error(pkt))
-						printf("HW detected L4 error\n");
-				}
-			}
+		if (pkt_cnt > 0) {
+			process_pkts(thr, thr_args, pkts, pkt_cnt);
 
-			/* Drop packets with errors */
-			if (odp_unlikely(odp_packet_has_error(pkt))) {
-				odp_packet_free(pkt);
-				continue;
-			}
-			pkts[pkt_cnt++] = pkt;
+			odp_packet_free_multi(pkts, pkt_cnt);
+		} else if (pkt_cnt == 0) {
+			continue;
+		} else {
+			break;
 		}
+	}
+
+	return 0;
+}
 
-		if (pkt_cnt) {
-			print_pkts(thr, thr_args, pkts, pkt_cnt);
+/**
+ * Scheduler receive function
+ *
+ * @param arg  thread arguments of type 'thread_args_t *'
+ */
+static int gen_recv_sched_thread(void *arg)
+{
+	int thr;
+	thread_args_t *thr_args;
+	odp_packet_t pkts[MAX_RX_BURST];
+	odp_event_t events[MAX_RX_BURST];
+	int pkt_cnt, burst_size, i;
+
+	thr = odp_thread_id();
+	thr_args = (thread_args_t *)arg;
+	burst_size = args->rx_burst_size;
+
+	printf("  [%02i] created mode: RECEIVE SCHEDULER\n", thr);
+	odp_barrier_wait(&barrier);
+
+	for (;;) {
+		if (thr_args->stop)
+			break;
+
+		pkt_cnt = odp_schedule_multi(NULL, ODP_SCHED_NO_WAIT,
+					     events, burst_size);
+
+		if (pkt_cnt > 0) {
+			for (i = 0; i < pkt_cnt; i++)
+				pkts[i] = odp_packet_from_event(events[i]);
+
+			process_pkts(thr, thr_args, pkts, pkt_cnt);
 
 			odp_packet_free_multi(pkts, pkt_cnt);
 		}
@@ -1133,28 +1203,29 @@  int main(int argc, char *argv[])
 
 	ifs = malloc(sizeof(interface_t) * args->appl.if_count);
 
-	if (args->appl.mode == APPL_MODE_PING ||
-	    args->appl.mode == APPL_MODE_UDP)
-		num_rx_queues = 1;
-	else
-		num_rx_queues = num_workers;
-
-	if (args->appl.mode == APPL_MODE_PING ||
-	    args->appl.mode == APPL_MODE_RCV)
-		num_tx_queues = 1;
-	else {
-		num_tx_queues = num_workers / args->appl.if_count;
-		if (num_workers % args->appl.if_count)
-			num_tx_queues++;
-	}
+	for (i = 0; i < args->appl.if_count; ++i) {
+		if (args->appl.mode == APPL_MODE_PING) {
+			num_rx_queues = 1;
+			num_tx_queues = 1;
+		} else if (args->appl.mode == APPL_MODE_UDP) {
+			num_rx_queues = 0;
+			num_tx_queues = num_workers / args->appl.if_count;
+			if (i < num_workers % args->appl.if_count)
+				num_tx_queues++;
+		} else { /* APPL_MODE_RCV*/
+			num_rx_queues = num_workers / args->appl.if_count;
+			if (i < num_workers % args->appl.if_count)
+				num_rx_queues++;
+			num_tx_queues = 0;
+		}
 
-	for (i = 0; i < args->appl.if_count; ++i)
 		if (create_pktio(args->appl.if_names[i], pool, num_rx_queues,
 				 num_tx_queues, &ifs[i])) {
 			EXAMPLE_ERR("Error: create interface %s failed.\n",
 				    args->appl.if_names[i]);
 			exit(EXIT_FAILURE);
 		}
+	}
 
 	/* Create and init worker threads */
 	memset(thread_tbl, 0, sizeof(thread_tbl));
@@ -1182,6 +1253,8 @@  int main(int argc, char *argv[])
 			abort();
 		}
 		thr_args = &args->thread[PING_THR_RX];
+		if (!args->appl.sched)
+			thr_args->rx.pktin = ifs[0].pktin[0];
 		thr_args->rx.ifs = ifs;
 		thr_args->rx.ifs_count = args->appl.if_count;
 		thr_args->pool = pool;
@@ -1200,7 +1273,10 @@  int main(int argc, char *argv[])
 		thr_args->mode = args->appl.mode;
 
 		memset(&thr_params, 0, sizeof(thr_params));
-		thr_params.start    = gen_recv_thread;
+		if (args->appl.sched)
+			thr_params.start = gen_recv_sched_thread;
+		else
+			thr_params.start = gen_recv_thread;
 		thr_params.arg      = thr_args;
 		thr_params.thr_type = ODP_THREAD_WORKER;
 		thr_params.instance = instance;
@@ -1246,21 +1322,27 @@  int main(int argc, char *argv[])
 		for (i = 0; i < num_workers; ++i) {
 			odp_cpumask_t thd_mask;
 			int (*thr_run_func)(void *);
-			int if_idx, pktout_idx;
+			int if_idx, pktq_idx;
 			uint64_t start_seq;
 
+			if_idx = i % args->appl.if_count;
+
 			if (args->appl.mode == APPL_MODE_RCV) {
+				pktq_idx = (i / args->appl.if_count) %
+					ifs[if_idx].pktin_count;
+				if (!args->appl.sched)
+					args->thread[i].rx.pktin =
+						ifs[if_idx].pktin[pktq_idx];
 				args->thread[i].rx.ifs = ifs;
 				args->thread[i].rx.ifs_count =
 					args->appl.if_count;
 			} else {
-				if_idx = i % args->appl.if_count;
-				pktout_idx = (i / args->appl.if_count) %
+				pktq_idx = (i / args->appl.if_count) %
 					ifs[if_idx].pktout_count;
 				start_seq = i * args->tx_burst_size;
 
 				args->thread[i].tx.pktout =
-					ifs[if_idx].pktout[pktout_idx];
+					ifs[if_idx].pktout[pktq_idx];
 				args->thread[i].tx.pktout_cfg =
 					&ifs[if_idx].config.pktout;
 				args->thread[i].counters.ctr_seq = start_seq;
@@ -1288,7 +1370,10 @@  int main(int argc, char *argv[])
 			if (args->appl.mode == APPL_MODE_UDP) {
 				thr_run_func = gen_send_thread;
 			} else if (args->appl.mode == APPL_MODE_RCV) {
-				thr_run_func = gen_recv_thread;
+				if (args->appl.sched)
+					thr_run_func = gen_recv_sched_thread;
+				else
+					thr_run_func = gen_recv_thread;
 			} else {
 				EXAMPLE_ERR("ERR MODE\n");
 				exit(EXIT_FAILURE);
@@ -1388,10 +1473,11 @@  static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
 		{"udp_tx_burst", required_argument, NULL, 'x'},
 		{"rx_burst", required_argument, NULL, 'r'},
 		{"csum", no_argument, NULL, 'y'},
+		{"sched", no_argument, NULL, 'z'},
 		{NULL, 0, NULL, 0}
 	};
 
-	static const char *shortopts = "+I:a:b:s:d:p:i:m:n:t:w:c:x:he:f:yr:";
+	static const char *shortopts = "+I:a:b:s:d:p:i:m:n:t:w:c:x:he:f:yr:z";
 
 	/* let helper collect its own arguments (e.g. --odph_proc) */
 	odph_parse_options(argc, argv, shortopts, longopts);
@@ -1406,6 +1492,7 @@  static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
 	appl_args->srcport = 0;
 	appl_args->dstport = 0;
 	appl_args->csum = 0;
+	appl_args->sched = 0;
 
 	opterr = 0; /* do not issue errors on helper options */
 
@@ -1557,6 +1644,9 @@  static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
 		case 'y':
 			appl_args->csum = 1;
 			break;
+		case 'z':
+			appl_args->sched = 1;
+			break;
 		case 'h':
 			usage(argv[0]);
 			exit(EXIT_SUCCESS);
@@ -1646,6 +1736,7 @@  static void usage(char *progname)
 	       "  -r, --rx_burst size of RX burst\n"
 	       "  -y, --csum use platform checksum support if available\n"
 	       "	         default is disabled\n"
+	       "  -z, --sched use scheduler API to receive packets\n"
 	       "\n", NO_PATH(progname), NO_PATH(progname)
 	      );
 }