From patchwork Fri Aug 24 14:00:03 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Github ODP bot X-Patchwork-Id: 145069 Delivered-To: patch@linaro.org Received: by 2002:a2e:1648:0:0:0:0:0 with SMTP id 8-v6csp1304502ljw; Fri, 24 Aug 2018 07:02:07 -0700 (PDT) X-Google-Smtp-Source: ANB0VdY9faB2mhnMDyAYCc2T+ypVzUihskm+rGIopsqswgoSpCACFtUGQpVfo2WAemiC0Bm1i6+0 X-Received: by 2002:a5b:950:: with SMTP id x16-v6mr1075329ybq.67.1535119327769; Fri, 24 Aug 2018 07:02:07 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1535119327; cv=none; d=google.com; s=arc-20160816; b=R+loz9QKCTx7GrXagBeFlnCCb87ynK4EZMu6ZEFOeILIKvepjWpyWw0Az2Ei9yEGAM Q7mOBxdohZIPMNMMCIB/FtfiySFBxuZ+ucGbE08+226paRb6OIWsCJnr3ebqgA2LSrH1 s+cI5i22Hfiwuqxa9sXaDfZBDbnXD6YFvoljJQ8t6PqnlkKYQdoDpANqlXQUNq8huALW 6x8VCUpkOvqSfPj0jQey6JKpZnpIbNxzcOyuPuGTfPbxhTjzUO7waB9F1szqJ/Y73q/w UfYpyiVIXuaftNhmU5zBlakPDE/oFzGK31hFR6QIjfWPnXTcb36L2yOUt/MvgBtbMZ7M PrbA== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:list-subscribe:list-help:list-post:list-archive :list-unsubscribe:list-id:precedence:subject:github-pr-num :references:in-reply-to:message-id:date:to:from:delivered-to :arc-authentication-results; bh=Knh+Y+48PVCmpENlD3T14fXDZ0hKKYc31ITJTFV8rEc=; b=mdeeQMcnWXUL18Ddnu1ZPf+cgGEdyt2bbdqaZcwwXLJkyNWfuzhaoglCVMIBpgkg9t 3w9JgZpV+NqrskKUY4pyE8Zn/yV1nkbSXv8run1AIIsiDMA/sFaGaOJnx/uKEHX3gXae UFpEIkCacgO7gm3YBWHYCCGZ4efmG5e8iO23saQgRjvwHCcVBp3evSp7kXsxCBqzoFDD x16iqUPOWJ3K4FtlbIx37TLtMKjUqC3aM/xXGvG2HbT4Gf5x7yolS+mUWt+ZjBeAulrh NV7q0vtnUYEL/2BuOLp52awrmYPqnoGrI1Hinmdgrr6lBVPxt+/OKF/C4T49XdCvUHsY UL8Q== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=yandex.ru Return-Path: Received: from lists.linaro.org (ec2-54-197-127-237.compute-1.amazonaws.com. [54.197.127.237]) by mx.google.com with ESMTP id u41-v6si1592152qvc.146.2018.08.24.07.02.07; Fri, 24 Aug 2018 07:02:07 -0700 (PDT) Received-SPF: pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) client-ip=54.197.127.237; Authentication-Results: mx.google.com; spf=pass (google.com: domain of lng-odp-bounces@lists.linaro.org designates 54.197.127.237 as permitted sender) smtp.mailfrom=lng-odp-bounces@lists.linaro.org; dmarc=fail (p=NONE sp=NONE dis=NONE) header.from=yandex.ru Received: by lists.linaro.org (Postfix, from userid 109) id 5AFDD60F00; Fri, 24 Aug 2018 14:02:07 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on ip-10-142-244-252 X-Spam-Level: X-Spam-Status: No, score=-3.6 required=5.0 tests=BAYES_00,FREEMAIL_FROM, MAILING_LIST_MULTI, RCVD_IN_DNSWL_LOW autolearn=disabled version=3.4.0 Received: from [127.0.0.1] (localhost [127.0.0.1]) by lists.linaro.org (Postfix) with ESMTP id 0CA7860694; Fri, 24 Aug 2018 14:00:31 +0000 (UTC) X-Original-To: lng-odp@lists.linaro.org Delivered-To: lng-odp@lists.linaro.org Received: by lists.linaro.org (Postfix, from userid 109) id AD272607C8; Fri, 24 Aug 2018 14:00:16 +0000 (UTC) Received: from forward103p.mail.yandex.net (forward103p.mail.yandex.net [77.88.28.106]) by lists.linaro.org (Postfix) with ESMTPS id C9DA1607EC for ; Fri, 24 Aug 2018 14:00:11 +0000 (UTC) Received: from mxback3j.mail.yandex.net (mxback3j.mail.yandex.net [IPv6:2a02:6b8:0:1619::10c]) by forward103p.mail.yandex.net (Yandex) with ESMTP id 58E682185771 for ; Fri, 24 Aug 2018 17:00:09 +0300 (MSK) Received: from smtp3p.mail.yandex.net (smtp3p.mail.yandex.net [2a02:6b8:0:1472:2741:0:8b6:8]) by mxback3j.mail.yandex.net (nwsmtp/Yandex) with ESMTP id 1jMvusfWvl-09QmeGUe; Fri, 24 Aug 2018 17:00:09 +0300 Received: by smtp3p.mail.yandex.net (nwsmtp/Yandex) with ESMTPSA id 2CVvDjLgK3-08jecAE5; Fri, 24 Aug 2018 17:00:08 +0300 (using TLSv1.2 with cipher ECDHE-RSA-AES128-SHA256 (128/128 bits)) (Client certificate not present) From: Github ODP bot To: lng-odp@lists.linaro.org Date: Fri, 24 Aug 2018 14:00:03 +0000 Message-Id: <1535119206-23556-2-git-send-email-odpbot@yandex.ru> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1535119206-23556-1-git-send-email-odpbot@yandex.ru> References: <1535119206-23556-1-git-send-email-odpbot@yandex.ru> Github-pr-num: 683 Subject: [lng-odp] [PATCH v1 1/4] linux-gen: ring_mpmc: new multi-producer, multi-consumer ring X-BeenThere: lng-odp@lists.linaro.org X-Mailman-Version: 2.1.16 Precedence: list List-Id: "The OpenDataPlane \(ODP\) List" List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: lng-odp-bounces@lists.linaro.org Sender: "lng-odp" From: Petri Savolainen The ring is similar to ring_internal.h, but checks for ring fullness. This ring can be used for storing events in a queue as enqueues can be tried on an already full queue. Signed-off-by: Petri Savolainen --- /** Email created from pull request 683 (psavol:master-queue-lockless-enqdeq-3) ** https://github.com/Linaro/odp/pull/683 ** Patch: https://github.com/Linaro/odp/pull/683.patch ** Base sha: 989df5d2f97ab4711328b11282dcc743f5740e00 ** Merge commit sha: 28073c54671148efdd01c9cf38c1a235d5a133f0 **/ platform/linux-generic/Makefile.am | 1 + .../include/odp_ring_mpmc_internal.h | 169 ++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 platform/linux-generic/include/odp_ring_mpmc_internal.h diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 80f968756..ab0b755d5 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -127,6 +127,7 @@ noinst_HEADERS = \ include/odp_queue_lf.h \ include/odp_queue_scalable_internal.h \ include/odp_ring_internal.h \ + include/odp_ring_mpmc_internal.h \ include/odp_ring_spsc_internal.h \ include/odp_ring_st_internal.h \ include/odp_schedule_if.h \ diff --git a/platform/linux-generic/include/odp_ring_mpmc_internal.h b/platform/linux-generic/include/odp_ring_mpmc_internal.h new file mode 100644 index 000000000..74bbb8fc7 --- /dev/null +++ b/platform/linux-generic/include/odp_ring_mpmc_internal.h @@ -0,0 +1,169 @@ +/* Copyright (c) 2018, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_MPMC_INTERNAL_H_ +#define ODP_RING_MPMC_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include +#include + +/* Ring of uint32_t data + * + * Ring stores head and tail counters. Ring indexes are formed from these + * counters with a mask (mask = ring_size - 1), which requires that ring size + * must be a power of two. + * + * 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + * | E | E | | | | | | | | | | E | E | E | E | E | + * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ + * ^ ^ ^ ^ + * | | | | + * r_tail r_head w_tail w_head + * + */ +typedef struct { + odp_atomic_u32_t ODP_ALIGNED_CACHE r_head; + odp_atomic_u32_t r_tail; + + odp_atomic_u32_t ODP_ALIGNED_CACHE w_head; + odp_atomic_u32_t w_tail; + +} ring_mpmc_t; + +static inline int ring_mpmc_cas_u32(odp_atomic_u32_t *atom, + uint32_t *old_val, uint32_t new_val) +{ + return __atomic_compare_exchange_n(&atom->v, old_val, new_val, + 0 /* strong */, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED); +} + +/* Initialize ring */ +static inline void ring_mpmc_init(ring_mpmc_t *ring) +{ + odp_atomic_init_u32(&ring->w_head, 0); + odp_atomic_init_u32(&ring->w_tail, 0); + odp_atomic_init_u32(&ring->r_head, 0); + odp_atomic_init_u32(&ring->r_tail, 0); +} + +/* Dequeue data from the ring head. Num is smaller than ring size. */ +static inline uint32_t ring_mpmc_deq_multi(ring_mpmc_t *ring, + uint32_t *ring_data, + uint32_t ring_mask, + uint32_t data[], + uint32_t num) +{ + uint32_t old_head, new_head, w_tail, num_data, i; + + /* Load acquires ensure that w_tail load happens after r_head load, + * and thus r_head value is always behind or equal to w_tail value. + * When CAS operation succeeds, this thread owns data between old + * and new r_head. */ + do { + old_head = odp_atomic_load_acq_u32(&ring->r_head); + odp_prefetch(&ring_data[(old_head + 1) & ring_mask]); + w_tail = odp_atomic_load_acq_u32(&ring->w_tail); + num_data = w_tail - old_head; + + /* Ring is empty */ + if (num_data == 0) + return 0; + + /* Try to take all available */ + if (num > num_data) + num = num_data; + + new_head = old_head + num; + + } while (odp_unlikely(ring_mpmc_cas_u32(&ring->r_head, &old_head, + new_head) == 0)); + + /* Read data. This will not move above load acquire of r_head. */ + for (i = 0; i < num; i++) + data[i] = ring_data[(old_head + 1 + i) & ring_mask]; + + /* Wait until other readers have updated the tail */ + while (odp_unlikely(odp_atomic_load_u32(&ring->r_tail) != old_head)) + odp_cpu_pause(); + + /* Release the new reader tail, writers acquire it. */ + odp_atomic_store_rel_u32(&ring->r_tail, new_head); + + return num; +} + +/* Enqueue multiple data into the ring tail. Num is smaller than ring size. */ +static inline uint32_t ring_mpmc_enq_multi(ring_mpmc_t *ring, + uint32_t *ring_data, + uint32_t ring_mask, + const uint32_t data[], + uint32_t num) +{ + uint32_t old_head, new_head, r_tail, num_free, i; + uint32_t size = ring_mask + 1; + + /* Load acquires ensure that w_head load happens after r_tail load, + * and thus r_tail value is always behind or equal to w_head value. + * When CAS operation succeeds, this thread owns data between old + * and new w_head. */ + do { + r_tail = odp_atomic_load_acq_u32(&ring->r_tail); + old_head = odp_atomic_load_acq_u32(&ring->w_head); + + num_free = size - (old_head - r_tail); + + /* Ring is full */ + if (num_free == 0) + return 0; + + /* Try to use all available */ + if (num > num_free) + num = num_free; + + new_head = old_head + num; + + } while (odp_unlikely(ring_mpmc_cas_u32(&ring->w_head, &old_head, + new_head) == 0)); + + /* Write data. This will not move above load acquire of w_head. */ + for (i = 0; i < num; i++) + ring_data[(old_head + 1 + i) & ring_mask] = data[i]; + + /* Wait until other writers have updated the tail */ + while (odp_unlikely(odp_atomic_load_u32(&ring->w_tail) != old_head)) + odp_cpu_pause(); + + /* Release the new writer tail, readers acquire it. */ + odp_atomic_store_rel_u32(&ring->w_tail, new_head); + + return num; +} + +/* Check if ring is empty */ +static inline int ring_mpmc_is_empty(ring_mpmc_t *ring) +{ + uint32_t head = odp_atomic_load_u32(&ring->r_head); + uint32_t tail = odp_atomic_load_u32(&ring->w_tail); + + return head == tail; +} + +#ifdef __cplusplus +} +#endif + +#endif