[v2,2/6] linux-gen: sched: optimize atomic packet input queue throughput

Message ID 1520002815-30682-3-git-send-email-odpbot@yandex.ru
State New
Headers show
Series
  • [v2,1/6] linux-gen: sched: optimize packet input polling
Related show

Commit Message

Github ODP bot March 2, 2018, 3 p.m.
From: Petri Savolainen <petri.savolainen@linaro.org>


When packet input queue is atomic, packets received from packet
input are passed directly to the application. Other queue types
may have events stashed on other threads, so for those incoming
packets are always enqueued (to maintain packet order).

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
/** Email created from pull request 504 (psavol:master-sched-optim-2)
 ** https://github.com/Linaro/odp/pull/504
 ** Patch: https://github.com/Linaro/odp/pull/504.patch
 ** Base sha: e1c0e4570a45d05dd9f2e8e052ce71164209d112
 ** Merge commit sha: 964132736e0785222be184065d6ac73121cd46ac
 **/
 .../linux-generic/include/odp_buffer_inlines.h     |  5 ++
 .../linux-generic/include/odp_queue_internal.h     | 22 ++++++++
 platform/linux-generic/include/odp_queue_lf.h      |  1 -
 platform/linux-generic/include/odp_schedule_if.h   |  4 +-
 platform/linux-generic/odp_packet_io.c             | 46 +++++++++++++----
 platform/linux-generic/odp_queue_basic.c           | 17 +-----
 platform/linux-generic/odp_queue_lf.c              |  2 +-
 platform/linux-generic/odp_schedule_basic.c        | 60 ++++++++++++++++++----
 platform/linux-generic/odp_schedule_iquery.c       |  6 +--
 platform/linux-generic/odp_schedule_sp.c           |  5 +-
 10 files changed, 123 insertions(+), 45 deletions(-)

Patch

diff --git a/platform/linux-generic/include/odp_buffer_inlines.h b/platform/linux-generic/include/odp_buffer_inlines.h
index 9abe79fc7..9e3b6ef70 100644
--- a/platform/linux-generic/include/odp_buffer_inlines.h
+++ b/platform/linux-generic/include/odp_buffer_inlines.h
@@ -28,6 +28,11 @@  static inline odp_buffer_t buf_from_buf_hdr(odp_buffer_hdr_t *hdr)
 	return (odp_buffer_t)hdr;
 }
 
+static inline odp_event_t event_from_buf_hdr(odp_buffer_hdr_t *hdr)
+{
+	return (odp_event_t)hdr;
+}
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 0540bf72d..3aec3fe9d 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -30,6 +30,7 @@  extern "C" {
 #include <odp/api/ticketlock.h>
 #include <odp_config_internal.h>
 #include <odp_ring_st_internal.h>
+#include <odp_queue_lf.h>
 
 #define QUEUE_STATUS_FREE         0
 #define QUEUE_STATUS_DESTROYED    1
@@ -62,6 +63,27 @@  union queue_entry_u {
 	uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct queue_entry_s))];
 };
 
