diff mbox series

[v3,6/10] linux-gen: queue: ring based queue implementation

Message ID 1519819218-27901-7-git-send-email-odpbot@yandex.ru
State New
Headers show
Series [v3,1/10] linux-gen: queue: inline queue from index conversion | expand

Commit Message

Github ODP bot Feb. 28, 2018, noon UTC
From: Petri Savolainen <petri.savolainen@linaro.org>


Change from linked list of bursts to a ring implementation.
Queues have maximum size but code is simpler and performance
is a bit better. This step helps in a potential future step to
implement queues with a lockless ring.

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
/** Email created from pull request 492 (psavol:master-sched-optim)
 ** https://github.com/Linaro/odp/pull/492
 ** Patch: https://github.com/Linaro/odp/pull/492.patch
 ** Base sha: f5e12df388352b27f09787028a0040afb28564f4
 ** Merge commit sha: 56e6340663c8679516a24dc81df13a53488b86b8
 **/
 .../linux-generic/include/odp_buffer_internal.h    |  34 ++---
 platform/linux-generic/include/odp_pool_internal.h |  28 ++++
 .../linux-generic/include/odp_queue_internal.h     |   5 +-
 platform/linux-generic/odp_packet.c                |   3 +-
 platform/linux-generic/odp_pool.c                  |  22 +--
 platform/linux-generic/odp_queue_basic.c           | 170 +++++++++------------
 6 files changed, 130 insertions(+), 132 deletions(-)
diff mbox series

Patch

diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h
index bd90ee156..e50dd604a 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -33,19 +33,31 @@  extern "C" {
 #include <odp_schedule_if.h>
 #include <stddef.h>
 
-#define BUFFER_BURST_SIZE    CONFIG_BURST_SIZE
-
 typedef struct seg_entry_t {
 	void     *hdr;
 	uint8_t  *data;
 	uint32_t  len;
 } seg_entry_t;
 
+typedef union buffer_index_t {
+	uint32_t u32;
+
+	struct {
+		uint32_t pool   :8;
+		uint32_t buffer :24;
+	};
+} buffer_index_t;
+
+/* Check that pool index fit into bit field */
+ODP_STATIC_ASSERT(ODP_CONFIG_POOLS    <= (0xFF + 1), "TOO_MANY_POOLS");
+
+/* Check that buffer index fit into bit field */
+ODP_STATIC_ASSERT(CONFIG_POOL_MAX_NUM <= (0xFFFFFF + 1), "TOO_LARGE_POOL");
+
 /* Common buffer header */
 struct ODP_ALIGNED_CACHE odp_buffer_hdr_t {
-
-	/* Buffer index in the pool */
-	uint32_t  index;
+	/* Combined pool and buffer index */
+	buffer_index_t index;
 
 	/* Total segment count */
 	uint16_t  segcount;
@@ -73,16 +85,6 @@  struct ODP_ALIGNED_CACHE odp_buffer_hdr_t {
 	/* Segments */
 	seg_entry_t seg[CONFIG_PACKET_SEGS_PER_HDR];
 
-	/* Burst counts */
-	uint8_t   burst_num;
-	uint8_t   burst_first;
-
-	/* Next buf in a list */
-	struct odp_buffer_hdr_t *next;
-
-	/* Burst table */
-	struct odp_buffer_hdr_t *burst[BUFFER_BURST_SIZE];
-
 	/* --- Mostly read only data --- */
 	const void *user_ptr;
 
@@ -109,8 +111,6 @@  struct ODP_ALIGNED_CACHE odp_buffer_hdr_t {
 ODP_STATIC_ASSERT(CONFIG_PACKET_SEGS_PER_HDR < 256,
 		  "CONFIG_PACKET_SEGS_PER_HDR_TOO_LARGE");
 
-ODP_STATIC_ASSERT(BUFFER_BURST_SIZE < 256, "BUFFER_BURST_SIZE_TOO_LARGE");
-
 #ifdef __cplusplus
 }
 #endif
diff --git a/platform/linux-generic/include/odp_pool_internal.h b/platform/linux-generic/include/odp_pool_internal.h
index faa019f48..e3de2b65d 100644
--- a/platform/linux-generic/include/odp_pool_internal.h
+++ b/platform/linux-generic/include/odp_pool_internal.h
@@ -104,6 +104,34 @@  static inline odp_buffer_hdr_t *buf_hdl_to_hdr(odp_buffer_t buf)
 	return (odp_buffer_hdr_t *)(uintptr_t)buf;
 }
 
+static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool,
+						   uint32_t buffer_idx)
+{
+	uint64_t block_offset;
+	odp_buffer_hdr_t *buf_hdr;
+
+	block_offset = buffer_idx * (uint64_t)pool->block_size;
+
+	/* clang requires cast to uintptr_t */
+	buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset];
+
+	return buf_hdr;
+}
+
+static inline odp_buffer_hdr_t *buf_hdr_from_index_u32(uint32_t u32)
+{
+	buffer_index_t index;
+	uint32_t pool_idx, buffer_idx;
+	pool_t *pool;
+
+	index.u32  = u32;
+	pool_idx   = index.pool;
+	buffer_idx = index.buffer;
+	pool       = pool_entry(pool_idx);
+
+	return buf_hdr_from_index(pool, buffer_idx);
+}
+
 int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int num);
 void buffer_free_multi(odp_buffer_hdr_t *buf_hdr[], int num_free);
 
diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h
index 8215a8180..0540bf72d 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -29,6 +29,7 @@  extern "C" {
 #include <odp/api/hints.h>
 #include <odp/api/ticketlock.h>
 #include <odp_config_internal.h>
+#include <odp_ring_st_internal.h>
 
 #define QUEUE_STATUS_FREE         0
 #define QUEUE_STATUS_DESTROYED    1
@@ -38,9 +39,7 @@  extern "C" {
 
 struct queue_entry_s {
 	odp_ticketlock_t  ODP_ALIGNED_CACHE lock;
-
-	odp_buffer_hdr_t *head;
-	odp_buffer_hdr_t *tail;
+	ring_st_t         ring_st;
 	int               status;
 
 	queue_enq_fn_t       ODP_ALIGNED_CACHE enqueue;
diff --git a/platform/linux-generic/odp_packet.c b/platform/linux-generic/odp_packet.c
index 22df7d670..e24aa0c7a 100644
--- a/platform/linux-generic/odp_packet.c
+++ b/platform/linux-generic/odp_packet.c
@@ -1819,7 +1819,8 @@  void odp_packet_print_data(odp_packet_t pkt, uint32_t offset,
 	len += snprintf(&str[len], n - len,
 			"  pool index    %" PRIu32 "\n", pool->pool_idx);
 	len += snprintf(&str[len], n - len,
-			"  buf index     %" PRIu32 "\n", hdr->buf_hdr.index);
+			"  buf index     %" PRIu32 "\n",
+			hdr->buf_hdr.index.buffer);
 	len += snprintf(&str[len], n - len,
 			"  segcount      %" PRIu16 "\n", hdr->buf_hdr.segcount);
 	len += snprintf(&str[len], n - len,
diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c
index 687edb8f0..998fc649e 100644
--- a/platform/linux-generic/odp_pool.c
+++ b/platform/linux-generic/odp_pool.c
@@ -80,20 +80,6 @@  static inline pool_t *pool_from_buf(odp_buffer_t buf)
 	return buf_hdr->pool_ptr;
 }
 
-static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool,
-						   uint32_t buffer_idx)
-{
-	uint64_t block_offset;
-	odp_buffer_hdr_t *buf_hdr;
-
-	block_offset = buffer_idx * (uint64_t)pool->block_size;
-
-	/* clang requires cast to uintptr_t */
-	buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset];
-
-	return buf_hdr;
-}
-
 int odp_pool_init_global(void)
 {
 	uint32_t i;
@@ -296,7 +282,9 @@  static void init_buffers(pool_t *pool)
 		memset(buf_hdr, 0, (uintptr_t)data - (uintptr_t)buf_hdr);
 
 		/* Initialize buffer metadata */
-		buf_hdr->index = i;
+		buf_hdr->index.u32    = 0;
+		buf_hdr->index.pool   = pool->pool_idx;
+		buf_hdr->index.buffer = i;
 		buf_hdr->type = type;
 		buf_hdr->event_type = type;
 		buf_hdr->pool_ptr = pool;
@@ -785,7 +773,7 @@  static inline void buffer_free_to_pool(pool_t *pool,
 		ring  = &pool->ring->hdr;
 		mask  = pool->ring_mask;
 		for (i = 0; i < num; i++)
-			buf_index[i] = buf_hdr[i]->index;
+			buf_index[i] = buf_hdr[i]->index.buffer;
 
 		ring_enq_multi(ring, mask, buf_index, num);
 
@@ -825,7 +813,7 @@  static inline void buffer_free_to_pool(pool_t *pool,
 	}
 
 	for (i = 0; i < num; i++)
-		cache->buf_index[cache_num + i] = buf_hdr[i]->index;
+		cache->buf_index[cache_num + i] = buf_hdr[i]->index.buffer;
 
 	cache->num = cache_num + num;
 }
diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c
index 2801b220f..9e99994da 100644
--- a/platform/linux-generic/odp_queue_basic.c
+++ b/platform/linux-generic/odp_queue_basic.c
@@ -40,9 +40,14 @@ 
 static int queue_init(queue_entry_t *queue, const char *name,
 		      const odp_queue_param_t *param);
 
+typedef struct ODP_ALIGNED_CACHE {
+	/* Storage space for ring data */
+	uint32_t data[CONFIG_QUEUE_SIZE];
+} ring_data_t;
+
 typedef struct queue_global_t {
 	queue_entry_t   queue[ODP_CONFIG_QUEUES];
-
+	ring_data_t     ring_data[ODP_CONFIG_QUEUES];
 	uint32_t        queue_lf_num;
 	uint32_t        queue_lf_size;
 	queue_lf_func_t queue_lf_func;
@@ -154,9 +159,11 @@  static int queue_capability(odp_queue_capability_t *capa)
 	capa->max_sched_groups  = sched_fn->num_grps();
 	capa->sched_prios       = odp_schedule_num_prio();
 	capa->plain.max_num     = capa->max_queues;
-	capa->sched.max_num     = capa->max_queues;
+	capa->plain.max_size    = CONFIG_QUEUE_SIZE;
 	capa->plain.lockfree.max_num  = queue_glb->queue_lf_num;
 	capa->plain.lockfree.max_size = queue_glb->queue_lf_size;
+	capa->sched.max_num     = capa->max_queues;
+	capa->sched.max_size    = CONFIG_QUEUE_SIZE;
 
 	return 0;
 }
@@ -204,6 +211,20 @@  static odp_queue_t queue_create(const char *name,
 		param = &default_param;
 	}
 
+	if (param->nonblocking == ODP_BLOCKING) {
+		if (param->size > CONFIG_QUEUE_SIZE)
+			return ODP_QUEUE_INVALID;
+	} else if (param->nonblocking == ODP_NONBLOCKING_LF) {
+		/* Only plain type lock-free queues supported */
+		if (param->type != ODP_QUEUE_TYPE_PLAIN)
+			return ODP_QUEUE_INVALID;
+		if (param->size > queue_glb->queue_lf_size)
+			return ODP_QUEUE_INVALID;
+	} else {
+		/* Wait-free queues not supported */
+		return ODP_QUEUE_INVALID;
+	}
+
 	for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
 		queue = &queue_glb->queue[i];
 
@@ -297,7 +318,7 @@  static int queue_destroy(odp_queue_t handle)
 		ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name);
 		return -1;
 	}
