diff mbox

[RFC,API-NEXT,07/12] linux-generic: tm: add pkt_queue support routines

Message ID 1437249827-578-8-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer July 18, 2015, 8:03 p.m. UTC
Signed-off-by: Barry Spinney <spinney@ezchip.com>
Signed-off-by: Mike Holmes <mike.holmes@linaro.org>
Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 platform/linux-generic/Makefile.am                 |   2 +
 .../linux-generic/include/odp_pkt_queue_internal.h |  62 ++++
 platform/linux-generic/odp_pkt_queue.c             | 376 +++++++++++++++++++++
 3 files changed, 440 insertions(+)
 create mode 100644 platform/linux-generic/include/odp_pkt_queue_internal.h
 create mode 100644 platform/linux-generic/odp_pkt_queue.c
diff mbox

Patch

diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am
index f1815e7..2eb271a 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -131,6 +131,7 @@  noinst_HEADERS = \
 		  ${srcdir}/include/odp_spin_internal.h \
 		  ${srcdir}/include/odp_timer_internal.h \
 	          ${srcdir}/include/odp_name_table_internal.h \
+		  ${srcdir}/include/odp_pkt_queue_internal.h \
 		  ${srcdir}/Makefile.inc
 
 __LIB__libodp_la_SOURCES = \
@@ -143,6 +144,7 @@  __LIB__libodp_la_SOURCES = \
 			   odp_event.c \
 			   odp_init.c \
 			   odp_name_table.c \
+			   odp_pkt_queue.c \
 			   odp_impl.c \
 			   odp_packet.c \
 			   odp_packet_flags.c \
