From patchwork Wed Dec 20 15:00:09 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Github ODP bot X-Patchwork-Id: 122472 Delivered-To: patch@linaro.org Received: by 10.140.22.227 with SMTP id 90csp5687505qgn; Wed, 20 Dec 2017 07:02:22 -0800 (PST) X-Google-Smtp-Source: ACJfBot9cj2CuYsQvBnqOZ3ff/TLxPUhm9Bebg5Z9/9RDkEtQ3ElIGkQr1XliQANjNnuFvSI9zer X-Received: by 10.200.7.65 with SMTP id k1mr10255308qth.123.1513782142418; Wed, 20 Dec 2017 07:02:22 -0800 (PST) ARC-Seal: i=1; a=rsa-sha256; t=1513782142; cv=none; d=google.com; s=arc-20160816; b=qg0eTDivmN1SjZdsy6QbyXB/di8BV6v9blRFfvPNUg+/vP81NOmsvyq3RAJS6Dnl7M IGzreIomh/rvlB/HkJMXRdV48KmwkMsGgbbw26tVqU6dQnQRkpvBo/Y3ei6ldz1xhAoR ipEbwMatmVyzoaBBsjlIr25LwLAV3jm4H268YQsvgOHjm5gaS/HqvTAAfSQE8WSI+SKL 0WZWs4jQza7c3LDMBFxWv2VWXcAzGoLuuxEEf/BQTUefOB6XJZtTdO38J3IsbOX9FDuN x5XxUPyzp9xCPj5TKrdkv9xoCWS8y5cyIWsVo+SbFTJaJDqF5kTKucZALWl8wjuqW7XF kwPw== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:list-subscribe:list-help:list-post:list-archive :list-unsubscribe:list-id:precedence:subject:github-pr-num :references:in-reply-to:message-id:date:to:from:delivered-to :arc-authentication-results; bh=fhi8XMCcwmTLookzEzlr+5Je21l+D77QflQwk8+DqB0=; b=zGWwZUAQ5T1nhlDnDPCOixepXg59ILc9VxIBmdTdvzbMOCGkDX3IAwgIdpauB6fEAM iaTAIq5GHZgLIzd2xGzVwtIrKDvfv6XNosRlRKuAYIk5wt9HAjQmDI011HOX9bd63xNV 6j4o1z5p+jlTLPP2CATaTsYF19Dh8PPXS8dpxvckkqnFgxWHjLU7kbMKH+QfzBVcgaAQ CNyt8yUO0QHBiWayPBYFG11Pi/RgYKi8QQFhKqCSk0lBbSKhCSklmF8B7K4VWtgslbee GBmeCpydWRenZPGzUBKZ7maV40lAxnI/DGzV+m7xLUFVKC7QTtwPJ8XbIALdXHhoZnb6 jNFw== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=yandex.ru Return-Path: Received: from lists.linaro.org (ec2-54-197-127-237.compute-1.amazonaws.com. [54.197.127.237]) by mx.google.com with ESMTP id l5si6964347qtf.461.2017.12.20.07.02.07; Wed, 20 Dec 2017 07:02:22 -0800 (PST) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) client-ip=54.197.127.237; Authentication-Results: mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=yandex.ru Received: by lists.linaro.org (Postfix, from userid 109) id 115FA608FD; Wed, 20 Dec 2017 15:02:07 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252 X-Spam-Level: X-Spam-Status: No, score=-2.6 required=5.0 tests=BAYES_00,FREEMAIL_FROM, RCVD_IN_DNSWL_LOW, RCVD_IN_MSPIKE_H3, RCVD_IN_MSPIKE_WL autolearn=disabled version=3.4.0 Received: from [127.0.0.1] (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id 0789960907; Wed, 20 Dec 2017 15:00:37 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id 338DA608F6; Wed, 20 Dec 2017 15:00:23 +0000 (UTC) Received: from forward100j.mail.yandex.net (forward100j.mail.yandex.net [5.45.198.240]) by lists.linaro.org (Postfix) with ESMTPS id 13514608E2 for ; Wed, 20 Dec 2017 15:00:15 +0000 (UTC) Received: from mxback1j.mail.yandex.net (mxback1j.mail.yandex.net [IPv6:2a02:6b8:0:1619::10a]) by forward100j.mail.yandex.net (Yandex) with ESMTP id 984AF5D850C6 for ; Wed, 20 Dec 2017 18:00:13 +0300 (MSK) Received: from smtp3p.mail.yandex.net (smtp3p.mail.yandex.net [2a02:6b8:0:1472:2741:0:8b6:8]) by mxback1j.mail.yandex.net (nwsmtp/Yandex) with ESMTP id XzhcS5qExR-0D0OtMV3; Wed, 20 Dec 2017 18:00:13 +0300 Received: by smtp3p.mail.yandex.net (nwsmtp/Yandex) with ESMTPSA id 9OmWOYMNlY-0C58MXki; Wed, 20 Dec 2017 18:00:12 +0300 (using TLSv1.2 with cipher ECDHE-RSA-AES128-SHA256 (128/128 bits)) (Client certificate not present) From: Github ODP bot To: lng-odp@lists.linaro.org Date: Wed, 20 Dec 2017 18:00:09 +0300 Message-Id: <1513782010-27654-3-git-send-email-odpbot@yandex.ru> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1513782010-27654-1-git-send-email-odpbot@yandex.ru> References: <1513782010-27654-1-git-send-email-odpbot@yandex.ru> Github-pr-num: 353 Subject: [lng-odp] [PATCH API-NEXT v1 2/3] linux-gen: queue: lock-free implementation X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: "The OpenDataPlane \(ODP\) List" List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" From: Petri Savolainen Simple implementation of non-blocking, lock-free plain queues. Signed-off-by: Petri Savolainen --- /** Email created from pull request 353 (psavol:next-lockfree-queue-impl2) ** https://github.com/Linaro/odp/pull/353 ** Patch: https://github.com/Linaro/odp/pull/353.patch ** Base sha: 12fd3a9224a856271934986a1bad981843915d68 ** Merge commit sha: 291f28b061ad4fa89a0346d6da4c6324c6335eca **/ platform/linux-generic/Makefile.am | 2 + .../linux-generic/include/odp_queue_internal.h | 1 + platform/linux-generic/include/odp_queue_lf.h | 36 +++ platform/linux-generic/odp_queue.c | 71 ++++- platform/linux-generic/odp_queue_lf.c | 302 +++++++++++++++++++++ 5 files changed, 398 insertions(+), 14 deletions(-) create mode 100644 platform/linux-generic/include/odp_queue_lf.h create mode 100644 platform/linux-generic/odp_queue_lf.c diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 4371e7a99..d4673136a 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -161,6 +161,7 @@ noinst_HEADERS = \ include/odp_queue_scalable_internal.h \ include/odp_ring_internal.h \ include/odp_queue_if.h \ + include/odp_queue_lf.h \ include/odp_schedule_if.h \ include/odp_schedule_scalable.h \ include/odp_schedule_scalable_config.h \ @@ -223,6 +224,7 @@ __LIB__libodp_linux_la_SOURCES = \ odp_pool.c \ odp_queue.c \ odp_queue_if.c \ + odp_queue_lf.c \ odp_queue_scalable.c \ odp_rwlock.c \ odp_rwlock_recursive.c \ diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index dd846d592..26410ac7b 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -53,6 +53,7 @@ struct queue_entry_s { odp_queue_param_t param; odp_pktin_queue_t pktin; odp_pktout_queue_t pktout; + void *queue_lf; char name[ODP_QUEUE_NAME_LEN]; }; diff --git a/platform/linux-generic/include/odp_queue_lf.h b/platform/linux-generic/include/odp_queue_lf.h new file mode 100644 index 000000000..34ea48737 --- /dev/null +++ b/platform/linux-generic/include/odp_queue_lf.h @@ -0,0 +1,36 @@ +/* Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_QUEUE_LF_H_ +#define ODP_QUEUE_LF_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +/* Lock-free queue functions */ +typedef struct { + queue_enq_fn_t enq; + queue_enq_multi_fn_t enq_multi; + queue_deq_fn_t deq; + queue_deq_multi_fn_t deq_multi; + +} queue_lf_func_t; + +uint32_t queue_lf_init_global(uint32_t *queue_lf_size, + queue_lf_func_t *lf_func); +void queue_lf_term_global(void); +void *queue_lf_create(queue_entry_t *queue); +void queue_lf_destroy(void *queue_lf); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 6a69eb849..7cb60562b 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -39,11 +40,16 @@ static int queue_init(queue_entry_t *queue, const char *name, const odp_queue_param_t *param); -typedef struct queue_table_t { - queue_entry_t queue[ODP_CONFIG_QUEUES]; -} queue_table_t; +typedef struct queue_global_t { + queue_entry_t queue[ODP_CONFIG_QUEUES]; -static queue_table_t *queue_tbl; + uint32_t queue_lf_num; + uint32_t queue_lf_size; + queue_lf_func_t queue_lf_func; + +} queue_global_t; + +static queue_global_t *queue_glb; static queue_entry_t *get_qentry(uint32_t queue_id); @@ -64,26 +70,28 @@ static inline odp_queue_t queue_from_id(uint32_t queue_id) static queue_entry_t *get_qentry(uint32_t queue_id) { - return &queue_tbl->queue[queue_id]; + return &queue_glb->queue[queue_id]; } static int queue_init_global(void) { uint32_t i; odp_shm_t shm; + uint32_t lf_size = 0; + queue_lf_func_t *lf_func; ODP_DBG("Queue init ... "); shm = odp_shm_reserve("odp_queues", - sizeof(queue_table_t), + sizeof(queue_global_t), sizeof(queue_entry_t), 0); - queue_tbl = odp_shm_addr(shm); + queue_glb = odp_shm_addr(shm); - if (queue_tbl == NULL) + if (queue_glb == NULL) return -1; - memset(queue_tbl, 0, sizeof(queue_table_t)); + memset(queue_glb, 0, sizeof(queue_global_t)); for (i = 0; i < ODP_CONFIG_QUEUES; i++) { /* init locks */ @@ -93,6 +101,10 @@ static int queue_init_global(void) queue->s.handle = queue_from_id(i); } + lf_func = &queue_glb->queue_lf_func; + queue_glb->queue_lf_num = queue_lf_init_global(&lf_size, lf_func); + queue_glb->queue_lf_size = lf_size; + ODP_DBG("done\n"); ODP_DBG("Queue init global\n"); ODP_DBG(" struct queue_entry_s size %zu\n", @@ -122,7 +134,7 @@ static int queue_term_global(void) int i; for (i = 0; i < ODP_CONFIG_QUEUES; i++) { - queue = &queue_tbl->queue[i]; + queue = &queue_glb->queue[i]; LOCK(&queue->s.lock); if (queue->s.status != QUEUE_STATUS_FREE) { ODP_ERR("Not destroyed queue: %s\n", queue->s.name); @@ -131,6 +143,8 @@ static int queue_term_global(void) UNLOCK(&queue->s.lock); } + queue_lf_term_global(); + ret = odp_shm_free(odp_shm_lookup("odp_queues")); if (ret < 0) { ODP_ERR("shm free failed for odp_queues"); @@ -151,6 +165,8 @@ static int queue_capability(odp_queue_capability_t *capa) capa->sched_prios = odp_schedule_num_prio(); capa->plain.max_num = capa->max_queues; capa->sched.max_num = capa->max_queues; + capa->plain.lockfree.max_num = queue_glb->queue_lf_num; + capa->plain.lockfree.max_size = queue_glb->queue_lf_size; return 0; } @@ -188,6 +204,7 @@ static odp_queue_t queue_create(const char *name, { uint32_t i; queue_entry_t *queue; + void *queue_lf; odp_queue_t handle = ODP_QUEUE_INVALID; odp_queue_type_t type = ODP_QUEUE_TYPE_PLAIN; odp_queue_param_t default_param; @@ -198,7 +215,7 @@ static odp_queue_t queue_create(const char *name, } for (i = 0; i < ODP_CONFIG_QUEUES; i++) { - queue = &queue_tbl->queue[i]; + queue = &queue_glb->queue[i]; if (queue->s.status != QUEUE_STATUS_FREE) continue; @@ -207,7 +224,26 @@ static odp_queue_t queue_create(const char *name, if (queue->s.status == QUEUE_STATUS_FREE) { if (queue_init(queue, name, param)) { UNLOCK(&queue->s.lock); - return handle; + return ODP_QUEUE_INVALID; + } + + if (param->nonblocking == ODP_NONBLOCKING_LF) { + queue_lf_func_t *lf_func; + + lf_func = &queue_glb->queue_lf_func; + + queue_lf = queue_lf_create(queue); + + if (queue_lf == NULL) { + UNLOCK(&queue->s.lock); + return ODP_QUEUE_INVALID; + } + queue->s.queue_lf = queue_lf; + + queue->s.enqueue = lf_func->enq; + queue->s.enqueue_multi = lf_func->enq_multi; + queue->s.dequeue = lf_func->deq; + queue->s.dequeue_multi = lf_func->deq_multi; } type = queue->s.type; @@ -224,7 +260,10 @@ static odp_queue_t queue_create(const char *name, UNLOCK(&queue->s.lock); } - if (handle != ODP_QUEUE_INVALID && type == ODP_QUEUE_TYPE_SCHED) { + if (handle == ODP_QUEUE_INVALID) + return ODP_QUEUE_INVALID; + + if (type == ODP_QUEUE_TYPE_SCHED) { if (sched_fn->init_queue(queue->s.index, &queue->s.param.sched)) { queue->s.status = QUEUE_STATUS_FREE; @@ -289,6 +328,10 @@ static int queue_destroy(odp_queue_t handle) default: ODP_ABORT("Unexpected queue status\n"); } + + if (queue->s.param.nonblocking == ODP_NONBLOCKING_LF) + queue_lf_destroy(queue->s.queue_lf); + UNLOCK(&queue->s.lock); return 0; @@ -313,7 +356,7 @@ static odp_queue_t queue_lookup(const char *name) uint32_t i; for (i = 0; i < ODP_CONFIG_QUEUES; i++) { - queue_entry_t *queue = &queue_tbl->queue[i]; + queue_entry_t *queue = &queue_glb->queue[i]; if (queue->s.status == QUEUE_STATUS_FREE || queue->s.status == QUEUE_STATUS_DESTROYED) diff --git a/platform/linux-generic/odp_queue_lf.c b/platform/linux-generic/odp_queue_lf.c new file mode 100644 index 000000000..756422cf3 --- /dev/null +++ b/platform/linux-generic/odp_queue_lf.c @@ -0,0 +1,302 @@ +/* Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include +#include +#include +#include +#include +#include + +#include "config.h" +#include + +#define LF_NODE_EMPTY 0 +#define LF_NODE_DATA 1 +#define LF_NODE_MARK 2 +#define RING_LF_SIZE 64 +#define QUEUE_LF_NUM 128 + +/* Node in lock-free ring */ +typedef union { + odp_atomic_u64_t atomic_u64; + + struct { + /* marker == DATA */ + uint64_t marker: 2; + + /* Max 62 bits stored from a pointer */ + uint64_t ptr: 62; + } data; + + struct { + /* marker == EMPTY or MARK */ + uint64_t marker: 2; + + /* Counter to avoid ABA issues */ + uint64_t counter: 62; + }; + + uint64_t u64; + +} ring_lf_node_t; + +/* Lock-free ring */ +typedef struct { + ring_lf_node_t node[RING_LF_SIZE]; + + int used; + odp_atomic_u64_t aba_counter; + +} queue_lf_t ODP_ALIGNED_CACHE; + +/* Lock-free queue globals */ +typedef struct { + queue_lf_t queue_lf[QUEUE_LF_NUM]; + + odp_shm_t shm; + +} queue_lf_global_t ODP_ALIGNED_CACHE; + +static queue_lf_global_t *queue_lf_glb; + +static inline int next_idx(int idx) +{ + int next = idx + 1; + + if (next == RING_LF_SIZE) + next = 0; + + return next; +} + +static inline int next_is_tail(ring_lf_node_t node, ring_lf_node_t next, + ring_lf_node_t next_next) +{ + if ((node.marker == LF_NODE_MARK || node.marker == LF_NODE_DATA) && + next.marker == LF_NODE_EMPTY) + return 1; + + if (node.marker == LF_NODE_DATA && next.marker == LF_NODE_MARK && + next_next.marker == LF_NODE_MARK) + return 1; + + return 0; +} + +static inline int next_is_head(ring_lf_node_t node, ring_lf_node_t next) +{ + if ((node.marker == LF_NODE_MARK && next.marker == LF_NODE_DATA)) + return 1; + + return 0; +} + +static int queue_lf_enq(queue_t q_int, odp_buffer_hdr_t *buf_hdr) +{ + queue_entry_t *queue; + queue_lf_t *queue_lf; + int i, i_next, i_nn; + ring_lf_node_t node_val; + ring_lf_node_t next_val; + ring_lf_node_t nn_val; + ring_lf_node_t new_val; + ring_lf_node_t *node; + ring_lf_node_t *next; + ring_lf_node_t *nn; + uint64_t cur_val; + + queue = qentry_from_int(q_int); + queue_lf = queue->s.queue_lf; + + for (i = 0; i < RING_LF_SIZE; i++) { + i_next = next_idx(i); + i_nn = next_idx(i_next); + + node = &queue_lf->node[i]; + next = &queue_lf->node[i_next]; + nn = &queue_lf->node[i_nn]; + node_val.u64 = odp_atomic_load_u64(&node->atomic_u64); + next_val.u64 = odp_atomic_load_u64(&next->atomic_u64); + nn_val.u64 = odp_atomic_load_u64(&nn->atomic_u64); + + if (!next_is_tail(node_val, next_val, nn_val)) + continue; + + /* Next node is the tail. Replace it with data. */ + new_val.data.marker = LF_NODE_DATA; + new_val.data.ptr = ((uintptr_t)buf_hdr) >> 2; + + cur_val = next_val.u64; + + if (odp_atomic_cas_rel_u64(&next->atomic_u64, &cur_val, + new_val.u64)) + return 0; + } + + return -1; +} + +static int queue_lf_enq_multi(queue_t q_int, odp_buffer_hdr_t **buf_hdr, + int num) +{ + (void)num; + + if (queue_lf_enq(q_int, buf_hdr[0]) == 0) + return 1; + + return 0; +} + +static odp_buffer_hdr_t *queue_lf_deq(queue_t q_int) +{ + queue_entry_t *queue; + queue_lf_t *queue_lf; + int i, i_next, i_nn; + ring_lf_node_t node_val; + ring_lf_node_t next_val; + ring_lf_node_t nn_val; + ring_lf_node_t new_val; + ring_lf_node_t *node; + ring_lf_node_t *next; + ring_lf_node_t *nn; + uint64_t counter; + + queue = qentry_from_int(q_int); + queue_lf = queue->s.queue_lf; + + for (i = 0; i < RING_LF_SIZE; i++) { +retry: + i_next = next_idx(i); + + node = &queue_lf->node[i]; + next = &queue_lf->node[i_next]; + node_val.u64 = odp_atomic_load_u64(&node->atomic_u64); + next_val.u64 = odp_atomic_load_u64(&next->atomic_u64); + + if (!next_is_head(node_val, next_val)) + continue; + + /* Next node is the head.*/ + i_nn = next_idx(i_next); + nn = &queue_lf->node[i_nn]; + nn_val.u64 = odp_atomic_load_u64(&nn->atomic_u64); + + counter = odp_atomic_fetch_inc_u64(&queue_lf->aba_counter); + new_val.counter = counter; + + /* Cannot replace next with marker, as long as next-next is + * also a marker. Otherwise, the ring would be full of markers. + * Empty it and retry. */ + if (nn_val.marker == LF_NODE_MARK) { + new_val.marker = LF_NODE_EMPTY; + odp_atomic_cas_u64(&nn->atomic_u64, &nn_val.u64, + new_val.u64); + goto retry; + } + + new_val.marker = LF_NODE_MARK; + + if (odp_atomic_cas_acq_u64(&next->atomic_u64, &next_val.u64, + new_val.u64)) { + /* Successfully replaced data with marker. */ + return (void *)(uintptr_t)(next_val.data.ptr << 2); + } + } + + return NULL; +} + +static int queue_lf_deq_multi(queue_t q_int, odp_buffer_hdr_t **buf_hdr, + int num) +{ + odp_buffer_hdr_t *buf; + + (void)num; + + buf = queue_lf_deq(q_int); + + if (buf == NULL) + return 0; + + buf_hdr[0] = buf; + return 1; +} + +uint32_t queue_lf_init_global(uint32_t *queue_lf_size, + queue_lf_func_t *lf_func) +{ + odp_shm_t shm; + odp_atomic_op_t lockfree; + + odp_atomic_lock_free_u64(&lockfree); + + ODP_DBG("\nLock-free queue init\n"); + ODP_DBG(" u64 lock-free: %i\n\n", lockfree.op.cas); + + if (lockfree.op.cas == 0) + return 0; + + shm = odp_shm_reserve("odp_queues_lf", + sizeof(queue_lf_global_t), + ODP_CACHE_LINE_SIZE, 0); + + queue_lf_glb = odp_shm_addr(shm); + memset(queue_lf_glb, 0, sizeof(queue_lf_global_t)); + + queue_lf_glb->shm = shm; + + memset(lf_func, 0, sizeof(queue_lf_func_t)); + lf_func->enq = queue_lf_enq; + lf_func->enq_multi = queue_lf_enq_multi; + lf_func->deq = queue_lf_deq; + lf_func->deq_multi = queue_lf_deq_multi; + + *queue_lf_size = RING_LF_SIZE - 1; + + return QUEUE_LF_NUM; +} + +void queue_lf_term_global(void) +{ + odp_shm_t shm; + + if (queue_lf_glb == NULL) + return; + + shm = queue_lf_glb->shm; + + if (odp_shm_free(shm) < 0) + ODP_ERR("shm free failed"); +} + +void *queue_lf_create(queue_entry_t *queue) +{ + int i; + queue_lf_t *queue_lf = NULL; + + if (queue->s.type != ODP_QUEUE_TYPE_PLAIN) + return NULL; + + for (i = 0; i < QUEUE_LF_NUM; i++) { + if (queue_lf_glb->queue_lf[i].used == 0) { + queue_lf = &queue_lf_glb->queue_lf[i]; + memset(queue_lf, 0, sizeof(queue_lf_t)); + queue_lf->node[0].marker = LF_NODE_MARK; + queue_lf->used = 1; + break; + } + } + + return queue_lf; +} + +void queue_lf_destroy(void *queue_lf_ptr) +{ + queue_lf_t *queue_lf = queue_lf_ptr; + + queue_lf->used = 0; +}