diff mbox

[PATCHv2,2/3] linux-generic: buffers: improve lockless allocator

Message ID 1420998638-23980-3-git-send-email-bill.fischofer@linaro.org
State New
Headers show

Commit Message

Bill Fischofer Jan. 11, 2015, 5:50 p.m. UTC
Improve lockless buffer allocation by using offsets to extend tag length to
avoid ABA issues. This also resolves Bug 1051.

Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 .../include/odp_buffer_pool_internal.h             | 89 ++++++++++++++--------
 platform/linux-generic/odp_buffer_pool.c           |  7 +-
 2 files changed, 59 insertions(+), 37 deletions(-)

Comments

Taras Kondratiuk Jan. 13, 2015, 8:57 p.m. UTC | #1
On 01/11/2015 07:50 PM, Bill Fischofer wrote:
> Improve lockless buffer allocation by using offsets to extend tag length to
> avoid ABA issues. This also resolves Bug 1051.
> 
> Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
> ---
>  .../include/odp_buffer_pool_internal.h             | 89 ++++++++++++++--------
>  platform/linux-generic/odp_buffer_pool.c           |  7 +-
>  2 files changed, 59 insertions(+), 37 deletions(-)
> 
> diff --git a/platform/linux-generic/include/odp_buffer_pool_internal.h b/platform/linux-generic/include/odp_buffer_pool_internal.h
> index 2e48ac3..169e02d 100644
> --- a/platform/linux-generic/include/odp_buffer_pool_internal.h
> +++ b/platform/linux-generic/include/odp_buffer_pool_internal.h
> @@ -107,8 +107,8 @@ struct pool_entry_s {
>  	size_t                  pool_size;
>  	uint32_t                buf_align;
>  	uint32_t                buf_stride;
> -	_odp_atomic_ptr_t       buf_freelist;
> -	_odp_atomic_ptr_t       blk_freelist;
> +	odp_atomic_u64_t        buf_freelist;
> +	odp_atomic_u64_t        blk_freelist;
>  	odp_atomic_u32_t        bufcount;
>  	odp_atomic_u32_t        blkcount;
>  	odp_atomic_u64_t        bufallocs;
> @@ -142,57 +142,77 @@ extern void *pool_entry_ptr[];
>  #define pool_is_secure(pool) 0
>  #endif
>  
> -#define TAG_ALIGN ((size_t)16)
> +#define OFFSET_BITS ODP_BITSIZE((uint64_t)ODP_BUFFER_MAX_BUFFERS * \
> +				(uint32_t)ODP_CONFIG_PACKET_BUF_LEN_MAX)

It seems with a current odp_config.h settings OFFSET_BITS will be ~41
bits. On 32-bit machine it is too much. I've made calculation manually,
so I can be wrong.

> +
> +#define MAX_OFFSET (1ULL << BUF_OFFSET_BITS)
> +#define TAG_BITS (64 - OFFSET_BITS)
> +#define TAG_ALIGN (1ULL << TAG_BITS)
> +
> +#define NULL_OFFSET (~0ULL)
> +#define NULL_DETAGGED_OFFSET ((~0ULL) >> TAG_BITS)
>  
>  #define odp_cs(ptr, old, new) \
> -	_odp_atomic_ptr_cmp_xchg_strong(&ptr, (void **)&old, (void *)new, \
> -					_ODP_MEMMODEL_SC, \
> -					_ODP_MEMMODEL_SC)
> +	_odp_atomic_u64_cmp_xchg_strong_mm(&ptr, (uint64_t *)&old, \
> +					   (uint64_t)new, \
> +					   _ODP_MEMMODEL_SC, _ODP_MEMMODEL_SC)

On 32-bit machine this doubles a size of data to exchange. What is an
expected performance impact?

