diff mbox series

[v2,5/7] hash: fix rw concurrency while moving keys

Message ID 1539233972-49860-6-git-send-email-honnappa.nagarahalli@arm.com
State New
Headers show
Series Address reader-writer concurrency in rte_hash | expand

Commit Message

Honnappa Nagarahalli Oct. 11, 2018, 4:59 a.m. UTC
Reader-writer concurrency issue, caused by moving the keys
to their alternative locations during key insert, is solved
by introducing a global counter(tbl_chng_cnt) indicating a
change in table.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

Reviewed-by: Gavin Hu <gavin.hu@arm.com>

Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>

Reviewed-by: Steve Capper <steve.capper@arm.com>

Reviewed-by: Yipeng Wang <yipeng1.wang@intel.com>

---
 lib/librte_hash/rte_cuckoo_hash.c | 306 +++++++++++++++++++++++++-------------
 lib/librte_hash/rte_cuckoo_hash.h |   3 +
 2 files changed, 209 insertions(+), 100 deletions(-)

-- 
2.7.4
diff mbox series

Patch

diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c
index e2b0260..dfd5f2a 100644
--- a/lib/librte_hash/rte_cuckoo_hash.c
+++ b/lib/librte_hash/rte_cuckoo_hash.c
@@ -96,6 +96,7 @@  rte_hash_create(const struct rte_hash_parameters *params)
 	unsigned int readwrite_concur_support = 0;
 	unsigned int writer_takes_lock = 0;
 	unsigned int recycle_on_del = 1;
+	uint32_t *tbl_chng_cnt = NULL;
 
 	rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
 
@@ -210,6 +211,14 @@  rte_hash_create(const struct rte_hash_parameters *params)
 		goto err_unlock;
 	}
 
+	tbl_chng_cnt = rte_zmalloc_socket(NULL, sizeof(uint32_t),
+			RTE_CACHE_LINE_SIZE, params->socket_id);
+
+	if (tbl_chng_cnt == NULL) {
+		RTE_LOG(ERR, HASH, "memory allocation failed\n");
+		goto err_unlock;
+	}
+
 /*
  * If x86 architecture is used, select appropriate compare function,
  * which may use x86 intrinsics, otherwise use memcmp
@@ -276,6 +285,8 @@  rte_hash_create(const struct rte_hash_parameters *params)
 		default_hash_func : params->hash_func;
 	h->key_store = k;
 	h->free_slots = r;
+	h->tbl_chng_cnt = tbl_chng_cnt;
+	*h->tbl_chng_cnt = 0;
 	h->hw_trans_mem_support = hw_trans_mem_support;
 	h->multi_writer_support = multi_writer_support;
 	h->readwrite_concur_support = readwrite_concur_support;
@@ -321,6 +332,7 @@  rte_hash_create(const struct rte_hash_parameters *params)
 	rte_free(h);
 	rte_free(buckets);
 	rte_free(k);
+	rte_free(tbl_chng_cnt);
 	return NULL;
 }
 
@@ -359,6 +371,7 @@  rte_hash_free(struct rte_hash *h)
 	rte_ring_free(h->free_slots);
 	rte_free(h->key_store);
 	rte_free(h->buckets);
+	rte_free(h->tbl_chng_cnt);
 	rte_free(h);
 	rte_free(te);
 }
@@ -456,6 +469,7 @@  rte_hash_reset(struct rte_hash *h)
 	__hash_rw_writer_lock(h);
 	memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
 	memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
+	*h->tbl_chng_cnt = 0;
 
 	/* clear the free ring */
 	while (rte_ring_dequeue(h->free_slots, &ptr) == 0)
@@ -650,11 +664,27 @@  rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
 		if (unlikely(&h->buckets[prev_alt_bkt_idx]
 				!= curr_bkt)) {
 			/* revert it to empty, otherwise duplicated keys */
-			curr_bkt->key_idx[curr_slot] = EMPTY_SLOT;
+			__atomic_store_n(&curr_bkt->key_idx[curr_slot],
+				EMPTY_SLOT,
+				__ATOMIC_RELEASE);
 			__hash_rw_writer_unlock(h);
 			return -1;
 		}
 