-	if (queue->s.head != NULL) {
+	if (ring_st_is_empty(&queue->s.ring_st) == 0) {
 		UNLOCK(&queue->s.lock);
 		ODP_ERR("queue \"%s\" not empty\n", queue->s.name);
 		return -1;
@@ -364,81 +385,71 @@  static odp_queue_t queue_lookup(const char *name)
 	return ODP_QUEUE_INVALID;
 }
 
+static inline void buffer_index_from_buf(uint32_t buffer_index[],
+					 odp_buffer_hdr_t *buf_hdr[], int num)
+{
+	int i;
+
+	for (i = 0; i < num; i++)
+		buffer_index[i] = buf_hdr[i]->index.u32;
+}
+
+static inline void buffer_index_to_buf(odp_buffer_hdr_t *buf_hdr[],
+				       uint32_t buffer_index[], int num)
+{
+	int i;
+
+	for (i = 0; i < num; i++) {
+		buf_hdr[i] = buf_hdr_from_index_u32(buffer_index[i]);
+		odp_prefetch(buf_hdr[i]);
+	}
+}
+
 static inline int enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[],
 			    int num)
 {
 	int sched = 0;
-	int i, ret;
+	int ret;
 	queue_entry_t *queue;
-	odp_buffer_hdr_t *hdr, *tail, *next_hdr;
+	int num_enq;
+	ring_st_t *ring_st;
+	uint32_t buf_idx[num];
 
 	queue = qentry_from_int(q_int);
+	ring_st = &queue->s.ring_st;
+
 	if (sched_fn->ord_enq_multi(q_int, (void **)buf_hdr, num, &ret))
 		return ret;
 
-	/* Optimize the common case of single enqueue */
-	if (num == 1) {
-		tail = buf_hdr[0];
-		hdr  = tail;
-		hdr->burst_num = 0;
-		hdr->next = NULL;
-	} else {
-		int next;
-
-		/* Start from the last buffer header */
-		tail = buf_hdr[num - 1];
-		hdr  = tail;
-		hdr->next = NULL;
-		next = num - 2;
-
-		while (1) {
-			/* Build a burst. The buffer header carrying
-			 * a burst is the last buffer of the burst. */
-			for (i = 0; next >= 0 && i < BUFFER_BURST_SIZE;
-			     i++, next--)
-				hdr->burst[BUFFER_BURST_SIZE - 1 - i] =
-					buf_hdr[next];
-
-			hdr->burst_num   = i;
-			hdr->burst_first = BUFFER_BURST_SIZE - i;
-
-			if (odp_likely(next < 0))
-				break;
-
-			/* Get another header and link it */
-			next_hdr  = hdr;
-			hdr       = buf_hdr[next];
-			hdr->next = next_hdr;
-			next--;
-		}
-	}
+	buffer_index_from_buf(buf_idx, buf_hdr, num);
 
 	LOCK(&queue->s.lock);
+
 	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
 		UNLOCK(&queue->s.lock);
 		ODP_ERR("Bad queue status\n");
 		return -1;
 	}
 