> +
> +/* Helper functions for offset tagging to avoid ABA race conditions */
> +#define odp_tag(offset) \
> +	(((uint64_t)offset) & (TAG_ALIGN - 1))
>  
> -/* Helper functions for pointer tagging to avoid ABA race conditions */
> -#define odp_tag(ptr) \
> -	(((size_t)ptr) & (TAG_ALIGN - 1))
> +#define off2ptr(offset, base) \
> +	((void *)((size_t)base + ((size_t)((uint64_t)offset >> TAG_BITS))))
>  
> -#define odp_detag(ptr) \
> -	((void *)(((size_t)ptr) & -TAG_ALIGN))
> +#define odp_detag(offset, base) \
> +	(((uint64_t)offset >> TAG_BITS) == NULL_DETAGGED_OFFSET ? NULL : \
> +	 off2ptr(offset, base))
>  
> -#define odp_retag(ptr, tag) \
> -	((void *)(((size_t)ptr) | odp_tag(tag)))
> +#define ptr2off(ptr, base) \
> +	((uint64_t)(ptr == NULL ? (NULL_DETAGGED_OFFSET << TAG_BITS) : \
> +		    (((uint64_t)((size_t)ptr - (size_t)base)) << TAG_BITS)))
> +
> +#define odp_retag(ptr, base, tag) (ptr2off(ptr, base) | odp_tag(tag))
>  
>  
>  static inline void *get_blk(struct pool_entry_s *pool)
>  {
> -	void *oldhead, *myhead, *newhead;
> +	uint64_t oldhead, newhead;
> +	void *myhead;
>  
> -	oldhead = _odp_atomic_ptr_load(&pool->blk_freelist, _ODP_MEMMODEL_ACQ);
> +	oldhead = odp_atomic_load_u64(&pool->blk_freelist);
>  
>  	do {
> -		size_t tag = odp_tag(oldhead);
> -		myhead = odp_detag(oldhead);
> +		uint64_t tag = odp_tag(oldhead);
> +		myhead = odp_detag(oldhead, pool->pool_base_addr);
>  		if (odp_unlikely(myhead == NULL))
>  			break;
> -		newhead = odp_retag(((odp_buf_blk_t *)myhead)->next, tag + 1);
> +		newhead = odp_retag(((odp_buf_blk_t *)myhead)->next,
> +				    pool->pool_base_addr, tag + 1);
>  	} while (odp_cs(pool->blk_freelist, oldhead, newhead) == 0);
>  
> -	if (odp_unlikely(myhead == NULL))
> +	if (odp_unlikely(myhead == NULL)) {
>  		odp_atomic_inc_u64(&pool->blkempty);
> -	else
> +	} else {
> +		odp_atomic_inc_u64(&pool->blkallocs);

This seems to be a not related change.

>  		odp_atomic_dec_u32(&pool->blkcount);
> +	}
>  
> -	return (void *)myhead;
> +	return myhead;
>  }
Bill Fischofer Jan. 13, 2015, 9:08 p.m. UTC | #2
OFFSET_BITS is just the theoretical maximum and doesn't reflect reality.
With the current ODP_CONFIG values it evaluates to 39, which means buffer
pools can theoretically be up to 512GB is size, which we're clearly not
going to see.  The purpose here is simply to have a significantly larger
set of TAG_BITS (in this case, 25) to avoid ABA conditions.

If a platform does not support 64-bit atomic operations then the lock-based
allocator should be used.  One of the reasons both are provided is to
enable different implementations to pick which works best for that
platform.  Since a local cache is used in any case, the performance
difference between lock-based and lockless allocation is minor, however
having both means we can do scalability analysis when we start performance
tuning.

On Tue, Jan 13, 2015 at 2:57 PM, Taras Kondratiuk <
taras.kondratiuk@linaro.org> wrote:

