From patchwork Thu Feb 22 10:00:18 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Github ODP bot X-Patchwork-Id: 129177 Delivered-To: patch@linaro.org Received: by 10.46.66.2 with SMTP id p2csp425295lja; Thu, 22 Feb 2018 02:09:54 -0800 (PST) X-Google-Smtp-Source: AG47ELvGwmmp8T2YyiQrNJu8rRllaXrC8eyGLM7xl4QD67mk071o6u6KFYDsLDx80Ozb1FFVagIq X-Received: by 10.55.110.194 with SMTP id j185mr10041947qkc.327.1519294194701; Thu, 22 Feb 2018 02:09:54 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1519294194; cv=none; d=google.com; s=arc-20160816; b=ShTzq978Ksli2wJWZbLkxWkAbbl6gsMkQ0IpQUSkVxYiM0+2XtXfHxgyC5LtbwhHeP Kkfn5F6IYut6or1ErM7hcZnL+tJ1QPPG3Fj/z39i9TC1/kwGNHcOuXYkQBB8ZcsPu2qH j6uHtfhPZtXIDdemVVm6nnrpqv6RsClnk6/X6J7rtittqlNi50KMMh7i++0RRyNuH85+ 8egHkaBihnDNUSz0BHxtOyJtxEe8444SZtTv/SIK+NM3V2JAlfnRrOYFaxPeOgHxHzwH yYS8GNPyVu3GhY8Vz8sVhT9vr5AGV5T4ZKCPQDNsvgQLtOhyT2qIGeeIG4GIIEYsA25X /z3g== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:list-subscribe:list-help:list-post:list-archive :list-unsubscribe:list-id:precedence:subject:github-pr-num :references:in-reply-to:message-id:date:to:from:delivered-to :arc-authentication-results; bh=yPOQxOapjQotJPla8rBwev6YYQXv+cld9fs8PeyiAkE=; b=lFk5JnWSv3TTuC9y1bipWnAYkJAkYQXMj+s/M13h0pG0DsT0v+0zYfsSZBhxBeo1ZR LWIp0cgHPrJjIaIrs0NdxbxboL48IkDq1EtilDhoftOPzH9yCU6yjGgc40Xy0RSOWtDC YK5G0WeFWTJux2Os+JmWD2ce55NRDiYerAqRTYfcWBw0b9Q13vLJR95PgGSrxIyPetTQ GBP3Is7LB5yHDTAXjYuN+JKlIAKvdLebQ0G99uz+i62j6gPrf4xCJODlxBk50lCK9Pe5 OkwbuRn51sgNKrAx3KCWyFXNNPU2/N3bZ3NvZT/3YlnI/uZxYS4eo7ZqMODOD88m9Z6K s+ag== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=yandex.ru Return-Path: Received: from lists.linaro.org (ec2-54-197-127-237.compute-1.amazonaws.com. [54.197.127.237]) by mx.google.com with ESMTP id r7si735866qkf.355.2018.02.22.02.09.54; Thu, 22 Feb 2018 02:09:54 -0800 (PST) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) client-ip=54.197.127.237; Authentication-Results: mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=yandex.ru Received: by lists.linaro.org (Postfix, from userid 109) id 3F3B16087E; Thu, 22 Feb 2018 10:09:54 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252 X-Spam-Level: X-Spam-Status: No, score=-2.6 required=5.0 tests=BAYES_00,FREEMAIL_FROM, RCVD_IN_DNSWL_LOW autolearn=disabled version=3.4.0 Received: from [127.0.0.1] (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id 837C861760; Thu, 22 Feb 2018 10:01:37 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id 9FE2B6173F; Thu, 22 Feb 2018 10:01:23 +0000 (UTC) Received: from forward102o.mail.yandex.net (forward102o.mail.yandex.net [37.140.190.182]) by lists.linaro.org (Postfix) with ESMTPS id C93FE60591 for ; Thu, 22 Feb 2018 10:00:28 +0000 (UTC) Received: from mxback14j.mail.yandex.net (mxback14j.mail.yandex.net [IPv6:2a02:6b8:0:1619::90]) by forward102o.mail.yandex.net (Yandex) with ESMTP id 9F56B5A045AF for ; Thu, 22 Feb 2018 13:00:27 +0300 (MSK) Received: from smtp4p.mail.yandex.net (smtp4p.mail.yandex.net [2a02:6b8:0:1402::15:6]) by mxback14j.mail.yandex.net (nwsmtp/Yandex) with ESMTP id qhOrHd6l7X-0RTqpKoq; Thu, 22 Feb 2018 13:00:27 +0300 Received: by smtp4p.mail.yandex.net (nwsmtp/Yandex) with ESMTPSA id azPEnBhLoH-0Qv0LQOd; Thu, 22 Feb 2018 13:00:26 +0300 (using TLSv1.2 with cipher ECDHE-RSA-AES128-SHA256 (128/128 bits)) (Client certificate not present) From: Github ODP bot To: lng-odp@lists.linaro.org Date: Thu, 22 Feb 2018 13:00:18 +0300 Message-Id: <1519293622-14665-7-git-send-email-odpbot@yandex.ru> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1519293622-14665-1-git-send-email-odpbot@yandex.ru> References: <1519293622-14665-1-git-send-email-odpbot@yandex.ru> Github-pr-num: 492 Subject: [lng-odp] [PATCH v2 6/10] linux-gen: queue: ring based queue implementation X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: "The OpenDataPlane \(ODP\) List" List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" From: Petri Savolainen Change from linked list of bursts to a ring implementation. Queues have maximum size but code is simpler and performance is a bit better. This step helps in a potential future step to implement queues with a lockless ring. Signed-off-by: Petri Savolainen --- /** Email created from pull request 492 (psavol:master-sched-optim) ** https://github.com/Linaro/odp/pull/492 ** Patch: https://github.com/Linaro/odp/pull/492.patch ** Base sha: 5a58bbf2bb331fd7dde2ebbc0430634ace6900fb ** Merge commit sha: b29563293c1bca56419d2dc355a8e64d961e024a **/ .../linux-generic/include/odp_buffer_internal.h | 34 ++--- platform/linux-generic/include/odp_pool_internal.h | 28 ++++ .../linux-generic/include/odp_queue_internal.h | 5 +- platform/linux-generic/odp_packet.c | 3 +- platform/linux-generic/odp_pool.c | 22 +-- platform/linux-generic/odp_queue.c | 159 +++++++++------------ 6 files changed, 120 insertions(+), 131 deletions(-) diff --git a/platform/linux-generic/include/odp_buffer_internal.h b/platform/linux-generic/include/odp_buffer_internal.h index b52669387..b7af785aa 100644 --- a/platform/linux-generic/include/odp_buffer_internal.h +++ b/platform/linux-generic/include/odp_buffer_internal.h @@ -33,19 +33,31 @@ extern "C" { #include #include -#define BUFFER_BURST_SIZE CONFIG_BURST_SIZE - typedef struct seg_entry_t { void *hdr; uint8_t *data; uint32_t len; } seg_entry_t; +typedef union buffer_index_t { + uint32_t u32; + + struct { + uint32_t pool :8; + uint32_t buffer :24; + }; +} buffer_index_t; + +/* Check that pool index fit into bit field */ +ODP_STATIC_ASSERT(ODP_CONFIG_POOLS <= (0xFF + 1), "TOO_MANY_POOLS"); + +/* Check that buffer index fit into bit field */ +ODP_STATIC_ASSERT(CONFIG_POOL_MAX_NUM <= (0xFFFFFF + 1), "TOO_LARGE_POOL"); + /* Common buffer header */ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t { - - /* Buffer index in the pool */ - uint32_t index; + /* Combined pool and buffer index */ + buffer_index_t index; /* Total segment count */ uint16_t segcount; @@ -73,16 +85,6 @@ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t { /* Segments */ seg_entry_t seg[CONFIG_PACKET_SEGS_PER_HDR]; - /* Burst counts */ - uint8_t burst_num; - uint8_t burst_first; - - /* Next buf in a list */ - struct odp_buffer_hdr_t *next; - - /* Burst table */ - struct odp_buffer_hdr_t *burst[BUFFER_BURST_SIZE]; - /* --- Mostly read only data --- */ /* User context pointer or u64 */ @@ -115,8 +117,6 @@ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t { ODP_STATIC_ASSERT(CONFIG_PACKET_SEGS_PER_HDR < 256, "CONFIG_PACKET_SEGS_PER_HDR_TOO_LARGE"); -ODP_STATIC_ASSERT(BUFFER_BURST_SIZE < 256, "BUFFER_BURST_SIZE_TOO_LARGE"); - #ifdef __cplusplus } #endif diff --git a/platform/linux-generic/include/odp_pool_internal.h b/platform/linux-generic/include/odp_pool_internal.h index 8284bcd7d..276032e7b 100644 --- a/platform/linux-generic/include/odp_pool_internal.h +++ b/platform/linux-generic/include/odp_pool_internal.h @@ -104,6 +104,34 @@ static inline odp_buffer_hdr_t *buf_hdl_to_hdr(odp_buffer_t buf) return (odp_buffer_hdr_t *)(uintptr_t)buf; } +static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool, + uint32_t buffer_idx) +{ + uint32_t block_offset; + odp_buffer_hdr_t *buf_hdr; + + block_offset = buffer_idx * pool->block_size; + + /* clang requires cast to uintptr_t */ + buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset]; + + return buf_hdr; +} + +static inline odp_buffer_hdr_t *buf_hdr_from_index_u32(uint32_t u32) +{ + buffer_index_t index; + uint32_t pool_idx, buffer_idx; + pool_t *pool; + + index.u32 = u32; + pool_idx = index.pool; + buffer_idx = index.buffer; + pool = pool_entry(pool_idx); + + return buf_hdr_from_index(pool, buffer_idx); +} + int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int num); void buffer_free_multi(odp_buffer_hdr_t *buf_hdr[], int num_free); diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 6fe3fb462..68dddbb0b 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -29,6 +29,7 @@ extern "C" { #include #include #include +#include #define QUEUE_STATUS_FREE 0 #define QUEUE_STATUS_DESTROYED 1 @@ -38,9 +39,7 @@ extern "C" { struct queue_entry_s { odp_ticketlock_t ODP_ALIGNED_CACHE lock; - - odp_buffer_hdr_t *head; - odp_buffer_hdr_t *tail; + ring_st_t ring_st; int status; queue_enq_fn_t ODP_ALIGNED_CACHE enqueue; diff --git a/platform/linux-generic/odp_packet.c b/platform/linux-generic/odp_packet.c index 1c1b45ff1..6f8f2d039 100644 --- a/platform/linux-generic/odp_packet.c +++ b/platform/linux-generic/odp_packet.c @@ -1814,7 +1814,8 @@ void odp_packet_print_data(odp_packet_t pkt, uint32_t offset, len += snprintf(&str[len], n - len, " pool index %" PRIu32 "\n", pool->pool_idx); len += snprintf(&str[len], n - len, - " buf index %" PRIu32 "\n", hdr->buf_hdr.index); + " buf index %" PRIu32 "\n", + hdr->buf_hdr.index.buffer); len += snprintf(&str[len], n - len, " segcount %" PRIu16 "\n", hdr->buf_hdr.segcount); len += snprintf(&str[len], n - len, diff --git a/platform/linux-generic/odp_pool.c b/platform/linux-generic/odp_pool.c index e5ba8982a..a9c57f4cb 100644 --- a/platform/linux-generic/odp_pool.c +++ b/platform/linux-generic/odp_pool.c @@ -80,20 +80,6 @@ static inline pool_t *pool_from_buf(odp_buffer_t buf) return buf_hdr->pool_ptr; } -static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool, - uint32_t buffer_idx) -{ - uint32_t block_offset; - odp_buffer_hdr_t *buf_hdr; - - block_offset = buffer_idx * pool->block_size; - - /* clang requires cast to uintptr_t */ - buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset]; - - return buf_hdr; -} - int odp_pool_init_global(void) { uint32_t i; @@ -296,7 +282,9 @@ static void init_buffers(pool_t *pool) memset(buf_hdr, 0, (uintptr_t)data - (uintptr_t)buf_hdr); /* Initialize buffer metadata */ - buf_hdr->index = i; + buf_hdr->index.u32 = 0; + buf_hdr->index.pool = pool->pool_idx; + buf_hdr->index.buffer = i; buf_hdr->type = type; buf_hdr->event_type = type; buf_hdr->pool_ptr = pool; @@ -782,7 +770,7 @@ static inline void buffer_free_to_pool(pool_t *pool, ring = &pool->ring->hdr; mask = pool->ring_mask; for (i = 0; i < num; i++) - buf_index[i] = buf_hdr[i]->index; + buf_index[i] = buf_hdr[i]->index.buffer; ring_enq_multi(ring, mask, buf_index, num); @@ -822,7 +810,7 @@ static inline void buffer_free_to_pool(pool_t *pool, } for (i = 0; i < num; i++) - cache->buf_index[cache_num + i] = buf_hdr[i]->index; + cache->buf_index[cache_num + i] = buf_hdr[i]->index.buffer; cache->num = cache_num + num; } diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 21135df63..eda872321 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -39,8 +39,15 @@ static int queue_init(queue_entry_t *queue, const char *name, const odp_queue_param_t *param); +typedef struct ODP_ALIGNED_CACHE { + /* Storage space for ring data */ + uint32_t data[CONFIG_QUEUE_SIZE]; +} ring_data_t; + typedef struct queue_table_t { - queue_entry_t queue[ODP_CONFIG_QUEUES]; + queue_entry_t queue[ODP_CONFIG_QUEUES]; + ring_data_t ring_data[ODP_CONFIG_QUEUES]; + } queue_table_t; static queue_table_t *queue_tbl; @@ -143,8 +150,10 @@ static int queue_capability(odp_queue_capability_t *capa) capa->max_sched_groups = sched_fn->num_grps(); capa->sched_prios = odp_schedule_num_prio(); capa->plain.max_num = capa->max_queues; + capa->plain.max_size = CONFIG_QUEUE_SIZE; capa->plain.nonblocking = ODP_BLOCKING; capa->sched.max_num = capa->max_queues; + capa->sched.max_size = CONFIG_QUEUE_SIZE; capa->sched.nonblocking = ODP_BLOCKING; return 0; @@ -192,6 +201,9 @@ static odp_queue_t queue_create(const char *name, param = &default_param; } + if (param->size > CONFIG_QUEUE_SIZE) + return ODP_QUEUE_INVALID; + for (i = 0; i < ODP_CONFIG_QUEUES; i++) { queue = &queue_tbl->queue[i]; @@ -263,7 +275,7 @@ static int queue_destroy(odp_queue_t handle) ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name); return -1; } - if (queue->s.head != NULL) { + if (ring_st_is_empty(&queue->s.ring_st) == 0) { UNLOCK(&queue->s.lock); ODP_ERR("queue \"%s\" not empty\n", queue->s.name); return -1; @@ -326,81 +338,71 @@ static odp_queue_t queue_lookup(const char *name) return ODP_QUEUE_INVALID; } +static inline void buffer_index_from_buf(uint32_t buffer_index[], + odp_buffer_hdr_t *buf_hdr[], int num) +{ + int i; + + for (i = 0; i < num; i++) + buffer_index[i] = buf_hdr[i]->index.u32; +} + +static inline void buffer_index_to_buf(odp_buffer_hdr_t *buf_hdr[], + uint32_t buffer_index[], int num) +{ + int i; + + for (i = 0; i < num; i++) { + buf_hdr[i] = buf_hdr_from_index_u32(buffer_index[i]); + odp_prefetch(buf_hdr[i]); + } +} + static inline int enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], int num) { int sched = 0; - int i, ret; + int ret; queue_entry_t *queue; - odp_buffer_hdr_t *hdr, *tail, *next_hdr; + int num_enq; + ring_st_t *ring_st; + uint32_t buf_idx[num]; queue = qentry_from_int(q_int); + ring_st = &queue->s.ring_st; + if (sched_fn->ord_enq_multi(q_int, (void **)buf_hdr, num, &ret)) return ret; - /* Optimize the common case of single enqueue */ - if (num == 1) { - tail = buf_hdr[0]; - hdr = tail; - hdr->burst_num = 0; - hdr->next = NULL; - } else { - int next; - - /* Start from the last buffer header */ - tail = buf_hdr[num - 1]; - hdr = tail; - hdr->next = NULL; - next = num - 2; - - while (1) { - /* Build a burst. The buffer header carrying - * a burst is the last buffer of the burst. */ - for (i = 0; next >= 0 && i < BUFFER_BURST_SIZE; - i++, next--) - hdr->burst[BUFFER_BURST_SIZE - 1 - i] = - buf_hdr[next]; - - hdr->burst_num = i; - hdr->burst_first = BUFFER_BURST_SIZE - i; - - if (odp_likely(next < 0)) - break; - - /* Get another header and link it */ - next_hdr = hdr; - hdr = buf_hdr[next]; - hdr->next = next_hdr; - next--; - } - } + buffer_index_from_buf(buf_idx, buf_hdr, num); LOCK(&queue->s.lock); + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { UNLOCK(&queue->s.lock); ODP_ERR("Bad queue status\n"); return -1; } - /* Empty queue */ - if (queue->s.head == NULL) - queue->s.head = hdr; - else - queue->s.tail->next = hdr; + num_enq = ring_st_enq_multi(ring_st, buf_idx, num); - queue->s.tail = tail; + if (odp_unlikely(num_enq == 0)) { + UNLOCK(&queue->s.lock); + return 0; + } if (queue->s.status == QUEUE_STATUS_NOTSCHED) { queue->s.status = QUEUE_STATUS_SCHED; - sched = 1; /* retval: schedule queue */ + sched = 1; } + UNLOCK(&queue->s.lock); /* Add queue to scheduling */ if (sched && sched_fn->sched_queue(queue->s.index)) ODP_ABORT("schedule_queue failed\n"); - return num; /* All events enqueued */ + return num_enq; } static int queue_int_enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], @@ -446,12 +448,15 @@ static int queue_enq(odp_queue_t handle, odp_event_t ev) static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { - odp_buffer_hdr_t *hdr, *next; - int i, j; - int updated = 0; int status_sync = sched_fn->status_sync; + int num_deq; + ring_st_t *ring_st; + uint32_t buf_idx[num]; + + ring_st = &queue->s.ring_st; LOCK(&queue->s.lock); + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { /* Bad queue, or queue has been destroyed. * Scheduler finalizes queue destroy after this. */ @@ -459,9 +464,9 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], return -1; } - hdr = queue->s.head; + num_deq = ring_st_deq_multi(ring_st, buf_idx, num); - if (hdr == NULL) { + if (num_deq == 0) { /* Already empty queue */ if (queue->s.status == QUEUE_STATUS_SCHED) { queue->s.status = QUEUE_STATUS_NOTSCHED; @@ -471,51 +476,18 @@ static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], } UNLOCK(&queue->s.lock); - return 0; - } - - for (i = 0; i < num && hdr; ) { - int burst_num = hdr->burst_num; - int first = hdr->burst_first; - /* First, get bursted buffers */ - for (j = 0; j < burst_num && i < num; j++, i++) { - buf_hdr[i] = hdr->burst[first + j]; - odp_prefetch(buf_hdr[i]); - } - - if (burst_num) { - hdr->burst_num = burst_num - j; - hdr->burst_first = first + j; - } - - if (i == num) - break; - - /* When burst is empty, consume the current buffer header and - * move to the next header */ - buf_hdr[i] = hdr; - next = hdr->next; - hdr->next = NULL; - hdr = next; - updated++; - i++; + return 0; } - /* Write head only if updated */ - if (updated) - queue->s.head = hdr; - - /* Queue is empty */ - if (hdr == NULL) - queue->s.tail = NULL; - if (status_sync && queue->s.type == ODP_QUEUE_TYPE_SCHED) sched_fn->save_context(queue->s.index); UNLOCK(&queue->s.lock); - return i; + buffer_index_to_buf(buf_hdr, buf_idx, num_deq); + + return num_deq; } static int queue_int_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[], @@ -584,8 +556,9 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->s.pktin = PKTIN_INVALID; queue->s.pktout = PKTOUT_INVALID; - queue->s.head = NULL; - queue->s.tail = NULL; + ring_st_init(&queue->s.ring_st, + queue_tbl->ring_data[queue->s.index].data, + CONFIG_QUEUE_SIZE); return 0; } @@ -661,7 +634,7 @@ int sched_cb_queue_empty(uint32_t queue_index) return -1; } - if (queue->s.head == NULL) { + if (ring_st_is_empty(&queue->s.ring_st)) { /* Already empty queue. Update status. */ if (queue->s.status == QUEUE_STATUS_SCHED) queue->s.status = QUEUE_STATUS_NOTSCHED;