diff --git a/platform/linux-generic/include/odp_pkt_queue_internal.h b/platform/linux-generic/include/odp_pkt_queue_internal.h
new file mode 100644
index 0000000..8138255
--- /dev/null
+++ b/platform/linux-generic/include/odp_pkt_queue_internal.h
@@ -0,0 +1,62 @@ 
+/* Copyright 2015 EZchip Semiconductor Ltd. All Rights Reserved.
+ *
+ * Copyright (c) 2015, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_INT_PKT_QUEUE_H_
+#define ODP_INT_PKT_QUEUE_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdint.h>
+#include <odp.h>
+
+typedef uint64_t odp_int_queue_pool_t;
+typedef uint32_t odp_int_pkt_queue_t;
+
+#define ODP_INT_QUEUE_POOL_INVALID  0
+#define ODP_INT_PKT_QUEUE_INVALID   0
+
+/* None of the functions in this file do any locking.  Thats because the
+ * expected usage model is that each TM system will create its own
+ * odp_int_queue_pool, and then only call odp_int_pkt_queue_append and
+ * odp_int_pkt_queue_remove from a single thread associated/dedicated to this
+ * same TM system/odp_int_queue_pool.  The main difficulty that this file
+ * tries to deal with is the possibility of a huge number of queues (e.g. 16
+ * million), where each such queue could have a huge range in the number of
+ * pkts queued (say 0 to > 1,000) - yet the total "peak" number of pkts queued
+ * is many orders of magnitude smaller than the product of max_num_queues
+ * times max_queue_cnt.  In particular, it is assumed that even at peak usage,
+ * only a small fraction of max_num_queues will be "active" - i.e. have any
+ * pkts queued, yet over time it is expected that most every queue will have
+ * some sort of backlog.
+ */
+
+/* max_num_queues must be <= 16 * 1024 * 1024. */
+odp_int_queue_pool_t odp_queue_pool_create(uint32_t max_num_queues,
+					   uint32_t max_queued_pkts);
+
+odp_int_pkt_queue_t odp_pkt_queue_create(odp_int_queue_pool_t queue_pool);
+
+int odp_pkt_queue_append(odp_int_queue_pool_t queue_pool,
+			 odp_int_pkt_queue_t  pkt_queue,
+			 odp_packet_t         pkt);
+
+int odp_pkt_queue_remove(odp_int_queue_pool_t queue_pool,
+			 odp_int_pkt_queue_t  pkt_queue,
+			 odp_packet_t        *pkt);
+
+void odp_pkt_queue_stats_print(odp_int_queue_pool_t queue_pool);
+
+void odp_queue_pool_destroy(odp_int_queue_pool_t queue_pool);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/odp_pkt_queue.c b/platform/linux-generic/odp_pkt_queue.c
new file mode 100644
index 0000000..5a4e937
--- /dev/null
+++ b/platform/linux-generic/odp_pkt_queue.c
@@ -0,0 +1,376 @@ 
+/* Copyright 2015 EZchip Semiconductor Ltd. All Rights Reserved.
+
+ * Copyright (c) 2015, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#include <stdint.h>
+#include <string.h>
+#include <malloc.h>
+#include <stdio.h>
+#include <odp.h>
+#include <odp_pkt_queue_internal.h>
+#include <odp_debug_internal.h>
+
+#define MAX(a, b) (((a) > (b)) ? (a) : (b))
+#define MIN(a, b) (((a) < (b)) ? (a) : (b))
+
+typedef struct /* Must be exactly 64 bytes long AND cacheline aligned! */ {
+	uint32_t next_queue_blk_idx;
+	uint32_t tail_queue_blk_idx;
+	odp_packet_t pkts[7];
+} ODP_ALIGNED_CACHE queue_blk_t;
+
+typedef struct {
+	queue_blk_t blks[0];
+} ODP_ALIGNED_CACHE queue_blks_t;
+
+/* The queue_num_tbl is used to map from a queue_num to a queue_num_desc.
+ * The reason is based on the assumption that usually only a small fraction
+ * of the max_num_queues will have more than 1 pkt associated with it.  This
+ * way the active queue_desc's can be dynamically allocated and freed according
+ * to the actual usage pattern.
+ */
+typedef struct {
+	uint32_t queue_num_to_blk_idx[0];
+} queue_num_tbl_t;
+
+typedef struct {
+	uint32_t num_blks;
+	uint32_t next_blk_idx; /* blk_idx of queue_blks not yet added. */
+	queue_blks_t *queue_blks;
+} queue_region_desc_t;
+
+typedef struct {
+	uint64_t total_pkt_appends;
+	uint64_t total_pkt_removes;
+	uint64_t total_bad_removes;
+	uint32_t free_list_size;
+	uint32_t min_free_list_size;
+	uint32_t peak_free_list_size;
+	uint32_t free_list_head_idx;
+	uint32_t max_queue_num;
+	uint32_t max_queued_pkts;
+	uint32_t next_queue_num;
+	queue_region_desc_t queue_region_descs[16];
+	uint32_t *queue_num_tbl;
+	uint8_t current_region;
+	uint8_t all_regions_used;
+} queue_pool_t;
+
+static queue_blk_t *blk_idx_to_queue_blk(queue_pool_t *queue_pool,
+					 uint32_t queue_blk_idx)
+{
+	queue_region_desc_t *queue_region_desc;
+	uint32_t which_region, blk_tbl_idx;
+
+	which_region = queue_blk_idx >> 28;
+	blk_tbl_idx = queue_blk_idx & ((1 << 28) - 1);
+	queue_region_desc = &queue_pool->queue_region_descs[which_region];
+	return &queue_region_desc->queue_blks->blks[blk_tbl_idx];
+}
+
+static int pkt_queue_free_list_add(queue_pool_t *pool,
+				   uint32_t num_queue_blks)
+{
+	queue_region_desc_t *region_desc;
+	queue_blks_t *queue_blks;
+	queue_blk_t *queue_blk;
+	uint32_t which_region, blks_added, num_blks, start_idx;
+	uint32_t malloc_len, blks_to_add, cnt;
+
+	which_region = pool->current_region;
+	blks_added = 0;
+	while ((blks_added < num_queue_blks) && (pool->all_regions_used == 0)) {
+		region_desc = &pool->queue_region_descs[which_region];
+		start_idx = region_desc->next_blk_idx;
+		num_blks = region_desc->num_blks;
+		queue_blks = region_desc->queue_blks;
+		if (!queue_blks) {
+			malloc_len = num_blks * sizeof(queue_blk_t);
+			queue_blks = malloc(malloc_len);
+			memset(queue_blks, 0, malloc_len);
+			region_desc->queue_blks = queue_blks;
+		}
+
+		/* Now add as many queue_blks to the free list as... */
+		blks_to_add = MIN(num_blks - start_idx, num_queue_blks);
+		queue_blk = &queue_blks->blks[start_idx];
+		for (cnt = 1; cnt <= blks_to_add; cnt++) {
+			queue_blk->next_queue_blk_idx = start_idx + cnt;
+			queue_blk++;
+		}
+
+		blks_added += blks_to_add;
+		pool->free_list_size += blks_to_add;
+		region_desc->next_blk_idx += blks_to_add;
+		if (blks_to_add == (num_blks - start_idx)) {
+			/* Advance to the next region */
+			pool->current_region++;
+			if (16 <= pool->current_region) {
+				pool->all_regions_used = 1;
+				return blks_added;
+			}
+
+			which_region = pool->current_region;
+		}
+	}
+
+	return blks_added;
+}
+
+static queue_blk_t *queue_blk_alloc(queue_pool_t *pool,
+				    uint32_t *queue_blk_idx)
+{
+	queue_blk_t *head_queue_blk;
+	uint32_t head_queue_blk_idx;
+	int rc;
+
+	if (pool->free_list_size <= 1) {
+		/* Replenish the queue_blk_t free list. */
+		pool->min_free_list_size = pool->free_list_size;
+		rc = pkt_queue_free_list_add(pool, 64);
+		if (rc <= 0)
+			return NULL;
+	}
+
+	head_queue_blk_idx = pool->free_list_head_idx;
+	head_queue_blk = blk_idx_to_queue_blk(pool, head_queue_blk_idx);
+	pool->free_list_size--;
+	pool->free_list_head_idx = head_queue_blk->next_queue_blk_idx;
+	*queue_blk_idx = head_queue_blk_idx;
+	if (pool->free_list_size < pool->min_free_list_size)
+		pool->min_free_list_size = pool->free_list_size;
+
+	return head_queue_blk;
+}
+
+static void queue_blk_free(queue_pool_t *pool, queue_blk_t *queue_blk,
+			   uint32_t queue_blk_idx)
+{
+	if ((!queue_blk) || (queue_blk_idx == 0))
+		return;
+
+	queue_blk->next_queue_blk_idx = pool->free_list_head_idx;
+	pool->free_list_head_idx = queue_blk_idx;
+	pool->free_list_size++;
+	if (pool->peak_free_list_size < pool->free_list_size)
+		pool->peak_free_list_size = pool->free_list_size;
+}
+
+static void queue_region_desc_init(queue_pool_t *pool, uint32_t which_region,
+				   uint32_t num_blks)
+{
+	queue_region_desc_t *queue_region_desc;
+
+	queue_region_desc = &pool->queue_region_descs[which_region];
+	queue_region_desc->num_blks = num_blks;
+}
+
+odp_int_queue_pool_t odp_queue_pool_create(uint32_t max_num_queues,
+					   uint32_t max_queued_pkts)
+{
+	queue_pool_t *pool;
+	uint32_t idx, initial_free_list_size, malloc_len, first_queue_blk_idx;
+	int rc;
+
+	pool = malloc(sizeof(queue_pool_t));
+	memset(pool, 0, sizeof(queue_pool_t));
+
+	/* Initialize the queue_blk_tbl_sizes array based upon the
+	 * max_queued_pkts.
+	 */
+	max_queued_pkts = MAX(max_queued_pkts, 64 * 1024);
+	queue_region_desc_init(pool, 0, max_queued_pkts / 4);
+	queue_region_desc_init(pool, 1, max_queued_pkts / 64);
+	queue_region_desc_init(pool, 2, max_queued_pkts / 64);
+	queue_region_desc_init(pool, 3, max_queued_pkts / 64);
+	queue_region_desc_init(pool, 4, max_queued_pkts / 64);
+	for (idx = 5; idx < 16; idx++)
+		queue_region_desc_init(pool, idx, max_queued_pkts / 16);
+
+	/* Now allocate the first queue_blk_tbl and add its blks to the free
+	 * list.  Replenish the queue_blk_t free list.
+	 */
+	initial_free_list_size = MIN(64 * 1024, max_queued_pkts / 4);
+	rc = pkt_queue_free_list_add(pool, initial_free_list_size);
+	if (rc < 0)
+		return ODP_INT_QUEUE_POOL_INVALID;
+
+	/* Discard the first queue blk with idx 0 */
+	queue_blk_alloc(pool, &first_queue_blk_idx);
+
+	pool->max_queue_num = max_num_queues;
+	pool->max_queued_pkts = max_queued_pkts;
+	pool->next_queue_num = 1;
+
+	malloc_len = max_num_queues * sizeof(uint32_t);
+	pool->queue_num_tbl = malloc(malloc_len);
+	memset(pool->queue_num_tbl, 0, malloc_len);
+
+	pool->min_free_list_size = pool->free_list_size;
+	pool->peak_free_list_size = pool->free_list_size;
+	return (odp_int_queue_pool_t)pool;
+}
+
+odp_int_pkt_queue_t odp_pkt_queue_create(odp_int_queue_pool_t queue_pool)
+{
+	queue_pool_t *pool;
+	uint32_t queue_num;
+
+	pool = (queue_pool_t *)queue_pool;
+	queue_num = pool->next_queue_num++;
+	if (pool->max_queue_num < queue_num)
+		return ODP_INT_PKT_QUEUE_INVALID;
+
+	return (odp_int_pkt_queue_t)queue_num;
+}
+
+int odp_pkt_queue_append(odp_int_queue_pool_t queue_pool,
+			 odp_int_pkt_queue_t pkt_queue, odp_packet_t pkt)
+{
+	queue_pool_t *pool;
+	queue_blk_t *first_blk, *tail_blk, *new_tail_blk;
+	uint32_t queue_num, first_blk_idx, tail_blk_idx, new_tail_blk_idx;
+	uint32_t idx;
+
+	pool = (queue_pool_t *)queue_pool;
+	queue_num = (uint32_t)pkt_queue;
+	if ((queue_num == 0) || (pool->max_queue_num < queue_num))
+		return -2;
+
+	if (pkt == ODP_PACKET_INVALID)
+		return -3;
+
+	pool->total_pkt_appends++;
+	first_blk_idx = pool->queue_num_tbl[queue_num];
+	if (first_blk_idx == 0) {
+		first_blk = queue_blk_alloc(pool, &first_blk_idx);
+		if (!first_blk)
+			return -1;
+
+		pool->queue_num_tbl[queue_num] = first_blk_idx;
+		memset(first_blk, 0, sizeof(queue_blk_t));
+		first_blk->pkts[0] = pkt;
+		return 0;
+	}
+
+	first_blk = blk_idx_to_queue_blk(pool, first_blk_idx);
+	tail_blk_idx = first_blk->tail_queue_blk_idx;
+	if (tail_blk_idx == 0)
+		tail_blk = first_blk;
+	else
+		tail_blk = blk_idx_to_queue_blk(pool, tail_blk_idx);
+
+	/* Find first empty slot and insert pkt there. */
+	for (idx = 0; idx < 7; idx++) {
+		if (tail_blk->pkts[idx] == ODP_PACKET_INVALID) {
+			tail_blk->pkts[idx] = pkt;
+			return 0;
+		}
+	}
+
+	/* If we reach here, the tai_blk was full, so we need to allocate a new
+	 * one and link it in.
+	 */
+	new_tail_blk = queue_blk_alloc(pool, &new_tail_blk_idx);
+	if (!new_tail_blk)
+		return -1;
+
+	memset(new_tail_blk, 0, sizeof(queue_blk_t));
+	new_tail_blk->pkts[0] = pkt;
+	tail_blk->next_queue_blk_idx = new_tail_blk_idx;
+	first_blk->tail_queue_blk_idx = new_tail_blk_idx;
+	return 0;
+}
+
+int odp_pkt_queue_remove(odp_int_queue_pool_t queue_pool,
+			 odp_int_pkt_queue_t pkt_queue, odp_packet_t *pkt)
+{
+	queue_pool_t *pool;
+	queue_blk_t *first_blk, *second_blk;
+	uint32_t queue_num, first_blk_idx, next_blk_idx, idx;
+
+	pool = (queue_pool_t *)queue_pool;
+	queue_num = (uint32_t)pkt_queue;
+	if ((queue_num == 0) || (pool->max_queue_num < queue_num))
+		return -2;
+
+	first_blk_idx = pool->queue_num_tbl[queue_num];
+	if (first_blk_idx == 0)
+		return 0; /* pkt queue is empty. */
+
+	/* Now remove the first valid odp_packet_t handle value we find. */
+	first_blk = blk_idx_to_queue_blk(pool, first_blk_idx);
+	for (idx = 0; idx < 7; idx++) {
+		if (first_blk->pkts[idx] != ODP_PACKET_INVALID) {
+			*pkt = first_blk->pkts[idx];
+			first_blk->pkts[idx] = ODP_PACKET_INVALID;
+
+			/* Now see if there are any more pkts in this queue. */
+			if ((idx == 6) ||
+			    (first_blk->pkts[idx + 1] == ODP_PACKET_INVALID)) {
+				/* We have reached the end of this queue_blk.
+				 * Check to see if there is a following block
+				 * or not
+				 */
+				next_blk_idx = first_blk->next_queue_blk_idx;
+				if (next_blk_idx != 0) {
+					second_blk =
+						blk_idx_to_queue_blk
+						(pool,
+						 next_blk_idx);
+					second_blk->tail_queue_blk_idx =
+						first_blk->tail_queue_blk_idx;
+				}
+
+				pool->queue_num_tbl[queue_num] = next_blk_idx;
+				queue_blk_free(pool, first_blk, first_blk_idx);
+			}
+
+			pool->total_pkt_removes++;
+			return 1;
+		}
+	}
+
+	/* It is an error to not find at least one pkt in the first_blk! */
+	pool->total_bad_removes++;
+	return -1;
+}
+
+void odp_pkt_queue_stats_print(odp_int_queue_pool_t queue_pool)
+{
+	queue_pool_t *pool;
+
+	pool = (queue_pool_t *)queue_pool;
+	ODP_DBG("pkt_queue_stats - queue_pool=0x%lX\n", queue_pool);
+	ODP_DBG("  max_queue_num=%u max_queued_pkts=%u next_queue_num=%u\n",
+		pool->max_queue_num, pool->max_queued_pkts,
+		pool->next_queue_num);
+	ODP_DBG("  total pkt appends=%lu total pkt removes=%lu "
+		"bad removes=%lu\n",
+		pool->total_pkt_appends, pool->total_pkt_removes,
+		pool->total_bad_removes);
+	ODP_DBG("  free_list size=%u  min size=%u peak size=%u\n",
+		pool->free_list_size, pool->min_free_list_size,
+		pool->peak_free_list_size);
+}
+
+void odp_queue_pool_destroy(odp_int_queue_pool_t queue_pool)
+{
+	queue_region_desc_t *queue_region_desc;
+	queue_pool_t *pool;
+	uint32_t idx;
+
+	pool = (queue_pool_t *)queue_pool;
+	for (idx = 0; idx < 16; idx++) {
+		queue_region_desc = &pool->queue_region_descs[idx];
+		if (queue_region_desc->queue_blks)
+			free(queue_region_desc->queue_blks);
+	}
+
+	free(pool->queue_num_tbl);
+	free(pool);
+}