> On 01/11/2015 07:50 PM, Bill Fischofer wrote:
> > Improve lockless buffer allocation by using offsets to extend tag length
> to
> > avoid ABA issues. This also resolves Bug 1051.
> >
> > Signed-off-by: Bill Fischofer <bill.fischofer@linaro.org>
> > ---
> >  .../include/odp_buffer_pool_internal.h             | 89
> ++++++++++++++--------
> >  platform/linux-generic/odp_buffer_pool.c           |  7 +-
> >  2 files changed, 59 insertions(+), 37 deletions(-)
> >
> > diff --git a/platform/linux-generic/include/odp_buffer_pool_internal.h
> b/platform/linux-generic/include/odp_buffer_pool_internal.h
> > index 2e48ac3..169e02d 100644
> > --- a/platform/linux-generic/include/odp_buffer_pool_internal.h
> > +++ b/platform/linux-generic/include/odp_buffer_pool_internal.h
> > @@ -107,8 +107,8 @@ struct pool_entry_s {
> >       size_t                  pool_size;
> >       uint32_t                buf_align;
> >       uint32_t                buf_stride;
> > -     _odp_atomic_ptr_t       buf_freelist;
> > -     _odp_atomic_ptr_t       blk_freelist;
> > +     odp_atomic_u64_t        buf_freelist;
> > +     odp_atomic_u64_t        blk_freelist;
> >       odp_atomic_u32_t        bufcount;
> >       odp_atomic_u32_t        blkcount;
> >       odp_atomic_u64_t        bufallocs;
> > @@ -142,57 +142,77 @@ extern void *pool_entry_ptr[];
> >  #define pool_is_secure(pool) 0
> >  #endif
> >
> > -#define TAG_ALIGN ((size_t)16)
> > +#define OFFSET_BITS ODP_BITSIZE((uint64_t)ODP_BUFFER_MAX_BUFFERS * \
> > +                             (uint32_t)ODP_CONFIG_PACKET_BUF_LEN_MAX)
>
> It seems with a current odp_config.h settings OFFSET_BITS will be ~41
> bits. On 32-bit machine it is too much. I've made calculation manually,
> so I can be wrong.
>
> > +
> > +#define MAX_OFFSET (1ULL << BUF_OFFSET_BITS)
> > +#define TAG_BITS (64 - OFFSET_BITS)
> > +#define TAG_ALIGN (1ULL << TAG_BITS)
> > +
> > +#define NULL_OFFSET (~0ULL)
> > +#define NULL_DETAGGED_OFFSET ((~0ULL) >> TAG_BITS)
> >
> >  #define odp_cs(ptr, old, new) \
> > -     _odp_atomic_ptr_cmp_xchg_strong(&ptr, (void **)&old, (void *)new, \
> > -                                     _ODP_MEMMODEL_SC, \
> > -                                     _ODP_MEMMODEL_SC)
> > +     _odp_atomic_u64_cmp_xchg_strong_mm(&ptr, (uint64_t *)&old, \
> > +                                        (uint64_t)new, \
> > +                                        _ODP_MEMMODEL_SC,
> _ODP_MEMMODEL_SC)
>
> On 32-bit machine this doubles a size of data to exchange. What is an
> expected performance impact?
>
> > +
> > +/* Helper functions for offset tagging to avoid ABA race conditions */
> > +#define odp_tag(offset) \
> > +     (((uint64_t)offset) & (TAG_ALIGN - 1))
> >
> > -/* Helper functions for pointer tagging to avoid ABA race conditions */
> > -#define odp_tag(ptr) \
> > -     (((size_t)ptr) & (TAG_ALIGN - 1))
> > +#define off2ptr(offset, base) \
> > +     ((void *)((size_t)base + ((size_t)((uint64_t)offset >> TAG_BITS))))
> >
> > -#define odp_detag(ptr) \
> > -     ((void *)(((size_t)ptr) & -TAG_ALIGN))
> > +#define odp_detag(offset, base) \
> > +     (((uint64_t)offset >> TAG_BITS) == NULL_DETAGGED_OFFSET ? NULL : \
> > +      off2ptr(offset, base))
> >
> > -#define odp_retag(ptr, tag) \
> > -     ((void *)(((size_t)ptr) | odp_tag(tag)))
> > +#define ptr2off(ptr, base) \
> > +     ((uint64_t)(ptr == NULL ? (NULL_DETAGGED_OFFSET << TAG_BITS) : \
> > +                 (((uint64_t)((size_t)ptr - (size_t)base)) <<
> TAG_BITS)))
> > +
> > +#define odp_retag(ptr, base, tag) (ptr2off(ptr, base) | odp_tag(tag))
> >
> >
> >  static inline void *get_blk(struct pool_entry_s *pool)
> >  {
> > -     void *oldhead, *myhead, *newhead;
> > +     uint64_t oldhead, newhead;
> > +     void *myhead;
> >
> > -     oldhead = _odp_atomic_ptr_load(&pool->blk_freelist,
> _ODP_MEMMODEL_ACQ);
> > +     oldhead = odp_atomic_load_u64(&pool->blk_freelist);
> >
> >       do {
> > -             size_t tag = odp_tag(oldhead);
> > -             myhead = odp_detag(oldhead);
> > +             uint64_t tag = odp_tag(oldhead);
> > +             myhead = odp_detag(oldhead, pool->pool_base_addr);
> >               if (odp_unlikely(myhead == NULL))
> >                       break;
> > -             newhead = odp_retag(((odp_buf_blk_t *)myhead)->next, tag +
> 1);
> > +             newhead = odp_retag(((odp_buf_blk_t *)myhead)->next,
> > +                                 pool->pool_base_addr, tag + 1);
> >       } while (odp_cs(pool->blk_freelist, oldhead, newhead) == 0);
> >
> > -     if (odp_unlikely(myhead == NULL))
> > +     if (odp_unlikely(myhead == NULL)) {
> >               odp_atomic_inc_u64(&pool->blkempty);
> > -     else
> > +     } else {
> > +             odp_atomic_inc_u64(&pool->blkallocs);
>
> This seems to be a not related change.
>
> >               odp_atomic_dec_u32(&pool->blkcount);
> > +     }
> >
> > -     return (void *)myhead;
> > +     return myhead;
> >  }
>
>
> --
> Taras Kondratiuk
>
Taras Kondratiuk Jan. 13, 2015, 9:31 p.m. UTC | #3
On 01/13/2015 11:08 PM, Bill Fischofer wrote:
> If a platform does not support 64-bit atomic operations then the
> lock-based allocator should be used.

