From patchwork Thu Sep 6 17:12:16 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Honnappa Nagarahalli X-Patchwork-Id: 146117 Delivered-To: patch@linaro.org Received: by 2002:a2e:1648:0:0:0:0:0 with SMTP id 8-v6csp126248ljw; Thu, 6 Sep 2018 10:12:46 -0700 (PDT) X-Google-Smtp-Source: ANB0VdZ8VrOgdXjUGndF3QyeBjE8Yj8NZLVZwgh18Tge5XByYYtq2+PLQX1vXRQ3ShxC0WTfqko1 X-Received: by 2002:a1c:7305:: with SMTP id d5-v6mr2832395wmb.53.1536253966441; Thu, 06 Sep 2018 10:12:46 -0700 (PDT) ARC-Seal: i=1; a=rsa-sha256; t=1536253966; cv=none; d=google.com; s=arc-20160816; b=I7xEFQA6Ru5j05g2zzGSDzpDx4D7LSbHqKpTD7lULjHRX9Dvh7BnAWg1qGs5tJE31y Vm1kJ1nhGnKFAko5Z3zZqKo2YjfUNlx6uaSb7FzlRo1PGmGkIt2EffUyz1Grr4tP8tGs er+w6bGZ5Z5+3MlcSlMtgZpqmKr+t4h+0XY4PsA1JEb2WHnjPQrx/O924V0MN2PN4pN7 BP2IjPIWsuufMA3dNFyLYEXc7NVnTyY3gvjC5SGPJMOZ/k+8qFKi3YpGfQ6uuqrl5I/f zFuzuUjbbxkOHSAbudcOpz4XX9L/B8hDCjoe6RiB/KFXkQ55ibpZDlzXo3H/VE+EVT4l n3gA== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=arc-20160816; h=sender:errors-to:list-subscribe:list-help:list-post:list-archive :list-unsubscribe:list-id:precedence:subject:references:in-reply-to :message-id:date:cc:to:from; bh=bqXTyh77tYNL9Yz8I2eLGUSIVVpZzKY3rDnzOgyiycw=; b=IXxKv/rcWgRXlPmkbOXC8n5C67wveY6P+NTWsMTeabSKHzNxrK7m6gdzJsrXYD+Hiz FMcUFysFnF1Mh3JpLtIy6djuLwUYOMQfvCD0n4pI7i23DHqVXDCJKZonwH8maS04796U iDTBlmy/u3k6+WlX1/jYZ6FFvo1sb7V+CVdC2r2Z/4lj1okPktHexA8ksxty1Ny3UCH0 9eSJmcMe/hMSwrAsCiIJdUwE+FjnGoxGVy7XbRri41HucPwT1jnbPy6ge7rd2F93DvMS DyQzMoMl/QQXeMM8yWz1PWGmiSv/cjpdE7/3T5WZHmMY3CAkHLRsIVqM6Fosl8Aw0inQ rRyw== ARC-Authentication-Results: i=1; mx.google.com; spf=pass (google.com: domain of dev-bounces@dpdk.org designates 92.243.14.124 as permitted sender) smtp.mailfrom=dev-bounces@dpdk.org Return-Path: Received: from dpdk.org (dpdk.org. [92.243.14.124]) by mx.google.com with ESMTP id i4-v6si5809882wrc.177.2018.09.06.10.12.46; Thu, 06 Sep 2018 10:12:46 -0700 (PDT) Received-SPF: pass (google.com: domain of dev-bounces@dpdk.org designates 92.243.14.124 as permitted sender) client-ip=92.243.14.124; Authentication-Results: mx.google.com; spf=pass (google.com: domain of dev-bounces@dpdk.org designates 92.243.14.124 as permitted sender) smtp.mailfrom=dev-bounces@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id A7E854C9D; Thu, 6 Sep 2018 19:12:33 +0200 (CEST) Received: from foss.arm.com (foss.arm.com [217.140.101.70]) by dpdk.org (Postfix) with ESMTP id 3C3254C88 for ; Thu, 6 Sep 2018 19:12:32 +0200 (CEST) Received: from usa-sjc-imap-foss1.foss.arm.com (unknown [10.72.51.249]) by usa-sjc-mx-foss1.foss.arm.com (Postfix) with ESMTP id A27097A9; Thu, 6 Sep 2018 10:12:31 -0700 (PDT) Received: from 2p2660v4-1.austin.arm.com (2p2660v4-1.austin.arm.com [10.118.12.164]) by usa-sjc-imap-foss1.foss.arm.com (Postfix) with ESMTPSA id 342633F557; Thu, 6 Sep 2018 10:12:31 -0700 (PDT) From: Honnappa Nagarahalli To: bruce.richardson@intel.com, pablo.de.lara.guarch@intel.com Cc: dev@dpdk.org, honnappa.nagarahalli@dpdk.org, gavin.hu@arm.com, steve.capper@arm.com, ola.liljedahl@arm.com, nd@arm.com, Honnappa Nagarahalli Date: Thu, 6 Sep 2018 12:12:16 -0500 Message-Id: <1536253938-192391-3-git-send-email-honnappa.nagarahalli@arm.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1536253938-192391-1-git-send-email-honnappa.nagarahalli@arm.com> References: <1536253938-192391-1-git-send-email-honnappa.nagarahalli@arm.com> Subject: [dpdk-dev] [PATCH 2/4] hash: add memory ordering to avoid race conditions X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" Only race condition that can occur is - using the key store element before the key write is completed. Hence, while inserting the element the release memory order is used. Any other race condition is caught by the key comparison. Memory orderings are added only where needed. For ex: reads in the writer's context do not need memory ordering as there is a single writer. key_idx in the bucket entry and pdata in the key store element are used for synchronisation. key_idx is used to release an inserted entry in the bucket to the reader. Use of pdata for synchronisation is required due to updation of an existing entry where-in only the pdata is updated without updating key_idx. Signed-off-by: Honnappa Nagarahalli Reviewed-by: Gavin Hu Reviewed-by: Ola Liljedahl Reviewed-by: Steve Capper --- lib/librte_hash/rte_cuckoo_hash.c | 111 ++++++++++++++++++++++++++++---------- 1 file changed, 82 insertions(+), 29 deletions(-) -- 2.7.4 diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c index 33acfc9..2d89158 100644 --- a/lib/librte_hash/rte_cuckoo_hash.c +++ b/lib/librte_hash/rte_cuckoo_hash.c @@ -485,7 +485,9 @@ enqueue_slot_back(const struct rte_hash *h, rte_ring_sp_enqueue(h->free_slots, slot_id); } -/* Search a key from bucket and update its data */ +/* Search a key from bucket and update its data. + * Writer holds the lock before calling this. + */ static inline int32_t search_and_update(const struct rte_hash *h, void *data, const void *key, struct rte_hash_bucket *bkt, hash_sig_t sig, hash_sig_t alt_hash) @@ -499,8 +501,13 @@ search_and_update(const struct rte_hash *h, void *data, const void *key, k = (struct rte_hash_key *) ((char *)keys + bkt->key_idx[i] * h->key_entry_size); if (rte_hash_cmp_eq(key, k->key, h) == 0) { - /* Update data */ - k->pdata = data; + /* 'pdata' acts as the synchronization point + * when an existing hash entry is updated. + * Key is not updated in this case. + */ + __atomic_store_n(&k->pdata, + data, + __ATOMIC_RELEASE); /* * Return index where key is stored, * subtracting the first dummy index @@ -554,7 +561,15 @@ rte_hash_cuckoo_insert_mw(const struct rte_hash *h, if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) { prim_bkt->sig_current[i] = sig; prim_bkt->sig_alt[i] = alt_hash; - prim_bkt->key_idx[i] = new_idx; + /* Key can be of arbitrary length, so it is + * not possible to store it atomically. + * Hence the new key element's memory stores + * (key as well as data) should be complete + * before it is referenced. + */ + __atomic_store_n(&prim_bkt->key_idx[i], + new_idx, + __ATOMIC_RELEASE); break; } } @@ -637,8 +652,10 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h, prev_bkt->sig_current[prev_slot]; curr_bkt->sig_current[curr_slot] = prev_bkt->sig_alt[prev_slot]; - curr_bkt->key_idx[curr_slot] = - prev_bkt->key_idx[prev_slot]; + /* Release the updated bucket entry */ + __atomic_store_n(&curr_bkt->key_idx[curr_slot], + prev_bkt->key_idx[prev_slot], + __ATOMIC_RELEASE); curr_slot = prev_slot; curr_node = prev_node; @@ -647,7 +664,10 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h, curr_bkt->sig_current[curr_slot] = sig; curr_bkt->sig_alt[curr_slot] = alt_hash; - curr_bkt->key_idx[curr_slot] = new_idx; + /* Release the new bucket entry */ + __atomic_store_n(&curr_bkt->key_idx[curr_slot], + new_idx, + __ATOMIC_RELEASE); __hash_rw_writer_unlock(h); @@ -778,8 +798,15 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, new_idx = (uint32_t)((uintptr_t) slot_id); /* Copy key */ rte_memcpy(new_k->key, key, h->key_len); - new_k->pdata = data; - + /* Key can be of arbitrary length, so it is not possible to store + * it atomically. Hence the new key element's memory stores + * (key as well as data) should be complete before it is referenced. + * 'pdata' acts as the synchronization point when an existing hash + * entry is updated. + */ + __atomic_store_n(&new_k->pdata, + data, + __ATOMIC_RELEASE); /* Find an empty slot and insert */ ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data, @@ -865,21 +892,27 @@ search_one_bucket(const struct rte_hash *h, const void *key, hash_sig_t sig, void **data, const struct rte_hash_bucket *bkt) { int i; + uint32_t key_idx; + void *pdata; struct rte_hash_key *k, *keys = h->key_store; for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) { - if (bkt->sig_current[i] == sig && - bkt->key_idx[i] != EMPTY_SLOT) { + key_idx = __atomic_load_n(&bkt->key_idx[i], + __ATOMIC_ACQUIRE); + if (bkt->sig_current[i] == sig && key_idx != EMPTY_SLOT) { k = (struct rte_hash_key *) ((char *)keys + - bkt->key_idx[i] * h->key_entry_size); + key_idx * h->key_entry_size); + pdata = __atomic_load_n(&k->pdata, + __ATOMIC_ACQUIRE); + if (rte_hash_cmp_eq(key, k->key, h) == 0) { if (data != NULL) - *data = k->pdata; + *data = pdata; /* * Return index where key is stored, * subtracting the first dummy index */ - return bkt->key_idx[i] - 1; + return key_idx - 1; } } } @@ -980,31 +1013,36 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) } } -/* Search one bucket and remove the matched key */ +/* Search one bucket and remove the matched key. + * Writer is expected to hold the lock while calling this + * function. + */ static inline int32_t search_and_remove(const struct rte_hash *h, const void *key, struct rte_hash_bucket *bkt, hash_sig_t sig) { struct rte_hash_key *k, *keys = h->key_store; unsigned int i; - int32_t ret; + uint32_t key_idx; /* Check if key is in primary location */ for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) { - if (bkt->sig_current[i] == sig && - bkt->key_idx[i] != EMPTY_SLOT) { + key_idx = __atomic_load_n(&bkt->key_idx[i], + __ATOMIC_ACQUIRE); + if (bkt->sig_current[i] == sig && key_idx != EMPTY_SLOT) { k = (struct rte_hash_key *) ((char *)keys + - bkt->key_idx[i] * h->key_entry_size); + key_idx * h->key_entry_size); if (rte_hash_cmp_eq(key, k->key, h) == 0) { remove_entry(h, bkt, i); + __atomic_store_n(&bkt->key_idx[i], + EMPTY_SLOT, + __ATOMIC_RELEASE); /* * Return index where key is stored, * subtracting the first dummy index */ - ret = bkt->key_idx[i] - 1; - bkt->key_idx[i] = EMPTY_SLOT; - return ret; + return key_idx - 1; } } } @@ -1151,6 +1189,7 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys, const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0}; uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0}; + void *pdata[RTE_HASH_LOOKUP_BULK_MAX]; /* Prefetch first keys */ for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++) @@ -1220,18 +1259,25 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys, while (prim_hitmask[i]) { uint32_t hit_index = __builtin_ctzl(prim_hitmask[i]); - uint32_t key_idx = primary_bkt[i]->key_idx[hit_index]; + uint32_t key_idx = + __atomic_load_n( + &primary_bkt[i]->key_idx[hit_index], + __ATOMIC_ACQUIRE); const struct rte_hash_key *key_slot = (const struct rte_hash_key *)( (const char *)h->key_store + key_idx * h->key_entry_size); + + if (key_idx != EMPTY_SLOT) + pdata[i] = __atomic_load_n(&key_slot->pdata, + __ATOMIC_ACQUIRE); /* * If key index is 0, do not compare key, * as it is checking the dummy slot */ if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) { if (data != NULL) - data[i] = key_slot->pdata; + data[i] = pdata[i]; hits |= 1ULL << i; positions[i] = key_idx - 1; @@ -1243,11 +1289,19 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys, while (sec_hitmask[i]) { uint32_t hit_index = __builtin_ctzl(sec_hitmask[i]); - uint32_t key_idx = secondary_bkt[i]->key_idx[hit_index]; + uint32_t key_idx = + __atomic_load_n( + &secondary_bkt[i]->key_idx[hit_index], + __ATOMIC_ACQUIRE); const struct rte_hash_key *key_slot = (const struct rte_hash_key *)( (const char *)h->key_store + key_idx * h->key_entry_size); + + if (key_idx != EMPTY_SLOT) + pdata[i] = __atomic_load_n(&key_slot->pdata, + __ATOMIC_ACQUIRE); + /* * If key index is 0, do not compare key, * as it is checking the dummy slot @@ -1255,7 +1309,7 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys, if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) { if (data != NULL) - data[i] = key_slot->pdata; + data[i] = pdata[i]; hits |= 1ULL << i; positions[i] = key_idx - 1; @@ -1320,7 +1374,8 @@ rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32 idx = *next % RTE_HASH_BUCKET_ENTRIES; /* If current position is empty, go to the next one */ - while (h->buckets[bucket_idx].key_idx[idx] == EMPTY_SLOT) { + while ((position = __atomic_load_n(&h->buckets[bucket_idx].key_idx[idx], + __ATOMIC_ACQUIRE)) == EMPTY_SLOT) { (*next)++; /* End of table */ if (*next == total_entries) @@ -1329,8 +1384,6 @@ rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32 idx = *next % RTE_HASH_BUCKET_ENTRIES; } __hash_rw_reader_lock(h); - /* Get position of entry in key table */ - position = h->buckets[bucket_idx].key_idx[idx]; next_key = (struct rte_hash_key *) ((char *)h->key_store + position * h->key_entry_size); /* Return key and data */