+		/* Inform the previous move. The current move need
+		 * not be informed now as the current bucket entry
+		 * is present in both primary and secondary.
+		 * Since there is one writer, load acquires on
+		 * tbl_chng_cnt are not required.
+		 */
+		__atomic_store_n(h->tbl_chng_cnt,
+				 *h->tbl_chng_cnt + 1,
+				 __ATOMIC_RELEASE);
+		/* The stores to sig_alt and sig_current should not
+		 * move above the store to tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_RELEASE);
+
 		/* Need to swap current/alt sig to allow later
 		 * Cuckoo insert to move elements back to its
 		 * primary bucket if available
@@ -673,6 +703,20 @@  rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
 		curr_bkt = curr_node->bkt;
 	}
 
+	/* Inform the previous move. The current move need
+	 * not be informed now as the current bucket entry
+	 * is present in both primary and secondary.
+	 * Since there is one writer, load acquires on
+	 * tbl_chng_cnt are not required.
+	 */
+	__atomic_store_n(h->tbl_chng_cnt,
+			 *h->tbl_chng_cnt + 1,
+			 __ATOMIC_RELEASE);
+	/* The stores to sig_alt and sig_current should not
+	 * move above the store to tbl_chng_cnt.
+	 */
+	__atomic_thread_fence(__ATOMIC_RELEASE);
+
 	curr_bkt->sig_current[curr_slot] = sig;
 	curr_bkt->sig_alt[curr_slot] = alt_hash;
 	/* Release the new bucket entry */
@@ -937,30 +981,56 @@  __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
 	uint32_t bucket_idx;
 	hash_sig_t alt_hash;
 	struct rte_hash_bucket *bkt;
+	uint32_t cnt_b, cnt_a;
 	int ret;
 
-	bucket_idx = sig & h->bucket_bitmask;
-	bkt = &h->buckets[bucket_idx];
-
 	__hash_rw_reader_lock(h);
 
-	/* Check if key is in primary location */
-	ret = search_one_bucket(h, key, sig, data, bkt);
-	if (ret != -1) {
-		__hash_rw_reader_unlock(h);
-		return ret;
-	}
-	/* Calculate secondary hash */
-	alt_hash = rte_hash_secondary_hash(sig);
-	bucket_idx = alt_hash & h->bucket_bitmask;
-	bkt = &h->buckets[bucket_idx];
+	do {
+		/* Load the table change counter before the lookup
+		 * starts. Acquire semantics will make sure that
+		 * loads in search_one_bucket are not hoisted.
+		 */
+		cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+				__ATOMIC_ACQUIRE);
+
+		bucket_idx = sig & h->bucket_bitmask;
+		bkt = &h->buckets[bucket_idx];
+
+		/* Check if key is in primary location */
+		ret = search_one_bucket(h, key, sig, data, bkt);
+		if (ret != -1) {
+			__hash_rw_reader_unlock(h);
+			return ret;
+		}
+		/* Calculate secondary hash */
+		alt_hash = rte_hash_secondary_hash(sig);
+		bucket_idx = alt_hash & h->bucket_bitmask;
+		bkt = &h->buckets[bucket_idx];
+
+		/* Check if key is in secondary location */
+		ret = search_one_bucket(h, key, alt_hash, data, bkt);
+		if (ret != -1) {
+			__hash_rw_reader_unlock(h);
+			return ret;
+		}
+
+		/* The loads of sig_current in search_one_bucket
+		 * should not move below the load from tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_ACQUIRE);
+		/* Re-read the table change counter to check if the
+		 * table has changed during search. If yes, re-do
+		 * the search.
+		 * This load should not get hoisted. The load
+		 * acquires on cnt_b, key index in primary bucket
+		 * and key index in secondary bucket will make sure
+		 * that it does not get hoisted.
+		 */
+		cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+	} while (cnt_b != cnt_a);
 