It would be nice to mention this in the commit message.
Bill Fischofer Jan. 13, 2015, 9:43 p.m. UTC | #4
I can include that in a v3 if needed.  Would that go above or below the -- ?

On Tue, Jan 13, 2015 at 3:31 PM, Taras Kondratiuk <
taras.kondratiuk@linaro.org> wrote:

> On 01/13/2015 11:08 PM, Bill Fischofer wrote:
> > If a platform does not support 64-bit atomic operations then the
> > lock-based allocator should be used.
>
> It would be nice to mention this in the commit message.
>
> --
> Taras Kondratiuk
>
Taras Kondratiuk Jan. 13, 2015, 9:49 p.m. UTC | #5
On 01/13/2015 11:43 PM, Bill Fischofer wrote:
> I can include that in a v3 if needed.  Would that go above or below the -- ?

I think it should be in a git history, so above.
diff mbox

Patch

diff --git a/platform/linux-generic/include/odp_buffer_pool_internal.h b/platform/linux-generic/include/odp_buffer_pool_internal.h
index 2e48ac3..169e02d 100644
--- a/platform/linux-generic/include/odp_buffer_pool_internal.h
+++ b/platform/linux-generic/include/odp_buffer_pool_internal.h
@@ -107,8 +107,8 @@  struct pool_entry_s {
 	size_t                  pool_size;
 	uint32_t                buf_align;
 	uint32_t                buf_stride;
-	_odp_atomic_ptr_t       buf_freelist;
-	_odp_atomic_ptr_t       blk_freelist;
+	odp_atomic_u64_t        buf_freelist;
+	odp_atomic_u64_t        blk_freelist;
 	odp_atomic_u32_t        bufcount;
 	odp_atomic_u32_t        blkcount;
 	odp_atomic_u64_t        bufallocs;
@@ -142,57 +142,77 @@  extern void *pool_entry_ptr[];
 #define pool_is_secure(pool) 0
 #endif
 
-#define TAG_ALIGN ((size_t)16)
+#define OFFSET_BITS ODP_BITSIZE((uint64_t)ODP_BUFFER_MAX_BUFFERS * \
+				(uint32_t)ODP_CONFIG_PACKET_BUF_LEN_MAX)
+
+#define MAX_OFFSET (1ULL << BUF_OFFSET_BITS)
+#define TAG_BITS (64 - OFFSET_BITS)
+#define TAG_ALIGN (1ULL << TAG_BITS)
+
+#define NULL_OFFSET (~0ULL)
+#define NULL_DETAGGED_OFFSET ((~0ULL) >> TAG_BITS)
 
 #define odp_cs(ptr, old, new) \