-	/* Empty queue */
-	if (queue->s.head == NULL)
-		queue->s.head = hdr;
-	else
-		queue->s.tail->next = hdr;
+	num_enq = ring_st_enq_multi(ring_st, buf_idx, num);
 
-	queue->s.tail = tail;
+	if (odp_unlikely(num_enq == 0)) {
+		UNLOCK(&queue->s.lock);
+		return 0;
+	}
 
 	if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
 		queue->s.status = QUEUE_STATUS_SCHED;
-		sched = 1; /* retval: schedule queue */
+		sched = 1;
 	}
+
 	UNLOCK(&queue->s.lock);
 
 	/* Add queue to scheduling */
 	if (sched && sched_fn->sched_queue(queue->s.index))
 		ODP_ABORT("schedule_queue failed\n");
 
-	return num; /* All events enqueued */
+	return num_enq;
 }
 
 static int queue_int_enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[],
@@ -484,12 +495,15 @@  static int queue_enq(odp_queue_t handle, odp_event_t ev)
 static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 			    int num)
 {
-	odp_buffer_hdr_t *hdr, *next;
-	int i, j;
-	int updated = 0;
 	int status_sync = sched_fn->status_sync;
+	int num_deq;
+	ring_st_t *ring_st;
+	uint32_t buf_idx[num];
+
+	ring_st = &queue->s.ring_st;
 
 	LOCK(&queue->s.lock);
+
 	if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
 		/* Bad queue, or queue has been destroyed.
 		 * Scheduler finalizes queue destroy after this. */
@@ -497,9 +511,9 @@  static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 		return -1;
 	}
 