-	/* Check if key is in secondary location */
-	ret = search_one_bucket(h, key, alt_hash, data, bkt);
-	if (ret != -1) {
-		__hash_rw_reader_unlock(h);
-		return ret;
-	}
 	__hash_rw_reader_unlock(h);
 	return -ENOENT;
 }
@@ -1242,6 +1312,7 @@  __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 	uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
 	uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
 	void *pdata[RTE_HASH_LOOKUP_BULK_MAX];
+	uint32_t cnt_b, cnt_a;
 
 	/* Prefetch first keys */
 	for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++)
@@ -1277,102 +1348,137 @@  __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
 	}
 
 	__hash_rw_reader_lock(h);
-	/* Compare signatures and prefetch key slot of first hit */
-	for (i = 0; i < num_keys; i++) {
-		compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
+	do {
+		/* Load the table change counter before the lookup
+		 * starts. Acquire semantics will make sure that
+		 * loads in compare_signatures are not hoisted.
+		 */
+		cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+
+		/* Compare signatures and prefetch key slot of first hit */
+		for (i = 0; i < num_keys; i++) {
+			compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
 				primary_bkt[i], secondary_bkt[i],
 				prim_hash[i], sec_hash[i], h->sig_cmp_fn);
 
-		if (prim_hitmask[i]) {
-			uint32_t first_hit = __builtin_ctzl(prim_hitmask[i]);
-			uint32_t key_idx = primary_bkt[i]->key_idx[first_hit];
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-			rte_prefetch0(key_slot);
-			continue;
-		}
+			if (prim_hitmask[i]) {
+				uint32_t first_hit =
+						__builtin_ctzl(prim_hitmask[i]);
+				uint32_t key_idx =
+					primary_bkt[i]->key_idx[first_hit];
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
+				rte_prefetch0(key_slot);
+				continue;
+			}
 
-		if (sec_hitmask[i]) {
-			uint32_t first_hit = __builtin_ctzl(sec_hitmask[i]);
-			uint32_t key_idx = secondary_bkt[i]->key_idx[first_hit];
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-			rte_prefetch0(key_slot);
+			if (sec_hitmask[i]) {
+				uint32_t first_hit =
+						__builtin_ctzl(sec_hitmask[i]);
+				uint32_t key_idx =
+					secondary_bkt[i]->key_idx[first_hit];
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
+				rte_prefetch0(key_slot);
+			}
 		}
-	}
 
-	/* Compare keys, first hits in primary first */
-	for (i = 0; i < num_keys; i++) {
-		positions[i] = -ENOENT;
-		while (prim_hitmask[i]) {
-			uint32_t hit_index = __builtin_ctzl(prim_hitmask[i]);
+		/* Compare keys, first hits in primary first */
+		for (i = 0; i < num_keys; i++) {
+			positions[i] = -ENOENT;
+			while (prim_hitmask[i]) {
+				uint32_t hit_index =
+						__builtin_ctzl(prim_hitmask[i]);
 
-			uint32_t key_idx =
-			__atomic_load_n(
-				&primary_bkt[i]->key_idx[hit_index],
-				__ATOMIC_ACQUIRE);
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-
-			if (key_idx != EMPTY_SLOT)
-				pdata[i] = __atomic_load_n(&key_slot->pdata,
-						__ATOMIC_ACQUIRE);
-			/*
-			 * If key index is 0, do not compare key,
-			 * as it is checking the dummy slot
-			 */
-			if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
-				if (data != NULL)
-					data[i] = pdata[i];
+				uint32_t key_idx =
+				__atomic_load_n(
+					&primary_bkt[i]->key_idx[hit_index],
+					__ATOMIC_ACQUIRE);
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
 
-				hits |= 1ULL << i;
-				positions[i] = key_idx - 1;
-				goto next_key;
+				if (key_idx != EMPTY_SLOT)
+					pdata[i] = __atomic_load_n(
+							&key_slot->pdata,
+							__ATOMIC_ACQUIRE);
+				/*
+				 * If key index is 0, do not compare key,
+				 * as it is checking the dummy slot
+				 */
+				if (!!key_idx &
+					!rte_hash_cmp_eq(
+						key_slot->key, keys[i], h)) {
+					if (data != NULL)
+						data[i] = pdata[i];
+
+					hits |= 1ULL << i;
+					positions[i] = key_idx - 1;
+					goto next_key;
+				}
+				prim_hitmask[i] &= ~(1 << (hit_index));
 			}
-			prim_hitmask[i] &= ~(1 << (hit_index));
-		}
 
-		while (sec_hitmask[i]) {
-			uint32_t hit_index = __builtin_ctzl(sec_hitmask[i]);
+			while (sec_hitmask[i]) {
+				uint32_t hit_index =
+						__builtin_ctzl(sec_hitmask[i]);
 
-			uint32_t key_idx =
-			__atomic_load_n(
-				&secondary_bkt[i]->key_idx[hit_index],
-				__ATOMIC_ACQUIRE);
-			const struct rte_hash_key *key_slot =
-				(const struct rte_hash_key *)(
-				(const char *)h->key_store +
-				key_idx * h->key_entry_size);
-
-			if (key_idx != EMPTY_SLOT)
-				pdata[i] = __atomic_load_n(&key_slot->pdata,
-						__ATOMIC_ACQUIRE);
-
-			/*
-			 * If key index is 0, do not compare key,
-			 * as it is checking the dummy slot
-			 */
+				uint32_t key_idx =
+				__atomic_load_n(
+					&secondary_bkt[i]->key_idx[hit_index],
+					__ATOMIC_ACQUIRE);
+				const struct rte_hash_key *key_slot =
+					(const struct rte_hash_key *)(
+					(const char *)h->key_store +
+					key_idx * h->key_entry_size);
 
-			if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
-				if (data != NULL)
-					data[i] = pdata[i];
+				if (key_idx != EMPTY_SLOT)
+					pdata[i] = __atomic_load_n(
+							&key_slot->pdata,
+							__ATOMIC_ACQUIRE);
+				/*
+				 * If key index is 0, do not compare key,
+				 * as it is checking the dummy slot
+				 */
 
-				hits |= 1ULL << i;
-				positions[i] = key_idx - 1;
-				goto next_key;
+				if (!!key_idx &
+					!rte_hash_cmp_eq(
+						key_slot->key, keys[i], h)) {
+					if (data != NULL)
+						data[i] = pdata[i];
+
+					hits |= 1ULL << i;
+					positions[i] = key_idx - 1;
+					goto next_key;
+				}
+				sec_hitmask[i] &= ~(1 << (hit_index));
 			}
-			sec_hitmask[i] &= ~(1 << (hit_index));
-		}
 
 next_key:
-		continue;
-	}
+			continue;
+		}
+
+		/* The loads of sig_current in compare_signatures
+		 * should not move below the load from tbl_chng_cnt.
+		 */
+		__atomic_thread_fence(__ATOMIC_ACQUIRE);
+		/* Re-read the table change counter to check if the
+		 * table has changed during search. If yes, re-do
+		 * the search.
+		 * This load should not get hoisted. The load
+		 * acquires on cnt_b, primary key index and secondary
+		 * key index will make sure that it does not get
+		 * hoisted.
+		 */
+		cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+					__ATOMIC_ACQUIRE);
+	} while (cnt_b != cnt_a);
 
 	__hash_rw_reader_unlock(h);
 
diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h
index a44c6be..cf50ada 100644
--- a/lib/librte_hash/rte_cuckoo_hash.h
+++ b/lib/librte_hash/rte_cuckoo_hash.h
@@ -1,5 +1,6 @@ 
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2016 Intel Corporation
+ * Copyright(c) 2018 Arm Limited
  */
 
 /* rte_cuckoo_hash.h
@@ -196,6 +197,8 @@  struct rte_hash {
 	 * to the key table.
 	 */
 	rte_rwlock_t *readwrite_lock; /**< Read-write lock thread-safety. */
+	uint32_t *tbl_chng_cnt;
+	/**< Indicates if the hash table changed from last read. */
 } __rte_cache_aligned;
 
 struct queue_node {