diff mbox series

[v3,4/5] example: generator: add direct mode packet RX

Message ID 1515499216-4272-5-git-send-email-odpbot@yandex.ru
State New
Headers show
Series [v3,1/5] example: generator: add configuration option for RX burst size | expand

Commit Message

Github ODP bot Jan. 9, 2018, noon UTC
From: Bogdan Pricope <bogdan.pricope@linaro.org>


Update packet RX processing by adding direct pktin mode.
Direct mode should increase throughput on RX side.

Signed-off-by: Bogdan Pricope <bogdan.pricope@linaro.org>

---
/** Email created from pull request 343 (bogdanPricope:generator_rx_direct_pr)
 ** https://github.com/Linaro/odp/pull/343
 ** Patch: https://github.com/Linaro/odp/pull/343.patch
 ** Base sha: 49ebafae0edebbc750742d8874ad0a7588286dea
 ** Merge commit sha: 2eefe24f19e219515f14085c88dda09761f71845
 **/
 example/generator/odp_generator.c | 150 +++++++++++++++++++++++++-------------
 1 file changed, 99 insertions(+), 51 deletions(-)
diff mbox series

Patch

diff --git a/example/generator/odp_generator.c b/example/generator/odp_generator.c
index 6ab25280c..e3a523843 100644
--- a/example/generator/odp_generator.c
+++ b/example/generator/odp_generator.c
@@ -114,8 +114,6 @@  typedef struct {
 		} tx;
 		struct {
 			odp_pktin_queue_t pktin; /**< Packet input queue */
-			interface_t *ifs; /**< Interfaces array */
-			int ifs_count; /**< Interfaces array size */
 		} rx;
 	};
 	odp_pool_t pool;	/**< Pool for packet IO */
@@ -777,19 +775,18 @@  static int gen_send_thread(void *arg)
 /**
  * Process icmp packets
  *
+ * @param  thr worker id
+ * @param  thr_args worker argument
  * @param  icmp icmp header address
- * @param  msg output buffer
  */
 
