diff mbox

[PATCHv2] linux-generic: test: fix ring resource leaks

Message ID 1464190216-13226-1-git-send-email-maxim.uvarov@linaro.org
State Accepted
Commit d585b3f6ea6e28e2fa086501378cbb7fa5e9fe02
Headers show

Commit Message

Maxim Uvarov May 25, 2016, 3:30 p.m. UTC
Make test a little bit simple. Add memory free and
take care about overflow using cast to int:
(int32_t)odp_atomic_load_u32(consume_count)
Where number of consumer threads can dequeue from ring
and decrease atomic u32.

Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
Reviewed-and-tested-by: Bill Fischofer <bill.fischofer@linaro.org>
---
 v2: some cosmetic fixes:
	- remove not needed usleep();
	- add barrier to run queue/deq to ring at the same time;
	- more accurate cast to (int32_t) instead of (int);

 Note: this test hangs with both -mcx16 and -O3 on gcc <= 4.9, that
	will be fixed in separate patch.


 platform/linux-generic/test/ring/ring_stress.c | 82 ++++++++++++++------------
 1 file changed, 43 insertions(+), 39 deletions(-)

Comments

Bill Fischofer May 25, 2016, 4:25 p.m. UTC | #1
This version looks good too, so my previous review stands.

On Wed, May 25, 2016 at 10:30 AM, Maxim Uvarov <maxim.uvarov@linaro.org>
wrote:

> Make test a little bit simple. Add memory free and
> take care about overflow using cast to int:
> (int32_t)odp_atomic_load_u32(consume_count)
> Where number of consumer threads can dequeue from ring
> and decrease atomic u32.
>
> Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org>
> Reviewed-and-tested-by: Bill Fischofer <bill.fischofer@linaro.org>
> ---
>  v2: some cosmetic fixes:
>         - remove not needed usleep();
>         - add barrier to run queue/deq to ring at the same time;
>         - more accurate cast to (int32_t) instead of (int);
>
>  Note: this test hangs with both -mcx16 and -O3 on gcc <= 4.9, that
>         will be fixed in separate patch.
>
>
>  platform/linux-generic/test/ring/ring_stress.c | 82
> ++++++++++++++------------
>  1 file changed, 43 insertions(+), 39 deletions(-)
>
> diff --git a/platform/linux-generic/test/ring/ring_stress.c
> b/platform/linux-generic/test/ring/ring_stress.c
> index c68419f..8b6d9ae 100644
> --- a/platform/linux-generic/test/ring/ring_stress.c
> +++ b/platform/linux-generic/test/ring/ring_stress.c
> @@ -54,6 +54,9 @@ static odp_atomic_u32_t *retrieve_consume_count(void);
>  static const char *ring_name = "stress ring";
>  static const char *consume_count_name = "stress ring consume count";
>
> +/* barrier to run threads at the same time */
> +static odp_barrier_t barrier;
> +
>  int ring_test_stress_start(void)
>  {
>         odp_shm_t shared;
> @@ -120,6 +123,8 @@ void ring_test_stress_1_1_producer_consumer(void)
>          */
>         odp_atomic_init_u32(consume_count, 1);
>
> +       odp_barrier_init(&barrier, 2);
> +
>         /* kick the workers */
>         odp_cunit_thread_create(stress_worker, &worker_param);
>
> @@ -156,12 +161,13 @@ void ring_test_stress_N_M_producer_consumer(void)
>         consume_count = retrieve_consume_count();
>         CU_ASSERT(consume_count != NULL);
>
> -       /* in N:M test case, producer threads are always
> -        * greater or equal to consumer threads, thus produce
> -        * enought "goods" to be consumed by consumer threads.
> +       /* all producer threads try to fill ring to RING_SIZE,
> +        * while consumers threads dequeue from ring with PIECE_BULK
> +        * blocks. Multiply on 100 to add more tries.
>          */
> -       odp_atomic_init_u32(consume_count,
> -                           (worker_param.numthrds) / 2);
> +       odp_atomic_init_u32(consume_count, RING_SIZE / PIECE_BULK * 100);
> +
> +       odp_barrier_init(&barrier, worker_param.numthrds);
>
>         /* kick the workers */
>         odp_cunit_thread_create(stress_worker, &worker_param);
> @@ -202,8 +208,15 @@ static odp_atomic_u32_t *retrieve_consume_count(void)
>  /* worker function for multiple producer instances */
>  static int do_producer(_ring_t *r)
>  {
> -       int i, result = 0;
> +       int i;
>         void **enq = NULL;
> +       odp_atomic_u32_t *consume_count;
> +
> +       consume_count = retrieve_consume_count();
> +       if (consume_count == NULL) {
> +               LOG_ERR("cannot retrieve expected consume count.\n");
> +               return -1;
> +       }
>
>         /* allocate dummy object pointers for enqueue */
>         enq = malloc(PIECE_BULK * 2 * sizeof(void *));
> @@ -216,26 +229,29 @@ static int do_producer(_ring_t *r)
>         for (i = 0; i < PIECE_BULK; i++)
>                 enq[i] = (void *)(unsigned long)i;
>
> -       do {
> -               result = _ring_mp_enqueue_bulk(r, enq, PIECE_BULK);
> -               if (0 == result) {
> -                       free(enq);
> -                       return 0;
> -               }
> -               usleep(10); /* wait for consumer threads */
> -       } while (!_ring_full(r));
> +       odp_barrier_wait(&barrier);
>
> +       while ((int32_t)odp_atomic_load_u32(consume_count) > 0) {
> +               /* produce as much data as we can to the ring */
> +               (void)_ring_mp_enqueue_bulk(r, enq, PIECE_BULK);
> +       }
> +
> +       free(enq);
>         return 0;
>  }
>
>  /* worker function for multiple consumer instances */
>  static int do_consumer(_ring_t *r)
>  {
> -       int i, result = 0;
> +       int i;
>         void **deq = NULL;
> -       odp_atomic_u32_t *consume_count = NULL;
> -       const char *message = "test OK!";
> -       const char *mismatch = "data mismatch..lockless enq/deq failed.";
> +       odp_atomic_u32_t *consume_count;
> +
> +       consume_count = retrieve_consume_count();
> +       if (consume_count == NULL) {
> +               LOG_ERR("cannot retrieve expected consume count.\n");
> +               return -1;
> +       }
>
>         /* allocate dummy object pointers for dequeue */
>         deq = malloc(PIECE_BULK * 2 * sizeof(void *));
> @@ -244,31 +260,19 @@ static int do_consumer(_ring_t *r)
>                 return 0; /* not failure, skip for insufficient memory */
>         }
>
> -       consume_count = retrieve_consume_count();
> -       if (consume_count == NULL) {
> -               LOG_ERR("cannot retrieve expected consume count.\n");
> -               return -1;
> -       }
> +       odp_barrier_wait(&barrier);
>
> -       while (odp_atomic_load_u32(consume_count) > 0) {
> -               result = _ring_mc_dequeue_bulk(r, deq, PIECE_BULK);
> -               if (0 == result) {
> -                       /* evaluate the data pattern */
> -                       for (i = 0; i < PIECE_BULK; i++) {
> -                               if (deq[i] != (void *)(unsigned long)i) {
> -                                       result = -1;
> -                                       message = mismatch;
> -                                       break;
> -                               }
> -                       }
> -
> -                       free(deq);
> -                       LOG_ERR("%s\n", message);
> +       while ((int32_t)odp_atomic_load_u32(consume_count) > 0) {
> +               if (!_ring_mc_dequeue_bulk(r, deq, PIECE_BULK)) {
>                         odp_atomic_dec_u32(consume_count);
> -                       return result;
> +
> +                       /* evaluate the data pattern */
> +                       for (i = 0; i < PIECE_BULK; i++)
> +                               CU_ASSERT(deq[i] == (void *)(unsigned
> long)i);
>                 }
> -               usleep(10); /* wait for producer threads */
>         }
> +
> +       free(deq);
>         return 0;
>  }
>
> --
> 2.7.1.250.gff4ea60
>
> _______________________________________________
> lng-odp mailing list
> lng-odp@lists.linaro.org
> https://lists.linaro.org/mailman/listinfo/lng-odp
>
Maxim Uvarov May 25, 2016, 6:09 p.m. UTC | #2
On 05/25/16 19:25, Bill Fischofer wrote:
> This version looks good too, so my previous review stands.
>

ok, thanks. This patch will go together with configure.ac fix for 
-mcx16. Because in my case it fixes clang
hang bug, but reproduces hang on old gcc. So I will merge both patches 
at the same time.

Maxim.
> On Wed, May 25, 2016 at 10:30 AM, Maxim Uvarov 
> <maxim.uvarov@linaro.org <mailto:maxim.uvarov@linaro.org>> wrote:
>
>     Make test a little bit simple. Add memory free and
>     take care about overflow using cast to int:
>     (int32_t)odp_atomic_load_u32(consume_count)
>     Where number of consumer threads can dequeue from ring
>     and decrease atomic u32.
>
>     Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org
>     <mailto:maxim.uvarov@linaro.org>>
>     Reviewed-and-tested-by: Bill Fischofer <bill.fischofer@linaro.org
>     <mailto:bill.fischofer@linaro.org>>
>     ---
>      v2: some cosmetic fixes:
>             - remove not needed usleep();
>             - add barrier to run queue/deq to ring at the same time;
>             - more accurate cast to (int32_t) instead of (int);
>
>      Note: this test hangs with both -mcx16 and -O3 on gcc <= 4.9, that
>             will be fixed in separate patch.
>
>
>      platform/linux-generic/test/ring/ring_stress.c | 82
>     ++++++++++++++------------
>      1 file changed, 43 insertions(+), 39 deletions(-)
>
>     diff --git a/platform/linux-generic/test/ring/ring_stress.c
>     b/platform/linux-generic/test/ring/ring_stress.c
>     index c68419f..8b6d9ae 100644
>     --- a/platform/linux-generic/test/ring/ring_stress.c
>     +++ b/platform/linux-generic/test/ring/ring_stress.c
>     @@ -54,6 +54,9 @@ static odp_atomic_u32_t
>     *retrieve_consume_count(void);
>      static const char *ring_name = "stress ring";
>      static const char *consume_count_name = "stress ring consume count";
>
>     +/* barrier to run threads at the same time */
>     +static odp_barrier_t barrier;
>     +
>      int ring_test_stress_start(void)
>      {
>             odp_shm_t shared;
>     @@ -120,6 +123,8 @@ void ring_test_stress_1_1_producer_consumer(void)
>              */
>             odp_atomic_init_u32(consume_count, 1);
>
>     +       odp_barrier_init(&barrier, 2);
>     +
>             /* kick the workers */
>             odp_cunit_thread_create(stress_worker, &worker_param);
>
>     @@ -156,12 +161,13 @@ void
>     ring_test_stress_N_M_producer_consumer(void)
>             consume_count = retrieve_consume_count();
>             CU_ASSERT(consume_count != NULL);
>
>     -       /* in N:M test case, producer threads are always
>     -        * greater or equal to consumer threads, thus produce
>     -        * enought "goods" to be consumed by consumer threads.
>     +       /* all producer threads try to fill ring to RING_SIZE,
>     +        * while consumers threads dequeue from ring with PIECE_BULK
>     +        * blocks. Multiply on 100 to add more tries.
>              */
>     -       odp_atomic_init_u32(consume_count,
>     -                           (worker_param.numthrds) / 2);
>     +       odp_atomic_init_u32(consume_count, RING_SIZE / PIECE_BULK
>     * 100);
>     +
>     +       odp_barrier_init(&barrier, worker_param.numthrds);
>
>             /* kick the workers */
>             odp_cunit_thread_create(stress_worker, &worker_param);
>     @@ -202,8 +208,15 @@ static odp_atomic_u32_t
>     *retrieve_consume_count(void)
>      /* worker function for multiple producer instances */
>      static int do_producer(_ring_t *r)
>      {
>     -       int i, result = 0;
>     +       int i;
>             void **enq = NULL;
>     +       odp_atomic_u32_t *consume_count;
>     +
>     +       consume_count = retrieve_consume_count();
>     +       if (consume_count == NULL) {
>     +               LOG_ERR("cannot retrieve expected consume count.\n");
>     +               return -1;
>     +       }
>
>             /* allocate dummy object pointers for enqueue */
>             enq = malloc(PIECE_BULK * 2 * sizeof(void *));
>     @@ -216,26 +229,29 @@ static int do_producer(_ring_t *r)
>             for (i = 0; i < PIECE_BULK; i++)
>                     enq[i] = (void *)(unsigned long)i;
>
>     -       do {
>     -               result = _ring_mp_enqueue_bulk(r, enq, PIECE_BULK);
>     -               if (0 == result) {
>     -                       free(enq);
>     -                       return 0;
>     -               }
>     -               usleep(10); /* wait for consumer threads */
>     -       } while (!_ring_full(r));
>     +       odp_barrier_wait(&barrier);
>
>     +       while ((int32_t)odp_atomic_load_u32(consume_count) > 0) {
>     +               /* produce as much data as we can to the ring */
>     +               (void)_ring_mp_enqueue_bulk(r, enq, PIECE_BULK);
>     +       }
>     +
>     +       free(enq);
>             return 0;
>      }
>
>      /* worker function for multiple consumer instances */
>      static int do_consumer(_ring_t *r)
>      {
>     -       int i, result = 0;
>     +       int i;
>             void **deq = NULL;
>     -       odp_atomic_u32_t *consume_count = NULL;
>     -       const char *message = "test OK!";
>     -       const char *mismatch = "data mismatch..lockless enq/deq
>     failed.";
>     +       odp_atomic_u32_t *consume_count;
>     +
>     +       consume_count = retrieve_consume_count();
>     +       if (consume_count == NULL) {
>     +               LOG_ERR("cannot retrieve expected consume count.\n");
>     +               return -1;
>     +       }
>
>             /* allocate dummy object pointers for dequeue */
>             deq = malloc(PIECE_BULK * 2 * sizeof(void *));
>     @@ -244,31 +260,19 @@ static int do_consumer(_ring_t *r)
>                     return 0; /* not failure, skip for insufficient
>     memory */
>             }
>
>     -       consume_count = retrieve_consume_count();
>     -       if (consume_count == NULL) {
>     -               LOG_ERR("cannot retrieve expected consume count.\n");
>     -               return -1;
>     -       }
>     +       odp_barrier_wait(&barrier);
>
>     -       while (odp_atomic_load_u32(consume_count) > 0) {
>     -               result = _ring_mc_dequeue_bulk(r, deq, PIECE_BULK);
>     -               if (0 == result) {
>     -                       /* evaluate the data pattern */
>     -                       for (i = 0; i < PIECE_BULK; i++) {
>     -                               if (deq[i] != (void *)(unsigned
>     long)i) {
>     -                                       result = -1;
>     -                                       message = mismatch;
>     -                                       break;
>     -                               }
>     -                       }
>     -
>     -                       free(deq);
>     -                       LOG_ERR("%s\n", message);
>     +       while ((int32_t)odp_atomic_load_u32(consume_count) > 0) {
>     +               if (!_ring_mc_dequeue_bulk(r, deq, PIECE_BULK)) {
>                             odp_atomic_dec_u32(consume_count);
>     -                       return result;
>     +
>     +                       /* evaluate the data pattern */
>     +                       for (i = 0; i < PIECE_BULK; i++)
>     +                               CU_ASSERT(deq[i] == (void
>     *)(unsigned long)i);
>                     }
>     -               usleep(10); /* wait for producer threads */
>             }
>     +
>     +       free(deq);
>             return 0;
>      }
>
>     --
>     2.7.1.250.gff4ea60
>
>     _______________________________________________
>     lng-odp mailing list
>     lng-odp@lists.linaro.org <mailto:lng-odp@lists.linaro.org>
>     https://lists.linaro.org/mailman/listinfo/lng-odp
>
>
Maxim Uvarov May 27, 2016, 3:29 p.m. UTC | #3
Merged,

now ring has to work and not hang.

Maxim.

On 05/25/16 19:25, Bill Fischofer wrote:
> This version looks good too, so my previous review stands.
>
> On Wed, May 25, 2016 at 10:30 AM, Maxim Uvarov 
> <maxim.uvarov@linaro.org <mailto:maxim.uvarov@linaro.org>> wrote:
>
>     Make test a little bit simple. Add memory free and
>     take care about overflow using cast to int:
>     (int32_t)odp_atomic_load_u32(consume_count)
>     Where number of consumer threads can dequeue from ring
>     and decrease atomic u32.
>
>     Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org
>     <mailto:maxim.uvarov@linaro.org>>
>     Reviewed-and-tested-by: Bill Fischofer <bill.fischofer@linaro.org
>     <mailto:bill.fischofer@linaro.org>>
>     ---
>      v2: some cosmetic fixes:
>             - remove not needed usleep();
>             - add barrier to run queue/deq to ring at the same time;
>             - more accurate cast to (int32_t) instead of (int);
>
>      Note: this test hangs with both -mcx16 and -O3 on gcc <= 4.9, that
>             will be fixed in separate patch.
>
>
>      platform/linux-generic/test/ring/ring_stress.c | 82
>     ++++++++++++++------------
>      1 file changed, 43 insertions(+), 39 deletions(-)
>
>     diff --git a/platform/linux-generic/test/ring/ring_stress.c
>     b/platform/linux-generic/test/ring/ring_stress.c
>     index c68419f..8b6d9ae 100644
>     --- a/platform/linux-generic/test/ring/ring_stress.c
>     +++ b/platform/linux-generic/test/ring/ring_stress.c
>     @@ -54,6 +54,9 @@ static odp_atomic_u32_t
>     *retrieve_consume_count(void);
>      static const char *ring_name = "stress ring";
>      static const char *consume_count_name = "stress ring consume count";
>
>     +/* barrier to run threads at the same time */
>     +static odp_barrier_t barrier;
>     +
>      int ring_test_stress_start(void)
>      {
>             odp_shm_t shared;
>     @@ -120,6 +123,8 @@ void ring_test_stress_1_1_producer_consumer(void)
>              */
>             odp_atomic_init_u32(consume_count, 1);
>
>     +       odp_barrier_init(&barrier, 2);
>     +
>             /* kick the workers */
>             odp_cunit_thread_create(stress_worker, &worker_param);
>
>     @@ -156,12 +161,13 @@ void
>     ring_test_stress_N_M_producer_consumer(void)
>             consume_count = retrieve_consume_count();
>             CU_ASSERT(consume_count != NULL);
>
>     -       /* in N:M test case, producer threads are always
>     -        * greater or equal to consumer threads, thus produce
>     -        * enought "goods" to be consumed by consumer threads.
>     +       /* all producer threads try to fill ring to RING_SIZE,
>     +        * while consumers threads dequeue from ring with PIECE_BULK
>     +        * blocks. Multiply on 100 to add more tries.
>              */
>     -       odp_atomic_init_u32(consume_count,
>     -                           (worker_param.numthrds) / 2);
>     +       odp_atomic_init_u32(consume_count, RING_SIZE / PIECE_BULK
>     * 100);
>     +
>     +       odp_barrier_init(&barrier, worker_param.numthrds);
>
>             /* kick the workers */
>             odp_cunit_thread_create(stress_worker, &worker_param);
>     @@ -202,8 +208,15 @@ static odp_atomic_u32_t
>     *retrieve_consume_count(void)
>      /* worker function for multiple producer instances */
>      static int do_producer(_ring_t *r)
>      {
>     -       int i, result = 0;
>     +       int i;
>             void **enq = NULL;
>     +       odp_atomic_u32_t *consume_count;
>     +
>     +       consume_count = retrieve_consume_count();
>     +       if (consume_count == NULL) {
>     +               LOG_ERR("cannot retrieve expected consume count.\n");
>     +               return -1;
>     +       }
>
>             /* allocate dummy object pointers for enqueue */
>             enq = malloc(PIECE_BULK * 2 * sizeof(void *));
>     @@ -216,26 +229,29 @@ static int do_producer(_ring_t *r)
>             for (i = 0; i < PIECE_BULK; i++)
>                     enq[i] = (void *)(unsigned long)i;
>
>     -       do {
>     -               result = _ring_mp_enqueue_bulk(r, enq, PIECE_BULK);
>     -               if (0 == result) {
>     -                       free(enq);
>     -                       return 0;
>     -               }
>     -               usleep(10); /* wait for consumer threads */
>     -       } while (!_ring_full(r));
>     +       odp_barrier_wait(&barrier);
>
>     +       while ((int32_t)odp_atomic_load_u32(consume_count) > 0) {
>     +               /* produce as much data as we can to the ring */
>     +               (void)_ring_mp_enqueue_bulk(r, enq, PIECE_BULK);
>     +       }
>     +
>     +       free(enq);
>             return 0;
>      }
>
>      /* worker function for multiple consumer instances */
>      static int do_consumer(_ring_t *r)
>      {
>     -       int i, result = 0;
>     +       int i;
>             void **deq = NULL;
>     -       odp_atomic_u32_t *consume_count = NULL;
>     -       const char *message = "test OK!";
>     -       const char *mismatch = "data mismatch..lockless enq/deq
>     failed.";
>     +       odp_atomic_u32_t *consume_count;
>     +
>     +       consume_count = retrieve_consume_count();
>     +       if (consume_count == NULL) {
>     +               LOG_ERR("cannot retrieve expected consume count.\n");
>     +               return -1;
>     +       }
>
>             /* allocate dummy object pointers for dequeue */
>             deq = malloc(PIECE_BULK * 2 * sizeof(void *));
>     @@ -244,31 +260,19 @@ static int do_consumer(_ring_t *r)
>                     return 0; /* not failure, skip for insufficient
>     memory */
>             }
>
>     -       consume_count = retrieve_consume_count();
>     -       if (consume_count == NULL) {
>     -               LOG_ERR("cannot retrieve expected consume count.\n");
>     -               return -1;
>     -       }
>     +       odp_barrier_wait(&barrier);
>
>     -       while (odp_atomic_load_u32(consume_count) > 0) {
>     -               result = _ring_mc_dequeue_bulk(r, deq, PIECE_BULK);
>     -               if (0 == result) {
>     -                       /* evaluate the data pattern */
>     -                       for (i = 0; i < PIECE_BULK; i++) {
>     -                               if (deq[i] != (void *)(unsigned
>     long)i) {
>     -                                       result = -1;
>     -                                       message = mismatch;
>     -                                       break;
>     -                               }
>     -                       }
>     -
>     -                       free(deq);
>     -                       LOG_ERR("%s\n", message);
>     +       while ((int32_t)odp_atomic_load_u32(consume_count) > 0) {
>     +               if (!_ring_mc_dequeue_bulk(r, deq, PIECE_BULK)) {
>                             odp_atomic_dec_u32(consume_count);
>     -                       return result;
>     +
>     +                       /* evaluate the data pattern */
>     +                       for (i = 0; i < PIECE_BULK; i++)
>     +                               CU_ASSERT(deq[i] == (void
>     *)(unsigned long)i);
>                     }
>     -               usleep(10); /* wait for producer threads */
>             }
>     +
>     +       free(deq);
>             return 0;
>      }
>
>     --
>     2.7.1.250.gff4ea60
>
>     _______________________________________________
>     lng-odp mailing list
>     lng-odp@lists.linaro.org <mailto:lng-odp@lists.linaro.org>
>     https://lists.linaro.org/mailman/listinfo/lng-odp
>
>
diff mbox

Patch

diff --git a/platform/linux-generic/test/ring/ring_stress.c b/platform/linux-generic/test/ring/ring_stress.c
index c68419f..8b6d9ae 100644
--- a/platform/linux-generic/test/ring/ring_stress.c
+++ b/platform/linux-generic/test/ring/ring_stress.c
@@ -54,6 +54,9 @@  static odp_atomic_u32_t *retrieve_consume_count(void);
 static const char *ring_name = "stress ring";
 static const char *consume_count_name = "stress ring consume count";
 
+/* barrier to run threads at the same time */
+static odp_barrier_t barrier;
+
 int ring_test_stress_start(void)
 {
 	odp_shm_t shared;
@@ -120,6 +123,8 @@  void ring_test_stress_1_1_producer_consumer(void)
 	 */
 	odp_atomic_init_u32(consume_count, 1);
 
+	odp_barrier_init(&barrier, 2);
+
 	/* kick the workers */
 	odp_cunit_thread_create(stress_worker, &worker_param);
 
@@ -156,12 +161,13 @@  void ring_test_stress_N_M_producer_consumer(void)
 	consume_count = retrieve_consume_count();
 	CU_ASSERT(consume_count != NULL);
 
-	/* in N:M test case, producer threads are always
-	 * greater or equal to consumer threads, thus produce
-	 * enought "goods" to be consumed by consumer threads.
+	/* all producer threads try to fill ring to RING_SIZE,
+	 * while consumers threads dequeue from ring with PIECE_BULK
+	 * blocks. Multiply on 100 to add more tries.
 	 */
-	odp_atomic_init_u32(consume_count,
-			    (worker_param.numthrds) / 2);
+	odp_atomic_init_u32(consume_count, RING_SIZE / PIECE_BULK * 100);
+
+	odp_barrier_init(&barrier, worker_param.numthrds);
 
 	/* kick the workers */
 	odp_cunit_thread_create(stress_worker, &worker_param);
@@ -202,8 +208,15 @@  static odp_atomic_u32_t *retrieve_consume_count(void)
 /* worker function for multiple producer instances */
 static int do_producer(_ring_t *r)
 {
-	int i, result = 0;
+	int i;
 	void **enq = NULL;
+	odp_atomic_u32_t *consume_count;
+
+	consume_count = retrieve_consume_count();
+	if (consume_count == NULL) {
+		LOG_ERR("cannot retrieve expected consume count.\n");
+		return -1;
+	}
 
 	/* allocate dummy object pointers for enqueue */
 	enq = malloc(PIECE_BULK * 2 * sizeof(void *));
@@ -216,26 +229,29 @@  static int do_producer(_ring_t *r)
 	for (i = 0; i < PIECE_BULK; i++)
 		enq[i] = (void *)(unsigned long)i;
 
-	do {
-		result = _ring_mp_enqueue_bulk(r, enq, PIECE_BULK);
-		if (0 == result) {
-			free(enq);
-			return 0;
-		}
-		usleep(10); /* wait for consumer threads */
-	} while (!_ring_full(r));
+	odp_barrier_wait(&barrier);
 
+	while ((int32_t)odp_atomic_load_u32(consume_count) > 0) {
+		/* produce as much data as we can to the ring */
+		(void)_ring_mp_enqueue_bulk(r, enq, PIECE_BULK);
+	}
+
+	free(enq);
 	return 0;
 }
 
 /* worker function for multiple consumer instances */
 static int do_consumer(_ring_t *r)
 {
-	int i, result = 0;
+	int i;
 	void **deq = NULL;
-	odp_atomic_u32_t *consume_count = NULL;
-	const char *message = "test OK!";
-	const char *mismatch = "data mismatch..lockless enq/deq failed.";
+	odp_atomic_u32_t *consume_count;
+
+	consume_count = retrieve_consume_count();
+	if (consume_count == NULL) {
+		LOG_ERR("cannot retrieve expected consume count.\n");
+		return -1;
+	}
 
 	/* allocate dummy object pointers for dequeue */
 	deq = malloc(PIECE_BULK * 2 * sizeof(void *));
@@ -244,31 +260,19 @@  static int do_consumer(_ring_t *r)
 		return 0; /* not failure, skip for insufficient memory */
 	}
 
-	consume_count = retrieve_consume_count();
-	if (consume_count == NULL) {
-		LOG_ERR("cannot retrieve expected consume count.\n");
-		return -1;
-	}
+	odp_barrier_wait(&barrier);
 
-	while (odp_atomic_load_u32(consume_count) > 0) {
-		result = _ring_mc_dequeue_bulk(r, deq, PIECE_BULK);
-		if (0 == result) {
-			/* evaluate the data pattern */
-			for (i = 0; i < PIECE_BULK; i++) {
-				if (deq[i] != (void *)(unsigned long)i) {
-					result = -1;
-					message = mismatch;
-					break;
-				}
-			}
-
-			free(deq);
-			LOG_ERR("%s\n", message);
+	while ((int32_t)odp_atomic_load_u32(consume_count) > 0) {
+		if (!_ring_mc_dequeue_bulk(r, deq, PIECE_BULK)) {
 			odp_atomic_dec_u32(consume_count);
-			return result;
+
+			/* evaluate the data pattern */
+			for (i = 0; i < PIECE_BULK; i++)
+				CU_ASSERT(deq[i] == (void *)(unsigned long)i);
 		}
-		usleep(10); /* wait for producer threads */
 	}
+
+	free(deq);
 	return 0;
 }