-	_odp_atomic_ptr_cmp_xchg_strong(&ptr, (void **)&old, (void *)new, \
-					_ODP_MEMMODEL_SC, \
-					_ODP_MEMMODEL_SC)
+	_odp_atomic_u64_cmp_xchg_strong_mm(&ptr, (uint64_t *)&old, \
+					   (uint64_t)new, \
+					   _ODP_MEMMODEL_SC, _ODP_MEMMODEL_SC)
+
+/* Helper functions for offset tagging to avoid ABA race conditions */
+#define odp_tag(offset) \
+	(((uint64_t)offset) & (TAG_ALIGN - 1))
 
-/* Helper functions for pointer tagging to avoid ABA race conditions */
-#define odp_tag(ptr) \
-	(((size_t)ptr) & (TAG_ALIGN - 1))
+#define off2ptr(offset, base) \
+	((void *)((size_t)base + ((size_t)((uint64_t)offset >> TAG_BITS))))
 
-#define odp_detag(ptr) \
-	((void *)(((size_t)ptr) & -TAG_ALIGN))
+#define odp_detag(offset, base) \
+	(((uint64_t)offset >> TAG_BITS) == NULL_DETAGGED_OFFSET ? NULL : \
+	 off2ptr(offset, base))
 
-#define odp_retag(ptr, tag) \
-	((void *)(((size_t)ptr) | odp_tag(tag)))
+#define ptr2off(ptr, base) \
+	((uint64_t)(ptr == NULL ? (NULL_DETAGGED_OFFSET << TAG_BITS) : \
+		    (((uint64_t)((size_t)ptr - (size_t)base)) << TAG_BITS)))
+
+#define odp_retag(ptr, base, tag) (ptr2off(ptr, base) | odp_tag(tag))
 
 
 static inline void *get_blk(struct pool_entry_s *pool)
 {
-	void *oldhead, *myhead, *newhead;
+	uint64_t oldhead, newhead;
+	void *myhead;
 
-	oldhead = _odp_atomic_ptr_load(&pool->blk_freelist, _ODP_MEMMODEL_ACQ);
+	oldhead = odp_atomic_load_u64(&pool->blk_freelist);
 
 	do {
-		size_t tag = odp_tag(oldhead);
-		myhead = odp_detag(oldhead);
+		uint64_t tag = odp_tag(oldhead);
+		myhead = odp_detag(oldhead, pool->pool_base_addr);
 		if (odp_unlikely(myhead == NULL))
 			break;
-		newhead = odp_retag(((odp_buf_blk_t *)myhead)->next, tag + 1);
+		newhead = odp_retag(((odp_buf_blk_t *)myhead)->next,
+				    pool->pool_base_addr, tag + 1);
 	} while (odp_cs(pool->blk_freelist, oldhead, newhead) == 0);
 
-	if (odp_unlikely(myhead == NULL))
+	if (odp_unlikely(myhead == NULL)) {
 		odp_atomic_inc_u64(&pool->blkempty);
-	else
+	} else {
+		odp_atomic_inc_u64(&pool->blkallocs);
 		odp_atomic_dec_u32(&pool->blkcount);
+	}
 
-	return (void *)myhead;
+	return myhead;
 }
 
 static inline void ret_blk(struct pool_entry_s *pool, void *block)
 {
-	void *oldhead, *myhead, *myblock;
+	uint64_t oldhead, myblock;
+	void *myhead;
 
-	oldhead = _odp_atomic_ptr_load(&pool->blk_freelist, _ODP_MEMMODEL_ACQ);
+	oldhead = odp_atomic_load_u64(&pool->blk_freelist);
 
 	do {
 		size_t tag = odp_tag(oldhead);
-		myhead = odp_detag(oldhead);
+		myhead = odp_detag(oldhead, pool->pool_base_addr);
 		((odp_buf_blk_t *)block)->next = myhead;
-		myblock = odp_retag(block, tag + 1);
+		myblock = odp_retag(block, pool->pool_base_addr, tag + 1);
 	} while (odp_cs(pool->blk_freelist, oldhead, myblock) == 0);
 
 	odp_atomic_inc_u32(&pool->blkcount);
@@ -201,16 +221,18 @@  static inline void ret_blk(struct pool_entry_s *pool, void *block)
 
 static inline odp_buffer_hdr_t *get_buf(struct pool_entry_s *pool)
 {
-	odp_buffer_hdr_t *oldhead, *myhead, *newhead;
+	uint64_t oldhead, newhead;
+	odp_buffer_hdr_t *myhead;
 
-	oldhead = _odp_atomic_ptr_load(&pool->buf_freelist, _ODP_MEMMODEL_ACQ);
+	oldhead = odp_atomic_load_u64(&pool->buf_freelist);
 
 	do {
 		size_t tag = odp_tag(oldhead);
-		myhead = odp_detag(oldhead);
+		myhead = odp_detag(oldhead, pool->pool_mdata_addr);
 		if (odp_unlikely(myhead == NULL))
 			break;
-		newhead = odp_retag(myhead->next, tag + 1);
+		newhead = odp_retag(myhead->next, pool->pool_mdata_addr,
+				    tag + 1);
 	} while (odp_cs(pool->buf_freelist, oldhead, newhead) == 0);
 
 	if (odp_unlikely(myhead == NULL)) {
@@ -235,7 +257,8 @@  static inline odp_buffer_hdr_t *get_buf(struct pool_entry_s *pool)
 
 static inline void ret_buf(struct pool_entry_s *pool, odp_buffer_hdr_t *buf)
 {
-	odp_buffer_hdr_t *oldhead, *myhead, *mybuf;
+	uint64_t oldhead, mybuf;
+	odp_buffer_hdr_t *myhead;
 
 	buf->allocator = ODP_FREEBUF;  /* Mark buffer free */
 
@@ -249,13 +272,13 @@  static inline void ret_buf(struct pool_entry_s *pool, odp_buffer_hdr_t *buf)
 		buf->size = 0;
 	}
 
-	oldhead = _odp_atomic_ptr_load(&pool->buf_freelist, _ODP_MEMMODEL_ACQ);
+	oldhead = odp_atomic_load_u64(&pool->buf_freelist);
 
 	do {
 		size_t tag = odp_tag(oldhead);
-		myhead = odp_detag(oldhead);
+		myhead = odp_detag(oldhead, pool->pool_mdata_addr);
 		buf->next = myhead;
-		mybuf = odp_retag(buf, tag + 1);
+		mybuf = odp_retag(buf, pool->pool_mdata_addr, tag + 1);
 	} while (odp_cs(pool->buf_freelist, oldhead, mybuf) == 0);
 
 	uint64_t bufcount = odp_atomic_fetch_add_u32(&pool->bufcount, 1) + 1;
diff --git a/platform/linux-generic/odp_buffer_pool.c b/platform/linux-generic/odp_buffer_pool.c
index eedb380..5d181bb 100644
--- a/platform/linux-generic/odp_buffer_pool.c
+++ b/platform/linux-generic/odp_buffer_pool.c
@@ -286,10 +286,9 @@  odp_buffer_pool_t odp_buffer_pool_create(const char *name,
 		pool->s.pool_mdata_addr = mdata_base_addr;
 
 		pool->s.buf_stride = buf_stride;
-		_odp_atomic_ptr_store(&pool->s.buf_freelist, NULL,
-				      _ODP_MEMMODEL_RLX);
-		_odp_atomic_ptr_store(&pool->s.blk_freelist, NULL,
-				      _ODP_MEMMODEL_RLX);
+
+		odp_atomic_store_u64(&pool->s.buf_freelist, NULL_OFFSET);
+		odp_atomic_store_u64(&pool->s.blk_freelist, NULL_OFFSET);
 
 		/* Initialization will increment these to their target vals */
 		odp_atomic_store_u32(&pool->s.bufcount, 0);