+typedef struct ODP_ALIGNED_CACHE {
+	/* Storage space for ring data */
+	uint32_t data[CONFIG_QUEUE_SIZE];
+} queue_ring_data_t;
+
+typedef struct queue_global_t {
+	queue_entry_t     queue[ODP_CONFIG_QUEUES];
+	queue_ring_data_t ring_data[ODP_CONFIG_QUEUES];
+	uint32_t          queue_lf_num;
+	uint32_t          queue_lf_size;
+	queue_lf_func_t   queue_lf_func;
+
+} queue_global_t;
+
+extern queue_global_t *queue_glb;
+
+static inline queue_t queue_index_to_qint(uint32_t queue_id)
+{
+	return (queue_t)&queue_glb->queue[queue_id];
+}
+
 static inline uint32_t queue_to_index(odp_queue_t handle)
 {
 	return _odp_typeval(handle) - 1;
diff --git a/platform/linux-generic/include/odp_queue_lf.h b/platform/linux-generic/include/odp_queue_lf.h
index ef178e07b..8a973a859 100644
--- a/platform/linux-generic/include/odp_queue_lf.h
+++ b/platform/linux-generic/include/odp_queue_lf.h
@@ -12,7 +12,6 @@  extern "C" {
 #endif
 
 #include <odp_queue_if.h>
-#include <odp_queue_internal.h>
 
 /* Lock-free queue functions */
 typedef struct {
diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h
index dd2097bfb..4bd127a42 100644
--- a/platform/linux-generic/include/odp_schedule_if.h
+++ b/platform/linux-generic/include/odp_schedule_if.h
@@ -77,7 +77,9 @@  typedef struct schedule_fn_t {
 extern const schedule_fn_t *sched_fn;
 
 /* Interface for the scheduler */
-int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[]);
+int sched_cb_pktin_poll(int pktio_index, int pktin_index,
+			odp_buffer_hdr_t *hdr_tbl[], int num);
+int sched_cb_pktin_poll_old(int pktio_index, int num_queue, int index[]);
 int sched_cb_pktin_poll_one(int pktio_index, int rx_queue, odp_event_t evts[]);
 void sched_cb_pktio_stop_finalize(int pktio_index);
 odp_queue_t sched_cb_queue_handle(uint32_t queue_index);
diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c
index ae8e390b1..e76ff8140 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -559,7 +559,7 @@  odp_pktio_t odp_pktio_lookup(const char *name)
 	return hdl;
 }
 
-static inline int pktin_recv_buf(odp_pktin_queue_t queue,
+static inline int pktin_recv_buf(pktio_entry_t *entry, int pktin_index,
 				 odp_buffer_hdr_t *buffer_hdrs[], int num)
 {
 	odp_packet_t pkt;
@@ -570,7 +570,7 @@  static inline int pktin_recv_buf(odp_pktin_queue_t queue,
 	int pkts;
 	int num_rx = 0;
 
-	pkts = odp_pktin_recv(queue, packets, num);
+	pkts = entry->s.ops->recv(entry, pktin_index, packets, num);
 
 	for (i = 0; i < pkts; i++) {
 		pkt = packets[i];
@@ -624,13 +624,16 @@  static odp_buffer_hdr_t *pktin_dequeue(queue_t q_int)
 	odp_buffer_hdr_t *buf_hdr;
 	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
 	int pkts;
+	odp_pktin_queue_t pktin_queue = queue_fn->get_pktin(q_int);
+	odp_pktio_t pktio = pktin_queue.pktio;
+	int pktin_index   = pktin_queue.index;
+	pktio_entry_t *entry = get_pktio_entry(pktio);
 
 	buf_hdr = queue_fn->deq(q_int);
 	if (buf_hdr != NULL)
 		return buf_hdr;
 
-	pkts = pktin_recv_buf(queue_fn->get_pktin(q_int),
-			      hdr_tbl, QUEUE_MULTI_MAX);
+	pkts = pktin_recv_buf(entry, pktin_index, hdr_tbl, QUEUE_MULTI_MAX);
 
 	if (pkts <= 0)
 		return NULL;
@@ -646,6 +649,10 @@  static int pktin_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], int num)
 	int nbr;
 	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
 	int pkts, i, j;
+	odp_pktin_queue_t pktin_queue = queue_fn->get_pktin(q_int);
+	odp_pktio_t pktio = pktin_queue.pktio;
+	int pktin_index   = pktin_queue.index;
+	pktio_entry_t *entry = get_pktio_entry(pktio);
 
 	nbr = queue_fn->deq_multi(q_int, buf_hdr, num);
 	if (odp_unlikely(nbr > num))
@@ -657,8 +664,8 @@  static int pktin_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], int num)
 	if (nbr == num)
 		return nbr;
 
-	pkts = pktin_recv_buf(queue_fn->get_pktin(q_int),
-			      hdr_tbl, QUEUE_MULTI_MAX);
+	pkts = pktin_recv_buf(entry, pktin_index, hdr_tbl, QUEUE_MULTI_MAX);
+
 	if (pkts <= 0)
 		return nbr;
 
@@ -720,12 +727,29 @@  int sched_cb_pktin_poll_one(int pktio_index,
 	return num_rx;
 }
 
-int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
+int sched_cb_pktin_poll(int pktio_index, int pktin_index,
+			odp_buffer_hdr_t *hdr_tbl[], int num)
+{
+	pktio_entry_t *entry = pktio_entry_by_index(pktio_index);
+	int state = entry->s.state;
+
+	if (odp_unlikely(state != PKTIO_STATE_STARTED)) {
+		if (state < PKTIO_STATE_ACTIVE ||
+		    state == PKTIO_STATE_STOP_PENDING)
+			return -1;
+
+		ODP_DBG("Interface %s not started\n", entry->s.name);
+		return 0;
+	}
+
+	return pktin_recv_buf(entry, pktin_index, hdr_tbl, num);
+}
+
+int sched_cb_pktin_poll_old(int pktio_index, int num_queue, int index[])
 {
 	odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
 	int num, idx;
-	pktio_entry_t *entry;
-	entry = pktio_entry_by_index(pktio_index);
+	pktio_entry_t *entry = pktio_entry_by_index(pktio_index);
 	int state = entry->s.state;
 
 	if (odp_unlikely(state != PKTIO_STATE_STARTED)) {
@@ -739,9 +763,9 @@  int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
 
 	for (idx = 0; idx < num_queue; idx++) {
 		queue_t q_int;
-		odp_pktin_queue_t pktin = entry->s.in_queue[index[idx]].pktin;
 
-		num = pktin_recv_buf(pktin, hdr_tbl, QUEUE_MULTI_MAX);
+		num = pktin_recv_buf(entry, index[idx], hdr_tbl,
+				     QUEUE_MULTI_MAX);
 
 		if (num == 0)
 			continue;
diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c
index d9e5fdcea..399c86ed8 100644
--- a/platform/linux-generic/odp_queue_basic.c
+++ b/platform/linux-generic/odp_queue_basic.c
@@ -8,7 +8,6 @@ 
 
 #include <odp/api/queue.h>
 #include <odp_queue_internal.h>
-#include <odp_queue_lf.h>
 #include <odp_queue_if.h>
 #include <odp/api/std_types.h>
 #include <odp/api/align.h>
@@ -40,21 +39,7 @@ 
 static int queue_init(queue_entry_t *queue, const char *name,
 		      const odp_queue_param_t *param);
 
-typedef struct ODP_ALIGNED_CACHE {
-	/* Storage space for ring data */
-	uint32_t data[CONFIG_QUEUE_SIZE];
-} ring_data_t;
-
-typedef struct queue_global_t {
-	queue_entry_t   queue[ODP_CONFIG_QUEUES];
-	ring_data_t     ring_data[ODP_CONFIG_QUEUES];
-	uint32_t        queue_lf_num;
-	uint32_t        queue_lf_size;
-	queue_lf_func_t queue_lf_func;
-
-} queue_global_t;
-
-static queue_global_t *queue_glb;
+queue_global_t *queue_glb;
 
 static inline queue_entry_t *get_qentry(uint32_t queue_id)
 {
diff --git a/platform/linux-generic/odp_queue_lf.c b/platform/linux-generic/odp_queue_lf.c
index 74f529469..066e6a674 100644
--- a/platform/linux-generic/odp_queue_lf.c
+++ b/platform/linux-generic/odp_queue_lf.c
@@ -7,7 +7,7 @@ 
 #include <odp/api/queue.h>
 #include <odp/api/atomic.h>
 #include <odp/api/shared_memory.h>
-#include <odp_queue_lf.h>
+#include <odp_queue_internal.h>
 #include <string.h>
 #include <stdio.h>
 
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c
index b3847ab91..b251bdeef 100644
--- a/platform/linux-generic/odp_schedule_basic.c
+++ b/platform/linux-generic/odp_schedule_basic.c
@@ -26,6 +26,7 @@ 
 #include <odp_ring_internal.h>
 #include <odp_timer_internal.h>
 #include <odp_queue_internal.h>
+#include <odp_buffer_inlines.h>
 
 /* Number of priority levels  */
 #define NUM_PRIO 8
@@ -699,14 +700,20 @@  static inline int queue_is_pktin(uint32_t queue_index)
 	return sched->queue[queue_index].poll_pktin;
 }
 
-static inline int poll_pktin(uint32_t qi)
+static inline int poll_pktin(uint32_t qi, int stash)
 {
-	int pktio_index, pktin_index, num, num_pktin;
+	odp_buffer_hdr_t *b_hdr[MAX_DEQ];
+	int pktio_index, pktin_index, num, num_pktin, i;
+	int ret;
+	queue_t qint;
 
 	pktio_index = sched->queue[qi].pktio_index;
 	pktin_index = sched->queue[qi].pktin_index;
 
-	num = sched_cb_pktin_poll(pktio_index, 1, &pktin_index);
+	num = sched_cb_pktin_poll(pktio_index, pktin_index, b_hdr, MAX_DEQ);
+
+	if (num == 0)
+		return 0;
 
 	/* Pktio stopped or closed. Call stop_finalize when we have stopped
 	 * polling all pktin queues of the pktio. */
@@ -720,9 +727,33 @@  static inline int poll_pktin(uint32_t qi)
 
 		if (num_pktin == 0)
 			sched_cb_pktio_stop_finalize(pktio_index);
+
+		return num;
 	}
 
-	return num;
+	if (stash) {
+		for (i = 0; i < num; i++)
+			sched_local.ev_stash[i] = event_from_buf_hdr(b_hdr[i]);
+
+		return num;
+	}
+
+	qint = queue_index_to_qint(qi);
+
+	ret = queue_fn->enq_multi(qint, b_hdr, num);
+
+	/* Drop packets that were not enqueued */
+	if (odp_unlikely(ret < num)) {
+		int num_enq = ret;
+
+		if (odp_unlikely(ret < 0))
+			num_enq = 0;
+
+		ODP_DBG("Dropped %i packets\n", num - num_enq);
+		buffer_free_multi(&b_hdr[num_enq], num - num_enq);
+	}
+
+	return ret;
 }
 
 static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
@@ -805,17 +836,26 @@  static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
 				 * priorities. Stop scheduling queue when pktio
 				 * has been stopped. */
 				if (pktin) {
-					int num_pkt = poll_pktin(qi);
+					int atomic = queue_is_atomic(qi);
+					int num_pkt = poll_pktin(qi, atomic);
 
-					if (odp_likely(num_pkt >= 0)) {
+					if (odp_unlikely(num_pkt < 0))
+						continue;
+
+					if (num_pkt == 0 || !atomic) {
 						ring_enq(ring, RING_MASK, qi);
 						break;
 					}
-				}
 
-				/* Remove empty queue from scheduling. Continue
-				 * scheduling the same priority queue. */
-				continue;
+					/* Process packets from an atomic queue
+					 * right away */
+					num = num_pkt;
+				} else {
+					/* Remove empty queue from scheduling.
+					 * Continue scheduling the same priority
+					 * queue. */
+					continue;
+				}
 			}
 
 			handle            = queue_from_index(qi);
diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c
index 1a82f48da..ddd97bea7 100644
--- a/platform/linux-generic/odp_schedule_iquery.c
+++ b/platform/linux-generic/odp_schedule_iquery.c
@@ -674,9 +674,9 @@  static inline void pktio_poll_input(void)
 		cmd = &sched->pktio_poll.commands[index];
 
 		/* Poll packet input */
-		if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio,
-						     cmd->count,
-						     cmd->pktin))) {
+		if (odp_unlikely(sched_cb_pktin_poll_old(cmd->pktio,
+							 cmd->count,
+							 cmd->pktin))) {
 			/* Pktio stopped or closed. Remove poll
 			 * command and call stop_finalize when all
 			 * commands of the pktio has been removed.
diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c
index 50390274b..84d16d3c1 100644
--- a/platform/linux-generic/odp_schedule_sp.c
+++ b/platform/linux-generic/odp_schedule_sp.c
@@ -524,8 +524,9 @@  static int schedule_multi(odp_queue_t *from, uint64_t wait,
 		cmd = sched_cmd();
 
 		if (cmd && cmd->s.type == CMD_PKTIO) {
-			if (sched_cb_pktin_poll(cmd->s.index, cmd->s.num_pktin,
-						cmd->s.pktin_idx)) {
+			if (sched_cb_pktin_poll_old(cmd->s.index,
+						    cmd->s.num_pktin,
+						    cmd->s.pktin_idx)) {
 				/* Pktio stopped or closed. */
 				sched_cb_pktio_stop_finalize(cmd->s.index);
 			} else {