-	hdr = queue->s.head;
+	num_deq = ring_st_deq_multi(ring_st, buf_idx, num);
 
-	if (hdr == NULL) {
+	if (num_deq == 0) {
 		/* Already empty queue */
 		if (queue->s.status == QUEUE_STATUS_SCHED) {
 			queue->s.status = QUEUE_STATUS_NOTSCHED;
@@ -509,51 +523,18 @@  static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 		}
 
 		UNLOCK(&queue->s.lock);
-		return 0;
-	}
-
-	for (i = 0; i < num && hdr; ) {
-		int burst_num = hdr->burst_num;
-		int first     = hdr->burst_first;
-
-		/* First, get bursted buffers */
-		for (j = 0; j < burst_num && i < num; j++, i++) {
-			buf_hdr[i] = hdr->burst[first + j];
-			odp_prefetch(buf_hdr[i]);
-		}
-
-		if (burst_num) {
-			hdr->burst_num   = burst_num - j;
-			hdr->burst_first = first + j;
-		}
 
-		if (i == num)
-			break;
-
-		/* When burst is empty, consume the current buffer header and
-		 * move to the next header */
-		buf_hdr[i] = hdr;
-		next       = hdr->next;
-		hdr->next  = NULL;
-		hdr        = next;
-		updated++;
-		i++;
+		return 0;
 	}
 
-	/* Write head only if updated */
-	if (updated)
-		queue->s.head = hdr;
-
-	/* Queue is empty */
-	if (hdr == NULL)
-		queue->s.tail = NULL;
-
 	if (status_sync && queue->s.type == ODP_QUEUE_TYPE_SCHED)
 		sched_fn->save_context(queue->s.index);
 
 	UNLOCK(&queue->s.lock);
 
-	return i;
+	buffer_index_to_buf(buf_hdr, buf_idx, num_deq);
+
+	return num_deq;
 }
 
 static int queue_int_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[],
@@ -622,8 +603,9 @@  static int queue_init(queue_entry_t *queue, const char *name,
 	queue->s.pktin = PKTIN_INVALID;
 	queue->s.pktout = PKTOUT_INVALID;
 
-	queue->s.head = NULL;
-	queue->s.tail = NULL;
+	ring_st_init(&queue->s.ring_st,
+		     queue_glb->ring_data[queue->s.index].data,
+		     CONFIG_QUEUE_SIZE);
 
 	return 0;
 }
@@ -699,7 +681,7 @@  int sched_cb_queue_empty(uint32_t queue_index)
 		return -1;
 	}
 
-	if (queue->s.head == NULL) {
+	if (ring_st_is_empty(&queue->s.ring_st)) {
 		/* Already empty queue. Update status. */
 		if (queue->s.status == QUEUE_STATUS_SCHED)
 			queue->s.status = QUEUE_STATUS_NOTSCHED;