-static void process_icmp_pkt(thread_args_t *thr_args,
-			     odph_icmphdr_t *icmp, char *msg)
+static void process_icmp_pkt(int thr, thread_args_t *thr_args,
+			     odph_icmphdr_t *icmp)
 {
 	uint64_t trecv;
 	uint64_t tsend;
 	uint64_t rtt_ms, rtt_us;
 
-	msg[0] = 0;
-
 	if (icmp->type == ICMP_ECHOREPLY) {
 		thr_args->counters.ctr_icmp_reply_rcv++;
 
@@ -799,33 +796,32 @@  static void process_icmp_pkt(thread_args_t *thr_args,
 		rtt_ms = (trecv - tsend) / ODP_TIME_MSEC_IN_NS;
 		rtt_us = (trecv - tsend) / ODP_TIME_USEC_IN_NS -
 				1000 * rtt_ms;
-		sprintf(msg,
-			"ICMP Echo Reply seq %d time %"
-			PRIu64 ".%.03" PRIu64" ms",
+		printf("  [%02i] ICMP Echo Reply seq %d time %"
+			PRIu64 ".%.03" PRIu64" ms\n", thr,
 			odp_be_to_cpu_16(icmp->un.echo.sequence),
 			rtt_ms, rtt_us);
 	} else if (icmp->type == ICMP_ECHO) {
-		sprintf(msg, "Icmp Echo Request");
+		printf("  [%02i] ICMP Echo Request\n", thr);
 	}
 }
 
 /**
- * Print odp packets
+ * Process odp packets
  *
  * @param  thr worker id
+ * @param  thr_args worker argument
  * @param  pkt_tbl packets to be print
  * @param  len packet number
  */
-static void print_pkts(int thr, thread_args_t *thr_args,
-		       odp_packet_t pkt_tbl[], unsigned len)
+static void process_pkts(int thr, thread_args_t *thr_args,
+			 odp_packet_t pkt_tbl[], unsigned len)
 {
 	odp_packet_t pkt;
+	odp_packet_chksum_status_t csum_status;
 	char *buf;
 	odph_ipv4hdr_t *ip;
 	odph_icmphdr_t *icmp;
 	unsigned i;
-	size_t offset;
-	char msg[1024];
 
 	for (i = 0; i < len; ++i) {
 		pkt = pkt_tbl[i];
@@ -834,10 +830,21 @@  static void print_pkts(int thr, thread_args_t *thr_args,
 		if (!odp_packet_has_ipv4(pkt))
 			continue;
 
+		csum_status = odp_packet_l3_chksum_status(pkt);
+		if (csum_status == ODP_PACKET_CHKSUM_BAD)
+			printf("L3 checksum error detected.\n");
+
+		csum_status = odp_packet_l4_chksum_status(pkt);
+		if (csum_status == ODP_PACKET_CHKSUM_BAD)
+			printf("L4 checksum error detected.\n");
+
+		/* Drop packets with errors */
+		if (odp_unlikely(odp_packet_has_error(pkt)))
+			continue;
+
 		thr_args->counters.ctr_pkt_rcv++;
 		buf = odp_packet_data(pkt);
 		ip = (odph_ipv4hdr_t *)(buf + odp_packet_l3_offset(pkt));
-		offset = odp_packet_l4_offset(pkt);
 
 		/* udp */
 		if (ip->proto == ODPH_IPPROTO_UDP)
@@ -845,16 +852,16 @@  static void print_pkts(int thr, thread_args_t *thr_args,
 
 		/* icmp */
 		if (ip->proto == ODPH_IPPROTO_ICMPv4) {
-			icmp = (odph_icmphdr_t *)(buf + offset);
+			icmp = (odph_icmphdr_t *)(buf +
+				odp_packet_l4_offset(pkt));
 
-			process_icmp_pkt(thr_args, icmp, msg);
-			printf("  [%02i] %s\n", thr, msg);
+			process_icmp_pkt(thr, thr_args, icmp);
 		}
 	}
 }
 
 /**
- * Main receive function
+ * Scheduler receive function
  *
  * @param arg  thread arguments of type 'thread_args_t *'
  */
@@ -862,17 +869,16 @@  static int gen_recv_thread(void *arg)
 {
 	int thr;
 	thread_args_t *thr_args;
-	odp_packet_t pkts[MAX_RX_BURST], pkt;
-	odp_event_t events[MAX_RX_BURST];
+	odp_packet_t pkts[MAX_RX_BURST];
+	odp_event_t events[MAX_RX_BURST], ev;
 	int pkt_cnt, ev_cnt, i;
-	odp_packet_chksum_status_t csum_status;
 	int burst_size;
 
 	thr = odp_thread_id();
 	thr_args = (thread_args_t *)arg;
 	burst_size = args->rx_burst_size;
 
-	printf("  [%02i] created mode: RECEIVE\n", thr);
+	printf("  [%02i] created mode: RECEIVE SCHEDULER\n", thr);
 	odp_barrier_wait(&barrier);
 
 	for (;;) {
@@ -884,29 +890,62 @@  static int gen_recv_thread(void *arg)
 					    events, burst_size);
 		if (ev_cnt == 0)
 			continue;
+
 		for (i = 0, pkt_cnt = 0; i < ev_cnt; i++) {
-			pkt = odp_packet_from_event(events[i]);
+			ev = events[i];
 
-			csum_status = odp_packet_l3_chksum_status(pkt);
-			if (csum_status == ODP_PACKET_CHKSUM_BAD)
-				printf("L3 checksum error detected.\n");
+			if (odp_event_type(ev) == ODP_EVENT_PACKET)
+				pkts[pkt_cnt++] = odp_packet_from_event(ev);
+			else
+				odp_event_free(ev);
+		}
 
-			csum_status = odp_packet_l4_chksum_status(pkt);
-			if (csum_status == ODP_PACKET_CHKSUM_BAD)
-				printf("L4 checksum error detected.\n");
+		if (pkt_cnt) {
+			process_pkts(thr, thr_args, pkts, pkt_cnt);
 
-			/* Drop packets with errors */
-			if (odp_unlikely(odp_packet_has_error(pkt))) {
-				odp_packet_free(pkt);
-				continue;
-			}
-			pkts[pkt_cnt++] = pkt;
+			odp_packet_free_multi(pkts, pkt_cnt);
 		}
+	}
 
-		if (pkt_cnt) {
-			print_pkts(thr, thr_args, pkts, pkt_cnt);
+	return 0;
+}
+
+/**
+ * Direct receive function
+ *
+ * @param arg  thread arguments of type 'thread_args_t *'
+ */
+static int gen_recv_direct_thread(void *arg)
+{
+	int thr;
+	thread_args_t *thr_args;
+	odp_packet_t pkts[MAX_RX_BURST];
+	int pkt_cnt, burst_size;
+	odp_pktin_queue_t pktin;
+
+	thr = odp_thread_id();
+	thr_args = (thread_args_t *)arg;
+	pktin = thr_args->rx.pktin;
+	burst_size = args->rx_burst_size;
+
+	printf("  [%02i] created mode: RECEIVE\n", thr);
+	odp_barrier_wait(&barrier);
+
+	for (;;) {
+		if (thr_args->stop)
+			break;
+
+		pkt_cnt = odp_pktin_recv_tmo(pktin, pkts, burst_size,
+					     ODP_PKTIN_NO_WAIT);
+
+		if (pkt_cnt > 0) {
+			process_pkts(thr, thr_args, pkts, pkt_cnt);
 
 			odp_packet_free_multi(pkts, pkt_cnt);
+		} else if (pkt_cnt == 0) {
+			continue;
+		} else {
+			break;
 		}
 	}
 
@@ -1223,8 +1262,8 @@  int main(int argc, char *argv[])
 			abort();
 		}
 		thr_args = &args->thread[PING_THR_RX];
-		thr_args->rx.ifs = ifs;
-		thr_args->rx.ifs_count = args->appl.if_count;
+		if (!args->appl.sched)
+			thr_args->rx.pktin = ifs[0].pktin[0];
 		thr_args->pool = pool;
 		thr_args->tp = tp;
 		thr_args->tq = tq;
@@ -1241,7 +1280,10 @@  int main(int argc, char *argv[])
 		thr_args->mode = args->appl.mode;
 
 		memset(&thr_params, 0, sizeof(thr_params));
-		thr_params.start    = gen_recv_thread;
+		if (args->appl.sched)
+			thr_params.start = gen_recv_thread;
+		else
+			thr_params.start = gen_recv_direct_thread;
 		thr_params.arg      = thr_args;
 		thr_params.thr_type = ODP_THREAD_WORKER;
 		thr_params.instance = instance;
@@ -1287,21 +1329,24 @@  int main(int argc, char *argv[])
 		for (i = 0; i < num_workers; ++i) {
 			odp_cpumask_t thd_mask;
 			int (*thr_run_func)(void *);
-			int if_idx, pktout_idx;
+			int if_idx, pktq_idx;
 			uint64_t start_seq;
 
+			if_idx = i % args->appl.if_count;
+
 			if (args->appl.mode == APPL_MODE_RCV) {
-				args->thread[i].rx.ifs = ifs;
-				args->thread[i].rx.ifs_count =
-					args->appl.if_count;
+				pktq_idx = (i / args->appl.if_count) %
+					ifs[if_idx].pktin_count;
+				if (!args->appl.sched)
+					args->thread[i].rx.pktin =
+						ifs[if_idx].pktin[pktq_idx];
 			} else {
-				if_idx = i % args->appl.if_count;
-				pktout_idx = (i / args->appl.if_count) %
+				pktq_idx = (i / args->appl.if_count) %
 					ifs[if_idx].pktout_count;
 				start_seq = i * args->tx_burst_size;
 
 				args->thread[i].tx.pktout =
-					ifs[if_idx].pktout[pktout_idx];
+					ifs[if_idx].pktout[pktq_idx];
 				args->thread[i].tx.pktout_cfg =
 					&ifs[if_idx].config.pktout;
 				args->thread[i].counters.ctr_seq = start_seq;
@@ -1329,7 +1374,10 @@  int main(int argc, char *argv[])
 			if (args->appl.mode == APPL_MODE_UDP) {
 				thr_run_func = gen_send_thread;
 			} else if (args->appl.mode == APPL_MODE_RCV) {
-				thr_run_func = gen_recv_thread;
+				if (args->appl.sched)
+					thr_run_func = gen_recv_thread;
+				else
+					thr_run_func = gen_recv_direct_thread;
 			} else {
 				EXAMPLE_ERR("ERR MODE\n");
 				exit(EXIT_FAILURE);